from flask import Flask, request, jsonify, send_from_directory from datetime import datetime, timedelta, timezone from zoneinfo import ZoneInfo import sqlite3 import os import json app = Flask(__name__, static_folder='static') DB_PATH = 'din.db' # 东八区时区 TZ = ZoneInfo('Asia/Shanghai') def now_tz(): """获取东八区当前时间""" return datetime.now(TZ) def init_db(): """初始化数据库""" conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute(''' CREATE TABLE IF NOT EXISTS din_records ( id INTEGER PRIMARY KEY AUTOINCREMENT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, content TEXT, year INTEGER, month INTEGER, week INTEGER, day INTEGER ) ''') conn.commit() conn.close() def get_db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def get_week_number(date): """获取ISO周数""" return date.isocalendar()[1] # ========== API 路由 ========== @app.route('/api/din', methods=['POST']) def create_din(): """创建新的 din 记录""" data = request.get_json() or {} content = data.get('content', '').strip() now = now_tz() conn = get_db() c = conn.cursor() c.execute(''' INSERT INTO din_records (created_at, content, year, month, week, day) VALUES (?, ?, ?, ?, ?, ?) ''', (now, content, now.year, now.month, get_week_number(now), now.day)) conn.commit() record_id = c.lastrowid conn.close() return jsonify({ 'id': record_id, 'created_at': now.isoformat(), 'content': content }) @app.route('/api/din', methods=['GET']) def get_dins(): """获取 din 记录列表""" limit = request.args.get('limit', 100, type=int) conn = get_db() c = conn.cursor() c.execute(''' SELECT * FROM din_records ORDER BY created_at DESC LIMIT ? ''', (limit,)) records = [dict(row) for row in c.fetchall()] conn.close() return jsonify(records) @app.route('/api/din/', methods=['PUT']) def update_din(record_id): """更新 din 记录内容""" data = request.get_json() or {} content = data.get('content', '').strip() conn = get_db() c = conn.cursor() c.execute('UPDATE din_records SET content = ? WHERE id = ?', (content, record_id)) conn.commit() conn.close() return jsonify({'success': True}) @app.route('/api/din/', methods=['DELETE']) def delete_din(record_id): """删除 din 记录""" conn = get_db() c = conn.cursor() c.execute('DELETE FROM din_records WHERE id = ?', (record_id,)) conn.commit() conn.close() return jsonify({'success': True}) @app.route('/api/stats', methods=['GET']) def get_stats(): """获取统计数据""" conn = get_db() c = conn.cursor() now = now_tz() today = now.day this_week = get_week_number(now) this_month = now.month this_year = now.year # 今日统计 c.execute('SELECT COUNT(*) FROM din_records WHERE year = ? AND day = ?', (this_year, today)) today_count = c.fetchone()[0] # 本周统计 c.execute('SELECT COUNT(*) FROM din_records WHERE year = ? AND week = ?', (this_year, this_week)) week_count = c.fetchone()[0] # 本月统计 c.execute('SELECT COUNT(*) FROM din_records WHERE year = ? AND month = ?', (this_year, this_month)) month_count = c.fetchone()[0] # 总计 c.execute('SELECT COUNT(*) FROM din_records') total_count = c.fetchone()[0] # 昨日同比 yesterday = now - timedelta(days=1) c.execute('SELECT COUNT(*) FROM din_records WHERE year = ? AND day = ?', (yesterday.year, yesterday.day)) yesterday_count = c.fetchone()[0] # 上周同比 last_week = now - timedelta(weeks=1) c.execute('SELECT COUNT(*) FROM din_records WHERE year = ? AND week = ?', (last_week.year, get_week_number(last_week))) last_week_count = c.fetchone()[0] # 上月同比 last_month = now.replace(month=this_month-1) if this_month > 1 else now.replace(year=this_year-1, month=12) c.execute('SELECT COUNT(*) FROM din_records WHERE year = ? AND month = ?', (last_month.year, last_month.month)) last_month_count = c.fetchone()[0] conn.close() return jsonify({ 'today': today_count, 'week': week_count, 'month': month_count, 'total': total_count, 'yesterday': yesterday_count, 'last_week': last_week_count, 'last_month': last_month_count, 'day_growth': calculate_growth(today_count, yesterday_count), 'week_growth': calculate_growth(week_count, last_week_count), 'month_growth': calculate_growth(month_count, last_month_count) }) def calculate_growth(current, previous): """计算增长率""" if previous == 0: return 100 if current > 0 else 0 return round((current - previous) / previous * 100, 1) # 加载成就配置 ACHIEVEMENTS_FILE = 'achievements.json' def load_achievements_config(): """加载成就配置""" if os.path.exists(ACHIEVEMENTS_FILE): with open(ACHIEVEMENTS_FILE, 'r', encoding='utf-8') as f: return json.load(f) return [] def check_achievement(ach, stats): """检查单个成就是否解锁""" cond = ach.get('condition', {}) cond_type = cond.get('type', '') if cond_type == 'total': return stats['total'] >= cond.get('min', 0) elif cond_type == 'streak': return stats['streak'] >= cond.get('min', 0) elif cond_type == 'day_max': return stats.get('day_max', 0) >= cond.get('min', 0) elif cond_type == 'content_count': return stats.get('content_count', 0) >= cond.get('min', 0) elif cond_type == 'early_time': return stats.get('has_early', False) elif cond_type == 'late_time': return stats.get('has_late', False) return False def get_achievement_stats(conn): """获取成就统计信息""" c = conn.cursor() stats = {} # 总计 c.execute('SELECT COUNT(*) FROM din_records') stats['total'] = c.fetchone()[0] # 连续天数 c.execute('SELECT DISTINCT year, day FROM din_records ORDER BY created_at DESC') days = c.fetchall() stats['streak'] = calculate_streak(days) # 单日最高记录 c.execute(''' SELECT COUNT(*) as cnt FROM din_records GROUP BY year, day ORDER BY cnt DESC LIMIT 1 ''') result = c.fetchone() stats['day_max'] = result[0] if result else 0 # 有描述的记录数 c.execute('SELECT COUNT(*) FROM din_records WHERE content IS NOT NULL AND content != ""') stats['content_count'] = c.fetchone()[0] # 早晚记录检查(东八区) # SQLite datetime 存储的是 UTC,需要 +8 小时转换为东八区 c.execute('SELECT created_at FROM din_records') rows = c.fetchall() hours = [] for row in rows: try: # 解析时间并转换为东八区 dt_str = row[0] if 'T' in dt_str: # ISO format with timezone dt = datetime.fromisoformat(dt_str.replace('Z', '+00:00')) else: # SQLite default format dt = datetime.strptime(dt_str, '%Y-%m-%d %H:%M:%S') dt = dt.replace(tzinfo=timezone.utc) # 转换为东八区 dt_cn = dt.astimezone(TZ) hours.append(dt_cn.hour) except Exception as e: print(f"Parse time error: {e}, value: {dt_str}") continue stats['has_early'] = any(h < 6 for h in hours) stats['has_late'] = any(h >= 23 for h in hours) return stats @app.route('/api/achievements', methods=['GET']) def get_achievements(): """获取成就数据""" conn = get_db() # 获取统计数据 stats = get_achievement_stats(conn) conn.close() # 加载成就配置 config = load_achievements_config() # 检查每个成就 achievements = [] for ach in config: achievements.append({ 'id': ach['id'], 'name': ach['name'], 'desc': ach['desc'], 'icon': ach['icon'], 'unlocked': check_achievement(ach, stats) }) return jsonify({ 'achievements': achievements, 'unlocked_count': sum(1 for a in achievements if a['unlocked']), 'total_count': len(achievements), 'current_streak': stats['streak'] }) def calculate_streak(days): """计算连续天数""" if not days: return 0 # 转换为 date 对象 date_set = set() for row in days: try: d = datetime(row['year'], 1, 1, tzinfo=TZ) + timedelta(days=row['day']-1) date_set.add(d.date()) except: continue sorted_days = sorted(date_set, reverse=True) if not sorted_days: return 0 streak = 1 today = now_tz().date() # 检查今天或昨天是否有记录 if sorted_days[0] != today and (today - sorted_days[0]).days > 1: return 0 for i in range(1, len(sorted_days)): if (sorted_days[i-1] - sorted_days[i]).days == 1: streak += 1 else: break return streak # ========== 静态文件 ========== @app.route('/') def index(): return send_from_directory('static', 'index.html') @app.route('/') def static_files(path): return send_from_directory('static', path) if __name__ == '__main__': init_db() app.run(host='0.0.0.0', port=5000, debug=True)