# coding = utf-8 import json import os import re from apscheduler.schedulers.background import BackgroundScheduler from flask import Flask, jsonify, request from pygrocy import EntityType, Grocy import atexit import base64 import mimetypes import loguru import redis import requests from barcode import BarcodeSpider from recipe import get_recipe_from_xiachufang logger = loguru.logger # config = configparser.ConfigParser() # config.read('config.ini') # GROCY_URL = config.get('Grocy', 'GROCY_URL') # GROCY_PORT = config.getint('Grocy', 'GROCY_PORT') # GROCY_API = config.get('Grocy', 'GROCY_API') # GROCY_DEFAULT_QUANTITY_UNIT_ID = config.getint('Grocy', 'GROCY_DEFAULT_QUANTITY_UNIT_ID') # GROCY_DEFAULT_BEST_BEFORE_DAYS = config.get('Grocy', 'GROCY_DEFAULT_BEST_BEFORE_DAYS') # GROCY_LOCATION = {} # for key in config['GrocyLocation']: # GROCY_LOCATION[key] = config.get('GrocyLocation', key) # X_RapidAPI_Key = config.get('RapidAPI', 'X_RapidAPI_Key') # get config from environment GROCY_URL = os.environ.get("GROCY_URL") GROCY_API_KEY = os.environ.get("GROCY_API_KEY") GROCY_PORT = os.environ.get("GROCY_PORT") GROCY_DEFAULT_QUANTITY_UNIT_ID_PURCHASE = os.environ.get("GROCY_DEFAULT_QUANTITY_UNIT_ID_PURCHASE") GROCY_DEFAULT_QUANTITY_UNIT_ID_STOCK = os.environ.get("GROCY_DEFAULT_QUANTITY_UNIT_ID_STOCK") GROCY_DEFAULT_BEST_BEFORE_DAYS = os.environ.get("GROCY_DEFAULT_BEST_BEFORE_DAYS") X_RapidAPI_Key = os.environ.get("X_RapidAPI_Key") REDIS_HOST = os.environ.get("REDIS_HOST") REDIS_PORT = os.environ.get("REDIS_PORT") BARK_DEVICE_KEY = os.environ.get("BARK_DEVICE_KEY") app = Flask(__name__) grocy = Grocy(GROCY_URL, GROCY_API_KEY, GROCY_PORT, verify_ssl=True) # 连接到 Redis r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True) # 定义 Redis Stream 的键 STREAM_KEY = "message_stream" CONSUMER_GROUP = "consumer_group_1" CONSUMER_NAME = "consumer_1" # 创建 Redis 消费者组(如果不存在) def create_consumer_group(): try: r.xgroup_create(STREAM_KEY, CONSUMER_GROUP, id='0', mkstream=True) except redis.exceptions.ResponseError as e: if "BUSYGROUP" in str(e): # 消费者组已存在,不报错 pass create_consumer_group() def get_locations(): locations = grocy.get_generic_objects_for_type(EntityType.LOCATIONS) return locations def bark_push(title, text): icon = 'https://styles.redditmedia.com/t5_10dd8f/styles/communityIcon_rwl05b14iveb1.png' data = { "title": title, "body": text, "device_key": BARK_DEVICE_KEY, "icon": icon, "group": "Grocy", "url": "" } requests.post('http://bark.tunpok.com/push/', json=data) def add_product(dict_good, location): good_name = "" if "description" in dict_good: good_name = dict_good["description"] elif "description_cn" in dict_good: good_name = dict_good["description_cn"] if not good_name: return False, good_name if 'specification' in dict_good: good_name = good_name + " - " + dict_good['specification'] locations = get_locations() if not location: location = locations[0]['name'] location_map = {item['name']: item['id'] for item in locations} data_grocy = { "name": good_name, "description": "", "location_id": location_map[location], "qu_id_purchase": GROCY_DEFAULT_QUANTITY_UNIT_ID_PURCHASE, "qu_id_stock": GROCY_DEFAULT_QUANTITY_UNIT_ID_STOCK, "default_best_before_days": GROCY_DEFAULT_BEST_BEFORE_DAYS, "default_consume_location_id": location_map[location], "move_on_open": "1", } if ("gpc" in dict_good) and dict_good["gpc"]: best_before_days, location_ = gpc_best_before_days_and_location( int(dict_good["gpc"]), locations) if best_before_days: data_grocy["default_best_before_days"] = best_before_days if location_: data_grocy["location_id"] = location_ data_grocy["default_consume_location_id"] = location_ # add product response_grocy = grocy.add_generic(EntityType.PRODUCTS, data_grocy) # # add gds info grocy.set_userfields( EntityType.PRODUCTS, int(response_grocy["created_object_id"]), "GDSInfo", json.dumps(dict_good, ensure_ascii=False), ) # add barcode, ex. 06921168593910 data_barcode = { "product_id": int(response_grocy["created_object_id"]), "barcode": dict_good["gtin"], "amount": 1.0, } grocy.add_generic(EntityType.PRODUCT_BARCODES, data_barcode) # add barcode, EAN-13, ex. 6921168593910 if dict_good["gtin"].startswith("0"): data_barcode = { "product_id": int(response_grocy["created_object_id"]), "barcode": dict_good["gtin"].lstrip("0"), "amount": 1.0, } grocy.add_generic(EntityType.PRODUCT_BARCODES, data_barcode) # add picture pic_url = "" if ("picfilename" in dict_good) and dict_good["picfilename"]: pic_url = dict_good["picfilename"] elif ("picture_filename" in dict_good) and dict_good["picture_filename"]: pic_url = dict_good["picture_filename"] if pic_url: try: response_img = requests.get( pic_url, { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36" }, ) if response_img.status_code == 200: image_data = response_img.content with open("img.png", "wb") as o: # output_data = remove(image_data) o.write(image_data) grocy.add_product_pic(int(response_grocy["created_object_id"]), "img.png") except requests.exceptions.RequestException as err: print("Request error:", err) grocy.add_product_by_barcode(dict_good["gtin"], 1.0, 0.0) return True, good_name def gpc_best_before_days_and_location(Code, locations): """ 保质期(天) 类别 7 50370000(中类,鲜切水果或蔬菜), 50380000(中类,鲜切水果或蔬菜), 50350000(中类,未处理或未加工的(新鲜)叶菜类蔬菜) 14 50250000(中类,未处理或未加工的(新鲜)水果), 10000025(细类,牛奶(易腐坏)), 10006970(细类,牛奶替代品(易腐坏)), 10000278(细类,酸奶(易腐坏)), 10006979(细类,酸奶替代品(易腐坏)) 152 50270000(中类,未处理或未加工的冷冻水果), 50310000(中类,未处理或未加工的耐储存水果) 305 94000000(中类,粮食作物), 50000000(大类,食品、饮料和烟草), 10120000(中类,宠物护理用品或食品组合装), 10110000(中类,宠物食品或饮品) 1005 53000000(大类,美容、个人护理和卫生用品), 47100000(中类,清洁产品), 47190000(中类,清洁和卫生产品组合装), 51000000(大类,医疗保健), 10100000(中类,宠物护理用品) """ with open("gpc_brick_code.json") as json_file: gpc_data = json.load(json_file) best_before_days = {} best_before_days["7"] = [ 50370000, 50380000, 50350000, ] best_before_days["14"] = [ 50250000, 10000025, 10006970, 10000278, 10006979, ] best_before_days["152"] = [ 50270000, 50310000, ] best_before_days["305"] = [ 94000000, 50000000, 10120000, 10110000, ] best_before_days["670"] = [] best_before_days["1005"] = [ 53000000, 47100000, 47190000, 51000000, 10100000, ] location_map = {item['name']: item['id'] for item in locations} for item in gpc_data["Schema"]: if item["Code"] == Code: title = item["Title-1"] location = locations[0]['name'] if title == 'Food/Beverage/Tobacco': if item['Title-2'] in [ 'Bread/Bakery Products', ]: location = '零食柜' else: location = '厨房' elif title in [ 'Beauty/Personal Care/Hygiene', 'Cleaning/Hygiene Products', ]: location = '浴室' elif title == 'Healthcare': location = '药柜' codes = [item["Code"], item["Code-1"], item["Code-2"], item["Code-3"]] for day, filter_codes in best_before_days.items(): if any(code in filter_codes for code in codes): return day, location_map[location] return None, location_map[location] def convert_image_to_base64(image_content): base64_image = base64.b64encode(image_content).decode("utf-8") return base64_image def handle_add_product(barcode, location): try: product = grocy.product_by_barcode(barcode) barcode_ = None amount = 1.0 if product: for product_barcode in product.product_barcodes: if product_barcode.barcode == barcode: barcode_ = product_barcode break if barcode_: if hasattr(barcode_, "amount") and barcode_.amount: amount = barcode_.amount grocy.add_product_by_barcode(barcode, amount, 0.0) response_data = {"message": "Item added successfully"} return response_data, 200, product.name except: spider = BarcodeSpider(x_rapidapi_key=X_RapidAPI_Key) good = spider.get_good(barcode) if not good: response_data = {"message": "Item not found"} return response_data, 400, None try: added, good_name = add_product(good, location) if added: response_data = {"message": "New item added successfully"} return response_data, 200, good_name else: response_data = {"message": "Fail to add new item"} return response_data, 400, None except Exception as e: if hasattr(e, "message"): error_message = e.message else: error_message = str(e) response_data = {"message": error_message} return response_data, 400, None @app.route("/") def index(): return "Up and running!" @app.route("/add", methods=["POST"]) def add(): data = request.json location = data.get("location", "") barcode = data.get("barcode", "") response_data, status_code = handle_add_product(barcode, location) return jsonify(response_data), status_code @app.route("/consume", methods=["POST"]) def consume(): try: data = request.json barcode = data.get("barcode", "") grocy.consume_product_by_barcode(barcode) response_data = {"message": "Item removed successfully"} return jsonify(response_data), 200 except Exception as e: error_message = str(e) response_data = {"message": error_message} return jsonify(response_data), 400 def upload_image(url): # download image url = url.split('?')[0] image_name = url.split('/')[-1] img = requests.get(url) mime_type = mimetypes.guess_type(image_name)[0] if not mime_type: mime_type = 'image/jpeg' # compress image compress_url = 'https://upload-tinypng.tunpok.com/upload/' files = {'file': ('img.jpeg', img.content, mime_type,)} resp = requests.post(compress_url, files=files, ) if resp.status_code == 200: return resp.json()['cdn_url'] return None def add_recipe_from_url(url): try: # https://www.xiachufang.com/recipe/107141278/?recipe_type=1&page_scene=6 recipe_number = re.search(r'(\d+)', url).group(1) resp = grocy.get_generic_objects_for_type(EntityType.RECIPES, ['name~%s' % recipe_number]) if resp: response_data = {"message": "Recipe already exists"} return jsonify(response_data), 400, None xcf_recipe = get_recipe_from_xiachufang(url) description = "" + '来源' + "" description += "
" if xcf_recipe['ingredients']: description += "

用料

\n" description += "\n" if xcf_recipe['steps']: description += "

做法

\n" for idx, step in enumerate(xcf_recipe["steps"]): description += f"

步骤 {idx+1}

\n" description += f'

{step[0]}

\n' if len(step) == 1: description += "
" continue # img = requests.get(step[1]) # img_data = 'data:image/png;base64,' + convert_image_to_base64(img.content) img = upload_image(step[1]) if img: img_data = img else: img = requests.get(step[1]) img_data = 'data:image/png;base64,' + convert_image_to_base64(img.content) description += f'\n\n\n' description += "
" data_grocy = { "name": xcf_recipe["name"] + " - " + recipe_number, "description": description } grocy.add_generic(EntityType.RECIPES, data_grocy) response_data = {"message": f"{data_grocy['name']}"} return response_data, 200, data_grocy['name'] except Exception as e: error_message = str(e) response_data = {"message": error_message} return response_data, 400, None @app.route("/add_recipe", methods=["POST"]) def add_recipe(): data = request.json source = data.get("source", "xiachufang") url = data.get("url", "") if source != "xiachufang": response_data = {"message": "Invalid source"} return jsonify(response_data), 400 if not url: response_data = {"message": "Invalid url"} return jsonify(response_data), 400 stream_id = r.xadd(STREAM_KEY, data) return jsonify({"message": "Data added to Redis Stream", "stream_id": stream_id}), 200 def consume_product_by_barcode(barcode): try: product = grocy.product_by_barcode(barcode) if not product: response_data = {"message": "Item not found"} return response_data, 400, None grocy.consume_product_by_barcode(barcode) response_data = {"message": "Item removed successfully"} return response_data, 200, product.name except Exception as e: error_message = str(e) response_data = {"message": error_message} return response_data, 400, None # 定时任务函数:从 Redis Stream 读取未消费的消息 def consume_stream(): consume_product_flag_key = 'consume-product-flag' continuous_consume_flag_key = 'continuous-consume-flag' add_product_flag_key = 'add-product-flag' try: # 读取 Stream 中的消息,设置阻塞时间为 0 messages = r.xreadgroup(CONSUMER_GROUP, CONSUMER_NAME, {STREAM_KEY: '>'}, count=10, block=1000) if messages: for stream, msgs in messages: for msg_id, msg in msgs: # 打印或处理消息 if msg.get('url'): logger.info(f"Processing recipe message: {msg.get('url')}") elif msg.get('data'): logger.info(f"Processing message: {msg.get('data')}") else: logger.info(f"Processing message: {msg}") if msg.get('data'): if not msg['data'].isnumeric(): if msg['data'] == consume_product_flag_key: # 如果消息内容为 'consume-product',则设置消耗商品标志位 logger.info(f"Setting consume-product flag") r.set(consume_product_flag_key, '1', ex=60 * 5) elif msg['data'] == continuous_consume_flag_key: # 如果消息内容为 'continuous-consume',则设置连续消耗标志位 logger.info(f"Setting continuous-consume flag") r.set(continuous_consume_flag_key, '1', ex=60 * 5) elif msg['data'] == add_product_flag_key: # 如果消息内容为 'add-product',则设置添加商品标志位 logger.info(f"Setting add-product flag") r.delete(consume_product_flag_key) r.delete(continuous_consume_flag_key) else: logger.info(f"Skip non-numeric barcode: {msg['data']}") continue consume_product_flag = r.get(consume_product_flag_key) continuous_consume_flag = r.get(continuous_consume_flag_key) if not (consume_product_flag or continuous_consume_flag): resp, status_code, good_name = handle_add_product(msg['data'], '') if not good_name: good_name = '' if status_code == 200: # 消费完成后确认消息 r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id) bark_push('%s 添加成功' % good_name, '条形码 %s' % msg['data']) else: logger.error(f"Failed to process message: {msg['data']} {resp}") bark_push('商品添加失败', '条形码 %s' % msg['data']) else: resp, status_code, good_name = consume_product_by_barcode(msg['data']) if status_code == 200: r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id) bark_push('%s 消耗成功' % good_name, '条形码 %s' % msg['data']) else: logger.error(f"Failed to process message: {msg['data']} {resp}") bark_push('商品消耗失败', '条形码 %s' % msg['data']) if consume_product_flag_key: r.delete(consume_product_flag_key) if msg.get('url'): resp, status_code, recipe_name = add_recipe_from_url(msg['url']) if status_code == 200: r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id) bark_push(f'{recipe_name} 添加成功', f'地址 {msg["url"]}') else: logger.error(f"Failed to process message: {msg['url']} {resp}") bark_push('食谱添加失败', '%s 来源 %s' % (resp['message'], msg['url'])) except Exception as e: logger.exception(f"Error processing messages: {str(e)}") # TODO: 定时任务函数:从 Redis Stream 读取消费者组的未确认消息 # def check_unacked_messages() @app.route('/barcode', methods=['POST']) def add_to_stream(): try: # 获取请求中的 JSON 数据 data = request.json if not data: return jsonify({"message": "No data provided"}), 400 if not data.get('data'): return jsonify({"message": "No barcode data provided"}), 400 # 将数据添加到 Redis Stream 中 # 自动生成 ID ('*' 表示让 Redis 生成唯一 ID) stream_id = r.xadd(STREAM_KEY, data) if data['data'].isnumeric(): bark_push('扫码成功', '条形码 %s' % data['data']) else: bark_push('扫码成功', '二维码 %s' % data['data']) return jsonify({"message": "Data added to Redis Stream", "stream_id": stream_id}), 200 except Exception as e: return jsonify({"message": str(e)}), 500 # 初始化 APScheduler 并添加定时任务 scheduler = BackgroundScheduler() scheduler.add_job(func=consume_stream, trigger="interval", seconds=10) scheduler.start() # 确保程序退出时停止定时任务 atexit.register(lambda: scheduler.shutdown()) if __name__ == "__main__": app.run(host="0.0.0.0", port=9288)