feat(api): 菜谱接口使用 redis streams TUN-124
All checks were successful
continuous-integration/drone/push Build is passing

为应用程序添加了从 URL 添加食谱的功能,允许用户通过提供来源和URL来添加食谱。如果成功添加,将会返回成功消息,否则将会返回错误消息。

该功能的实现包括以下步骤:
- 在`app.py`文件中添加了`add_recipe_from_url`函数,用于处理从URL添加食谱的逻辑。
- 在`/add_recipe`路由中,根据请求中的来源和URL调用`add_recipe_from_url`函数来添加食谱。
- 如果添加成功,将会返回成功消息和状态码200;否则,将会返回错误消息和适当的状态码。

该功能的实现还包括对Redis的连接和消费者组的创建,以及从Redis Stream中读取未消费的消息并进行处理。
This commit is contained in:
Ching 2024-09-30 00:23:41 +08:00
parent 11307820b4
commit 51b62df189

103
app.py
View File

@ -46,6 +46,26 @@ app = Flask(__name__)
grocy = Grocy(GROCY_URL, GROCY_API_KEY, GROCY_PORT, verify_ssl=True)
# 连接到 Redis
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True)
# 定义 Redis Stream 的键
STREAM_KEY = "message_stream"
CONSUMER_GROUP = "consumer_group_1"
CONSUMER_NAME = "consumer_1"
# 创建 Redis 消费者组(如果不存在)
def create_consumer_group():
try:
r.xgroup_create(STREAM_KEY, CONSUMER_GROUP, id='0', mkstream=True)
except redis.exceptions.ResponseError as e:
if "BUSYGROUP" in str(e):
# 消费者组已存在,不报错
pass
create_consumer_group()
def get_locations():
locations = grocy.get_generic_objects_for_type(EntityType.LOCATIONS)
return locations
@ -317,17 +337,7 @@ def upload_image(url):
return None
@app.route("/add_recipe", methods=["POST"])
def add_recipe():
data = request.json
source = data.get("source", "xiachufang")
url = data.get("url", "")
if source != "xiachufang":
response_data = {"message": "Invalid source"}
return jsonify(response_data), 400
if not url:
response_data = {"message": "Invalid url"}
return jsonify(response_data), 400
def add_recipe_from_url(url):
try:
# https://www.xiachufang.com/recipe/107141278/?recipe_type=1&page_scene=6
recipe_number = re.search(r'(\d+)', url).group(1)
@ -368,32 +378,28 @@ def add_recipe():
"description": description
}
grocy.add_generic(EntityType.RECIPES, data_grocy)
response_data = {"message": f"Recipe {data_grocy['name']} added successfully"}
response_data = {"message": f"{data_grocy['name']}"}
except Exception as e:
error_message = str(e)
response_data = {"message": error_message}
return response_data, 400
@app.route("/add_recipe", methods=["POST"])
def add_recipe():
data = request.json
source = data.get("source", "xiachufang")
url = data.get("url", "")
if source != "xiachufang":
response_data = {"message": "Invalid source"}
return jsonify(response_data), 400
if not url:
response_data = {"message": "Invalid url"}
return jsonify(response_data), 400
return jsonify(response_data), 200
stream_id = r.xadd(STREAM_KEY, data)
return jsonify({"message": "Data added to Redis Stream", "stream_id": stream_id}), 200
# 连接到 Redis
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True)
# 定义 Redis Stream 的键
STREAM_KEY = "message_stream"
CONSUMER_GROUP = "consumer_group_1"
CONSUMER_NAME = "consumer_1"
# 创建 Redis 消费者组(如果不存在)
def create_consumer_group():
try:
r.xgroup_create(STREAM_KEY, CONSUMER_GROUP, id='0', mkstream=True)
except redis.exceptions.ResponseError as e:
if "BUSYGROUP" in str(e):
# 消费者组已存在,不报错
pass
create_consumer_group()
# 定时任务函数:从 Redis Stream 读取未消费的消息
def consume_stream():
@ -406,20 +412,29 @@ def consume_stream():
for msg_id, msg in msgs:
# 打印或处理消息
logger.info(f"Processing message: {msg['data']}")
if not msg['data'].isnumeric():
logger.info(f"Skip non-numeric barcode: {msg['data']}")
continue
if msg.get('data'):
if not msg['data'].isnumeric():
logger.info(f"Skip non-numeric barcode: {msg['data']}")
continue
resp, status_code, good_name = handle_add_product(msg['data'], '')
if not good_name:
good_name = ''
if status_code == 200:
# 消费完成后确认消息
r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id)
bark_push('%s 添加成功' % good_name, '条形码 %s' % msg['data'])
else:
logger.error(f"Failed to process message: {msg['data']} {resp}")
bark_push('商品添加失败', '条形码 %s' % msg['data'])
resp, status_code, good_name = handle_add_product(msg['data'], '')
if not good_name:
good_name = ''
if status_code == 200:
# 消费完成后确认消息
r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id)
bark_push('%s 添加成功' % good_name, '条形码 %s' % msg['data'])
else:
logger.error(f"Failed to process message: {msg['data']} {resp}")
bark_push('商品添加失败', '条形码 %s' % msg['data'])
if msg.get('url'):
resp, status_code = add_recipe_from_url(msg['url'])
if status_code == 200:
r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id)
bark_push(resp['message'], '来源 %s' % msg['url'])
else:
logger.error(f"Failed to process message: {msg['url']} {resp}")
bark_push('食谱添加失败', '%s 来源 %s' % (resp['message'], msg['url']))
except Exception as e:
logger.exception(f"Error processing messages: {str(e)}")