diff --git a/app.py b/app.py index a60f5ab..fc76454 100644 --- a/app.py +++ b/app.py @@ -6,6 +6,10 @@ import re import requests from flask import Flask, jsonify, request from pygrocy import EntityType, Grocy +import redis +from apscheduler.schedulers.background import BackgroundScheduler +import atexit + from barcode import BarcodeSpider from recipe import get_recipe_from_xiachufang @@ -31,6 +35,8 @@ GROCY_DEFAULT_QUANTITY_UNIT_ID_PURCHASE = os.environ.get("GROCY_DEFAULT_QUANTITY GROCY_DEFAULT_QUANTITY_UNIT_ID_STOCK = os.environ.get("GROCY_DEFAULT_QUANTITY_UNIT_ID_STOCK") GROCY_DEFAULT_BEST_BEFORE_DAYS = os.environ.get("GROCY_DEFAULT_BEST_BEFORE_DAYS") X_RapidAPI_Key = os.environ.get("X_RapidAPI_Key") +REDIS_HOST = os.environ.get("REDIS_HOST") +REDIS_PORT = os.environ.get("REDIS_PORT") app = Flask(__name__) grocy = Grocy(GROCY_URL, GROCY_API_KEY, GROCY_PORT, verify_ssl=True) @@ -322,5 +328,70 @@ def add_recipe(): return jsonify(response_data), 200 +# 连接到 Redis +r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True) + +# 定义 Redis Stream 的键 +STREAM_KEY = "message_stream" +CONSUMER_GROUP = "consumer_group_1" +CONSUMER_NAME = "consumer_1" + +# 创建 Redis 消费者组(如果不存在) +def create_consumer_group(): + try: + r.xgroup_create(STREAM_KEY, CONSUMER_GROUP, id='0', mkstream=True) + except redis.exceptions.ResponseError as e: + if "BUSYGROUP" in str(e): + # 消费者组已存在,不报错 + pass + +create_consumer_group() + +# 定时任务函数:从 Redis Stream 读取未消费的消息 +def consume_stream(): + try: + # 读取 Stream 中的消息,设置阻塞时间为 0 + messages = r.xreadgroup(CONSUMER_GROUP, CONSUMER_NAME, {STREAM_KEY: '>'}, count=10, block=1000) + + if messages: + for stream, msgs in messages: + for msg_id, msg in msgs: + # 打印或处理消息 + print(f"Received message ID: {msg_id}, data: {msg}") + + # 消费完成后确认消息 + r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id) + + except Exception as e: + print(f"Error consuming stream: {e}") + + +@app.route('/barcode', methods=['POST']) +def add_to_stream(): + try: + # 获取请求中的 JSON 数据 + data = request.json + if not data: + return jsonify({"message": "No data provided"}), 400 + + # 将数据添加到 Redis Stream 中 + # 自动生成 ID ('*' 表示让 Redis 生成唯一 ID) + stream_id = r.xadd(STREAM_KEY, data) + + return jsonify({"message": "Data added to Redis Stream", "stream_id": stream_id}), 200 + + except Exception as e: + return jsonify({"message": str(e)}), 500 + + +# 初始化 APScheduler 并添加定时任务 +scheduler = BackgroundScheduler() +scheduler.add_job(func=consume_stream, trigger="interval", seconds=5) +scheduler.start() + +# 确保程序退出时停止定时任务 +atexit.register(lambda: scheduler.shutdown()) + + if __name__ == "__main__": app.run(host="0.0.0.0", port=9288)