from playwright.async_api import async_playwright from jinja2 import Environment, FileSystemLoader import utils import json import os import datetime from loguru import logger class ImageGenerator: def __init__(self): self.env = Environment(loader=FileSystemLoader('templates')) # 加载英雄数据 with open('heroes.json', 'r', encoding='utf-8') as f: self.heroes_data = json.load(f) def get_hero_image_url(self, hero_id): """根据英雄id获取图片URL""" hero_info = self.heroes_data.get(str(hero_id)) if hero_info: return f"https://cdn.cloudflare.steamstatic.com{hero_info['img']}" return None async def generate_match_report(self, match_data): """ 根据match_data生成比赛报告图片 match_data应该是dota.py中serialize_match方法返回的格式 """ # 处理数据,标记最高经济和最高伤害的玩家 radiant_players = [] dire_players = [] # 分离天辉和夜魇玩家 for player in match_data['players']: player_data = { 'name': player['personaname'] if player['personaname'] else '-', 'is_friend': bool(player.get('nickname', '')), 'hero': player['hero'], 'level': player['level'], 'kills': player['kills'], 'deaths': player['deaths'], 'assists': player['assists'], 'total_gold': player['total_gold'], 'hero_damage': player['hero_damage'], 'hero_img': self.get_hero_image_url(player['hero_id']), 'highest_gold': False, 'highest_damage': False, 'rank': player.get('rank'), 'rank_tier': player.get('rank_tier'), } # 如果是好友,使用昵称 if player.get('nickname'): player_data['name'] = player['nickname'] # 转换英雄名称为中文 player_data['hero'] = utils.get_hero_chinese_name(player['hero']) if player['is_radiant']: radiant_players.append(player_data) else: dire_players.append(player_data) # 找出天辉最高经济和伤害 if radiant_players: max_gold_player = max(radiant_players, key=lambda p: int(p['total_gold'])) max_damage_player = max(radiant_players, key=lambda p: int(p['hero_damage'])) for player in radiant_players: if player == max_gold_player: player['highest_gold'] = True if player == max_damage_player: player['highest_damage'] = True # 找出夜魇最高经济和伤害 if dire_players: max_gold_player = max(dire_players, key=lambda p: int(p['total_gold'])) max_damage_player = max(dire_players, key=lambda p: int(p['hero_damage'])) for player in dire_players: if player == max_gold_player: player['highest_gold'] = True if player == max_damage_player: player['highest_damage'] = True # 短数字 for player in radiant_players: player['total_gold'] = utils.shorten_digits(player['total_gold']) player['hero_damage'] = utils.shorten_digits(player['hero_damage']) for player in dire_players: player['total_gold'] = utils.shorten_digits(player['total_gold']) player['hero_damage'] = utils.shorten_digits(player['hero_damage']) end_time = match_data.get('end_time', '') # '2025-03-05T01:04:30.000+08:00' get 01:04:30 end_time = end_time.split('T')[1].split('.')[0] # 准备模板数据 template_data = { 'radiant_players': radiant_players, 'dire_players': dire_players, 'radiant_score': match_data.get('radiant_score', 0), 'dire_score': match_data.get('dire_score', 0), 'duration': match_data.get('duration', 0), 'radiant_win': match_data.get('radiant_win', False), 'start_time': match_data.get('start_time', ''), 'end_time': end_time, } # 渲染模板 template = self.env.get_template('match_report.html') html_content = template.render(**template_data) # 使用Playwright生成图片 async with async_playwright() as playwright: browser = await playwright.chromium.launch() page = await browser.new_page() await page.set_content(html_content) await page.set_viewport_size({"width": 800, "height": 800}) # 等待内容完全加载 await page.wait_for_timeout(1000) # 调整截图高度以适应内容 body_height = await page.evaluate('document.body.scrollHeight') body_width = await page.evaluate('document.body.offsetWidth') await page.set_viewport_size({"width": body_width, "height": body_height}) # 设置更高的设备像素比以获得更清晰的图像 await page.evaluate('''() => { window.devicePixelRatio = 2; }''') # 截图 image_path = f"match_report_{match_data.get('match_id')}.png" await page.screenshot(path=image_path, full_page=True) await browser.close() # 上传图片 image_url = utils.upload_image(image_path, image_path) # 删除本地文件 try: os.remove(image_path) except Exception as e: logger.warning(f"删除本地图片文件失败: {str(e)}") return image_url return None async def generate_recent_matches_image(self, player_name, matches): """ 生成玩家最近比赛的图片报告 Args: player_name: 玩家名称 matches: 比赛数据列表 Returns: str: 上传后的图片URL """ # 处理比赛数据 processed_matches = [] for match in matches: # 获取英雄图片 hero_id = str(match['hero_id']) hero_img = f"https://cdn.dota2.com/apps/dota2/images/heroes/{self.heroes_data[hero_id]['name'].replace('npc_dota_hero_', '')}_full.png" # 格式化时间 end_time = datetime.datetime.fromtimestamp(match['end_time']).strftime('%Y-%m-%d %H:%M') # 格式化时长 duration_hms = utils.convert_seconds_to_hms(match['duration']) duration_formatted = f"{duration_hms[0]}:{duration_hms[1]:02d}:{duration_hms[2]:02d}" # 获取英雄中文名 hero_name = utils.get_hero_chinese_name(self.heroes_data[hero_id]['name']) processed_match = { 'win': match['win'], 'kills': match['kills'], 'deaths': match['deaths'], 'assists': match['assists'], 'hero_img': hero_img, 'hero_name': hero_name, 'duration_formatted': duration_formatted, 'end_time_formatted': end_time, 'party_size': match['party_size'], 'average_rank': match['average_rank'] } processed_matches.append(processed_match) # 渲染模板 template = self.env.get_template('recent_matches.html') html_content = template.render( player_name=player_name, matches=processed_matches ) # 使用Playwright生成图片 async with async_playwright() as playwright: browser = await playwright.chromium.launch() page = await browser.new_page() await page.set_content(html_content) await page.set_viewport_size({"width": 800, "height": 800}) # 等待内容完全加载 await page.wait_for_timeout(1000) # 调整截图高度以适应内容 body_height = await page.evaluate('document.body.scrollHeight') body_width = await page.evaluate('document.body.offsetWidth') await page.set_viewport_size({"width": body_width, "height": body_height}) # 设置更高的设备像素比以获得更清晰的图像 await page.evaluate('''() => { window.devicePixelRatio = 2; }''') # 截图 image_path = f"recent_matches_{player_name.replace(' ', '_')}_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}.png" await page.screenshot(path=image_path, full_page=True) await browser.close() # 上传图片 image_url = utils.upload_image(image_path, image_path) # 删除本地文件 try: os.remove(image_path) except Exception as e: logger.warning(f"删除本地图片文件失败: {str(e)}") return image_url