From d6c822122376d4905dff89d4a72e36bfaf954a4e Mon Sep 17 00:00:00 2001 From: Ching L Date: Wed, 25 Sep 2024 16:22:37 +0800 Subject: [PATCH] =?UTF-8?q?feat(api):=20=E4=BF=AE=E6=94=B9=E5=A4=84?= =?UTF-8?q?=E7=90=86=E6=89=AB=E7=A0=81=E7=9A=84=E5=AE=9A=E6=97=B6=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E4=B8=BA=E6=B7=BB=E5=8A=A0=E5=95=86=E5=93=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app.py | 70 +++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 45 insertions(+), 25 deletions(-) diff --git a/app.py b/app.py index 19fe458..1301ddb 100644 --- a/app.py +++ b/app.py @@ -3,18 +3,20 @@ import json import os import re -import requests +from apscheduler.schedulers.background import BackgroundScheduler from flask import Flask, jsonify, request from pygrocy import EntityType, Grocy -import redis -from apscheduler.schedulers.background import BackgroundScheduler import atexit +import base64 +import loguru +import redis +import requests from barcode import BarcodeSpider from recipe import get_recipe_from_xiachufang -import base64 +logger = loguru.logger # config = configparser.ConfigParser() # config.read('config.ini') # GROCY_URL = config.get('Grocy', 'GROCY_URL') @@ -47,6 +49,7 @@ def get_locations(): locations = grocy.get_generic_objects_for_type(EntityType.LOCATIONS) return locations + def bark_push(title, text): icon = 'https://styles.redditmedia.com/t5_10dd8f/styles/communityIcon_rwl05b14iveb1.png' data = { @@ -59,6 +62,7 @@ def bark_push(title, text): } requests.post('http://bark.tunpok.com/push/', json=data) + def add_product(dict_good, location): good_name = "" if "description" in dict_good: @@ -224,17 +228,7 @@ def convert_image_to_base64(image_content): return base64_image -@app.route("/") -def index(): - return "Up and running!" - - -@app.route("/add", methods=["POST"]) -def add(): - data = request.json - location = data.get("location", "") - barcode = data.get("barcode", "") - +def handle_add_product(barcode, location): try: product = grocy.product_by_barcode(barcode) barcode_ = None @@ -249,29 +243,43 @@ def add(): grocy.add_product_by_barcode(barcode, amount, 0.0) response_data = {"message": "Item added successfully"} - return jsonify(response_data), 200 + return response_data, 200 except: spider = BarcodeSpider(x_rapidapi_key=X_RapidAPI_Key) good = spider.get_good(barcode) if not good: response_data = {"message": "Item not found"} - return jsonify(response_data), 400 + return response_data, 400 try: if add_product(good, location): response_data = {"message": "New item added successfully"} - return jsonify(response_data), 200 + return response_data, 200 else: response_data = {"message": "Fail to add new item"} - return jsonify(response_data), 400 + return response_data, 400 except Exception as e: if hasattr(e, "message"): error_message = e.message else: error_message = str(e) response_data = {"message": error_message} - return jsonify(response_data), 400 + return response_data, 400 +@app.route("/") +def index(): + return "Up and running!" + + +@app.route("/add", methods=["POST"]) +def add(): + data = request.json + location = data.get("location", "") + barcode = data.get("barcode", "") + + response_data, status_code = handle_add_product(barcode, location) + return jsonify(response_data), status_code + @app.route("/consume", methods=["POST"]) def consume(): @@ -363,21 +371,33 @@ create_consumer_group() def consume_stream(): try: # 读取 Stream 中的消息,设置阻塞时间为 0 - messages = r.xreadgroup(CONSUMER_GROUP, CONSUMER_NAME, {STREAM_KEY: '>'}, count=10, block=1000) + messages = r.xreadgroup(CONSUMER_GROUP, CONSUMER_NAME, {STREAM_KEY: '>'}, count=5, block=1000) if messages: for stream, msgs in messages: for msg_id, msg in msgs: # 打印或处理消息 - print(f"Received message ID: {msg_id}, data: {msg}") + logger.info(f"Processing message: {msg['data']}") + if not msg['data'].isnumeric(): + logger.info(f"Skip non-numeric barcode: {msg['data']}") + continue - # 消费完成后确认消息 - r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id) + _, status_code = handle_add_product(msg['data'], '') + if status_code == 200: + # 消费完成后确认消息 + r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id) + bark_push('商品添加成功', '条形码 %s' % msg['data']) + else: + logger.error(f"Failed to process message: {msg['data']}") + bark_push('商品添加失败', '条形码 %s' % msg['data']) except Exception as e: print(f"Error consuming stream: {e}") +# TODO: 定时任务函数:从 Redis Stream 读取消费者组的未确认消息 +# def check_unacked_messages() + @app.route('/barcode', methods=['POST']) def add_to_stream(): try: @@ -405,7 +425,7 @@ def add_to_stream(): # 初始化 APScheduler 并添加定时任务 scheduler = BackgroundScheduler() -scheduler.add_job(func=consume_stream, trigger="interval", seconds=5) +scheduler.add_job(func=consume_stream, trigger="interval", seconds=300) scheduler.start() # 确保程序退出时停止定时任务