Compare commits
2 Commits
f83f229622
...
51b62df189
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
51b62df189 | ||
|
|
11307820b4 |
112
app.py
112
app.py
@ -8,6 +8,7 @@ from flask import Flask, jsonify, request
|
||||
from pygrocy import EntityType, Grocy
|
||||
import atexit
|
||||
import base64
|
||||
import mimetypes
|
||||
import loguru
|
||||
import redis
|
||||
import requests
|
||||
@ -45,6 +46,26 @@ app = Flask(__name__)
|
||||
grocy = Grocy(GROCY_URL, GROCY_API_KEY, GROCY_PORT, verify_ssl=True)
|
||||
|
||||
|
||||
# 连接到 Redis
|
||||
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True)
|
||||
|
||||
# 定义 Redis Stream 的键
|
||||
STREAM_KEY = "message_stream"
|
||||
CONSUMER_GROUP = "consumer_group_1"
|
||||
CONSUMER_NAME = "consumer_1"
|
||||
|
||||
# 创建 Redis 消费者组(如果不存在)
|
||||
def create_consumer_group():
|
||||
try:
|
||||
r.xgroup_create(STREAM_KEY, CONSUMER_GROUP, id='0', mkstream=True)
|
||||
except redis.exceptions.ResponseError as e:
|
||||
if "BUSYGROUP" in str(e):
|
||||
# 消费者组已存在,不报错
|
||||
pass
|
||||
|
||||
create_consumer_group()
|
||||
|
||||
|
||||
def get_locations():
|
||||
locations = grocy.get_generic_objects_for_type(EntityType.LOCATIONS)
|
||||
return locations
|
||||
@ -301,26 +322,22 @@ def consume():
|
||||
|
||||
def upload_image(url):
|
||||
# download image
|
||||
url = url.split('?')[0]
|
||||
image_name = url.split('/')[-1]
|
||||
img = requests.get(url)
|
||||
mime_type = mimetypes.guess_type(image_name)[0]
|
||||
if not mime_type:
|
||||
mime_type = 'image/jpeg'
|
||||
# compress image
|
||||
compress_url = 'https://upload-tinypng.tunpok.com/upload/'
|
||||
resp = requests.post(compress_url, files={'file': img.content})
|
||||
files = {'file': ('img.jpeg', img.content, mime_type,)}
|
||||
resp = requests.post(compress_url, files=files, )
|
||||
if resp.status_code == 200:
|
||||
return resp.json()['cdn_url']
|
||||
return None
|
||||
|
||||
|
||||
@app.route("/add_recipe", methods=["POST"])
|
||||
def add_recipe():
|
||||
data = request.json
|
||||
source = data.get("source", "xiachufang")
|
||||
url = data.get("url", "")
|
||||
if source != "xiachufang":
|
||||
response_data = {"message": "Invalid source"}
|
||||
return jsonify(response_data), 400
|
||||
if not url:
|
||||
response_data = {"message": "Invalid url"}
|
||||
return jsonify(response_data), 400
|
||||
def add_recipe_from_url(url):
|
||||
try:
|
||||
# https://www.xiachufang.com/recipe/107141278/?recipe_type=1&page_scene=6
|
||||
recipe_number = re.search(r'(\d+)', url).group(1)
|
||||
@ -361,32 +378,28 @@ def add_recipe():
|
||||
"description": description
|
||||
}
|
||||
grocy.add_generic(EntityType.RECIPES, data_grocy)
|
||||
response_data = {"message": f"Recipe {data_grocy['name']} added successfully"}
|
||||
response_data = {"message": f"{data_grocy['name']}"}
|
||||
except Exception as e:
|
||||
error_message = str(e)
|
||||
response_data = {"message": error_message}
|
||||
return response_data, 400
|
||||
|
||||
|
||||
@app.route("/add_recipe", methods=["POST"])
|
||||
def add_recipe():
|
||||
data = request.json
|
||||
source = data.get("source", "xiachufang")
|
||||
url = data.get("url", "")
|
||||
if source != "xiachufang":
|
||||
response_data = {"message": "Invalid source"}
|
||||
return jsonify(response_data), 400
|
||||
if not url:
|
||||
response_data = {"message": "Invalid url"}
|
||||
return jsonify(response_data), 400
|
||||
return jsonify(response_data), 200
|
||||
|
||||
stream_id = r.xadd(STREAM_KEY, data)
|
||||
return jsonify({"message": "Data added to Redis Stream", "stream_id": stream_id}), 200
|
||||
|
||||
# 连接到 Redis
|
||||
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True)
|
||||
|
||||
# 定义 Redis Stream 的键
|
||||
STREAM_KEY = "message_stream"
|
||||
CONSUMER_GROUP = "consumer_group_1"
|
||||
CONSUMER_NAME = "consumer_1"
|
||||
|
||||
# 创建 Redis 消费者组(如果不存在)
|
||||
def create_consumer_group():
|
||||
try:
|
||||
r.xgroup_create(STREAM_KEY, CONSUMER_GROUP, id='0', mkstream=True)
|
||||
except redis.exceptions.ResponseError as e:
|
||||
if "BUSYGROUP" in str(e):
|
||||
# 消费者组已存在,不报错
|
||||
pass
|
||||
|
||||
create_consumer_group()
|
||||
|
||||
# 定时任务函数:从 Redis Stream 读取未消费的消息
|
||||
def consume_stream():
|
||||
@ -399,20 +412,29 @@ def consume_stream():
|
||||
for msg_id, msg in msgs:
|
||||
# 打印或处理消息
|
||||
logger.info(f"Processing message: {msg['data']}")
|
||||
if not msg['data'].isnumeric():
|
||||
logger.info(f"Skip non-numeric barcode: {msg['data']}")
|
||||
continue
|
||||
if msg.get('data'):
|
||||
if not msg['data'].isnumeric():
|
||||
logger.info(f"Skip non-numeric barcode: {msg['data']}")
|
||||
continue
|
||||
|
||||
resp, status_code, good_name = handle_add_product(msg['data'], '')
|
||||
if not good_name:
|
||||
good_name = ''
|
||||
if status_code == 200:
|
||||
# 消费完成后确认消息
|
||||
r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id)
|
||||
bark_push('%s 添加成功' % good_name, '条形码 %s' % msg['data'])
|
||||
else:
|
||||
logger.error(f"Failed to process message: {msg['data']} {resp}")
|
||||
bark_push('商品添加失败', '条形码 %s' % msg['data'])
|
||||
resp, status_code, good_name = handle_add_product(msg['data'], '')
|
||||
if not good_name:
|
||||
good_name = ''
|
||||
if status_code == 200:
|
||||
# 消费完成后确认消息
|
||||
r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id)
|
||||
bark_push('%s 添加成功' % good_name, '条形码 %s' % msg['data'])
|
||||
else:
|
||||
logger.error(f"Failed to process message: {msg['data']} {resp}")
|
||||
bark_push('商品添加失败', '条形码 %s' % msg['data'])
|
||||
if msg.get('url'):
|
||||
resp, status_code = add_recipe_from_url(msg['url'])
|
||||
if status_code == 200:
|
||||
r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id)
|
||||
bark_push(resp['message'], '来源 %s' % msg['url'])
|
||||
else:
|
||||
logger.error(f"Failed to process message: {msg['url']} {resp}")
|
||||
bark_push('食谱添加失败', '%s 来源 %s' % (resp['message'], msg['url']))
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Error processing messages: {str(e)}")
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user