Ching 40129961a0
All checks were successful
continuous-integration/drone/push Build is passing
feat(models): 添加扫码日志记录功能
为了记录扫码操作,添加了一个新的方法add_log到ScanLog模型中。
2025-02-09 18:16:30 +08:00

577 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding = utf-8
import json
import os
import re
from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask, jsonify, request, render_template
from pygrocy import EntityType, Grocy
import atexit
import base64
import mimetypes
import loguru
import redis
import requests
from barcode import BarcodeSpider, ShowApiSpider
from recipe import get_recipe_from_xiachufang
import models
logger = loguru.logger
# config = configparser.ConfigParser()
# config.read('config.ini')
# GROCY_URL = config.get('Grocy', 'GROCY_URL')
# GROCY_PORT = config.getint('Grocy', 'GROCY_PORT')
# GROCY_API = config.get('Grocy', 'GROCY_API')
# GROCY_DEFAULT_QUANTITY_UNIT_ID = config.getint('Grocy', 'GROCY_DEFAULT_QUANTITY_UNIT_ID')
# GROCY_DEFAULT_BEST_BEFORE_DAYS = config.get('Grocy', 'GROCY_DEFAULT_BEST_BEFORE_DAYS')
# GROCY_LOCATION = {}
# for key in config['GrocyLocation']:
# GROCY_LOCATION[key] = config.get('GrocyLocation', key)
# X_RapidAPI_Key = config.get('RapidAPI', 'X_RapidAPI_Key')
# get config from environment
GROCY_URL = os.environ.get("GROCY_URL")
GROCY_API_KEY = os.environ.get("GROCY_API_KEY")
GROCY_PORT = os.environ.get("GROCY_PORT")
GROCY_DEFAULT_QUANTITY_UNIT_ID_PURCHASE = os.environ.get("GROCY_DEFAULT_QUANTITY_UNIT_ID_PURCHASE")
GROCY_DEFAULT_QUANTITY_UNIT_ID_STOCK = os.environ.get("GROCY_DEFAULT_QUANTITY_UNIT_ID_STOCK")
GROCY_DEFAULT_BEST_BEFORE_DAYS = os.environ.get("GROCY_DEFAULT_BEST_BEFORE_DAYS")
X_RapidAPI_Key = os.environ.get("X_RapidAPI_Key")
ShowApi_AppKey = os.environ.get("ShowApi_AppKey")
REDIS_HOST = os.environ.get("REDIS_HOST")
REDIS_PORT = os.environ.get("REDIS_PORT")
BARK_DEVICE_KEY = os.environ.get("BARK_DEVICE_KEY")
app = Flask(__name__)
grocy = Grocy(GROCY_URL, GROCY_API_KEY, GROCY_PORT, verify_ssl=True)
# 连接到 Redis
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True)
# 定义 Redis Stream 的键
STREAM_KEY = "message_stream"
CONSUMER_GROUP = "consumer_group_1"
CONSUMER_NAME = "consumer_1"
# 创建 Redis 消费者组(如果不存在)
def create_consumer_group():
try:
r.xgroup_create(STREAM_KEY, CONSUMER_GROUP, id='0', mkstream=True)
except redis.exceptions.ResponseError as e:
if "BUSYGROUP" in str(e):
# 消费者组已存在,不报错
pass
create_consumer_group()
def get_locations():
locations = grocy.get_generic_objects_for_type(EntityType.LOCATIONS)
return locations
def bark_push(title, text):
icon = 'https://styles.redditmedia.com/t5_10dd8f/styles/communityIcon_rwl05b14iveb1.png'
data = {
"title": title,
"body": text,
"device_key": BARK_DEVICE_KEY,
"icon": icon,
"group": "Grocy",
"url": ""
}
requests.post('http://bark.tunpok.com/push/', json=data)
def add_product(dict_good, location):
good_name = ""
if 'goodsName' in dict_good:
good_name = dict_good['goodsName']
elif "description" in dict_good:
good_name = dict_good["description"]
elif "description_cn" in dict_good:
good_name = dict_good["description_cn"]
if not good_name:
return False, good_name
if 'spec' in dict_good:
good_name = good_name + " - " + dict_good['spec']
elif 'specification' in dict_good:
good_name = good_name + " - " + dict_good['specification']
locations = get_locations()
if not location:
location = locations[0]['name']
location_map = {item['name']: item['id'] for item in locations}
data_grocy = {
"name": good_name,
"description": "",
"location_id": location_map[location],
"qu_id_purchase": GROCY_DEFAULT_QUANTITY_UNIT_ID_PURCHASE,
"qu_id_stock": GROCY_DEFAULT_QUANTITY_UNIT_ID_STOCK,
"default_best_before_days": GROCY_DEFAULT_BEST_BEFORE_DAYS,
"default_consume_location_id": location_map[location],
"move_on_open": "1",
}
if ("gpc" in dict_good) and dict_good["gpc"]:
best_before_days, location_ = gpc_best_before_days_and_location(
int(dict_good["gpc"]),
locations)
if best_before_days:
data_grocy["default_best_before_days"] = best_before_days
if location_:
data_grocy["location_id"] = location_
data_grocy["default_consume_location_id"] = location_
# add product
response_grocy = grocy.add_generic(EntityType.PRODUCTS, data_grocy)
# # add gds info
grocy.set_userfields(
EntityType.PRODUCTS,
int(response_grocy["created_object_id"]),
"GDSInfo",
json.dumps(dict_good, ensure_ascii=False),
)
# add barcode, ex. 06921168593910
barcode = None
if 'gtin' in dict_good:
barcode = dict_good['gtin']
elif 'code' in dict_good:
barcode = dict_good['code']
data_barcode = {
"product_id": int(response_grocy["created_object_id"]),
"barcode": barcode,
"amount": 1.0,
}
grocy.add_generic(EntityType.PRODUCT_BARCODES, data_barcode)
# add barcode, EAN-13, ex. 6921168593910
if barcode.startswith("0"):
data_barcode = {
"product_id": int(response_grocy["created_object_id"]),
"barcode": barcode.lstrip("0"),
"amount": 1.0,
}
grocy.add_generic(EntityType.PRODUCT_BARCODES, data_barcode)
# add picture
pic_url = ""
if ("picfilename" in dict_good) and dict_good["picfilename"]:
pic_url = dict_good["picfilename"]
elif ("picture_filename" in dict_good) and dict_good["picture_filename"]:
pic_url = dict_good["picture_filename"]
elif 'img' in dict_good and dict_good['img']:
pic_url = dict_good['img']
if pic_url:
try:
response_img = requests.get(
pic_url,
{
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
},
)
if response_img.status_code == 200:
image_data = response_img.content
with open("img.png", "wb") as o:
# output_data = remove(image_data)
o.write(image_data)
grocy.add_product_pic(int(response_grocy["created_object_id"]), "img.png")
except requests.exceptions.RequestException as err:
print("Request error:", err)
grocy.add_product_by_barcode(barcode, 1.0, 0.0)
return True, good_name
def gpc_best_before_days_and_location(Code, locations):
"""
保质期(天) 类别
7 50370000中类鲜切水果或蔬菜, 50380000中类鲜切水果或蔬菜, 50350000中类未处理或未加工的新鲜叶菜类蔬菜
14 50250000中类未处理或未加工的新鲜水果, 10000025细类牛奶易腐坏, 10006970细类牛奶替代品易腐坏, 10000278细类酸奶易腐坏, 10006979细类酸奶替代品易腐坏
152 50270000中类未处理或未加工的冷冻水果, 50310000中类未处理或未加工的耐储存水果
305 94000000中类粮食作物, 50000000大类食品、饮料和烟草, 10120000中类宠物护理用品或食品组合装, 10110000中类宠物食品或饮品
1005 53000000大类美容、个人护理和卫生用品, 47100000中类清洁产品, 47190000中类清洁和卫生产品组合装, 51000000大类医疗保健, 10100000中类宠物护理用品
"""
with open("gpc_brick_code.json") as json_file:
gpc_data = json.load(json_file)
best_before_days = {}
best_before_days["7"] = [
50370000,
50380000,
50350000,
]
best_before_days["14"] = [
50250000,
10000025,
10006970,
10000278,
10006979,
]
best_before_days["152"] = [
50270000,
50310000,
]
best_before_days["305"] = [
94000000,
50000000,
10120000,
10110000,
]
best_before_days["670"] = []
best_before_days["1005"] = [
53000000,
47100000,
47190000,
51000000,
10100000,
]
location_map = {item['name']: item['id'] for item in locations}
for item in gpc_data["Schema"]:
if item["Code"] == Code:
title = item["Title-1"]
location = locations[0]['name']
if title == 'Food/Beverage/Tobacco':
if item['Title-2'] in [
'Bread/Bakery Products',
'Confectionery/Sugar Sweetening Products',
]:
location = '零食柜'
else:
location = '厨房'
elif title in [
'Beauty/Personal Care/Hygiene',
'Cleaning/Hygiene Products',
]:
location = '浴室'
elif title == 'Healthcare':
location = '药柜'
codes = [item["Code"], item["Code-1"], item["Code-2"], item["Code-3"]]
for day, filter_codes in best_before_days.items():
if any(code in filter_codes for code in codes):
return day, location_map[location]
return None, location_map[location]
def convert_image_to_base64(image_content):
base64_image = base64.b64encode(image_content).decode("utf-8")
return base64_image
def handle_add_product(barcode, location):
try:
product = grocy.product_by_barcode(barcode)
barcode_ = None
amount = 1.0
if product:
for product_barcode in product.product_barcodes:
if product_barcode.barcode == barcode:
barcode_ = product_barcode
break
if barcode_:
if hasattr(barcode_, "amount") and barcode_.amount:
amount = barcode_.amount
grocy.add_product_by_barcode(barcode, amount, 0.0)
response_data = {"message": "Item added successfully"}
return response_data, 200, product.name
except:
spiders = []
spiders.append(ShowApiSpider(ShowApi_AppKey.split(",")))
spiders.append(BarcodeSpider(x_rapidapi_key=X_RapidAPI_Key))
for spider in spiders:
good = spider.get_good(barcode)
if good:
break
if not good:
response_data = {"message": "Item not found"}
return response_data, 400, None
try:
added, good_name = add_product(good, location)
if added:
response_data = {"message": "New item added successfully"}
return response_data, 200, good_name
else:
response_data = {"message": "Fail to add new item"}
return response_data, 400, None
except Exception as e:
if hasattr(e, "message"):
error_message = e.message
else:
error_message = str(e)
response_data = {"message": error_message}
return response_data, 400, None
@app.route("/")
def index():
return "Up and running!"
@app.route("/add", methods=["POST"])
def add():
data = request.json
location = data.get("location", "")
barcode = data.get("barcode", "")
response_data, status_code = handle_add_product(barcode, location)
return jsonify(response_data), status_code
@app.route("/consume", methods=["POST"])
def consume():
try:
data = request.json
barcode = data.get("barcode", "")
grocy.consume_product_by_barcode(barcode)
response_data = {"message": "Item removed successfully"}
return jsonify(response_data), 200
except Exception as e:
error_message = str(e)
response_data = {"message": error_message}
return jsonify(response_data), 400
def upload_image(url):
# download image
url = url.split('?')[0]
image_name = url.split('/')[-1]
img = requests.get(url)
mime_type = mimetypes.guess_type(image_name)[0]
if not mime_type:
mime_type = 'image/jpeg'
# compress image
compress_url = 'https://upload-tinypng.tunpok.com/upload/'
files = {'file': ('img.jpeg', img.content, mime_type,)}
resp = requests.post(compress_url, files=files, )
if resp.status_code == 200:
return resp.json()['cdn_url']
return None
def add_recipe_from_url(url):
try:
# https://www.xiachufang.com/recipe/107141278/?recipe_type=1&page_scene=6
recipe_number = re.search(r'(\d+)', url).group(1)
resp = grocy.get_generic_objects_for_type(EntityType.RECIPES, ['name~%s' % recipe_number])
if resp:
response_data = {"message": "Recipe already exists"}
return jsonify(response_data), 400, None
xcf_recipe = get_recipe_from_xiachufang(url)
description = "<a href=\"" + url + "\">" + '来源' + "</a>"
description += "<br>"
if xcf_recipe['ingredients']:
description += "<h2>用料</h2>\n"
description += "<ul>\n"
for ingredient in xcf_recipe["ingredients"]:
description += f'<li>{ingredient}</li>\n'
description += "</ul>\n"
if xcf_recipe['steps']:
description += "<h2>做法</h2>\n"
for idx, step in enumerate(xcf_recipe["steps"]):
description += f"<h3>步骤 {idx+1}</h3>\n"
description += f'<p>{step[0]}</p>\n'
if len(step) == 1:
description += "<br>"
continue
# img = requests.get(step[1])
# img_data = 'data:image/png;base64,' + convert_image_to_base64(img.content)
img = upload_image(step[1])
if img:
img_data = img
else:
img = requests.get(step[1])
img_data = 'data:image/png;base64,' + convert_image_to_base64(img.content)
description += f'\n<img src=\"{img_data}\">\n\n'
description += "<br>"
data_grocy = {
"name": xcf_recipe["name"] + " - " + recipe_number,
"description": description
}
grocy.add_generic(EntityType.RECIPES, data_grocy)
response_data = {"message": f"{data_grocy['name']}"}
return response_data, 200, data_grocy['name']
except Exception as e:
error_message = str(e)
response_data = {"message": error_message}
return response_data, 400, None
@app.route("/add_recipe", methods=["POST"])
def add_recipe():
data = request.json
source = data.get("source", "xiachufang")
url = data.get("url", "")
if source != "xiachufang":
response_data = {"message": "Invalid source"}
return jsonify(response_data), 400
if not url:
response_data = {"message": "Invalid url"}
return jsonify(response_data), 400
stream_id = r.xadd(STREAM_KEY, data)
return jsonify({"message": "Data added to Redis Stream", "stream_id": stream_id}), 200
def consume_product_by_barcode(barcode):
try:
product = grocy.product_by_barcode(barcode)
if not product:
response_data = {"message": "Item not found"}
return response_data, 400, None
grocy.consume_product_by_barcode(barcode)
response_data = {"message": "Item removed successfully"}
return response_data, 200, product.name
except Exception as e:
error_message = str(e)
response_data = {"message": error_message}
return response_data, 400, None
# 定时任务函数:从 Redis Stream 读取未消费的消息
def consume_stream():
consume_product_flag_key = 'consume-product-flag'
continuous_consume_flag_key = 'continuous-consume-flag'
add_product_flag_key = 'add-product-flag'
try:
# 读取 Stream 中的消息,设置阻塞时间为 0
messages = r.xreadgroup(CONSUMER_GROUP, CONSUMER_NAME, {STREAM_KEY: '>'}, count=10, block=1000)
if messages:
for stream, msgs in messages:
for msg_id, msg in msgs:
# 打印或处理消息
if msg.get('url'):
logger.info(f"Processing recipe message: {msg.get('url')}")
elif msg.get('data'):
logger.info(f"Processing message: {msg.get('data')}")
else:
logger.info(f"Processing message: {msg}")
if msg.get('data'):
if not msg['data'].isnumeric():
if msg['data'] == consume_product_flag_key:
# 如果消息内容为 'consume-product',则设置消耗商品标志位
logger.info(f"Setting consume-product flag")
r.set(consume_product_flag_key, '1', ex=60 * 5)
elif msg['data'] == continuous_consume_flag_key:
# 如果消息内容为 'continuous-consume',则设置连续消耗标志位
logger.info(f"Setting continuous-consume flag")
r.set(continuous_consume_flag_key, '1', ex=60 * 5)
elif msg['data'] == add_product_flag_key:
# 如果消息内容为 'add-product',则设置添加商品标志位
logger.info(f"Setting add-product flag")
r.delete(consume_product_flag_key)
r.delete(continuous_consume_flag_key)
else:
logger.info(f"Skip non-numeric barcode: {msg['data']}")
continue
consume_product_flag = r.get(consume_product_flag_key)
continuous_consume_flag = r.get(continuous_consume_flag_key)
if not (consume_product_flag or continuous_consume_flag):
resp, status_code, good_name = handle_add_product(msg['data'], '')
if not good_name:
good_name = ''
if status_code == 200:
# 消费完成后确认消息
r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id)
bark_push('%s 添加成功' % good_name, '条形码 %s' % msg['data'])
else:
logger.error(f"Failed to process message: {msg['data']} {resp}")
bark_push('商品添加失败', '条形码 %s \n %s' % (msg['data'], resp.get('message')))
else:
resp, status_code, good_name = consume_product_by_barcode(msg['data'])
if status_code == 200:
r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id)
bark_push('%s 消耗成功' % good_name, '条形码 %s' % msg['data'])
else:
logger.error(f"Failed to process message: {msg['data']} {resp}")
bark_push('商品消耗失败', '条形码 %s \n' % (msg['data'], resp.get('message')))
if consume_product_flag_key:
r.delete(consume_product_flag_key)
if msg.get('url'):
resp, status_code, recipe_name = add_recipe_from_url(msg['url'])
if status_code == 200:
r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id)
bark_push(f'{recipe_name} 添加成功', f'地址 {msg["url"]}')
else:
logger.error(f"Failed to process message: {msg['url']} {resp}")
bark_push('食谱添加失败', '%s 来源 %s' % (resp['message'], msg['url']))
except Exception as e:
logger.exception(f"Error processing messages: {str(e)}")
# TODO: 定时任务函数:从 Redis Stream 读取消费者组的未确认消息
# def check_unacked_messages()
@app.route('/barcode', methods=['POST'])
def add_to_stream():
try:
# 获取请求中的 JSON 数据
data = request.json
if not data:
return jsonify({"message": "No data provided"}), 400
if not data.get('data'):
return jsonify({"message": "No barcode data provided"}), 400
# 将数据添加到 Redis Stream 中
# 自动生成 ID ('*' 表示让 Redis 生成唯一 ID)
stream_id = r.xadd(STREAM_KEY, data)
if data['data'].isnumeric():
models.ScanLog.add_log(data['data'])
bark_push('扫码成功', '条形码 %s' % data['data'])
else:
bark_push('扫码成功', '二维码 %s' % data['data'])
return jsonify({"message": "Data added to Redis Stream", "stream_id": stream_id}), 200
except Exception as e:
return jsonify({"message": str(e)}), 500
@app.route('/scanlogs/today', methods=['GET'])
def get_today_scanlogs():
try:
# 获取今天的所有日志并序列化
logs = models.ScanLog.get_today_logs()
serialized_logs = [log.serialize() for log in logs]
return jsonify({
'logs': serialized_logs,
'count': len(serialized_logs)
}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/logs')
def show_logs():
return render_template('logs.html')
# 初始化 APScheduler 并添加定时任务
scheduler = BackgroundScheduler()
scheduler.add_job(func=consume_stream, trigger="interval", seconds=10)
scheduler.start()
# 确保程序退出时停止定时任务
atexit.register(lambda: scheduler.shutdown())
if __name__ == "__main__":
app.run(host="0.0.0.0", port=9288)