From 51b62df1899cc8eb8c03b9f8a14be891612ca03a Mon Sep 17 00:00:00 2001 From: Ching Date: Mon, 30 Sep 2024 00:23:41 +0800 Subject: [PATCH] =?UTF-8?q?feat(api):=20=E8=8F=9C=E8=B0=B1=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E4=BD=BF=E7=94=A8=20redis=20streams=20TUN-124?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 为应用程序添加了从 URL 添加食谱的功能,允许用户通过提供来源和URL来添加食谱。如果成功添加,将会返回成功消息,否则将会返回错误消息。 该功能的实现包括以下步骤: - 在`app.py`文件中添加了`add_recipe_from_url`函数,用于处理从URL添加食谱的逻辑。 - 在`/add_recipe`路由中,根据请求中的来源和URL调用`add_recipe_from_url`函数来添加食谱。 - 如果添加成功,将会返回成功消息和状态码200;否则,将会返回错误消息和适当的状态码。 该功能的实现还包括对Redis的连接和消费者组的创建,以及从Redis Stream中读取未消费的消息并进行处理。 --- app.py | 103 +++++++++++++++++++++++++++++++++------------------------ 1 file changed, 59 insertions(+), 44 deletions(-) diff --git a/app.py b/app.py index c2552be..7166913 100644 --- a/app.py +++ b/app.py @@ -46,6 +46,26 @@ app = Flask(__name__) grocy = Grocy(GROCY_URL, GROCY_API_KEY, GROCY_PORT, verify_ssl=True) +# 连接到 Redis +r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True) + +# 定义 Redis Stream 的键 +STREAM_KEY = "message_stream" +CONSUMER_GROUP = "consumer_group_1" +CONSUMER_NAME = "consumer_1" + +# 创建 Redis 消费者组(如果不存在) +def create_consumer_group(): + try: + r.xgroup_create(STREAM_KEY, CONSUMER_GROUP, id='0', mkstream=True) + except redis.exceptions.ResponseError as e: + if "BUSYGROUP" in str(e): + # 消费者组已存在,不报错 + pass + +create_consumer_group() + + def get_locations(): locations = grocy.get_generic_objects_for_type(EntityType.LOCATIONS) return locations @@ -317,17 +337,7 @@ def upload_image(url): return None -@app.route("/add_recipe", methods=["POST"]) -def add_recipe(): - data = request.json - source = data.get("source", "xiachufang") - url = data.get("url", "") - if source != "xiachufang": - response_data = {"message": "Invalid source"} - return jsonify(response_data), 400 - if not url: - response_data = {"message": "Invalid url"} - return jsonify(response_data), 400 +def add_recipe_from_url(url): try: # https://www.xiachufang.com/recipe/107141278/?recipe_type=1&page_scene=6 recipe_number = re.search(r'(\d+)', url).group(1) @@ -368,32 +378,28 @@ def add_recipe(): "description": description } grocy.add_generic(EntityType.RECIPES, data_grocy) - response_data = {"message": f"Recipe {data_grocy['name']} added successfully"} + response_data = {"message": f"{data_grocy['name']}"} except Exception as e: error_message = str(e) response_data = {"message": error_message} + return response_data, 400 + + +@app.route("/add_recipe", methods=["POST"]) +def add_recipe(): + data = request.json + source = data.get("source", "xiachufang") + url = data.get("url", "") + if source != "xiachufang": + response_data = {"message": "Invalid source"} + return jsonify(response_data), 400 + if not url: + response_data = {"message": "Invalid url"} return jsonify(response_data), 400 - return jsonify(response_data), 200 + stream_id = r.xadd(STREAM_KEY, data) + return jsonify({"message": "Data added to Redis Stream", "stream_id": stream_id}), 200 -# 连接到 Redis -r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True) - -# 定义 Redis Stream 的键 -STREAM_KEY = "message_stream" -CONSUMER_GROUP = "consumer_group_1" -CONSUMER_NAME = "consumer_1" - -# 创建 Redis 消费者组(如果不存在) -def create_consumer_group(): - try: - r.xgroup_create(STREAM_KEY, CONSUMER_GROUP, id='0', mkstream=True) - except redis.exceptions.ResponseError as e: - if "BUSYGROUP" in str(e): - # 消费者组已存在,不报错 - pass - -create_consumer_group() # 定时任务函数:从 Redis Stream 读取未消费的消息 def consume_stream(): @@ -406,20 +412,29 @@ def consume_stream(): for msg_id, msg in msgs: # 打印或处理消息 logger.info(f"Processing message: {msg['data']}") - if not msg['data'].isnumeric(): - logger.info(f"Skip non-numeric barcode: {msg['data']}") - continue + if msg.get('data'): + if not msg['data'].isnumeric(): + logger.info(f"Skip non-numeric barcode: {msg['data']}") + continue - resp, status_code, good_name = handle_add_product(msg['data'], '') - if not good_name: - good_name = '' - if status_code == 200: - # 消费完成后确认消息 - r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id) - bark_push('%s 添加成功' % good_name, '条形码 %s' % msg['data']) - else: - logger.error(f"Failed to process message: {msg['data']} {resp}") - bark_push('商品添加失败', '条形码 %s' % msg['data']) + resp, status_code, good_name = handle_add_product(msg['data'], '') + if not good_name: + good_name = '' + if status_code == 200: + # 消费完成后确认消息 + r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id) + bark_push('%s 添加成功' % good_name, '条形码 %s' % msg['data']) + else: + logger.error(f"Failed to process message: {msg['data']} {resp}") + bark_push('商品添加失败', '条形码 %s' % msg['data']) + if msg.get('url'): + resp, status_code = add_recipe_from_url(msg['url']) + if status_code == 200: + r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id) + bark_push(resp['message'], '来源 %s' % msg['url']) + else: + logger.error(f"Failed to process message: {msg['url']} {resp}") + bark_push('食谱添加失败', '%s 来源 %s' % (resp['message'], msg['url'])) except Exception as e: logger.exception(f"Error processing messages: {str(e)}")