import os import sys import django # --- 关键的 Django 设置 --- project_path = '/srv/sso' if project_path not in sys.path: sys.path.append(project_path) os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'sso.settings') django.setup() import canton.models from canton.models import * import bson import uuid from utils import const import hydrogen.flex_engine import requests from django.utils.timezone import get_current_timezone, localtime, now import datetime import utils.const import django.conf import utils.dataintel # --- 功能函数定义 --- def get_engine(table_name): return hydrogen.flex_engine.FlexEngine.get_engine_by_name( miniapp_id=44328, schema_name=table_name, enable_acl_check=False) def gen_code(info_id, count, print_only=False): info_engine = get_engine('license_info') code_engine = get_engine('license_redeem_code') info = info_engine.get_by_id(info_id) uuids = [] for x in range(count): uuids.append(str(uuid.uuid4())) for uid in uuids: code_engine.create( code=uid, status='valid', valid_from=info['valid_from'], valid_until=info['valid_until'], license_info=bson.dbref.DBRef( info_engine.collection.name, info['_id']), created_by=27068886, _read_perm=[const.HYDROGEN_ACL_CREATED_BY_TPL], _write_perm=[], ) if print_only: print(uid) if print_only: return # save uuids to csv fn = '/tmp/%s-%s.csv' % (info['name'].replace(' ', ''), localtime().strftime('%Y%m%d')) with open(fn, 'a') as f: for uid in uuids: f.write(uid + '\n') print(fn) def issue_license(code, psn): code_engine = get_engine('license_redeem_code') code = code_engine.filter(code=code, status=utils.const.TANGZHI_LICENSE_REDEEM_CODE_STATUS_VALID)[0] if not code: return False code_engine = get_engine('license_redeem_code') license_info = code_engine.dereference(code['license_info']) feature_engine = get_engine('license_feature') license_data = { 'license': { 'psn': psn, 'features': [], 'signing_key': 'IFANR_IOT_KEY', }, 'enable_cloud_push': True} for feature_id in license_info['license_feature']: feature = feature_engine.get(feature_id=feature_id) if feature and 'CP02' in feature['applicable_devices']: license_data['license']['features'].append({ 'feature_id': feature_id, 'applicable_devices': feature['applicable_devices'], }) if not license_data['license']['features']: return if license_info['license_type'] == utils.const.TANGZHI_LICENSE_TYPE_TIMED: if license_info.get('expiry_days'): license_data['license']['not_before'] = utils.timestamp_of(now()) license_data['license']['not_after'] = utils.timestamp_of( now() + datetime.timedelta( days=license_info['expiry_days'])) else: license_data['license']['not_before'] = license_info['not_before'] license_data['license']['not_after'] = license_info['not_after'] elif license_info['license_type'] == utils.const.TANGZHI_LICENSE_TYPE_COUNTER: license_data['license']['max_usage'] = license_info['max_usage'] elif license_info['license_type'] == utils.const.TANGZHI_LICENSE_TYPE_RELATIVE_TIME: license_data['license']['validity_hours'] = license_info['validity_hours'] resp = requests.post( django.conf.settings.TANGZHI_LICENSE_ISSUE_URL, json=license_data) if resp.status_code != utils.const.HTTP_OK: print('Failed to issue license: %s, data: %s' % (resp.text, license_data)) return device_license_engine = get_engine( django.conf.settings.IOT_DEVICE_LICENSE_SCHEMA_NAME) snapshot = { 'license_redeem_code': code_engine.document_2_dict(code, dereference=True) } code_engine.update_by_id( code['_id'], psn=psn) device_license_engine.update_by_id( resp.json()['license']['id'], snapshot=snapshot) resp_data = device_license_engine.get_by_id( resp.json()['license']['id']) for feature in resp_data['license']['features']: feature['tags'] = feature_engine.get(feature_id=feature['feature_id'])['tags'] return resp_data def bind_product(product_id, batch_id, start, stop): """绑定商品到序列号批次""" # 绑定商品 null_product = canton.models.ProductInfo.objects.filter(name='').first() new_product = canton.models.ProductInfo.objects.filter(id=product_id).first() psn__in = [_data['psn'] for _data in canton.models.SerialNumber.generate_serial_number(**{ 'batch_id': batch_id, 'start': start, 'stop': stop, 'sign': False, })] sns = SerialNumber.objects.filter(psn__in=psn__in) fail_sns = sns.filter(~Q(product=null_product) & ~Q(product=new_product)) if fail_sns: raise ValueError("部分设备已绑定 %s" % fail_sns.values_list('psn', flat=True)) sns.update(product=new_product) return sns.last().serialize() def update_product(new_product_id, batch_id, start, stop): """更新商品绑定""" # 更新商品 new_product = canton.models.ProductInfo.objects.filter(id=new_product_id).first() psn__in = [_data['psn'] for _data in canton.models.SerialNumber.generate_serial_number(**{ 'batch_id': batch_id, 'start': start, 'stop': stop, 'sign': False, })] sns = SerialNumber.objects.filter(psn__in=psn__in) fail_sns = sns.filter(product=new_product) if fail_sns: raise ValueError("部分设备已绑定为当前商品 %s" % fail_sns.values_list('psn', flat=True)) sns.update(product=new_product) return sns.last().serialize() def filter_xdp_device_info(keyword): engine = get_engine('device_info') try: if len(keyword) == 4: result = engine.filter(passcode=keyword)[0] else: result = engine.filter(psn=keyword)[0] except IndexError: result = None print("未找到设备信息") return # 定义 RSA 相关的字段 rsa_keys = ['rsa_public_key', 'rsa_private_key'] # 定义不需要在表格中展示的字段 exclude_fields = ['_read_perm', '_write_perm', 'created_by', 'mcu_uid'] # 分离 RSA 字段和其他字段 rsa_data = {} table_data = {} for key, value in result.items(): if key in rsa_keys: rsa_data[key] = value elif key in ['created_at', 'updated_at']: table_data[key] = utils.dataintel.strf_time(utils.timestamp_to_local_datetime(value)) elif key not in exclude_fields: table_data[key] = value # 计算表格宽度 max_key_len = max(len(str(k)) for k in table_data.keys()) max_val_len = min(80, max(len(str(v)[:80]) for v in table_data.values())) # 打印表格 print("\n" + "=" * (max_key_len + max_val_len + 7)) print(f"| {'field'.ljust(max_key_len)} | {'value'.ljust(max_val_len)} |") print("|" + "-" * (max_key_len + 2) + "|" + "-" * (max_val_len + 2) + "|") for key, value in table_data.items(): val_str = str(value) if len(val_str) > 80: val_str = val_str[:77] + "..." print(f"| {str(key).ljust(max_key_len)} | {val_str.ljust(max_val_len)} |") print("=" * (max_key_len + max_val_len + 7)) # 打印 RSA keys for key, value in rsa_data.items(): print(f"\n--- {key} ---") print(value) # --- 交互式菜单功能 --- def create_xdp_ultra_redeem_code(): """创建 xdp ultra redeem code""" print("\n=== 创建 XDP Ultra Redeem Code ===") count = input("请输入要生成的数量: ") try: count = int(count) if count <= 0: print("数量必须大于 0") return print(f"\n开始生成 {count} 个 redeem code...") # ultra 购买 info_id = '677758f12c450457fad77408' gen_code(info_id, count, True) print(f"\n成功生成 {count} 个 redeem code") except ValueError: print("错误: 请输入有效的数字") except Exception as e: print(f"执行出错: {e}") def batch_bind_product(): """批次绑定商品""" print("\n=== 批次绑定商品 ===") try: product_id = int(input("请输入商品ID (product_id): ")) batch_id = int(input("请输入批次ID (batch_id): ")) start = int(input("请输入开始流水号 (start): ")) stop = int(input("请输入结束流水号 (stop): ")) print(f"\n开始绑定商品...") print(f"商品ID: {product_id}, 批次ID: {batch_id}, Start: {start}, Stop: {stop}") result = bind_product(product_id, batch_id, start, stop) print(f"\n绑定成功!") print(f"结果: {result}") except ValueError as e: print(f"输入错误: {e}") except Exception as e: print(f"执行出错: {e}") def batch_update_product(): """批次更新商品""" print("\n=== 批次更新商品 ===") try: new_product_id = int(input("请输入新商品ID (new_product_id): ")) batch_id = int(input("请输入批次ID (batch_id): ")) start = int(input("请输入开始流水号 (start): ")) stop = int(input("请输入结束流水号 (stop): ")) print(f"\n开始更新商品...") print(f"新商品ID: {new_product_id}, 批次ID: {batch_id}, Start: {start}, Stop: {stop}") result = update_product(new_product_id, batch_id, start, stop) print(f"\n更新成功!") print(f"结果: {result}") except ValueError as e: print(f"输入错误: {e}") except Exception as e: print(f"执行出错: {e}") def query_xdp_device_info(): """查询 XDP 设备信息""" print("\n=== 查询 XDP 设备信息 ===") keyword = input("请输入 PSN 或 Passcode (4位): ").strip() if not keyword: print("错误: 请输入有效的关键字") return filter_xdp_device_info(keyword) def show_menu(): """显示功能菜单""" print("\n" + "="*50) print("SSO 交互式脚本") print("="*50) print("请选择要执行的功能:") print("1. 创建 XDP Ultra Redeem Code") print("2. 批次绑定商品") print("3. 批次更新商品") print("4. 查询 XDP 设备信息") print("0. 退出") print("="*50) def main(): """主函数""" print("--- 开始执行交互式 Django 脚本 ---\n") while True: show_menu() choice = input("\n请输入功能编号: ").strip() if choice == '1': create_xdp_ultra_redeem_code() elif choice == '2': batch_bind_product() elif choice == '3': batch_update_product() elif choice == '4': query_xdp_device_info() elif choice == '0': print("\n退出脚本...") break else: print("\n无效的选择,请重新输入") # 询问是否继续 if choice != '0': continue_choice = input("\n是否继续执行其他功能? (y/n): ").strip().lower() if continue_choice != 'y': print("\n退出脚本...") break print("\n--- 脚本执行完毕 ---") # 执行主函数 if __name__ == '__main__': main()