feat(api): 修改处理扫码的定时任务为添加商品
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
parent
057b71ee6f
commit
d6c8221223
70
app.py
70
app.py
@ -3,18 +3,20 @@ import json
|
||||
import os
|
||||
import re
|
||||
|
||||
import requests
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
from flask import Flask, jsonify, request
|
||||
from pygrocy import EntityType, Grocy
|
||||
import redis
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
import atexit
|
||||
import base64
|
||||
import loguru
|
||||
import redis
|
||||
import requests
|
||||
|
||||
|
||||
from barcode import BarcodeSpider
|
||||
from recipe import get_recipe_from_xiachufang
|
||||
import base64
|
||||
|
||||
logger = loguru.logger
|
||||
# config = configparser.ConfigParser()
|
||||
# config.read('config.ini')
|
||||
# GROCY_URL = config.get('Grocy', 'GROCY_URL')
|
||||
@ -47,6 +49,7 @@ def get_locations():
|
||||
locations = grocy.get_generic_objects_for_type(EntityType.LOCATIONS)
|
||||
return locations
|
||||
|
||||
|
||||
def bark_push(title, text):
|
||||
icon = 'https://styles.redditmedia.com/t5_10dd8f/styles/communityIcon_rwl05b14iveb1.png'
|
||||
data = {
|
||||
@ -59,6 +62,7 @@ def bark_push(title, text):
|
||||
}
|
||||
requests.post('http://bark.tunpok.com/push/', json=data)
|
||||
|
||||
|
||||
def add_product(dict_good, location):
|
||||
good_name = ""
|
||||
if "description" in dict_good:
|
||||
@ -224,17 +228,7 @@ def convert_image_to_base64(image_content):
|
||||
return base64_image
|
||||
|
||||
|
||||
@app.route("/")
|
||||
def index():
|
||||
return "Up and running!"
|
||||
|
||||
|
||||
@app.route("/add", methods=["POST"])
|
||||
def add():
|
||||
data = request.json
|
||||
location = data.get("location", "")
|
||||
barcode = data.get("barcode", "")
|
||||
|
||||
def handle_add_product(barcode, location):
|
||||
try:
|
||||
product = grocy.product_by_barcode(barcode)
|
||||
barcode_ = None
|
||||
@ -249,28 +243,42 @@ def add():
|
||||
grocy.add_product_by_barcode(barcode, amount, 0.0)
|
||||
|
||||
response_data = {"message": "Item added successfully"}
|
||||
return jsonify(response_data), 200
|
||||
return response_data, 200
|
||||
except:
|
||||
spider = BarcodeSpider(x_rapidapi_key=X_RapidAPI_Key)
|
||||
|
||||
good = spider.get_good(barcode)
|
||||
if not good:
|
||||
response_data = {"message": "Item not found"}
|
||||
return jsonify(response_data), 400
|
||||
return response_data, 400
|
||||
try:
|
||||
if add_product(good, location):
|
||||
response_data = {"message": "New item added successfully"}
|
||||
return jsonify(response_data), 200
|
||||
return response_data, 200
|
||||
else:
|
||||
response_data = {"message": "Fail to add new item"}
|
||||
return jsonify(response_data), 400
|
||||
return response_data, 400
|
||||
except Exception as e:
|
||||
if hasattr(e, "message"):
|
||||
error_message = e.message
|
||||
else:
|
||||
error_message = str(e)
|
||||
response_data = {"message": error_message}
|
||||
return jsonify(response_data), 400
|
||||
return response_data, 400
|
||||
|
||||
@app.route("/")
|
||||
def index():
|
||||
return "Up and running!"
|
||||
|
||||
|
||||
@app.route("/add", methods=["POST"])
|
||||
def add():
|
||||
data = request.json
|
||||
location = data.get("location", "")
|
||||
barcode = data.get("barcode", "")
|
||||
|
||||
response_data, status_code = handle_add_product(barcode, location)
|
||||
return jsonify(response_data), status_code
|
||||
|
||||
|
||||
@app.route("/consume", methods=["POST"])
|
||||
@ -363,21 +371,33 @@ create_consumer_group()
|
||||
def consume_stream():
|
||||
try:
|
||||
# 读取 Stream 中的消息,设置阻塞时间为 0
|
||||
messages = r.xreadgroup(CONSUMER_GROUP, CONSUMER_NAME, {STREAM_KEY: '>'}, count=10, block=1000)
|
||||
messages = r.xreadgroup(CONSUMER_GROUP, CONSUMER_NAME, {STREAM_KEY: '>'}, count=5, block=1000)
|
||||
|
||||
if messages:
|
||||
for stream, msgs in messages:
|
||||
for msg_id, msg in msgs:
|
||||
# 打印或处理消息
|
||||
print(f"Received message ID: {msg_id}, data: {msg}")
|
||||
logger.info(f"Processing message: {msg['data']}")
|
||||
if not msg['data'].isnumeric():
|
||||
logger.info(f"Skip non-numeric barcode: {msg['data']}")
|
||||
continue
|
||||
|
||||
# 消费完成后确认消息
|
||||
r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id)
|
||||
_, status_code = handle_add_product(msg['data'], '')
|
||||
if status_code == 200:
|
||||
# 消费完成后确认消息
|
||||
r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id)
|
||||
bark_push('商品添加成功', '条形码 %s' % msg['data'])
|
||||
else:
|
||||
logger.error(f"Failed to process message: {msg['data']}")
|
||||
bark_push('商品添加失败', '条形码 %s' % msg['data'])
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error consuming stream: {e}")
|
||||
|
||||
|
||||
# TODO: 定时任务函数:从 Redis Stream 读取消费者组的未确认消息
|
||||
# def check_unacked_messages()
|
||||
|
||||
@app.route('/barcode', methods=['POST'])
|
||||
def add_to_stream():
|
||||
try:
|
||||
@ -405,7 +425,7 @@ def add_to_stream():
|
||||
|
||||
# 初始化 APScheduler 并添加定时任务
|
||||
scheduler = BackgroundScheduler()
|
||||
scheduler.add_job(func=consume_stream, trigger="interval", seconds=5)
|
||||
scheduler.add_job(func=consume_stream, trigger="interval", seconds=300)
|
||||
scheduler.start()
|
||||
|
||||
# 确保程序退出时停止定时任务
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user