# coding = utf-8 import json import os import re from apscheduler.schedulers.background import BackgroundScheduler from flask import Flask, jsonify, request, render_template from pygrocy import EntityType, Grocy import atexit import base64 import mimetypes import loguru import redis import requests from barcode import BarcodeSpider, ShowApiSpider from recipe import get_recipe_from_xiachufang import models logger = loguru.logger # config = configparser.ConfigParser() # config.read('config.ini') # GROCY_URL = config.get('Grocy', 'GROCY_URL') # GROCY_PORT = config.getint('Grocy', 'GROCY_PORT') # GROCY_API = config.get('Grocy', 'GROCY_API') # GROCY_DEFAULT_QUANTITY_UNIT_ID = config.getint('Grocy', 'GROCY_DEFAULT_QUANTITY_UNIT_ID') # GROCY_DEFAULT_BEST_BEFORE_DAYS = config.get('Grocy', 'GROCY_DEFAULT_BEST_BEFORE_DAYS') # GROCY_LOCATION = {} # for key in config['GrocyLocation']: # GROCY_LOCATION[key] = config.get('GrocyLocation', key) # X_RapidAPI_Key = config.get('RapidAPI', 'X_RapidAPI_Key') # get config from environment GROCY_URL = os.environ.get("GROCY_URL") GROCY_API_KEY = os.environ.get("GROCY_API_KEY") GROCY_PORT = os.environ.get("GROCY_PORT") GROCY_DEFAULT_QUANTITY_UNIT_ID_PURCHASE = os.environ.get("GROCY_DEFAULT_QUANTITY_UNIT_ID_PURCHASE") GROCY_DEFAULT_QUANTITY_UNIT_ID_STOCK = os.environ.get("GROCY_DEFAULT_QUANTITY_UNIT_ID_STOCK") GROCY_DEFAULT_BEST_BEFORE_DAYS = os.environ.get("GROCY_DEFAULT_BEST_BEFORE_DAYS") X_RapidAPI_Key = os.environ.get("X_RapidAPI_Key") ShowApi_AppKey = os.environ.get("ShowApi_AppKey") REDIS_HOST = os.environ.get("REDIS_HOST") REDIS_PORT = os.environ.get("REDIS_PORT") BARK_DEVICE_KEY = os.environ.get("BARK_DEVICE_KEY") app = Flask(__name__) grocy = Grocy(GROCY_URL, GROCY_API_KEY, GROCY_PORT, verify_ssl=True) # 连接到 Redis r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True) # 定义 Redis Stream 的键 STREAM_KEY = "message_stream" CONSUMER_GROUP = "consumer_group_1" CONSUMER_NAME = "consumer_1" # 创建 Redis 消费者组(如果不存在) def create_consumer_group(): try: r.xgroup_create(STREAM_KEY, CONSUMER_GROUP, id='0', mkstream=True) except redis.exceptions.ResponseError as e: if "BUSYGROUP" in str(e): # 消费者组已存在,不报错 pass create_consumer_group() def get_locations(): locations = grocy.get_generic_objects_for_type(EntityType.LOCATIONS) return locations def bark_push(title, text): icon = 'https://styles.redditmedia.com/t5_10dd8f/styles/communityIcon_rwl05b14iveb1.png' data = { "title": title, "body": text, "device_key": BARK_DEVICE_KEY, "icon": icon, "group": "Grocy", "url": "" } requests.post('http://bark.tunpok.com/push/', json=data) def add_product(dict_good, location): good_name = "" if 'goodsName' in dict_good: good_name = dict_good['goodsName'] elif "description" in dict_good: good_name = dict_good["description"] elif "description_cn" in dict_good: good_name = dict_good["description_cn"] if not good_name: return False, good_name if 'spec' in dict_good: good_name = good_name + " - " + dict_good['spec'] elif 'specification' in dict_good: good_name = good_name + " - " + dict_good['specification'] locations = get_locations() if not location: location = locations[0]['name'] location_map = {item['name']: item['id'] for item in locations} data_grocy = { "name": good_name, "description": "", "location_id": location_map[location], "qu_id_purchase": GROCY_DEFAULT_QUANTITY_UNIT_ID_PURCHASE, "qu_id_stock": GROCY_DEFAULT_QUANTITY_UNIT_ID_STOCK, "default_best_before_days": GROCY_DEFAULT_BEST_BEFORE_DAYS, "default_consume_location_id": location_map[location], "move_on_open": "1", } if ("gpc" in dict_good) and dict_good["gpc"]: best_before_days, location_ = gpc_best_before_days_and_location( int(dict_good["gpc"]), locations) if best_before_days: data_grocy["default_best_before_days"] = best_before_days if location_: data_grocy["location_id"] = location_ data_grocy["default_consume_location_id"] = location_ # add product response_grocy = grocy.add_generic(EntityType.PRODUCTS, data_grocy) # # add gds info grocy.set_userfields( EntityType.PRODUCTS, int(response_grocy["created_object_id"]), "GDSInfo", json.dumps(dict_good, ensure_ascii=False), ) # add barcode, ex. 06921168593910 barcode = None if 'gtin' in dict_good: barcode = dict_good['gtin'] elif 'code' in dict_good: barcode = dict_good['code'] data_barcode = { "product_id": int(response_grocy["created_object_id"]), "barcode": barcode, "amount": 1.0, } grocy.add_generic(EntityType.PRODUCT_BARCODES, data_barcode) # add barcode, EAN-13, ex. 6921168593910 if barcode.startswith("0"): data_barcode = { "product_id": int(response_grocy["created_object_id"]), "barcode": barcode.lstrip("0"), "amount": 1.0, } grocy.add_generic(EntityType.PRODUCT_BARCODES, data_barcode) # add picture pic_url = "" if ("picfilename" in dict_good) and dict_good["picfilename"]: pic_url = dict_good["picfilename"] elif ("picture_filename" in dict_good) and dict_good["picture_filename"]: pic_url = dict_good["picture_filename"] elif 'img' in dict_good and dict_good['img']: pic_url = dict_good['img'] if pic_url: try: response_img = requests.get( pic_url, { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36" }, ) if response_img.status_code == 200: image_data = response_img.content with open("img.png", "wb") as o: # output_data = remove(image_data) o.write(image_data) grocy.add_product_pic(int(response_grocy["created_object_id"]), "img.png") except requests.exceptions.RequestException as err: print("Request error:", err) grocy.add_product_by_barcode(barcode, 1.0, 0.0) return True, good_name def gpc_best_before_days_and_location(Code, locations): """ 保质期(天) 类别 7 50370000(中类,鲜切水果或蔬菜), 50380000(中类,鲜切水果或蔬菜), 50350000(中类,未处理或未加工的(新鲜)叶菜类蔬菜) 14 50250000(中类,未处理或未加工的(新鲜)水果), 10000025(细类,牛奶(易腐坏)), 10006970(细类,牛奶替代品(易腐坏)), 10000278(细类,酸奶(易腐坏)), 10006979(细类,酸奶替代品(易腐坏)) 152 50270000(中类,未处理或未加工的冷冻水果), 50310000(中类,未处理或未加工的耐储存水果) 305 94000000(中类,粮食作物), 50000000(大类,食品、饮料和烟草), 10120000(中类,宠物护理用品或食品组合装), 10110000(中类,宠物食品或饮品) 1005 53000000(大类,美容、个人护理和卫生用品), 47100000(中类,清洁产品), 47190000(中类,清洁和卫生产品组合装), 51000000(大类,医疗保健), 10100000(中类,宠物护理用品) """ with open("gpc_brick_code.json") as json_file: gpc_data = json.load(json_file) best_before_days = {} best_before_days["7"] = [ 50370000, 50380000, 50350000, ] best_before_days["14"] = [ 50250000, 10000025, 10006970, 10000278, 10006979, ] best_before_days["152"] = [ 50270000, 50310000, ] best_before_days["305"] = [ 94000000, 50000000, 10120000, 10110000, ] best_before_days["670"] = [] best_before_days["1005"] = [ 53000000, 47100000, 47190000, 51000000, 10100000, ] location_map = {item['name']: item['id'] for item in locations} for item in gpc_data["Schema"]: if item["Code"] == Code: title = item["Title-1"] location = locations[0]['name'] if title == 'Food/Beverage/Tobacco': if item['Title-2'] in [ 'Bread/Bakery Products', 'Confectionery/Sugar Sweetening Products', ]: location = '零食柜' else: location = '厨房' elif title in [ 'Beauty/Personal Care/Hygiene', 'Cleaning/Hygiene Products', ]: location = '浴室' elif title == 'Healthcare': location = '药柜' codes = [item["Code"], item["Code-1"], item["Code-2"], item["Code-3"]] for day, filter_codes in best_before_days.items(): if any(code in filter_codes for code in codes): return day, location_map[location] return None, location_map[location] def convert_image_to_base64(image_content): base64_image = base64.b64encode(image_content).decode("utf-8") return base64_image def handle_add_product(barcode, location): try: product = grocy.product_by_barcode(barcode) barcode_ = None amount = 1.0 if product: for product_barcode in product.product_barcodes: if product_barcode.barcode == barcode: barcode_ = product_barcode break if barcode_: if hasattr(barcode_, "amount") and barcode_.amount: amount = barcode_.amount grocy.add_product_by_barcode(barcode, amount, 0.0) response_data = {"message": "Item added successfully"} return response_data, 200, product.name except: spiders = [] spiders.append(ShowApiSpider(ShowApi_AppKey.split(","))) spiders.append(BarcodeSpider(x_rapidapi_key=X_RapidAPI_Key)) for spider in spiders: good = spider.get_good(barcode) if good: break if not good: response_data = {"message": "Item not found"} return response_data, 400, None try: added, good_name = add_product(good, location) if added: response_data = {"message": "New item added successfully"} return response_data, 200, good_name else: response_data = {"message": "Fail to add new item"} return response_data, 400, None except Exception as e: if hasattr(e, "message"): error_message = e.message else: error_message = str(e) response_data = {"message": error_message} return response_data, 400, None @app.route("/") def index(): return "Up and running!" @app.route("/add", methods=["POST"]) def add(): data = request.json location = data.get("location", "") barcode = data.get("barcode", "") response_data, status_code = handle_add_product(barcode, location) return jsonify(response_data), status_code @app.route("/consume", methods=["POST"]) def consume(): try: data = request.json barcode = data.get("barcode", "") grocy.consume_product_by_barcode(barcode) response_data = {"message": "Item removed successfully"} return jsonify(response_data), 200 except Exception as e: error_message = str(e) response_data = {"message": error_message} return jsonify(response_data), 400 def upload_image(url): # download image url = url.split('?')[0] image_name = url.split('/')[-1] img = requests.get(url) mime_type = mimetypes.guess_type(image_name)[0] if not mime_type: mime_type = 'image/jpeg' # compress image compress_url = 'https://upload-tinypng.tunpok.com/upload/' files = {'file': ('img.jpeg', img.content, mime_type,)} resp = requests.post(compress_url, files=files, ) if resp.status_code == 200: return resp.json()['cdn_url'] return None def add_recipe_from_url(url): try: # https://www.xiachufang.com/recipe/107141278/?recipe_type=1&page_scene=6 recipe_number = re.search(r'(\d+)', url).group(1) resp = grocy.get_generic_objects_for_type(EntityType.RECIPES, ['name~%s' % recipe_number]) if resp: response_data = {"message": "Recipe already exists"} return jsonify(response_data), 400, None xcf_recipe = get_recipe_from_xiachufang(url) description = "" + '来源' + "" description += "
" if xcf_recipe['ingredients']: description += "

用料

\n" description += "\n" if xcf_recipe['steps']: description += "

做法

\n" for idx, step in enumerate(xcf_recipe["steps"]): description += f"

步骤 {idx+1}

\n" description += f'

{step[0]}

\n' if len(step) == 1: description += "
" continue # img = requests.get(step[1]) # img_data = 'data:image/png;base64,' + convert_image_to_base64(img.content) img = upload_image(step[1]) if img: img_data = img else: img = requests.get(step[1]) img_data = 'data:image/png;base64,' + convert_image_to_base64(img.content) description += f'\n\n\n' description += "
" data_grocy = { "name": xcf_recipe["name"] + " - " + recipe_number, "description": description } grocy.add_generic(EntityType.RECIPES, data_grocy) response_data = {"message": f"{data_grocy['name']}"} return response_data, 200, data_grocy['name'] except Exception as e: error_message = str(e) response_data = {"message": error_message} return response_data, 400, None @app.route("/add_recipe", methods=["POST"]) def add_recipe(): data = request.json source = data.get("source", "xiachufang") url = data.get("url", "") if source != "xiachufang": response_data = {"message": "Invalid source"} return jsonify(response_data), 400 if not url: response_data = {"message": "Invalid url"} return jsonify(response_data), 400 stream_id = r.xadd(STREAM_KEY, data) return jsonify({"message": "Data added to Redis Stream", "stream_id": stream_id}), 200 def consume_product_by_barcode(barcode): try: product = grocy.product_by_barcode(barcode) if not product: response_data = {"message": "Item not found"} return response_data, 400, None grocy.consume_product_by_barcode(barcode) response_data = {"message": "Item removed successfully"} return response_data, 200, product.name except Exception as e: error_message = str(e) response_data = {"message": error_message} return response_data, 400, None # 定时任务函数:从 Redis Stream 读取未消费的消息 def consume_stream(): consume_product_flag_key = 'consume-product-flag' continuous_consume_flag_key = 'continuous-consume-flag' add_product_flag_key = 'add-product-flag' try: # 读取 Stream 中的消息,设置阻塞时间为 0 messages = r.xreadgroup(CONSUMER_GROUP, CONSUMER_NAME, {STREAM_KEY: '>'}, count=10, block=1000) if messages: for stream, msgs in messages: for msg_id, msg in msgs: # 打印或处理消息 if msg.get('url'): logger.info(f"Processing recipe message: {msg.get('url')}") elif msg.get('data'): logger.info(f"Processing message: {msg.get('data')}") else: logger.info(f"Processing message: {msg}") if msg.get('data'): if not msg['data'].isnumeric(): if msg['data'] == consume_product_flag_key: # 如果消息内容为 'consume-product',则设置消耗商品标志位 logger.info(f"Setting consume-product flag") r.set(consume_product_flag_key, '1', ex=60 * 5) elif msg['data'] == continuous_consume_flag_key: # 如果消息内容为 'continuous-consume',则设置连续消耗标志位 logger.info(f"Setting continuous-consume flag") r.set(continuous_consume_flag_key, '1', ex=60 * 5) elif msg['data'] == add_product_flag_key: # 如果消息内容为 'add-product',则设置添加商品标志位 logger.info(f"Setting add-product flag") r.delete(consume_product_flag_key) r.delete(continuous_consume_flag_key) else: logger.info(f"Skip non-numeric barcode: {msg['data']}") continue consume_product_flag = r.get(consume_product_flag_key) continuous_consume_flag = r.get(continuous_consume_flag_key) if not (consume_product_flag or continuous_consume_flag): resp, status_code, good_name = handle_add_product(msg['data'], '') if not good_name: good_name = '' if status_code == 200: # 消费完成后确认消息 r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id) bark_push('%s 添加成功' % good_name, '条形码 %s' % msg['data']) else: logger.error(f"Failed to process message: {msg['data']} {resp}") bark_push('商品添加失败', '条形码 %s \n %s' % (msg['data'], resp.get('message'))) else: resp, status_code, good_name = consume_product_by_barcode(msg['data']) if status_code == 200: r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id) bark_push('%s 消耗成功' % good_name, '条形码 %s' % msg['data']) else: logger.error(f"Failed to process message: {msg['data']} {resp}") bark_push('商品消耗失败', '条形码 %s \n' % (msg['data'], resp.get('message'))) if consume_product_flag_key: r.delete(consume_product_flag_key) if msg.get('url'): resp, status_code, recipe_name = add_recipe_from_url(msg['url']) if status_code == 200: r.xack(STREAM_KEY, CONSUMER_GROUP, msg_id) bark_push(f'{recipe_name} 添加成功', f'地址 {msg["url"]}') else: logger.error(f"Failed to process message: {msg['url']} {resp}") bark_push('食谱添加失败', '%s 来源 %s' % (resp['message'], msg['url'])) except Exception as e: logger.exception(f"Error processing messages: {str(e)}") # TODO: 定时任务函数:从 Redis Stream 读取消费者组的未确认消息 # def check_unacked_messages() @app.route('/barcode', methods=['POST']) def add_to_stream(): try: # 获取请求中的 JSON 数据 data = request.json if not data: return jsonify({"message": "No data provided"}), 400 if not data.get('data'): return jsonify({"message": "No barcode data provided"}), 400 # 将数据添加到 Redis Stream 中 # 自动生成 ID ('*' 表示让 Redis 生成唯一 ID) stream_id = r.xadd(STREAM_KEY, data) if data['data'].isnumeric(): models.ScanLog.add_log(data['data']) bark_push('扫码成功', '条形码 %s' % data['data']) else: bark_push('扫码成功', '二维码 %s' % data['data']) return jsonify({"message": "Data added to Redis Stream", "stream_id": stream_id}), 200 except Exception as e: return jsonify({"message": str(e)}), 500 @app.route('/scanlogs/today', methods=['GET']) def get_today_scanlogs(): try: # 获取今天的所有日志并序列化 logs = models.ScanLog.get_today_logs() serialized_logs = [log.serialize() for log in logs] return jsonify({ 'logs': serialized_logs, 'count': len(serialized_logs) }), 200 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/logs') def show_logs(): return render_template('logs.html') # 初始化 APScheduler 并添加定时任务 scheduler = BackgroundScheduler() scheduler.add_job(func=consume_stream, trigger="interval", seconds=10) scheduler.start() # 确保程序退出时停止定时任务 atexit.register(lambda: scheduler.shutdown()) if __name__ == "__main__": app.run(host="0.0.0.0", port=9288)