Compare commits

..

3 Commits

Author SHA1 Message Date
Ching L
d6c8221223 feat(api): 修改处理扫码的定时任务为添加商品
All checks were successful
continuous-integration/drone/push Build is passing
2024-09-25 16:27:16 +08:00
Ching L
057b71ee6f chore: Update docker-compose.yml with Redis and Bark configurations 2024-09-25 15:20:21 +08:00
Ching L
2f35ef2acb feat(api): 增加扫码后 bark 推送消息功能 2024-09-25 15:04:14 +08:00
2 changed files with 67 additions and 25 deletions

89
app.py
View File

@ -3,18 +3,20 @@ import json
import os
import re
import requests
from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask, jsonify, request
from pygrocy import EntityType, Grocy
import redis
from apscheduler.schedulers.background import BackgroundScheduler
import atexit
import base64
import loguru
import redis
import requests
from barcode import BarcodeSpider
from recipe import get_recipe_from_xiachufang
import base64
logger = loguru.logger
# config = configparser.ConfigParser()
# config.read('config.ini')
# GROCY_URL = config.get('Grocy', 'GROCY_URL')
@ -37,6 +39,7 @@ GROCY_DEFAULT_BEST_BEFORE_DAYS = os.environ.get("GROCY_DEFAULT_BEST_BEFORE_DAYS"
X_RapidAPI_Key = os.environ.get("X_RapidAPI_Key")
REDIS_HOST = os.environ.get("REDIS_HOST")
REDIS_PORT = os.environ.get("REDIS_PORT")
BARK_DEVICE_KEY = os.environ.get("BARK_DEVICE_KEY")
app = Flask(__name__)
grocy = Grocy(GROCY_URL, GROCY_API_KEY, GROCY_PORT, verify_ssl=True)
@ -47,6 +50,19 @@ def get_locations():
return locations
def bark_push(title, text):
icon = 'https://styles.redditmedia.com/t5_10dd8f/styles/communityIcon_rwl05b14iveb1.png'
data = {
"title": title,
"body": text,
"device_key": BARK_DEVICE_KEY,
"icon": icon,
"group": "Grocy",
"url": ""
}
requests.post('http://bark.tunpok.com/push/', json=data)
def add_product(dict_good, location):
good_name = ""
if "description" in dict_good:
@ -212,17 +228,7 @@ def convert_image_to_base64(image_content):
return base64_image
@app.route("/")
def index():
return "Up and running!"
@app.route("/add", methods=["POST"])
def add():
data = request.json
location = data.get("location", "")
barcode = data.get("barcode", "")
def handle_add_product(barcode, location):
try:
product = grocy.product_by_barcode(barcode)
barcode_ = None
@ -237,29 +243,43 @@ def add():
grocy.add_product_by_barcode(barcode, amount, 0.0)
response_data = {"message": "Item added successfully"}
return jsonify(response_data), 200
return response_data, 200
except:
spider = BarcodeSpider(x_rapidapi_key=X_RapidAPI_Key)
good = spider.get_good(barcode)
if not good:
response_data = {"message": "Item not found"}
return jsonify(response_data), 400
return response_data, 400
try:
if add_product(good, location):
response_data = {"message": "New item added successfully"}
return jsonify(response_data), 200
return response_data, 200
else:
response_data = {"message": "Fail to add new item"}
return jsonify(response_data), 400
return response_data, 400
except Exception as e:
if hasattr(e, "message"):
error_message = e.message
else:
error_message = str(e)
response_data = {"message": error_message}
return jsonify(response_data), 400
return response_data, 400
@app.route("/")
def index():
return "Up and running!"
@app.route("/add", methods=["POST"])
def add():
data = request.json
location = data.get("location", "")
barcode = data.get("barcode", "")
response_data, status_code = handle_add_product(barcode, location)
return jsonify(response_data), status_code
@app.route("/consume", methods=["POST"])
def consume():
@ -351,21 +371,33 @@ create_consumer_group()
def consume_stream():
try:
# 读取 Stream 中的消息,设置阻塞时间为 0
messages = r.xreadgroup(CONSUMER_GROUP, CONSUMER_NAME, {STREAM_KEY: '>'}, count=10, block=1000)
messages = r.xreadgroup(CONSUMER_GROUP, CONSUMER_NAME, {STREAM_KEY: '>'}, count=5, block=1000)
if messages:
for stream, msgs in messages:
for msg_id, msg in msgs:
# 打印或处理消息
print(f"Received message ID: {msg_id}, data: {msg}")
logger.info(f"Processing message: {msg['data']}")
if not msg['data'].isnumeric():
logger.info(f"Skip non-numeric barcode: {msg['data']}")
continue
# 消费完成后确认消息
r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id)
_, status_code = handle_add_product(msg['data'], '')
if status_code == 200:
# 消费完成后确认消息
r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id)
bark_push('商品添加成功', '条形码 %s' % msg['data'])
else:
logger.error(f"Failed to process message: {msg['data']}")
bark_push('商品添加失败', '条形码 %s' % msg['data'])
except Exception as e:
print(f"Error consuming stream: {e}")
# TODO: 定时任务函数:从 Redis Stream 读取消费者组的未确认消息
# def check_unacked_messages()
@app.route('/barcode', methods=['POST'])
def add_to_stream():
try:
@ -373,11 +405,18 @@ def add_to_stream():
data = request.json
if not data:
return jsonify({"message": "No data provided"}), 400
if not data.get('data'):
return jsonify({"message": "No barcode data provided"}), 400
# 将数据添加到 Redis Stream 中
# 自动生成 ID ('*' 表示让 Redis 生成唯一 ID)
stream_id = r.xadd(STREAM_KEY, data)
if data['data'].isnumeric():
bark_push('扫码成功', '条形码 %s' % data['data'])
else:
bark_push('扫码成功', '二维码 %s' % data['data'])
return jsonify({"message": "Data added to Redis Stream", "stream_id": stream_id}), 200
except Exception as e:
@ -386,7 +425,7 @@ def add_to_stream():
# 初始化 APScheduler 并添加定时任务
scheduler = BackgroundScheduler()
scheduler.add_job(func=consume_stream, trigger="interval", seconds=5)
scheduler.add_job(func=consume_stream, trigger="interval", seconds=300)
scheduler.start()
# 确保程序退出时停止定时任务

View File

@ -11,5 +11,8 @@ services:
- GROCY_DEFAULT_QUANTITY_UNIT_ID_PURCHASE=2
- GROCY_DEFAULT_QUANTITY_UNIT_ID_STOCK=6
- GROCY_DEFAULT_BEST_BEFORE_DAYS=365
- REDIS_HOST=172.17.0.1
- REDIS_PORT=6379
- BARK_DEVICE_KEY=your-bark-device-key
ports:
- 9288:9288