Ching L b358f25a6c
All checks were successful
continuous-integration/drone/push Build is passing
feat(api): 添加图片上传功能
为应用添加了图片上传功能,可以通过URL下载图片并进行压缩处理。
2024-09-29 18:20:29 +08:00

454 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding = utf-8
import json
import os
import re
from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask, jsonify, request
from pygrocy import EntityType, Grocy
import atexit
import base64
import loguru
import redis
import requests
from barcode import BarcodeSpider
from recipe import get_recipe_from_xiachufang
logger = loguru.logger
# config = configparser.ConfigParser()
# config.read('config.ini')
# GROCY_URL = config.get('Grocy', 'GROCY_URL')
# GROCY_PORT = config.getint('Grocy', 'GROCY_PORT')
# GROCY_API = config.get('Grocy', 'GROCY_API')
# GROCY_DEFAULT_QUANTITY_UNIT_ID = config.getint('Grocy', 'GROCY_DEFAULT_QUANTITY_UNIT_ID')
# GROCY_DEFAULT_BEST_BEFORE_DAYS = config.get('Grocy', 'GROCY_DEFAULT_BEST_BEFORE_DAYS')
# GROCY_LOCATION = {}
# for key in config['GrocyLocation']:
# GROCY_LOCATION[key] = config.get('GrocyLocation', key)
# X_RapidAPI_Key = config.get('RapidAPI', 'X_RapidAPI_Key')
# get config from environment
GROCY_URL = os.environ.get("GROCY_URL")
GROCY_API_KEY = os.environ.get("GROCY_API_KEY")
GROCY_PORT = os.environ.get("GROCY_PORT")
GROCY_DEFAULT_QUANTITY_UNIT_ID_PURCHASE = os.environ.get("GROCY_DEFAULT_QUANTITY_UNIT_ID_PURCHASE")
GROCY_DEFAULT_QUANTITY_UNIT_ID_STOCK = os.environ.get("GROCY_DEFAULT_QUANTITY_UNIT_ID_STOCK")
GROCY_DEFAULT_BEST_BEFORE_DAYS = os.environ.get("GROCY_DEFAULT_BEST_BEFORE_DAYS")
X_RapidAPI_Key = os.environ.get("X_RapidAPI_Key")
REDIS_HOST = os.environ.get("REDIS_HOST")
REDIS_PORT = os.environ.get("REDIS_PORT")
BARK_DEVICE_KEY = os.environ.get("BARK_DEVICE_KEY")
app = Flask(__name__)
grocy = Grocy(GROCY_URL, GROCY_API_KEY, GROCY_PORT, verify_ssl=True)
def get_locations():
locations = grocy.get_generic_objects_for_type(EntityType.LOCATIONS)
return locations
def bark_push(title, text):
icon = 'https://styles.redditmedia.com/t5_10dd8f/styles/communityIcon_rwl05b14iveb1.png'
data = {
"title": title,
"body": text,
"device_key": BARK_DEVICE_KEY,
"icon": icon,
"group": "Grocy",
"url": ""
}
requests.post('http://bark.tunpok.com/push/', json=data)
def add_product(dict_good, location):
good_name = ""
if "description" in dict_good:
good_name = dict_good["description"]
elif "description_cn" in dict_good:
good_name = dict_good["description_cn"]
if not good_name:
return False, good_name
if 'specification' in dict_good:
good_name = good_name + " - " + dict_good['specification']
locations = get_locations()
if not location:
location = locations[0]['name']
location_map = {item['name']: item['id'] for item in locations}
data_grocy = {
"name": good_name,
"description": "",
"location_id": location_map[location],
"qu_id_purchase": GROCY_DEFAULT_QUANTITY_UNIT_ID_PURCHASE,
"qu_id_stock": GROCY_DEFAULT_QUANTITY_UNIT_ID_STOCK,
"default_best_before_days": GROCY_DEFAULT_BEST_BEFORE_DAYS,
"default_consume_location_id": location_map[location],
"move_on_open": "1",
}
if ("gpc" in dict_good) and dict_good["gpc"]:
best_before_days, location_ = gpc_best_before_days_and_location(
int(dict_good["gpc"]),
locations)
if best_before_days:
data_grocy["default_best_before_days"] = best_before_days
if location_:
data_grocy["location_id"] = location_
data_grocy["default_consume_location_id"] = location_
# add product
response_grocy = grocy.add_generic(EntityType.PRODUCTS, data_grocy)
# # add gds info
grocy.set_userfields(
EntityType.PRODUCTS,
int(response_grocy["created_object_id"]),
"GDSInfo",
json.dumps(dict_good, ensure_ascii=False),
)
# add barcode, ex. 06921168593910
data_barcode = {
"product_id": int(response_grocy["created_object_id"]),
"barcode": dict_good["gtin"],
"amount": 1.0,
}
grocy.add_generic(EntityType.PRODUCT_BARCODES, data_barcode)
# add barcode, EAN-13, ex. 6921168593910
if dict_good["gtin"].startswith("0"):
data_barcode = {
"product_id": int(response_grocy["created_object_id"]),
"barcode": dict_good["gtin"].lstrip("0"),
"amount": 1.0,
}
grocy.add_generic(EntityType.PRODUCT_BARCODES, data_barcode)
# add picture
pic_url = ""
if ("picfilename" in dict_good) and dict_good["picfilename"]:
pic_url = dict_good["picfilename"]
elif ("picture_filename" in dict_good) and dict_good["picture_filename"]:
pic_url = dict_good["picture_filename"]
if pic_url:
try:
response_img = requests.get(
pic_url,
{
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
},
)
if response_img.status_code == 200:
image_data = response_img.content
with open("img.png", "wb") as o:
# output_data = remove(image_data)
o.write(image_data)
grocy.add_product_pic(int(response_grocy["created_object_id"]), "img.png")
except requests.exceptions.RequestException as err:
print("Request error:", err)
grocy.add_product_by_barcode(dict_good["gtin"], 1.0, 0.0)
return True, good_name
def gpc_best_before_days_and_location(Code, locations):
"""
保质期(天) 类别
7 50370000中类鲜切水果或蔬菜, 50380000中类鲜切水果或蔬菜, 50350000中类未处理或未加工的新鲜叶菜类蔬菜
14 50250000中类未处理或未加工的新鲜水果, 10000025细类牛奶易腐坏, 10006970细类牛奶替代品易腐坏, 10000278细类酸奶易腐坏, 10006979细类酸奶替代品易腐坏
152 50270000中类未处理或未加工的冷冻水果, 50310000中类未处理或未加工的耐储存水果
305 94000000中类粮食作物, 50000000大类食品、饮料和烟草, 10120000中类宠物护理用品或食品组合装, 10110000中类宠物食品或饮品
1005 53000000大类美容、个人护理和卫生用品, 47100000中类清洁产品, 47190000中类清洁和卫生产品组合装, 51000000大类医疗保健, 10100000中类宠物护理用品
"""
with open("gpc_brick_code.json") as json_file:
gpc_data = json.load(json_file)
best_before_days = {}
best_before_days["7"] = [
50370000,
50380000,
50350000,
]
best_before_days["14"] = [
50250000,
10000025,
10006970,
10000278,
10006979,
]
best_before_days["152"] = [
50270000,
50310000,
]
best_before_days["305"] = [
94000000,
50000000,
10120000,
10110000,
]
best_before_days["670"] = []
best_before_days["1005"] = [
53000000,
47100000,
47190000,
51000000,
10100000,
]
location_map = {item['name']: item['id'] for item in locations}
for item in gpc_data["Schema"]:
if item["Code"] == Code:
title = item["Title-1"]
location = locations[0]['name']
if title == 'Food/Beverage/Tobacco':
if item['Title-2'] in [
'Bread/Bakery Products',
]:
location = '零食柜'
else:
location = '厨房'
elif title in [
'Beauty/Personal Care/Hygiene',
'Cleaning/Hygiene Products',
]:
location = '浴室'
elif title == 'Healthcare':
location = '药柜'
codes = [item["Code"], item["Code-1"], item["Code-2"], item["Code-3"]]
for day, filter_codes in best_before_days.items():
if any(code in filter_codes for code in codes):
return day, location_map[location]
return None, location_map[location]
def convert_image_to_base64(image_content):
base64_image = base64.b64encode(image_content).decode("utf-8")
return base64_image
def handle_add_product(barcode, location):
try:
product = grocy.product_by_barcode(barcode)
barcode_ = None
amount = 1.0
if product:
for product_barcode in product.product_barcodes:
if product_barcode.barcode == barcode:
barcode_ = product_barcode
break
if barcode_:
if hasattr(barcode_, "amount") and barcode_.amount:
amount = barcode_.amount
grocy.add_product_by_barcode(barcode, amount, 0.0)
response_data = {"message": "Item added successfully"}
return response_data, 200, product.name
except:
spider = BarcodeSpider(x_rapidapi_key=X_RapidAPI_Key)
good = spider.get_good(barcode)
if not good:
response_data = {"message": "Item not found"}
return response_data, 400, None
try:
added, good_name = add_product(good, location)
if added:
response_data = {"message": "New item added successfully"}
return response_data, 200, good_name
else:
response_data = {"message": "Fail to add new item"}
return response_data, 400, None
except Exception as e:
if hasattr(e, "message"):
error_message = e.message
else:
error_message = str(e)
response_data = {"message": error_message}
return response_data, 400, None
@app.route("/")
def index():
return "Up and running!"
@app.route("/add", methods=["POST"])
def add():
data = request.json
location = data.get("location", "")
barcode = data.get("barcode", "")
response_data, status_code = handle_add_product(barcode, location)
return jsonify(response_data), status_code
@app.route("/consume", methods=["POST"])
def consume():
try:
data = request.json
barcode = data.get("barcode", "")
grocy.consume_product_by_barcode(barcode)
response_data = {"message": "Item removed successfully"}
return jsonify(response_data), 200
except Exception as e:
error_message = str(e)
response_data = {"message": error_message}
return jsonify(response_data), 400
def upload_image(url):
# download image
img = requests.get(url)
# compress image
compress_url = 'https://upload-tinypng.tunpok.com/upload/'
resp = requests.post(compress_url, files={'file': img.content})
if resp.status_code == 200:
return resp.json()['cdn_url']
return None
@app.route("/add_recipe", methods=["POST"])
def add_recipe():
data = request.json
source = data.get("source", "xiachufang")
url = data.get("url", "")
if source != "xiachufang":
response_data = {"message": "Invalid source"}
return jsonify(response_data), 400
if not url:
response_data = {"message": "Invalid url"}
return jsonify(response_data), 400
try:
# https://www.xiachufang.com/recipe/107141278/?recipe_type=1&page_scene=6
recipe_number = re.search(r'(\d+)', url).group(1)
resp = grocy.get_generic_objects_for_type(EntityType.RECIPES, ['name~%s' % recipe_number])
if resp:
response_data = {"message": "Recipe already exists"}
return jsonify(response_data), 400
xcf_recipe = get_recipe_from_xiachufang(url)
description = "<a href=\"" + url + "\">" + '来源' + "</a>"
description += "<br>"
if xcf_recipe['ingredients']:
description += "<h2>用料</h2>\n"
description += "<ul>\n"
for ingredient in xcf_recipe["ingredients"]:
description += f'<li>{ingredient}</li>\n'
description += "</ul>\n"
if xcf_recipe['steps']:
description += "<h2>做法</h2>\n"
for idx, step in enumerate(xcf_recipe["steps"]):
description += f"<h3>步骤 {idx+1}</h3>\n"
description += f'<p>{step[0]}</p>\n'
if len(step) == 1:
description += "<br>"
continue
img = requests.get(step[1])
img_data = 'data:image/png;base64,' + convert_image_to_base64(img.content)
description += f'\n<img src=\"{img_data}\">\n\n'
description += "<br>"
data_grocy = {
"name": xcf_recipe["name"] + " - " + recipe_number,
"description": description
}
grocy.add_generic(EntityType.RECIPES, data_grocy)
response_data = {"message": f"Recipe {data_grocy['name']} added successfully"}
except Exception as e:
error_message = str(e)
response_data = {"message": error_message}
return jsonify(response_data), 400
return jsonify(response_data), 200
# 连接到 Redis
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True)
# 定义 Redis Stream 的键
STREAM_KEY = "message_stream"
CONSUMER_GROUP = "consumer_group_1"
CONSUMER_NAME = "consumer_1"
# 创建 Redis 消费者组(如果不存在)
def create_consumer_group():
try:
r.xgroup_create(STREAM_KEY, CONSUMER_GROUP, id='0', mkstream=True)
except redis.exceptions.ResponseError as e:
if "BUSYGROUP" in str(e):
# 消费者组已存在,不报错
pass
create_consumer_group()
# 定时任务函数:从 Redis Stream 读取未消费的消息
def consume_stream():
try:
# 读取 Stream 中的消息,设置阻塞时间为 0
messages = r.xreadgroup(CONSUMER_GROUP, CONSUMER_NAME, {STREAM_KEY: '>'}, count=10, block=1000)
if messages:
for stream, msgs in messages:
for msg_id, msg in msgs:
# 打印或处理消息
logger.info(f"Processing message: {msg['data']}")
if not msg['data'].isnumeric():
logger.info(f"Skip non-numeric barcode: {msg['data']}")
continue
resp, status_code, good_name = handle_add_product(msg['data'], '')
if not good_name:
good_name = ''
if status_code == 200:
# 消费完成后确认消息
r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id)
bark_push('%s 添加成功' % good_name, '条形码 %s' % msg['data'])
else:
logger.error(f"Failed to process message: {msg['data']} {resp}")
bark_push('商品添加失败', '条形码 %s' % msg['data'])
except Exception as e:
logger.exception(f"Error processing messages: {str(e)}")
# TODO: 定时任务函数:从 Redis Stream 读取消费者组的未确认消息
# def check_unacked_messages()
@app.route('/barcode', methods=['POST'])
def add_to_stream():
try:
# 获取请求中的 JSON 数据
data = request.json
if not data:
return jsonify({"message": "No data provided"}), 400
if not data.get('data'):
return jsonify({"message": "No barcode data provided"}), 400
# 将数据添加到 Redis Stream 中
# 自动生成 ID ('*' 表示让 Redis 生成唯一 ID)
stream_id = r.xadd(STREAM_KEY, data)
if data['data'].isnumeric():
bark_push('扫码成功', '条形码 %s' % data['data'])
else:
bark_push('扫码成功', '二维码 %s' % data['data'])
return jsonify({"message": "Data added to Redis Stream", "stream_id": stream_id}), 200
except Exception as e:
return jsonify({"message": str(e)}), 500
# 初始化 APScheduler 并添加定时任务
scheduler = BackgroundScheduler()
scheduler.add_job(func=consume_stream, trigger="interval", seconds=10)
scheduler.start()
# 确保程序退出时停止定时任务
atexit.register(lambda: scheduler.shutdown())
if __name__ == "__main__":
app.run(host="0.0.0.0", port=9288)