import peewee import opendota import datetime from loguru import logger import json import players import utils from image_generator import ImageGenerator import asyncio db = peewee.SqliteDatabase('dota.db') hero_client = opendota.HeroesApi() player_client = opendota.PlayersApi() match_client = opendota.MatchesApi() image_generator = ImageGenerator() class BaseModel(peewee.Model): class Meta: database = db class Hero(BaseModel): hero_id = peewee.IntegerField(primary_key=True) name = peewee.CharField() localized_name = peewee.CharField() primary_attr = peewee.CharField() attack_type = peewee.CharField() roles = peewee.CharField() @classmethod def fetch_heroes(cls): heroes = hero_client.get_heroes() for hero in heroes: cls.get_or_create( hero_id=hero.id, defaults={ 'name': hero.name, 'localized_name': hero.localized_name, 'primary_attr': hero.primary_attr, 'attack_type': hero.attack_type, 'roles': ','.join(hero.roles), } ) class Match(BaseModel): match_id = peewee.IntegerField(primary_key=True) start_time = peewee.DateTimeField() duration = peewee.IntegerField() radiant_win = peewee.BooleanField() party_size = peewee.IntegerField(null=True) opendota_response = peewee.TextField(null=True) def serialize_match(self): if not self.opendota_response: try: match_ = match_client.get_matches_by_match_id(self.match_id) m_dict = match_.to_dict() for player in m_dict['players']: if player['last_login']: # datatime obj to timestamp player['last_login'] = int(player['last_login'].timestamp()) self.opendota_response = json.dumps(m_dict) self.save() except Exception as e: logger.error('fail to get match %s' % self.match_id) raise e md = json.loads(self.opendota_response) match_data = { 'players': [players.serialize_player(player) for player in md['players']], 'dire_score': md['dire_score'], 'radiant_score': md['radiant_score'], # isoformat utc+8 'start_time': datetime.datetime.fromtimestamp(md['start_time']).strftime('%Y-%m-%dT%H:%M:%S.000+08:00'), 'end_time': datetime.datetime.fromtimestamp(md['start_time'] + md['duration']).strftime('%Y-%m-%dT%H:%M:%S.000+08:00'), 'duration': '%d:%02d:%02d' % utils.convert_seconds_to_hms(md['duration']), 'radiant_win': md['radiant_win'], 'party_size': self.party_size, 'match_id': self.match_id, } if not self.party_size: player_account_ids = [player['account_id'] for player in md['players'] if player['account_id']] match_data['party_size'] = Friend.select().where(Friend.steam_id.in_(player_account_ids)).count() return match_data class Friend(BaseModel): steam_id = peewee.IntegerField(primary_key=True) name = peewee.CharField() active = peewee.BooleanField(default=True) def get_recent_matches(self, limit=1): try: return player_client.get_players_by_account_id_select_matches(self.steam_id, limit=limit) except Exception as e: logger.error('fail to get player %s recent matches. error: %s' % (self.steam_id, e)) return [] def serialize_recent_matches(self, limit=1): matches = self.get_recent_matches(limit=limit) data = [] for match_ in matches: data.append({ 'match_id': match_.match_id, 'win': match_.radiant_win == (match_.player_slot < 128), 'is_radiant': match_.player_slot < 128, 'kills': match_.kills, 'deaths': match_.deaths, 'assists': match_.assists, 'party_size': match_.party_size, 'start_time': match_.start_time, 'end_time': match_.start_time + match_.duration, 'duration': match_.duration, 'average_rank': utils.get_ranking(match_.average_rank), 'hero_id': match_.hero_id, }) return data @classmethod def serialize_recent_matches_for_discord(cls, friends, limit=5): # { # "content": "## 水哥的战报\n", # "embeds": [ # { # "description": "3黑 00:34:23", # "fields": [], # "title": "2 杀 5 死 3 助 ", # "color": 6732650, # "url": "https://www.opendota.com/matches/7335993790", # "timestamp": "2023-09-12T16:00:00.000Z" # } # ], # } matches = [] if limit > 10: limit = 10 for friend in friends: matches_ = friend.serialize_recent_matches(limit=limit) if not matches_: continue matches.extend(matches_) if not matches: return None # sort matches by start_time from latest to oldest matches.sort(key=lambda x: x['start_time'], reverse=True) name = friends[0].name data = { 'content': f'## {name}的战报', 'embeds': [], } for match_ in matches[:limit]: duration = '%d:%02d:%02d' % utils.convert_seconds_to_hms(match_['duration']) summary = f"{duration}" if match_['party_size'] == None: if Match.filter(match_id=match_['match_id']).exists(): match_['party_size'] = Match.get(match_id=match_['match_id']).party_size if match_['party_size']: match_['party_size'] = 0 if match_['party_size'] and match_['party_size'] > 1: summary = f"{match_['party_size']}黑 {duration}" elif match_['party_size'] and match_['party_size'] == 1: summary = f"单排 {duration}" if match_['average_rank']: summary += '\n' + match_['average_rank'] # start_time = datetime.datetime.fromtimestamp(match_['start_time']).strftime('%Y-%m-%dT%H:%M:%S.000+08:00') end_time = datetime.datetime.fromtimestamp(match_['end_time']).strftime('%Y-%m-%dT%H:%M:%S.000+08:00') hero_name = utils.get_hero_chinese_name(Hero.get(hero_id=match_['hero_id']).localized_name) data['embeds'].append({ 'title': f"{hero_name} {match_['kills']} 杀 {match_['deaths']} 死 {match_['assists']} 助 ", 'description': summary, 'color': 6732650 if match_['win'] else 16724787, # 66bb6a or FF3333 'fields': [], 'timestamp': end_time, 'url': f"https://www.opendota.com/matches/{match_['match_id']}", }) # 生成图片报告 image_url = None try: # 使用asyncio运行异步函数 image_url = asyncio.run(image_generator.generate_recent_matches_image(name, matches[:limit])) except Exception as e: logger.error(f"生成最近比赛报告图片失败: {str(e)}") # 如果成功生成了图片,添加到第一个embed中 if image_url: data['embeds'].append({ 'image': { 'url': image_url }, 'color': 3447003 # 蓝色 }) return data def get_friends_recent_matches(): matches = [] for friend in Friend.filter(active=True): for match_ in friend.get_recent_matches(): if not Match.select().where(Match.match_id == match_.match_id).exists(): logger.info('create match, match info: %s' % match_.__dict__) match_obj = Match.create( match_id=match_.match_id, start_time=datetime.datetime.fromtimestamp(match_.start_time), duration=match_.duration, radiant_win=match_.radiant_win, party_size=match_.party_size, ) matches.append(match_obj.serialize_match()) return matches def serialize_match_for_discord(match_): # { # "content": "## 天辉\n\n我(LV23 大鱼人): 2 杀 5 死 3 助 | 12345 经济 | 13442 伤害\n我(LV23 大鱼人): 2 杀 5 死 3 助 | 12345 经济 | 13442 伤害\n我(LV23 大鱼人): 2 杀 5 死 3 助 | 12345 经济 | 13442 伤害\n我(LV23 大鱼人): 2 杀 5 死 3 助 | 12345 经济 | 13442 伤害\n我(LV23 大鱼人): 2 杀 5 死 3 助 | 12345 经济 | 13442 伤害\n\n## 夜魇\n\n我(LV23 大鱼人): 2 杀 5 死 3 助 | 12345 经济 | 13442 伤害\n我(LV23 大鱼人): 2 杀 5 死 3 助 | 12345 经济 | 13442 伤害\n我(LV23 大鱼人): 2 杀 5 死 3 助 | 12345 经济 | 13442 伤害\n我(LV23 大鱼人): 2 杀 5 死 3 助 | 12345 经济 | 13442 伤害\n我(LV23 大鱼人): 2 杀 5 死 3 助 | 12345 经济 | 13442 伤害\n", # "tts": false, # "embeds": [ # { # "id": 652627557, # "color": 6732650, # "fields": [ # { # "id": 878517961, # "name": "天辉", # "value": "23", # "inline": true # }, # { # "id": 159867313, # "name": "夜魇 ", # "value": "23", # "inline": true # }, # { # "id": 202767768, # "name": "3黑(我, 受风,小金 )00:34:23 赢", # "value": "" # } # ], # "author": { # "name": "opendota", # "url": "https://www.opendota.com/matches/7335993790" # }, # "timestamp": "2022-01-01T13:22:00.000Z" # } # ], # "components": [], # "actions": {} # } party = [player['nickname'] for player in match_['players'] if player['nickname']] is_radiant = False for player in match_['players']: if player['nickname']: is_radiant = player['is_radiant'] break win = is_radiant == match_['radiant_win'] summary = f"{match_['duration']}" if not match_['party_size']: if Match.filter(match_id=match_['match_id']).exists(): match_['party_size'] = Match.get(match_id=match_['match_id']).party_size else: match_['party_size'] = 0 if match_['party_size'] > 1: summary = f"{match_['party_size']}黑 {match_['duration']}" elif match_['party_size'] == 1: summary = f"单排 {match_['duration']}" radiant = [] dire = [] radiant_highest_gold = 0 radiant_highest_gold_idx = 0 radiant_highest_damage = 0 radiant_highest_damage_idx = 0 dire_highest_gold = 0 dire_highest_gold_idx = 0 dire_highest_damage = 0 dire_highest_damage_idx = 0 for player in match_['players']: player_name = player['personaname'] if player['nickname']: player_name = f"**{player['nickname']}**" desc = f"{player_name}(Lv.**{player['level']}** {utils.get_hero_chinese_name(player['hero'])}): **{player['kills']}** 杀 **{player['deaths']}** 死 **{player['assists']}** 助 | **{utils.shorten_digits(player['total_gold'])}** 经济 | **{utils.shorten_digits(player['hero_damage'])}** 伤害 " if player['is_radiant']: radiant.append(desc) if radiant_highest_gold < player['total_gold']: radiant_highest_gold = player['total_gold'] radiant_highest_gold_idx = len(radiant) - 1 if radiant_highest_damage < player['hero_damage']: radiant_highest_damage = player['hero_damage'] radiant_highest_damage_idx = len(radiant) - 1 else: dire.append(desc) if dire_highest_gold < player['total_gold']: dire_highest_gold = player['total_gold'] dire_highest_gold_idx = len(dire) - 1 if dire_highest_damage < player['hero_damage']: dire_highest_damage = player['hero_damage'] dire_highest_damage_idx = len(dire) - 1 radiant[radiant_highest_gold_idx] = '💰' + radiant[radiant_highest_gold_idx] radiant[radiant_highest_damage_idx] = '🩸' + radiant[radiant_highest_damage_idx] dire[dire_highest_gold_idx] = '💰' + dire[dire_highest_gold_idx] dire[dire_highest_damage_idx] = '🩸'+ dire[dire_highest_damage_idx] color = 6732650 if win else 16724787 # 66bb6a or FF3333 content = '## 天辉\n\n' + '\n'.join(radiant) + '\n\n## 夜魇\n\n' + '\n'.join(dire) + '\n' radiant_indicator = '' dire_indicator = '' if is_radiant: radiant_indicator = '🌟 ' else: dire_indicator = ' 🌟' # 生成比赛报告图片 image_url = None try: # 使用asyncio运行异步函数 image_url = asyncio.run(image_generator.generate_match_report(match_)) except Exception as e: logger.error(f"生成比赛报告图片失败: {str(e)}") data = { "content": content, "tts": False, "embeds": [ { "color": color, "fields": [ { "name": radiant_indicator + "天辉", "value": match_['radiant_score'], "inline": True }, { "name": "夜魇" + dire_indicator, "value": match_['dire_score'], "inline": True }, { "name": summary, "value": f"{','.join(party)}" } ], "author": { "name": "opendota", "url": "https://www.opendota.com/matches/%s" % match_['match_id'] }, "timestamp": match_['end_time'] } ], } # 如果成功生成了图片,添加到embeds中 if image_url: data["embeds"][0]["image"] = { "url": image_url } return data