# coding = utf-8 import json import os import re from apscheduler.schedulers.background import BackgroundScheduler from flask import Flask, jsonify, request from pygrocy import EntityType, Grocy import atexit import base64 import loguru import redis import requests from barcode import BarcodeSpider from recipe import get_recipe_from_xiachufang logger = loguru.logger # config = configparser.ConfigParser() # config.read('config.ini') # GROCY_URL = config.get('Grocy', 'GROCY_URL') # GROCY_PORT = config.getint('Grocy', 'GROCY_PORT') # GROCY_API = config.get('Grocy', 'GROCY_API') # GROCY_DEFAULT_QUANTITY_UNIT_ID = config.getint('Grocy', 'GROCY_DEFAULT_QUANTITY_UNIT_ID') # GROCY_DEFAULT_BEST_BEFORE_DAYS = config.get('Grocy', 'GROCY_DEFAULT_BEST_BEFORE_DAYS') # GROCY_LOCATION = {} # for key in config['GrocyLocation']: # GROCY_LOCATION[key] = config.get('GrocyLocation', key) # X_RapidAPI_Key = config.get('RapidAPI', 'X_RapidAPI_Key') # get config from environment GROCY_URL = os.environ.get("GROCY_URL") GROCY_API_KEY = os.environ.get("GROCY_API_KEY") GROCY_PORT = os.environ.get("GROCY_PORT") GROCY_DEFAULT_QUANTITY_UNIT_ID_PURCHASE = os.environ.get("GROCY_DEFAULT_QUANTITY_UNIT_ID_PURCHASE") GROCY_DEFAULT_QUANTITY_UNIT_ID_STOCK = os.environ.get("GROCY_DEFAULT_QUANTITY_UNIT_ID_STOCK") GROCY_DEFAULT_BEST_BEFORE_DAYS = os.environ.get("GROCY_DEFAULT_BEST_BEFORE_DAYS") X_RapidAPI_Key = os.environ.get("X_RapidAPI_Key") REDIS_HOST = os.environ.get("REDIS_HOST") REDIS_PORT = os.environ.get("REDIS_PORT") BARK_DEVICE_KEY = os.environ.get("BARK_DEVICE_KEY") app = Flask(__name__) grocy = Grocy(GROCY_URL, GROCY_API_KEY, GROCY_PORT, verify_ssl=True) def get_locations(): locations = grocy.get_generic_objects_for_type(EntityType.LOCATIONS) return locations def bark_push(title, text): icon = 'https://styles.redditmedia.com/t5_10dd8f/styles/communityIcon_rwl05b14iveb1.png' data = { "title": title, "body": text, "device_key": BARK_DEVICE_KEY, "icon": icon, "group": "Grocy", "url": "" } requests.post('http://bark.tunpok.com/push/', json=data) def add_product(dict_good, location): good_name = "" if "description" in dict_good: good_name = dict_good["description"] elif "description_cn" in dict_good: good_name = dict_good["description_cn"] if not good_name: return False if 'specification' in dict_good: good_name = good_name + " - " + dict_good['specification'] locations = get_locations() if not location: location = locations[0]['name'] location_map = {item['name']: item['id'] for item in locations} data_grocy = { "name": good_name, "description": "", "location_id": location_map[location], "qu_id_purchase": GROCY_DEFAULT_QUANTITY_UNIT_ID_PURCHASE, "qu_id_stock": GROCY_DEFAULT_QUANTITY_UNIT_ID_STOCK, "default_best_before_days": GROCY_DEFAULT_BEST_BEFORE_DAYS, "default_consume_location_id": location_map[location], "move_on_open": "1", } if ("gpc" in dict_good) and dict_good["gpc"]: best_before_days, location_ = gpc_best_before_days_and_location( int(dict_good["gpc"]), locations) if best_before_days: data_grocy["default_best_before_days"] = best_before_days if location_: data_grocy["location_id"] = location_ data_grocy["default_consume_location_id"] = location_ # add product response_grocy = grocy.add_generic(EntityType.PRODUCTS, data_grocy) # # add gds info grocy.set_userfields( EntityType.PRODUCTS, int(response_grocy["created_object_id"]), "GDSInfo", json.dumps(dict_good, ensure_ascii=False), ) # add barcode, ex. 06921168593910 data_barcode = { "product_id": int(response_grocy["created_object_id"]), "barcode": dict_good["gtin"], } grocy.add_generic(EntityType.PRODUCT_BARCODES, data_barcode) # add barcode, EAN-13, ex. 6921168593910 if dict_good["gtin"].startswith("0"): data_barcode = { "product_id": int(response_grocy["created_object_id"]), "barcode": dict_good["gtin"].lstrip("0"), } grocy.add_generic(EntityType.PRODUCT_BARCODES, data_barcode) # add picture pic_url = "" if ("picfilename" in dict_good) and dict_good["picfilename"]: pic_url = dict_good["picfilename"] elif ("picture_filename" in dict_good) and dict_good["picture_filename"]: pic_url = dict_good["picture_filename"] if pic_url: try: response_img = requests.get( pic_url, { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36" }, ) if response_img.status_code == 200: image_data = response_img.content with open("img.png", "wb") as o: # output_data = remove(image_data) o.write(image_data) grocy.add_product_pic(int(response_grocy["created_object_id"]), "img.png") except requests.exceptions.RequestException as err: print("Request error:", err) grocy.add_product_by_barcode(dict_good["gtin"], 1.0, 0.0) return True def gpc_best_before_days_and_location(Code, locations): """ 保质期(天) 类别 7 50370000(中类,鲜切水果或蔬菜), 50380000(中类,鲜切水果或蔬菜), 50350000(中类,未处理或未加工的(新鲜)叶菜类蔬菜) 14 50250000(中类,未处理或未加工的(新鲜)水果), 10000025(细类,牛奶(易腐坏)), 10006970(细类,牛奶替代品(易腐坏)), 10000278(细类,酸奶(易腐坏)), 10006979(细类,酸奶替代品(易腐坏)) 152 50270000(中类,未处理或未加工的冷冻水果), 50310000(中类,未处理或未加工的耐储存水果) 305 94000000(中类,粮食作物), 50000000(大类,食品、饮料和烟草), 10120000(中类,宠物护理用品或食品组合装), 10110000(中类,宠物食品或饮品) 1005 53000000(大类,美容、个人护理和卫生用品), 47100000(中类,清洁产品), 47190000(中类,清洁和卫生产品组合装), 51000000(大类,医疗保健), 10100000(中类,宠物护理用品) """ with open("gpc_brick_code.json") as json_file: gpc_data = json.load(json_file) best_before_days = {} best_before_days["7"] = [ 50370000, 50380000, 50350000, ] best_before_days["14"] = [ 50250000, 10000025, 10006970, 10000278, 10006979, ] best_before_days["152"] = [ 50270000, 50310000, ] best_before_days["305"] = [ 94000000, 50000000, 10120000, 10110000, ] best_before_days["670"] = [] best_before_days["1005"] = [ 53000000, 47100000, 47190000, 51000000, 10100000, ] location_map = {item['name']: item['id'] for item in locations} for item in gpc_data["Schema"]: if item["Code"] == Code: title = item["Title-1"] location = locations[0]['name'] if title == 'Food/Beverage/Tobacco': if item['Title-2'] in [ 'Bread/Bakery Products', ]: location = '零食柜' else: location = '厨房' elif title in [ 'Beauty/Personal Care/Hygiene', 'Cleaning/Hygiene Products', ]: location = '浴室' elif title == 'Healthcare': location = '药柜' codes = [item["Code"], item["Code-1"], item["Code-2"], item["Code-3"]] for day, filter_codes in best_before_days.items(): if any(code in filter_codes for code in codes): return day, location_map[location] return None, location_map[location] def convert_image_to_base64(image_content): base64_image = base64.b64encode(image_content).decode("utf-8") return base64_image def handle_add_product(barcode, location): try: product = grocy.product_by_barcode(barcode) barcode_ = None amount = 1.0 if product: for product_barcode in product.product_barcodes: if product_barcode.barcode == barcode: barcode_ = product_barcode break if barcode_: amount = barcode_.amount grocy.add_product_by_barcode(barcode, amount, 0.0) response_data = {"message": "Item added successfully"} return response_data, 200 except: spider = BarcodeSpider(x_rapidapi_key=X_RapidAPI_Key) good = spider.get_good(barcode) if not good: response_data = {"message": "Item not found"} return response_data, 400 try: if add_product(good, location): response_data = {"message": "New item added successfully"} return response_data, 200 else: response_data = {"message": "Fail to add new item"} return response_data, 400 except Exception as e: if hasattr(e, "message"): error_message = e.message else: error_message = str(e) response_data = {"message": error_message} return response_data, 400 @app.route("/") def index(): return "Up and running!" @app.route("/add", methods=["POST"]) def add(): data = request.json location = data.get("location", "") barcode = data.get("barcode", "") response_data, status_code = handle_add_product(barcode, location) return jsonify(response_data), status_code @app.route("/consume", methods=["POST"]) def consume(): try: data = request.json barcode = data.get("barcode", "") grocy.consume_product_by_barcode(barcode) response_data = {"message": "Item removed successfully"} return jsonify(response_data), 200 except Exception as e: error_message = str(e) response_data = {"message": error_message} return jsonify(response_data), 400 @app.route("/add_recipe", methods=["POST"]) def add_recipe(): data = request.json source = data.get("source", "xiachufang") url = data.get("url", "") if source != "xiachufang": response_data = {"message": "Invalid source"} return jsonify(response_data), 400 if not url: response_data = {"message": "Invalid url"} return jsonify(response_data), 400 try: # https://www.xiachufang.com/recipe/107141278/?recipe_type=1&page_scene=6 recipe_number = re.search(r'(\d+)', url).group(1) resp = grocy.get_generic_objects_for_type(EntityType.RECIPES, ['name~%s' % recipe_number]) if resp: response_data = {"message": "Recipe already exists"} return jsonify(response_data), 400 xcf_recipe = get_recipe_from_xiachufang(url) description = "" + '来源' + "" description += "
" if xcf_recipe['ingredients']: description += "

用料

\n" description += "\n" if xcf_recipe['steps']: description += "

做法

\n" for idx, step in enumerate(xcf_recipe["steps"]): description += f"

步骤 {idx+1}

\n" description += f'

{step[0]}

\n' if len(step) == 1: description += "
" continue img = requests.get(step[1]) img_data = 'data:image/png;base64,' + convert_image_to_base64(img.content) description += f'\n\n\n' description += "
" data_grocy = { "name": xcf_recipe["name"] + " - " + recipe_number, "description": description } grocy.add_generic(EntityType.RECIPES, data_grocy) response_data = {"message": f"Recipe {data_grocy['name']} added successfully"} except Exception as e: error_message = str(e) response_data = {"message": error_message} return jsonify(response_data), 400 return jsonify(response_data), 200 # 连接到 Redis r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True) # 定义 Redis Stream 的键 STREAM_KEY = "message_stream" CONSUMER_GROUP = "consumer_group_1" CONSUMER_NAME = "consumer_1" # 创建 Redis 消费者组(如果不存在) def create_consumer_group(): try: r.xgroup_create(STREAM_KEY, CONSUMER_GROUP, id='0', mkstream=True) except redis.exceptions.ResponseError as e: if "BUSYGROUP" in str(e): # 消费者组已存在,不报错 pass create_consumer_group() # 定时任务函数:从 Redis Stream 读取未消费的消息 def consume_stream(): try: # 读取 Stream 中的消息,设置阻塞时间为 0 messages = r.xreadgroup(CONSUMER_GROUP, CONSUMER_NAME, {STREAM_KEY: '>'}, count=10, block=1000) if messages: for stream, msgs in messages: for msg_id, msg in msgs: # 打印或处理消息 logger.info(f"Processing message: {msg['data']}") if not msg['data'].isnumeric(): logger.info(f"Skip non-numeric barcode: {msg['data']}") continue resp, status_code = handle_add_product(msg['data'], '') if status_code == 200: # 消费完成后确认消息 r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id) bark_push('商品添加成功', '条形码 %s' % msg['data']) else: logger.error(f"Failed to process message: {msg['data']} {resp}") bark_push('商品添加失败', '条形码 %s' % msg['data']) except Exception as e: logger.exception(f"Error processing messages: {str(e)}") # TODO: 定时任务函数:从 Redis Stream 读取消费者组的未确认消息 # def check_unacked_messages() @app.route('/barcode', methods=['POST']) def add_to_stream(): try: # 获取请求中的 JSON 数据 data = request.json if not data: return jsonify({"message": "No data provided"}), 400 if not data.get('data'): return jsonify({"message": "No barcode data provided"}), 400 # 将数据添加到 Redis Stream 中 # 自动生成 ID ('*' 表示让 Redis 生成唯一 ID) stream_id = r.xadd(STREAM_KEY, data) if data['data'].isnumeric(): bark_push('扫码成功', '条形码 %s' % data['data']) else: bark_push('扫码成功', '二维码 %s' % data['data']) return jsonify({"message": "Data added to Redis Stream", "stream_id": stream_id}), 200 except Exception as e: return jsonify({"message": str(e)}), 500 # 初始化 APScheduler 并添加定时任务 scheduler = BackgroundScheduler() scheduler.add_job(func=consume_stream, trigger="interval", seconds=300) scheduler.start() # 确保程序退出时停止定时任务 atexit.register(lambda: scheduler.shutdown()) if __name__ == "__main__": app.run(host="0.0.0.0", port=9288)