Compare commits

...

2 Commits

Author SHA1 Message Date
Ching
51b62df189 feat(api): 菜谱接口使用 redis streams TUN-124
All checks were successful
continuous-integration/drone/push Build is passing
为应用程序添加了从 URL 添加食谱的功能,允许用户通过提供来源和URL来添加食谱。如果成功添加,将会返回成功消息,否则将会返回错误消息。

该功能的实现包括以下步骤:
- 在`app.py`文件中添加了`add_recipe_from_url`函数,用于处理从URL添加食谱的逻辑。
- 在`/add_recipe`路由中,根据请求中的来源和URL调用`add_recipe_from_url`函数来添加食谱。
- 如果添加成功,将会返回成功消息和状态码200;否则,将会返回错误消息和适当的状态码。

该功能的实现还包括对Redis的连接和消费者组的创建,以及从Redis Stream中读取未消费的消息并进行处理。
2024-09-30 00:23:41 +08:00
Ching
11307820b4 fix(api): 修复上传图片失败的问题 2024-09-30 00:08:45 +08:00

86
app.py
View File

@ -8,6 +8,7 @@ from flask import Flask, jsonify, request
from pygrocy import EntityType, Grocy from pygrocy import EntityType, Grocy
import atexit import atexit
import base64 import base64
import mimetypes
import loguru import loguru
import redis import redis
import requests import requests
@ -45,6 +46,26 @@ app = Flask(__name__)
grocy = Grocy(GROCY_URL, GROCY_API_KEY, GROCY_PORT, verify_ssl=True) grocy = Grocy(GROCY_URL, GROCY_API_KEY, GROCY_PORT, verify_ssl=True)
# 连接到 Redis
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True)
# 定义 Redis Stream 的键
STREAM_KEY = "message_stream"
CONSUMER_GROUP = "consumer_group_1"
CONSUMER_NAME = "consumer_1"
# 创建 Redis 消费者组(如果不存在)
def create_consumer_group():
try:
r.xgroup_create(STREAM_KEY, CONSUMER_GROUP, id='0', mkstream=True)
except redis.exceptions.ResponseError as e:
if "BUSYGROUP" in str(e):
# 消费者组已存在,不报错
pass
create_consumer_group()
def get_locations(): def get_locations():
locations = grocy.get_generic_objects_for_type(EntityType.LOCATIONS) locations = grocy.get_generic_objects_for_type(EntityType.LOCATIONS)
return locations return locations
@ -301,26 +322,22 @@ def consume():
def upload_image(url): def upload_image(url):
# download image # download image
url = url.split('?')[0]
image_name = url.split('/')[-1]
img = requests.get(url) img = requests.get(url)
mime_type = mimetypes.guess_type(image_name)[0]
if not mime_type:
mime_type = 'image/jpeg'
# compress image # compress image
compress_url = 'https://upload-tinypng.tunpok.com/upload/' compress_url = 'https://upload-tinypng.tunpok.com/upload/'
resp = requests.post(compress_url, files={'file': img.content}) files = {'file': ('img.jpeg', img.content, mime_type,)}
resp = requests.post(compress_url, files=files, )
if resp.status_code == 200: if resp.status_code == 200:
return resp.json()['cdn_url'] return resp.json()['cdn_url']
return None return None
@app.route("/add_recipe", methods=["POST"]) def add_recipe_from_url(url):
def add_recipe():
data = request.json
source = data.get("source", "xiachufang")
url = data.get("url", "")
if source != "xiachufang":
response_data = {"message": "Invalid source"}
return jsonify(response_data), 400
if not url:
response_data = {"message": "Invalid url"}
return jsonify(response_data), 400
try: try:
# https://www.xiachufang.com/recipe/107141278/?recipe_type=1&page_scene=6 # https://www.xiachufang.com/recipe/107141278/?recipe_type=1&page_scene=6
recipe_number = re.search(r'(\d+)', url).group(1) recipe_number = re.search(r'(\d+)', url).group(1)
@ -361,32 +378,28 @@ def add_recipe():
"description": description "description": description
} }
grocy.add_generic(EntityType.RECIPES, data_grocy) grocy.add_generic(EntityType.RECIPES, data_grocy)
response_data = {"message": f"Recipe {data_grocy['name']} added successfully"} response_data = {"message": f"{data_grocy['name']}"}
except Exception as e: except Exception as e:
error_message = str(e) error_message = str(e)
response_data = {"message": error_message} response_data = {"message": error_message}
return response_data, 400
@app.route("/add_recipe", methods=["POST"])
def add_recipe():
data = request.json
source = data.get("source", "xiachufang")
url = data.get("url", "")
if source != "xiachufang":
response_data = {"message": "Invalid source"}
return jsonify(response_data), 400
if not url:
response_data = {"message": "Invalid url"}
return jsonify(response_data), 400 return jsonify(response_data), 400
return jsonify(response_data), 200
stream_id = r.xadd(STREAM_KEY, data)
return jsonify({"message": "Data added to Redis Stream", "stream_id": stream_id}), 200
# 连接到 Redis
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True)
# 定义 Redis Stream 的键
STREAM_KEY = "message_stream"
CONSUMER_GROUP = "consumer_group_1"
CONSUMER_NAME = "consumer_1"
# 创建 Redis 消费者组(如果不存在)
def create_consumer_group():
try:
r.xgroup_create(STREAM_KEY, CONSUMER_GROUP, id='0', mkstream=True)
except redis.exceptions.ResponseError as e:
if "BUSYGROUP" in str(e):
# 消费者组已存在,不报错
pass
create_consumer_group()
# 定时任务函数:从 Redis Stream 读取未消费的消息 # 定时任务函数:从 Redis Stream 读取未消费的消息
def consume_stream(): def consume_stream():
@ -399,6 +412,7 @@ def consume_stream():
for msg_id, msg in msgs: for msg_id, msg in msgs:
# 打印或处理消息 # 打印或处理消息
logger.info(f"Processing message: {msg['data']}") logger.info(f"Processing message: {msg['data']}")
if msg.get('data'):
if not msg['data'].isnumeric(): if not msg['data'].isnumeric():
logger.info(f"Skip non-numeric barcode: {msg['data']}") logger.info(f"Skip non-numeric barcode: {msg['data']}")
continue continue
@ -413,6 +427,14 @@ def consume_stream():
else: else:
logger.error(f"Failed to process message: {msg['data']} {resp}") logger.error(f"Failed to process message: {msg['data']} {resp}")
bark_push('商品添加失败', '条形码 %s' % msg['data']) bark_push('商品添加失败', '条形码 %s' % msg['data'])
if msg.get('url'):
resp, status_code = add_recipe_from_url(msg['url'])
if status_code == 200:
r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id)
bark_push(resp['message'], '来源 %s' % msg['url'])
else:
logger.error(f"Failed to process message: {msg['url']} {resp}")
bark_push('食谱添加失败', '%s 来源 %s' % (resp['message'], msg['url']))
except Exception as e: except Exception as e:
logger.exception(f"Error processing messages: {str(e)}") logger.exception(f"Error processing messages: {str(e)}")