From 36c34d41703a1d32901e38ecf4e548998b3bcd3a Mon Sep 17 00:00:00 2001 From: Ching Date: Mon, 7 Oct 2024 18:45:24 +0800 Subject: [PATCH] =?UTF-8?q?feat(api):=20=E6=B7=BB=E5=8A=A0=E6=B6=88?= =?UTF-8?q?=E8=80=97=E5=95=86=E5=93=81=E6=A0=87=E5=BF=97=E4=BD=8D=E5=92=8C?= =?UTF-8?q?=E8=BF=9E=E7=BB=AD=E6=B6=88=E8=80=97=E6=A0=87=E5=BF=97=E4=BD=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 为了支持消耗商品和连续消耗功能,添加了消耗商品标志位和连续消耗标志位。当接收到相应的消息时,设置对应的标志位,并在处理商品消耗时进行判断。如果标志位存在,则执行相应的操作,否则跳过处理。 --- app.py | 61 ++++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 51 insertions(+), 10 deletions(-) diff --git a/app.py b/app.py index 7166913..09f8f80 100644 --- a/app.py +++ b/app.py @@ -401,8 +401,25 @@ def add_recipe(): return jsonify({"message": "Data added to Redis Stream", "stream_id": stream_id}), 200 +def consume_product_by_barcode(barcode): + try: + product = grocy.product_by_barcode(barcode) + if not product: + response_data = {"message": "Item not found"} + return response_data, 400, None + grocy.consume_product_by_barcode(barcode) + response_data = {"message": "Item removed successfully"} + return response_data, 200, product.name + except Exception as e: + error_message = str(e) + response_data = {"message": error_message} + return response_data, 400, None + + # 定时任务函数:从 Redis Stream 读取未消费的消息 def consume_stream(): + consume_product_flag_key = 'consume-product-flag' + continuous_consume_flag_key = 'continuous-consume-flag' try: # 读取 Stream 中的消息,设置阻塞时间为 0 messages = r.xreadgroup(CONSUMER_GROUP, CONSUMER_NAME, {STREAM_KEY: '>'}, count=10, block=1000) @@ -414,19 +431,43 @@ def consume_stream(): logger.info(f"Processing message: {msg['data']}") if msg.get('data'): if not msg['data'].isnumeric(): - logger.info(f"Skip non-numeric barcode: {msg['data']}") + if msg['data'] == consume_product_flag_key: + # 如果消息内容为 'consume-product',则设置消耗商品标志位 + logger.info(f"Setting consume-product flag") + r.set(consume_product_flag_key, '1', ex=60 * 5) + elif msg['data'] == continuous_consume_flag_key: + # 如果消息内容为 'continuous-consume',则设置连续消耗标志位 + logger.info(f"Setting continuous-consume flag") + r.set(continuous_consume_flag_key, '1', ex=60 * 5) + else: + logger.info(f"Skip non-numeric barcode: {msg['data']}") continue - resp, status_code, good_name = handle_add_product(msg['data'], '') - if not good_name: - good_name = '' - if status_code == 200: - # 消费完成后确认消息 - r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id) - bark_push('%s 添加成功' % good_name, '条形码 %s' % msg['data']) + consume_product_flag = r.get(consume_product_flag_key) + continuous_consume_flag = r.get(continuous_consume_flag_key) + if not (consume_product_flag or continuous_consume_flag): + resp, status_code, good_name = handle_add_product(msg['data'], '') + if not good_name: + good_name = '' + if status_code == 200: + # 消费完成后确认消息 + r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id) + bark_push('%s 添加成功' % good_name, '条形码 %s' % msg['data']) + else: + logger.error(f"Failed to process message: {msg['data']} {resp}") + bark_push('商品添加失败', '条形码 %s' % msg['data']) else: - logger.error(f"Failed to process message: {msg['data']} {resp}") - bark_push('商品添加失败', '条形码 %s' % msg['data']) + resp, status_code, good_name = consume_product_by_barcode(msg['data']) + if status_code == 200: + r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id) + bark_push('%s 消耗成功' % good_name, '条形码 %s' % msg['data']) + else: + logger.error(f"Failed to process message: {msg['data']} {resp}") + bark_push('商品消耗失败', '条形码 %s' % msg['data']) + if consume_product_flag_key: + r.delete(consume_product_flag_key) + + if msg.get('url'): resp, status_code = add_recipe_from_url(msg['url']) if status_code == 200: