from peewee import * from datetime import datetime, time import json # Create database instance db = SqliteDatabase('scanner.db') class BaseModel(Model): class Meta: database = db class BarcodeDB(BaseModel): barcode = CharField(unique=True) product_info = TextField() class Meta: table_name = 'barcode_db' @classmethod def add_product(cls, barcode, product_info): # if product already exists, update product info product_info = json.dumps(product_info) try: product = cls.get(cls.barcode == barcode) if product.product_info != product_info: product.product_info = product_info product.save() except cls.DoesNotExist: # Add new product cls.create(barcode=barcode, product_info=product_info) class ScanLog(BaseModel): barcode = CharField() scan_at = DateTimeField(default=datetime.now) class Meta: table_name = 'scan_logs' @classmethod def get_today_logs(cls): # Get today's start and end timestamps today = datetime.now().date() today_start = datetime.combine(today, time.min) # Start of day (00:00:00) today_end = datetime.combine(today, time.max) # End of day (23:59:59.999999) # Query logs for today return cls.select().where(cls.scan_at.between(today_start, today_end)) def serialize(self): # Try to get product info from BarcodeDB name = None try: product = BarcodeDB.get(BarcodeDB.barcode == self.barcode) product_info = json.loads(product.product_info) name = '暂无' name_key = ['goodsName', 'goods_name', 'name', 'description_cn', 'description'] for key in name_key: if key in product_info: name = product_info.get(key) break except BarcodeDB.DoesNotExist: pass return { 'barcode': self.barcode, 'name': name, 'scan_at': self.scan_at } @classmethod def add_log(cls, barcode): # Add log to database cls.create(barcode=barcode) # Create tables def create_tables(): with db: db.create_tables([BarcodeDB, ScanLog]) if __name__ == '__main__': create_tables()