feat(api): 增加 barcode 接口;增加 redis streams 功能

使用 redis streams 处理 barcode
This commit is contained in:
Ching 2024-09-22 23:03:34 +08:00
parent b38fc00072
commit c46ea5bd16

71
app.py
View File

@ -6,6 +6,10 @@ import re
import requests
from flask import Flask, jsonify, request
from pygrocy import EntityType, Grocy
import redis
from apscheduler.schedulers.background import BackgroundScheduler
import atexit
from barcode import BarcodeSpider
from recipe import get_recipe_from_xiachufang
@ -31,6 +35,8 @@ GROCY_DEFAULT_QUANTITY_UNIT_ID_PURCHASE = os.environ.get("GROCY_DEFAULT_QUANTITY
GROCY_DEFAULT_QUANTITY_UNIT_ID_STOCK = os.environ.get("GROCY_DEFAULT_QUANTITY_UNIT_ID_STOCK")
GROCY_DEFAULT_BEST_BEFORE_DAYS = os.environ.get("GROCY_DEFAULT_BEST_BEFORE_DAYS")
X_RapidAPI_Key = os.environ.get("X_RapidAPI_Key")
REDIS_HOST = os.environ.get("REDIS_HOST")
REDIS_PORT = os.environ.get("REDIS_PORT")
app = Flask(__name__)
grocy = Grocy(GROCY_URL, GROCY_API_KEY, GROCY_PORT, verify_ssl=True)
@ -322,5 +328,70 @@ def add_recipe():
return jsonify(response_data), 200
# 连接到 Redis
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True)
# 定义 Redis Stream 的键
STREAM_KEY = "message_stream"
CONSUMER_GROUP = "consumer_group_1"
CONSUMER_NAME = "consumer_1"
# 创建 Redis 消费者组(如果不存在)
def create_consumer_group():
try:
r.xgroup_create(STREAM_KEY, CONSUMER_GROUP, id='0', mkstream=True)
except redis.exceptions.ResponseError as e:
if "BUSYGROUP" in str(e):
# 消费者组已存在,不报错
pass
create_consumer_group()
# 定时任务函数:从 Redis Stream 读取未消费的消息
def consume_stream():
try:
# 读取 Stream 中的消息,设置阻塞时间为 0
messages = r.xreadgroup(CONSUMER_GROUP, CONSUMER_NAME, {STREAM_KEY: '>'}, count=10, block=1000)
if messages:
for stream, msgs in messages:
for msg_id, msg in msgs:
# 打印或处理消息
print(f"Received message ID: {msg_id}, data: {msg}")
# 消费完成后确认消息
r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id)
except Exception as e:
print(f"Error consuming stream: {e}")
@app.route('/barcode', methods=['POST'])
def add_to_stream():
try:
# 获取请求中的 JSON 数据
data = request.json
if not data:
return jsonify({"message": "No data provided"}), 400
# 将数据添加到 Redis Stream 中
# 自动生成 ID ('*' 表示让 Redis 生成唯一 ID)
stream_id = r.xadd(STREAM_KEY, data)
return jsonify({"message": "Data added to Redis Stream", "stream_id": stream_id}), 200
except Exception as e:
return jsonify({"message": str(e)}), 500
# 初始化 APScheduler 并添加定时任务
scheduler = BackgroundScheduler()
scheduler.add_job(func=consume_stream, trigger="interval", seconds=5)
scheduler.start()
# 确保程序退出时停止定时任务
atexit.register(lambda: scheduler.shutdown())
if __name__ == "__main__":
app.run(host="0.0.0.0", port=9288)