diff --git a/app.py b/app.py index 7166913..09f8f80 100644 --- a/app.py +++ b/app.py @@ -401,8 +401,25 @@ def add_recipe(): return jsonify({"message": "Data added to Redis Stream", "stream_id": stream_id}), 200 +def consume_product_by_barcode(barcode): + try: + product = grocy.product_by_barcode(barcode) + if not product: + response_data = {"message": "Item not found"} + return response_data, 400, None + grocy.consume_product_by_barcode(barcode) + response_data = {"message": "Item removed successfully"} + return response_data, 200, product.name + except Exception as e: + error_message = str(e) + response_data = {"message": error_message} + return response_data, 400, None + + # 定时任务函数:从 Redis Stream 读取未消费的消息 def consume_stream(): + consume_product_flag_key = 'consume-product-flag' + continuous_consume_flag_key = 'continuous-consume-flag' try: # 读取 Stream 中的消息,设置阻塞时间为 0 messages = r.xreadgroup(CONSUMER_GROUP, CONSUMER_NAME, {STREAM_KEY: '>'}, count=10, block=1000) @@ -414,19 +431,43 @@ def consume_stream(): logger.info(f"Processing message: {msg['data']}") if msg.get('data'): if not msg['data'].isnumeric(): - logger.info(f"Skip non-numeric barcode: {msg['data']}") + if msg['data'] == consume_product_flag_key: + # 如果消息内容为 'consume-product',则设置消耗商品标志位 + logger.info(f"Setting consume-product flag") + r.set(consume_product_flag_key, '1', ex=60 * 5) + elif msg['data'] == continuous_consume_flag_key: + # 如果消息内容为 'continuous-consume',则设置连续消耗标志位 + logger.info(f"Setting continuous-consume flag") + r.set(continuous_consume_flag_key, '1', ex=60 * 5) + else: + logger.info(f"Skip non-numeric barcode: {msg['data']}") continue - resp, status_code, good_name = handle_add_product(msg['data'], '') - if not good_name: - good_name = '' - if status_code == 200: - # 消费完成后确认消息 - r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id) - bark_push('%s 添加成功' % good_name, '条形码 %s' % msg['data']) + consume_product_flag = r.get(consume_product_flag_key) + continuous_consume_flag = r.get(continuous_consume_flag_key) + if not (consume_product_flag or continuous_consume_flag): + resp, status_code, good_name = handle_add_product(msg['data'], '') + if not good_name: + good_name = '' + if status_code == 200: + # 消费完成后确认消息 + r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id) + bark_push('%s 添加成功' % good_name, '条形码 %s' % msg['data']) + else: + logger.error(f"Failed to process message: {msg['data']} {resp}") + bark_push('商品添加失败', '条形码 %s' % msg['data']) else: - logger.error(f"Failed to process message: {msg['data']} {resp}") - bark_push('商品添加失败', '条形码 %s' % msg['data']) + resp, status_code, good_name = consume_product_by_barcode(msg['data']) + if status_code == 200: + r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id) + bark_push('%s 消耗成功' % good_name, '条形码 %s' % msg['data']) + else: + logger.error(f"Failed to process message: {msg['data']} {resp}") + bark_push('商品消耗失败', '条形码 %s' % msg['data']) + if consume_product_flag_key: + r.delete(consume_product_flag_key) + + if msg.get('url'): resp, status_code = add_recipe_from_url(msg['url']) if status_code == 200: