- Added run_sso_script.sh for remote script execution with SSH - Added sso_script.py with Django-based SSO management functions - Implemented XDP Ultra redeem code generation functionality - Added batch product binding and update capabilities - Included comprehensive documentation and usage examples - Added automatic cleanup mechanisms for remote temporary files
289 lines
9.2 KiB
Python
289 lines
9.2 KiB
Python
import os
|
|
import sys
|
|
import django
|
|
|
|
# --- 关键的 Django 设置 ---
|
|
project_path = '/srv/sso'
|
|
if project_path not in sys.path:
|
|
sys.path.append(project_path)
|
|
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'sso.settings')
|
|
django.setup()
|
|
|
|
import canton.models
|
|
from canton.models import *
|
|
import bson
|
|
import uuid
|
|
from utils import const
|
|
import hydrogen.flex_engine
|
|
import requests
|
|
from django.utils.timezone import get_current_timezone, localtime, now
|
|
import datetime
|
|
import utils.const
|
|
import django.conf
|
|
|
|
|
|
# --- 功能函数定义 ---
|
|
def get_engine(table_name):
|
|
return hydrogen.flex_engine.FlexEngine.get_engine_by_name(
|
|
miniapp_id=44328,
|
|
schema_name=table_name,
|
|
enable_acl_check=False)
|
|
|
|
|
|
def gen_code(info_id, count, print_only=False):
|
|
info_engine = get_engine('license_info')
|
|
code_engine = get_engine('license_redeem_code')
|
|
|
|
info = info_engine.get_by_id(info_id)
|
|
|
|
uuids = []
|
|
for x in range(count):
|
|
uuids.append(str(uuid.uuid4()))
|
|
|
|
for uid in uuids:
|
|
code_engine.create(
|
|
code=uid,
|
|
status='valid',
|
|
valid_from=info['valid_from'],
|
|
valid_until=info['valid_until'],
|
|
license_info=bson.dbref.DBRef(
|
|
info_engine.collection.name,
|
|
info['_id']),
|
|
created_by=27068886,
|
|
_read_perm=[const.HYDROGEN_ACL_CREATED_BY_TPL],
|
|
_write_perm=[],
|
|
)
|
|
if print_only:
|
|
print(uid)
|
|
if print_only:
|
|
return
|
|
# save uuids to csv
|
|
fn = '/tmp/%s-%s.csv' % (info['name'].replace(' ', ''), localtime().strftime('%Y%m%d'))
|
|
with open(fn, 'a') as f:
|
|
for uid in uuids:
|
|
f.write(uid + '\n')
|
|
|
|
print(fn)
|
|
|
|
|
|
def issue_license(code, psn):
|
|
code_engine = get_engine('license_redeem_code')
|
|
code = code_engine.filter(code=code, status=utils.const.TANGZHI_LICENSE_REDEEM_CODE_STATUS_VALID)[0]
|
|
if not code:
|
|
return False
|
|
code_engine = get_engine('license_redeem_code')
|
|
license_info = code_engine.dereference(code['license_info'])
|
|
feature_engine = get_engine('license_feature')
|
|
license_data = {
|
|
'license': {
|
|
'psn': psn,
|
|
'features': [],
|
|
'signing_key': 'IFANR_IOT_KEY',
|
|
},
|
|
'enable_cloud_push': True}
|
|
for feature_id in license_info['license_feature']:
|
|
feature = feature_engine.get(feature_id=feature_id)
|
|
if feature and 'CP02' in feature['applicable_devices']:
|
|
license_data['license']['features'].append({
|
|
'feature_id': feature_id,
|
|
'applicable_devices': feature['applicable_devices'],
|
|
})
|
|
if not license_data['license']['features']:
|
|
return
|
|
if license_info['license_type'] == utils.const.TANGZHI_LICENSE_TYPE_TIMED:
|
|
if license_info.get('expiry_days'):
|
|
license_data['license']['not_before'] = utils.timestamp_of(now())
|
|
license_data['license']['not_after'] = utils.timestamp_of(
|
|
now() + datetime.timedelta(
|
|
days=license_info['expiry_days']))
|
|
else:
|
|
license_data['license']['not_before'] = license_info['not_before']
|
|
license_data['license']['not_after'] = license_info['not_after']
|
|
elif license_info['license_type'] == utils.const.TANGZHI_LICENSE_TYPE_COUNTER:
|
|
license_data['license']['max_usage'] = license_info['max_usage']
|
|
elif license_info['license_type'] == utils.const.TANGZHI_LICENSE_TYPE_RELATIVE_TIME:
|
|
license_data['license']['validity_hours'] = license_info['validity_hours']
|
|
|
|
resp = requests.post(
|
|
django.conf.settings.TANGZHI_LICENSE_ISSUE_URL,
|
|
json=license_data)
|
|
if resp.status_code != utils.const.HTTP_OK:
|
|
print('Failed to issue license: %s, data: %s' % (resp.text, license_data))
|
|
return
|
|
device_license_engine = get_engine(
|
|
django.conf.settings.IOT_DEVICE_LICENSE_SCHEMA_NAME)
|
|
snapshot = {
|
|
'license_redeem_code': code_engine.document_2_dict(code, dereference=True)
|
|
}
|
|
code_engine.update_by_id(
|
|
code['_id'],
|
|
psn=psn)
|
|
device_license_engine.update_by_id(
|
|
resp.json()['license']['id'],
|
|
snapshot=snapshot)
|
|
|
|
resp_data = device_license_engine.get_by_id(
|
|
resp.json()['license']['id'])
|
|
for feature in resp_data['license']['features']:
|
|
feature['tags'] = feature_engine.get(feature_id=feature['feature_id'])['tags']
|
|
|
|
return resp_data
|
|
|
|
|
|
def bind_product(product_id, batch_id, start, stop):
|
|
"""绑定商品到序列号批次"""
|
|
# 绑定商品
|
|
null_product = canton.models.ProductInfo.objects.filter(name='').first()
|
|
new_product = canton.models.ProductInfo.objects.filter(id=product_id).first()
|
|
|
|
psn__in = [_data['psn'] for _data in canton.models.SerialNumber.generate_serial_number(**{
|
|
'batch_id': batch_id,
|
|
'start': start,
|
|
'stop': stop,
|
|
'sign': False,
|
|
})]
|
|
|
|
sns = SerialNumber.objects.filter(psn__in=psn__in)
|
|
|
|
fail_sns = sns.filter(~Q(product=null_product) & ~Q(product=new_product))
|
|
if fail_sns:
|
|
raise ValueError("部分设备已绑定 %s" % fail_sns.values_list('psn', flat=True))
|
|
|
|
sns.update(product=new_product)
|
|
return sns.last().serialize()
|
|
|
|
|
|
def update_product(new_product_id, batch_id, start, stop):
|
|
"""更新商品绑定"""
|
|
# 更新商品
|
|
new_product = canton.models.ProductInfo.objects.filter(id=new_product_id).first()
|
|
|
|
psn__in = [_data['psn'] for _data in canton.models.SerialNumber.generate_serial_number(**{
|
|
'batch_id': batch_id,
|
|
'start': start,
|
|
'stop': stop,
|
|
'sign': False,
|
|
})]
|
|
|
|
sns = SerialNumber.objects.filter(psn__in=psn__in)
|
|
|
|
fail_sns = sns.filter(product=new_product)
|
|
if fail_sns:
|
|
raise ValueError("部分设备已绑定为当前商品 %s" % fail_sns.values_list('psn', flat=True))
|
|
|
|
sns.update(product=new_product)
|
|
return sns.last().serialize()
|
|
|
|
|
|
# --- 交互式菜单功能 ---
|
|
def create_xdp_ultra_redeem_code():
|
|
"""创建 xdp ultra redeem code"""
|
|
print("\n=== 创建 XDP Ultra Redeem Code ===")
|
|
count = input("请输入要生成的数量: ")
|
|
try:
|
|
count = int(count)
|
|
if count <= 0:
|
|
print("数量必须大于 0")
|
|
return
|
|
print(f"\n开始生成 {count} 个 redeem code...")
|
|
# ultra 购买
|
|
info_id = '677758f12c450457fad77408'
|
|
gen_code(info_id, count, True)
|
|
print(f"\n成功生成 {count} 个 redeem code")
|
|
except ValueError:
|
|
print("错误: 请输入有效的数字")
|
|
except Exception as e:
|
|
print(f"执行出错: {e}")
|
|
|
|
|
|
def batch_bind_product():
|
|
"""批次绑定商品"""
|
|
print("\n=== 批次绑定商品 ===")
|
|
try:
|
|
product_id = int(input("请输入商品ID (product_id): "))
|
|
batch_id = int(input("请输入批次ID (batch_id): "))
|
|
start = int(input("请输入开始流水号 (start): "))
|
|
stop = int(input("请输入结束流水号 (stop): "))
|
|
|
|
print(f"\n开始绑定商品...")
|
|
print(f"商品ID: {product_id}, 批次ID: {batch_id}, Start: {start}, Stop: {stop}")
|
|
|
|
result = bind_product(product_id, batch_id, start, stop)
|
|
print(f"\n绑定成功!")
|
|
print(f"结果: {result}")
|
|
except ValueError as e:
|
|
print(f"输入错误: {e}")
|
|
except Exception as e:
|
|
print(f"执行出错: {e}")
|
|
|
|
|
|
def batch_update_product():
|
|
"""批次更新商品"""
|
|
print("\n=== 批次更新商品 ===")
|
|
try:
|
|
new_product_id = int(input("请输入新商品ID (new_product_id): "))
|
|
batch_id = int(input("请输入批次ID (batch_id): "))
|
|
start = int(input("请输入开始流水号 (start): "))
|
|
stop = int(input("请输入结束流水号 (stop): "))
|
|
|
|
print(f"\n开始更新商品...")
|
|
print(f"新商品ID: {new_product_id}, 批次ID: {batch_id}, Start: {start}, Stop: {stop}")
|
|
|
|
result = update_product(new_product_id, batch_id, start, stop)
|
|
print(f"\n更新成功!")
|
|
print(f"结果: {result}")
|
|
except ValueError as e:
|
|
print(f"输入错误: {e}")
|
|
except Exception as e:
|
|
print(f"执行出错: {e}")
|
|
|
|
|
|
def show_menu():
|
|
"""显示功能菜单"""
|
|
print("\n" + "="*50)
|
|
print("SSO 交互式脚本")
|
|
print("="*50)
|
|
print("请选择要执行的功能:")
|
|
print("1. 创建 XDP Ultra Redeem Code")
|
|
print("2. 批次绑定商品")
|
|
print("3. 批次更新商品")
|
|
print("0. 退出")
|
|
print("="*50)
|
|
|
|
|
|
def main():
|
|
"""主函数"""
|
|
print("--- 开始执行交互式 Django 脚本 ---\n")
|
|
|
|
while True:
|
|
show_menu()
|
|
choice = input("\n请输入功能编号: ").strip()
|
|
|
|
if choice == '1':
|
|
create_xdp_ultra_redeem_code()
|
|
elif choice == '2':
|
|
batch_bind_product()
|
|
elif choice == '3':
|
|
batch_update_product()
|
|
elif choice == '0':
|
|
print("\n退出脚本...")
|
|
break
|
|
else:
|
|
print("\n无效的选择,请重新输入")
|
|
|
|
# 询问是否继续
|
|
if choice != '0':
|
|
continue_choice = input("\n是否继续执行其他功能? (y/n): ").strip().lower()
|
|
if continue_choice != 'y':
|
|
print("\n退出脚本...")
|
|
break
|
|
|
|
print("\n--- 脚本执行完毕 ---")
|
|
|
|
|
|
# 执行主函数
|
|
if __name__ == '__main__':
|
|
main()
|
|
|
|
|