- Add function to update merchant Product Bonus Coupon configuration - Support modifying bonus mappings and validity periods - Add interactive menu option for configuration updates
448 lines
16 KiB
Python
448 lines
16 KiB
Python
import os
|
|
import sys
|
|
import django
|
|
|
|
# --- 关键的 Django 设置 ---
|
|
project_path = '/srv/sso'
|
|
if project_path not in sys.path:
|
|
sys.path.append(project_path)
|
|
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'sso.settings')
|
|
django.setup()
|
|
|
|
import canton.models
|
|
from canton.models import *
|
|
import bson
|
|
import uuid
|
|
from utils import const
|
|
import hydrogen.flex_engine
|
|
import requests
|
|
from django.utils.timezone import get_current_timezone, localtime, now
|
|
import datetime
|
|
import utils.const
|
|
import django.conf
|
|
import utils.dataintel
|
|
|
|
|
|
# --- 功能函数定义 ---
|
|
def get_engine(table_name):
|
|
return hydrogen.flex_engine.FlexEngine.get_engine_by_name(
|
|
miniapp_id=44328,
|
|
schema_name=table_name,
|
|
enable_acl_check=False)
|
|
|
|
|
|
def gen_code(info_id, count, print_only=False):
|
|
info_engine = get_engine('license_info')
|
|
code_engine = get_engine('license_redeem_code')
|
|
|
|
info = info_engine.get_by_id(info_id)
|
|
|
|
uuids = []
|
|
for x in range(count):
|
|
uuids.append(str(uuid.uuid4()))
|
|
|
|
for uid in uuids:
|
|
code_engine.create(
|
|
code=uid,
|
|
status='valid',
|
|
valid_from=info['valid_from'],
|
|
valid_until=info['valid_until'],
|
|
license_info=bson.dbref.DBRef(
|
|
info_engine.collection.name,
|
|
info['_id']),
|
|
created_by=27068886,
|
|
_read_perm=[const.HYDROGEN_ACL_CREATED_BY_TPL],
|
|
_write_perm=[],
|
|
)
|
|
if print_only:
|
|
print(uid)
|
|
if print_only:
|
|
return
|
|
# save uuids to csv
|
|
fn = '/tmp/%s-%s.csv' % (info['name'].replace(' ', ''), localtime().strftime('%Y%m%d'))
|
|
with open(fn, 'a') as f:
|
|
for uid in uuids:
|
|
f.write(uid + '\n')
|
|
|
|
print(fn)
|
|
|
|
|
|
def issue_license(code, psn):
|
|
code_engine = get_engine('license_redeem_code')
|
|
code = code_engine.filter(code=code, status=utils.const.TANGZHI_LICENSE_REDEEM_CODE_STATUS_VALID)[0]
|
|
if not code:
|
|
return False
|
|
code_engine = get_engine('license_redeem_code')
|
|
license_info = code_engine.dereference(code['license_info'])
|
|
feature_engine = get_engine('license_feature')
|
|
license_data = {
|
|
'license': {
|
|
'psn': psn,
|
|
'features': [],
|
|
'signing_key': 'IFANR_IOT_KEY',
|
|
},
|
|
'enable_cloud_push': True}
|
|
for feature_id in license_info['license_feature']:
|
|
feature = feature_engine.get(feature_id=feature_id)
|
|
if feature and 'CP02' in feature['applicable_devices']:
|
|
license_data['license']['features'].append({
|
|
'feature_id': feature_id,
|
|
'applicable_devices': feature['applicable_devices'],
|
|
})
|
|
if not license_data['license']['features']:
|
|
return
|
|
if license_info['license_type'] == utils.const.TANGZHI_LICENSE_TYPE_TIMED:
|
|
if license_info.get('expiry_days'):
|
|
license_data['license']['not_before'] = utils.timestamp_of(now())
|
|
license_data['license']['not_after'] = utils.timestamp_of(
|
|
now() + datetime.timedelta(
|
|
days=license_info['expiry_days']))
|
|
else:
|
|
license_data['license']['not_before'] = license_info['not_before']
|
|
license_data['license']['not_after'] = license_info['not_after']
|
|
elif license_info['license_type'] == utils.const.TANGZHI_LICENSE_TYPE_COUNTER:
|
|
license_data['license']['max_usage'] = license_info['max_usage']
|
|
elif license_info['license_type'] == utils.const.TANGZHI_LICENSE_TYPE_RELATIVE_TIME:
|
|
license_data['license']['validity_hours'] = license_info['validity_hours']
|
|
|
|
resp = requests.post(
|
|
django.conf.settings.TANGZHI_LICENSE_ISSUE_URL,
|
|
json=license_data)
|
|
if resp.status_code != utils.const.HTTP_OK:
|
|
print('Failed to issue license: %s, data: %s' % (resp.text, license_data))
|
|
return
|
|
device_license_engine = get_engine(
|
|
django.conf.settings.IOT_DEVICE_LICENSE_SCHEMA_NAME)
|
|
snapshot = {
|
|
'license_redeem_code': code_engine.document_2_dict(code, dereference=True)
|
|
}
|
|
code_engine.update_by_id(
|
|
code['_id'],
|
|
psn=psn)
|
|
device_license_engine.update_by_id(
|
|
resp.json()['license']['id'],
|
|
snapshot=snapshot)
|
|
|
|
resp_data = device_license_engine.get_by_id(
|
|
resp.json()['license']['id'])
|
|
for feature in resp_data['license']['features']:
|
|
feature['tags'] = feature_engine.get(feature_id=feature['feature_id'])['tags']
|
|
|
|
return resp_data
|
|
|
|
|
|
def bind_product(product_id, batch_id, start, stop):
|
|
"""绑定商品到序列号批次"""
|
|
# 绑定商品
|
|
null_product = canton.models.ProductInfo.objects.filter(name='').first()
|
|
new_product = canton.models.ProductInfo.objects.filter(id=product_id).first()
|
|
|
|
psn__in = [_data['psn'] for _data in canton.models.SerialNumber.generate_serial_number(**{
|
|
'batch_id': batch_id,
|
|
'start': start,
|
|
'stop': stop,
|
|
'sign': False,
|
|
})]
|
|
|
|
sns = SerialNumber.objects.filter(psn__in=psn__in)
|
|
|
|
fail_sns = sns.filter(~Q(product=null_product) & ~Q(product=new_product))
|
|
if fail_sns:
|
|
raise ValueError("部分设备已绑定 %s" % fail_sns.values_list('psn', flat=True))
|
|
|
|
sns.update(product=new_product)
|
|
return sns.last().serialize()
|
|
|
|
|
|
def update_product(new_product_id, batch_id, start, stop):
|
|
"""更新商品绑定"""
|
|
# 更新商品
|
|
new_product = canton.models.ProductInfo.objects.filter(id=new_product_id).first()
|
|
|
|
psn__in = [_data['psn'] for _data in canton.models.SerialNumber.generate_serial_number(**{
|
|
'batch_id': batch_id,
|
|
'start': start,
|
|
'stop': stop,
|
|
'sign': False,
|
|
})]
|
|
|
|
sns = SerialNumber.objects.filter(psn__in=psn__in)
|
|
|
|
fail_sns = sns.filter(product=new_product)
|
|
if fail_sns:
|
|
raise ValueError("部分设备已绑定为当前商品 %s" % fail_sns.values_list('psn', flat=True))
|
|
|
|
sns.update(product=new_product)
|
|
return sns.last().serialize()
|
|
|
|
|
|
def filter_xdp_device_info(keyword):
|
|
engine = get_engine('device_info')
|
|
try:
|
|
if len(keyword) == 4:
|
|
result = engine.filter(passcode=keyword)[0]
|
|
else:
|
|
result = engine.filter(psn=keyword)[0]
|
|
except IndexError:
|
|
result = None
|
|
print("未找到设备信息")
|
|
return
|
|
|
|
# 定义 RSA 相关的字段
|
|
rsa_keys = ['rsa_public_key', 'rsa_private_key']
|
|
# 定义不需要在表格中展示的字段
|
|
exclude_fields = ['_read_perm', '_write_perm', 'created_by', 'mcu_uid']
|
|
|
|
# 分离 RSA 字段和其他字段
|
|
rsa_data = {}
|
|
table_data = {}
|
|
for key, value in result.items():
|
|
if key in rsa_keys:
|
|
rsa_data[key] = value
|
|
elif key in ['created_at', 'updated_at']:
|
|
table_data[key] = utils.dataintel.strf_time(utils.timestamp_to_local_datetime(value))
|
|
elif key not in exclude_fields:
|
|
table_data[key] = value
|
|
|
|
# 计算表格宽度
|
|
max_key_len = max(len(str(k)) for k in table_data.keys())
|
|
max_val_len = min(80, max(len(str(v)[:80]) for v in table_data.values()))
|
|
|
|
# 打印表格
|
|
print("\n" + "=" * (max_key_len + max_val_len + 7))
|
|
print(f"| {'field'.ljust(max_key_len)} | {'value'.ljust(max_val_len)} |")
|
|
print("|" + "-" * (max_key_len + 2) + "|" + "-" * (max_val_len + 2) + "|")
|
|
|
|
for key, value in table_data.items():
|
|
val_str = str(value)
|
|
if len(val_str) > 80:
|
|
val_str = val_str[:77] + "..."
|
|
print(f"| {str(key).ljust(max_key_len)} | {val_str.ljust(max_val_len)} |")
|
|
|
|
print("=" * (max_key_len + max_val_len + 7))
|
|
|
|
# 打印 RSA keys
|
|
for key, value in rsa_data.items():
|
|
print(f"\n--- {key} ---")
|
|
print(value)
|
|
|
|
|
|
|
|
# --- 交互式菜单功能 ---
|
|
def create_xdp_ultra_redeem_code():
|
|
"""创建 xdp ultra redeem code"""
|
|
print("\n=== 创建 XDP Ultra Redeem Code ===")
|
|
count = input("请输入要生成的数量: ")
|
|
try:
|
|
count = int(count)
|
|
if count <= 0:
|
|
print("数量必须大于 0")
|
|
return
|
|
print(f"\n开始生成 {count} 个 redeem code...")
|
|
# ultra 购买
|
|
info_id = '677758f12c450457fad77408'
|
|
gen_code(info_id, count, True)
|
|
print(f"\n成功生成 {count} 个 redeem code")
|
|
except ValueError:
|
|
print("错误: 请输入有效的数字")
|
|
except Exception as e:
|
|
print(f"执行出错: {e}")
|
|
|
|
|
|
def batch_bind_product():
|
|
"""批次绑定商品"""
|
|
print("\n=== 批次绑定商品 ===")
|
|
try:
|
|
product_id = int(input("请输入商品ID (product_id): "))
|
|
batch_id = int(input("请输入批次ID (batch_id): "))
|
|
start = int(input("请输入开始流水号 (start): "))
|
|
stop = int(input("请输入结束流水号 (stop): "))
|
|
|
|
print(f"\n开始绑定商品...")
|
|
print(f"商品ID: {product_id}, 批次ID: {batch_id}, Start: {start}, Stop: {stop}")
|
|
|
|
result = bind_product(product_id, batch_id, start, stop)
|
|
print(f"\n绑定成功!")
|
|
print(f"结果: {result}")
|
|
except ValueError as e:
|
|
print(f"输入错误: {e}")
|
|
except Exception as e:
|
|
print(f"执行出错: {e}")
|
|
|
|
|
|
def batch_update_product():
|
|
"""批次更新商品"""
|
|
print("\n=== 批次更新商品 ===")
|
|
try:
|
|
new_product_id = int(input("请输入新商品ID (new_product_id): "))
|
|
batch_id = int(input("请输入批次ID (batch_id): "))
|
|
start = int(input("请输入开始流水号 (start): "))
|
|
stop = int(input("请输入结束流水号 (stop): "))
|
|
|
|
print(f"\n开始更新商品...")
|
|
print(f"新商品ID: {new_product_id}, 批次ID: {batch_id}, Start: {start}, Stop: {stop}")
|
|
|
|
result = update_product(new_product_id, batch_id, start, stop)
|
|
print(f"\n更新成功!")
|
|
print(f"结果: {result}")
|
|
except ValueError as e:
|
|
print(f"输入错误: {e}")
|
|
except Exception as e:
|
|
print(f"执行出错: {e}")
|
|
|
|
|
|
def query_xdp_device_info():
|
|
"""查询 XDP 设备信息"""
|
|
print("\n=== 查询 XDP 设备信息 ===")
|
|
keyword = input("请输入 PSN 或 Passcode (4位): ").strip()
|
|
if not keyword:
|
|
print("错误: 请输入有效的关键字")
|
|
return
|
|
filter_xdp_device_info(keyword)
|
|
|
|
|
|
def update_merchant_config_bonus():
|
|
"""修改商户配置中的 bonus 和有效期"""
|
|
import json
|
|
print("\n=== 修改商户 Product Bonus Coupon 配置 ===")
|
|
try:
|
|
merchant = canton.models.Merchant.objects.get(id=1489)
|
|
print(f"商户名称: {merchant.name}")
|
|
|
|
merchant_config = canton.models.MerchantConfig.objects.filter(
|
|
merchant=merchant,
|
|
status=utils.const.CANTON_MERCHANT_CONFIG_STATUS_ACTIVE,
|
|
config_type=utils.const.CANTON_MERCHANT_CONFIG_TYPE_PRODUCT_BONUS_COUPON).last()
|
|
|
|
if not merchant_config:
|
|
print("未找到有效的 Product Bonus Coupon 配置")
|
|
return
|
|
|
|
component = merchant_config.component[0]
|
|
|
|
print("\n当前配置:")
|
|
print(f" bonus: {component.get('bonus', {})}")
|
|
valid_from = component.get('valid_from')
|
|
valid_until = component.get('valid_until')
|
|
if valid_from:
|
|
print(f" valid_from: {valid_from} ({datetime.datetime.fromtimestamp(valid_from)})")
|
|
if valid_until:
|
|
print(f" valid_until: {valid_until} ({datetime.datetime.fromtimestamp(valid_until)})")
|
|
|
|
print("\n请选择要修改的内容:")
|
|
print("1. 修改 bonus")
|
|
print("2. 修改 valid_from 和 valid_until")
|
|
print("3. 修改全部")
|
|
modify_choice = input("请选择 (1/2/3): ").strip()
|
|
|
|
if modify_choice in ['1', '3']:
|
|
print("\n当前 bonus:", component.get('bonus', {}))
|
|
print("请输入新的 bonus (JSON 格式):")
|
|
print("示例: {\"13365\": [3341, 3343], \"13410\": [3341, 3343]}")
|
|
bonus_input = input("新 bonus: ").strip()
|
|
if bonus_input:
|
|
new_bonus = json.loads(bonus_input)
|
|
component['bonus'] = new_bonus
|
|
print(f"bonus 已更新为: {new_bonus}")
|
|
|
|
if modify_choice in ['2', '3']:
|
|
print("\n请输入新的有效期:")
|
|
print("格式: YYYY-MM-DD HH:MM:SS 或时间戳")
|
|
|
|
valid_from_input = input("valid_from (留空保持不变): ").strip()
|
|
if valid_from_input:
|
|
if valid_from_input.isdigit():
|
|
component['valid_from'] = int(valid_from_input)
|
|
else:
|
|
dt = datetime.datetime.strptime(valid_from_input, "%Y-%m-%d %H:%M:%S")
|
|
component['valid_from'] = int(dt.timestamp())
|
|
print(f"valid_from 已更新为: {component['valid_from']} ({datetime.datetime.fromtimestamp(component['valid_from'])})")
|
|
|
|
valid_until_input = input("valid_until (留空保持不变): ").strip()
|
|
if valid_until_input:
|
|
if valid_until_input.isdigit():
|
|
component['valid_until'] = int(valid_until_input)
|
|
else:
|
|
dt = datetime.datetime.strptime(valid_until_input, "%Y-%m-%d %H:%M:%S")
|
|
component['valid_until'] = int(dt.timestamp())
|
|
print(f"valid_until 已更新为: {component['valid_until']} ({datetime.datetime.fromtimestamp(component['valid_until'])})")
|
|
|
|
merchant_config.component[0] = component
|
|
|
|
print("\n更新后的配置:")
|
|
print(f" bonus: {component.get('bonus', {})}")
|
|
print(f" valid_from: {component.get('valid_from')} ({datetime.datetime.fromtimestamp(component.get('valid_from', 0))})")
|
|
print(f" valid_until: {component.get('valid_until')} ({datetime.datetime.fromtimestamp(component.get('valid_until', 0))})")
|
|
|
|
confirm = input("\n确认保存? (y/n): ").strip().lower()
|
|
if confirm == 'y':
|
|
merchant_config.save()
|
|
print("配置已保存!")
|
|
else:
|
|
print("已取消保存")
|
|
|
|
except canton.models.Merchant.DoesNotExist:
|
|
print("未找到指定的商户")
|
|
except json.JSONDecodeError as e:
|
|
print(f"JSON 解析错误: {e}")
|
|
except ValueError as e:
|
|
print(f"输入错误: {e}")
|
|
except Exception as e:
|
|
print(f"执行出错: {e}")
|
|
|
|
|
|
def show_menu():
|
|
"""显示功能菜单"""
|
|
print("\n" + "="*50)
|
|
print("SSO 交互式脚本")
|
|
print("="*50)
|
|
print("请选择要执行的功能:")
|
|
print("1. 创建 XDP Ultra Redeem Code")
|
|
print("2. 批次绑定商品")
|
|
print("3. 批次更新商品")
|
|
print("4. 查询 XDP 设备信息")
|
|
print("5. 修改商户 Product Bonus Coupon 配置")
|
|
print("0. 退出")
|
|
print("="*50)
|
|
|
|
|
|
def main():
|
|
"""主函数"""
|
|
print("--- 开始执行交互式 Django 脚本 ---\n")
|
|
|
|
while True:
|
|
show_menu()
|
|
choice = input("\n请输入功能编号: ").strip()
|
|
|
|
if choice == '1':
|
|
create_xdp_ultra_redeem_code()
|
|
elif choice == '2':
|
|
batch_bind_product()
|
|
elif choice == '3':
|
|
batch_update_product()
|
|
elif choice == '4':
|
|
query_xdp_device_info()
|
|
elif choice == '5':
|
|
update_merchant_config_bonus()
|
|
elif choice == '0':
|
|
print("\n退出脚本...")
|
|
break
|
|
else:
|
|
print("\n无效的选择,请重新输入")
|
|
|
|
# 询问是否继续
|
|
if choice != '0':
|
|
continue_choice = input("\n是否继续执行其他功能? (y/n): ").strip().lower()
|
|
if continue_choice != 'y':
|
|
print("\n退出脚本...")
|
|
break
|
|
|
|
print("\n--- 脚本执行完毕 ---")
|
|
|
|
|
|
# 执行主函数
|
|
if __name__ == '__main__':
|
|
main()
|
|
|
|
|