feat(api): 添加消耗商品标志位和连续消耗标志位
Some checks reported errors
continuous-integration/drone/push Build was killed

为了支持消耗商品和连续消耗功能,添加了消耗商品标志位和连续消耗标志位。当接收到相应的消息时,设置对应的标志位,并在处理商品消耗时进行判断。如果标志位存在,则执行相应的操作,否则跳过处理。
This commit is contained in:
Ching 2024-10-07 18:45:24 +08:00
parent 51b62df189
commit 36c34d4170

41
app.py
View File

@ -401,8 +401,25 @@ def add_recipe():
return jsonify({"message": "Data added to Redis Stream", "stream_id": stream_id}), 200 return jsonify({"message": "Data added to Redis Stream", "stream_id": stream_id}), 200
def consume_product_by_barcode(barcode):
try:
product = grocy.product_by_barcode(barcode)
if not product:
response_data = {"message": "Item not found"}
return response_data, 400, None
grocy.consume_product_by_barcode(barcode)
response_data = {"message": "Item removed successfully"}
return response_data, 200, product.name
except Exception as e:
error_message = str(e)
response_data = {"message": error_message}
return response_data, 400, None
# 定时任务函数:从 Redis Stream 读取未消费的消息 # 定时任务函数:从 Redis Stream 读取未消费的消息
def consume_stream(): def consume_stream():
consume_product_flag_key = 'consume-product-flag'
continuous_consume_flag_key = 'continuous-consume-flag'
try: try:
# 读取 Stream 中的消息,设置阻塞时间为 0 # 读取 Stream 中的消息,设置阻塞时间为 0
messages = r.xreadgroup(CONSUMER_GROUP, CONSUMER_NAME, {STREAM_KEY: '>'}, count=10, block=1000) messages = r.xreadgroup(CONSUMER_GROUP, CONSUMER_NAME, {STREAM_KEY: '>'}, count=10, block=1000)
@ -414,9 +431,21 @@ def consume_stream():
logger.info(f"Processing message: {msg['data']}") logger.info(f"Processing message: {msg['data']}")
if msg.get('data'): if msg.get('data'):
if not msg['data'].isnumeric(): if not msg['data'].isnumeric():
if msg['data'] == consume_product_flag_key:
# 如果消息内容为 'consume-product',则设置消耗商品标志位
logger.info(f"Setting consume-product flag")
r.set(consume_product_flag_key, '1', ex=60 * 5)
elif msg['data'] == continuous_consume_flag_key:
# 如果消息内容为 'continuous-consume',则设置连续消耗标志位
logger.info(f"Setting continuous-consume flag")
r.set(continuous_consume_flag_key, '1', ex=60 * 5)
else:
logger.info(f"Skip non-numeric barcode: {msg['data']}") logger.info(f"Skip non-numeric barcode: {msg['data']}")
continue continue
consume_product_flag = r.get(consume_product_flag_key)
continuous_consume_flag = r.get(continuous_consume_flag_key)
if not (consume_product_flag or continuous_consume_flag):
resp, status_code, good_name = handle_add_product(msg['data'], '') resp, status_code, good_name = handle_add_product(msg['data'], '')
if not good_name: if not good_name:
good_name = '' good_name = ''
@ -427,6 +456,18 @@ def consume_stream():
else: else:
logger.error(f"Failed to process message: {msg['data']} {resp}") logger.error(f"Failed to process message: {msg['data']} {resp}")
bark_push('商品添加失败', '条形码 %s' % msg['data']) bark_push('商品添加失败', '条形码 %s' % msg['data'])
else:
resp, status_code, good_name = consume_product_by_barcode(msg['data'])
if status_code == 200:
r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id)
bark_push('%s 消耗成功' % good_name, '条形码 %s' % msg['data'])
else:
logger.error(f"Failed to process message: {msg['data']} {resp}")
bark_push('商品消耗失败', '条形码 %s' % msg['data'])
if consume_product_flag_key:
r.delete(consume_product_flag_key)
if msg.get('url'): if msg.get('url'):
resp, status_code = add_recipe_from_url(msg['url']) resp, status_code = add_recipe_from_url(msg['url'])
if status_code == 200: if status_code == 200: