Compare commits

..

No commits in common. "51b62df1899cc8eb8c03b9f8a14be891612ca03a" and "f83f22962271f88fe56d196369a308b589d02f83" have entirely different histories.

112
app.py
View File

@ -8,7 +8,6 @@ from flask import Flask, jsonify, request
from pygrocy import EntityType, Grocy from pygrocy import EntityType, Grocy
import atexit import atexit
import base64 import base64
import mimetypes
import loguru import loguru
import redis import redis
import requests import requests
@ -46,26 +45,6 @@ app = Flask(__name__)
grocy = Grocy(GROCY_URL, GROCY_API_KEY, GROCY_PORT, verify_ssl=True) grocy = Grocy(GROCY_URL, GROCY_API_KEY, GROCY_PORT, verify_ssl=True)
# 连接到 Redis
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True)
# 定义 Redis Stream 的键
STREAM_KEY = "message_stream"
CONSUMER_GROUP = "consumer_group_1"
CONSUMER_NAME = "consumer_1"
# 创建 Redis 消费者组(如果不存在)
def create_consumer_group():
try:
r.xgroup_create(STREAM_KEY, CONSUMER_GROUP, id='0', mkstream=True)
except redis.exceptions.ResponseError as e:
if "BUSYGROUP" in str(e):
# 消费者组已存在,不报错
pass
create_consumer_group()
def get_locations(): def get_locations():
locations = grocy.get_generic_objects_for_type(EntityType.LOCATIONS) locations = grocy.get_generic_objects_for_type(EntityType.LOCATIONS)
return locations return locations
@ -322,22 +301,26 @@ def consume():
def upload_image(url): def upload_image(url):
# download image # download image
url = url.split('?')[0]
image_name = url.split('/')[-1]
img = requests.get(url) img = requests.get(url)
mime_type = mimetypes.guess_type(image_name)[0]
if not mime_type:
mime_type = 'image/jpeg'
# compress image # compress image
compress_url = 'https://upload-tinypng.tunpok.com/upload/' compress_url = 'https://upload-tinypng.tunpok.com/upload/'
files = {'file': ('img.jpeg', img.content, mime_type,)} resp = requests.post(compress_url, files={'file': img.content})
resp = requests.post(compress_url, files=files, )
if resp.status_code == 200: if resp.status_code == 200:
return resp.json()['cdn_url'] return resp.json()['cdn_url']
return None return None
def add_recipe_from_url(url): @app.route("/add_recipe", methods=["POST"])
def add_recipe():
data = request.json
source = data.get("source", "xiachufang")
url = data.get("url", "")
if source != "xiachufang":
response_data = {"message": "Invalid source"}
return jsonify(response_data), 400
if not url:
response_data = {"message": "Invalid url"}
return jsonify(response_data), 400
try: try:
# https://www.xiachufang.com/recipe/107141278/?recipe_type=1&page_scene=6 # https://www.xiachufang.com/recipe/107141278/?recipe_type=1&page_scene=6
recipe_number = re.search(r'(\d+)', url).group(1) recipe_number = re.search(r'(\d+)', url).group(1)
@ -378,28 +361,32 @@ def add_recipe_from_url(url):
"description": description "description": description
} }
grocy.add_generic(EntityType.RECIPES, data_grocy) grocy.add_generic(EntityType.RECIPES, data_grocy)
response_data = {"message": f"{data_grocy['name']}"} response_data = {"message": f"Recipe {data_grocy['name']} added successfully"}
except Exception as e: except Exception as e:
error_message = str(e) error_message = str(e)
response_data = {"message": error_message} response_data = {"message": error_message}
return response_data, 400
@app.route("/add_recipe", methods=["POST"])
def add_recipe():
data = request.json
source = data.get("source", "xiachufang")
url = data.get("url", "")
if source != "xiachufang":
response_data = {"message": "Invalid source"}
return jsonify(response_data), 400
if not url:
response_data = {"message": "Invalid url"}
return jsonify(response_data), 400 return jsonify(response_data), 400
return jsonify(response_data), 200
stream_id = r.xadd(STREAM_KEY, data)
return jsonify({"message": "Data added to Redis Stream", "stream_id": stream_id}), 200
# 连接到 Redis
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True)
# 定义 Redis Stream 的键
STREAM_KEY = "message_stream"
CONSUMER_GROUP = "consumer_group_1"
CONSUMER_NAME = "consumer_1"
# 创建 Redis 消费者组(如果不存在)
def create_consumer_group():
try:
r.xgroup_create(STREAM_KEY, CONSUMER_GROUP, id='0', mkstream=True)
except redis.exceptions.ResponseError as e:
if "BUSYGROUP" in str(e):
# 消费者组已存在,不报错
pass
create_consumer_group()
# 定时任务函数:从 Redis Stream 读取未消费的消息 # 定时任务函数:从 Redis Stream 读取未消费的消息
def consume_stream(): def consume_stream():
@ -412,29 +399,20 @@ def consume_stream():
for msg_id, msg in msgs: for msg_id, msg in msgs:
# 打印或处理消息 # 打印或处理消息
logger.info(f"Processing message: {msg['data']}") logger.info(f"Processing message: {msg['data']}")
if msg.get('data'): if not msg['data'].isnumeric():
if not msg['data'].isnumeric(): logger.info(f"Skip non-numeric barcode: {msg['data']}")
logger.info(f"Skip non-numeric barcode: {msg['data']}") continue
continue
resp, status_code, good_name = handle_add_product(msg['data'], '') resp, status_code, good_name = handle_add_product(msg['data'], '')
if not good_name: if not good_name:
good_name = '' good_name = ''
if status_code == 200: if status_code == 200:
# 消费完成后确认消息 # 消费完成后确认消息
r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id) r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id)
bark_push('%s 添加成功' % good_name, '条形码 %s' % msg['data']) bark_push('%s 添加成功' % good_name, '条形码 %s' % msg['data'])
else: else:
logger.error(f"Failed to process message: {msg['data']} {resp}") logger.error(f"Failed to process message: {msg['data']} {resp}")
bark_push('商品添加失败', '条形码 %s' % msg['data']) bark_push('商品添加失败', '条形码 %s' % msg['data'])
if msg.get('url'):
resp, status_code = add_recipe_from_url(msg['url'])
if status_code == 200:
r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id)
bark_push(resp['message'], '来源 %s' % msg['url'])
else:
logger.error(f"Failed to process message: {msg['url']} {resp}")
bark_push('食谱添加失败', '%s 来源 %s' % (resp['message'], msg['url']))
except Exception as e: except Exception as e:
logger.exception(f"Error processing messages: {str(e)}") logger.exception(f"Error processing messages: {str(e)}")