424 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import peewee
import opendota
import datetime
from loguru import logger
import json
import players
import utils
from image_generator import ImageGenerator
import asyncio
db = peewee.SqliteDatabase('dota.db')
hero_client = opendota.HeroesApi()
player_client = opendota.PlayersApi()
match_client = opendota.MatchesApi()
image_generator = ImageGenerator()
class BaseModel(peewee.Model):
class Meta:
database = db
class Hero(BaseModel):
hero_id = peewee.IntegerField(primary_key=True)
name = peewee.CharField()
localized_name = peewee.CharField()
primary_attr = peewee.CharField()
attack_type = peewee.CharField()
roles = peewee.CharField()
@classmethod
def fetch_heroes(cls):
heroes = hero_client.get_heroes()
for hero in heroes:
cls.get_or_create(
hero_id=hero.id,
defaults={
'name': hero.name,
'localized_name': hero.localized_name,
'primary_attr': hero.primary_attr,
'attack_type': hero.attack_type,
'roles': ','.join(hero.roles),
}
)
class Match(BaseModel):
match_id = peewee.IntegerField(primary_key=True)
start_time = peewee.DateTimeField()
duration = peewee.IntegerField()
radiant_win = peewee.BooleanField()
party_size = peewee.IntegerField(null=True)
opendota_response = peewee.TextField(null=True)
def serialize_match(self):
if not self.opendota_response:
try:
match_ = match_client.get_matches_by_match_id(self.match_id)
m_dict = match_.to_dict()
for player in m_dict['players']:
if player['last_login']:
# datatime obj to timestamp
player['last_login'] = int(player['last_login'].timestamp())
self.opendota_response = json.dumps(m_dict)
self.save()
except Exception as e:
logger.error('fail to get match %s' % self.match_id)
raise e
md = json.loads(self.opendota_response)
match_data = {
'players': [players.serialize_player(player) for player in md['players']],
'dire_score': md['dire_score'],
'radiant_score': md['radiant_score'],
# isoformat utc+8
'start_time': datetime.datetime.fromtimestamp(md['start_time']).strftime('%Y-%m-%dT%H:%M:%S.000+08:00'),
'end_time': datetime.datetime.fromtimestamp(md['start_time'] + md['duration']).strftime('%Y-%m-%dT%H:%M:%S.000+08:00'),
'duration': '%d:%02d:%02d' % utils.convert_seconds_to_hms(md['duration']),
'radiant_win': md['radiant_win'],
'party_size': self.party_size,
'match_id': self.match_id,
}
if not self.party_size:
player_account_ids = [player['account_id'] for player in md['players'] if player['account_id']]
match_data['party_size'] = Friend.select().where(Friend.steam_id.in_(player_account_ids)).count()
return match_data
class Friend(BaseModel):
steam_id = peewee.IntegerField(primary_key=True)
name = peewee.CharField()
active = peewee.BooleanField(default=True)
rank_tier = peewee.IntegerField(null=True)
def get_recent_matches(self, limit=1):
try:
return player_client.get_players_by_account_id_select_matches(self.steam_id, limit=limit)
except Exception as e:
logger.error('fail to get player %s recent matches. error: %s' % (self.steam_id, e))
return []
def update_rank_tier(self):
"""Update player's rank tier from OpenDota API"""
try:
player_info = player_client.get_players_by_account_id(self.steam_id)
if player_info and hasattr(player_info, 'rank_tier') and player_info.rank_tier:
old_rank_tier = self.rank_tier
self.rank_tier = player_info.rank_tier
self.save()
return old_rank_tier != self.rank_tier, old_rank_tier
return False, None
except Exception as e:
logger.error(f'Failed to update rank tier for player {self.steam_id}. Error: {e}')
return False, None
def serialize_recent_matches(self, limit=1):
matches = self.get_recent_matches(limit=limit)
data = []
for match_ in matches:
data.append({
'match_id': match_.match_id,
'win': match_.radiant_win == (match_.player_slot < 128),
'is_radiant': match_.player_slot < 128,
'kills': match_.kills,
'deaths': match_.deaths,
'assists': match_.assists,
'party_size': match_.party_size,
'start_time': match_.start_time,
'end_time': match_.start_time + match_.duration,
'duration': match_.duration,
'average_rank': utils.get_ranking(match_.average_rank),
'hero_id': match_.hero_id,
})
return data
@classmethod
async def serialize_recent_matches_for_discord(cls, friends, limit=5):
# {
# "content": "## 水哥的战报\n",
# "embeds": [
# {
# "description": "3黑 00:34:23",
# "fields": [],
# "title": "2 杀 5 死 3 助 ",
# "color": 6732650,
# "url": "https://www.opendota.com/matches/7335993790",
# "timestamp": "2023-09-12T16:00:00.000Z"
# }
# ],
# }
matches = []
# if limit > 10:
# limit = 10
for friend in friends:
matches_ = friend.serialize_recent_matches(limit=limit)
if not matches_:
continue
matches.extend(matches_)
if not matches:
return None
# sort matches by start_time from latest to oldest
matches.sort(key=lambda x: x['start_time'], reverse=True)
name = friends[0].name
data = {
'content': f'## {name}的战报',
'embeds': [],
}
for match_ in matches[:min(limit, 9)]:
duration = '%d:%02d:%02d' % utils.convert_seconds_to_hms(match_['duration'])
summary = f"{duration}"
if match_['party_size'] == None:
if Match.filter(match_id=match_['match_id']).exists():
match_['party_size'] = Match.get(match_id=match_['match_id']).party_size
if match_['party_size']:
match_['party_size'] = 0
if match_['party_size'] and match_['party_size'] > 1:
summary = f"{match_['party_size']}{duration}"
elif match_['party_size'] and match_['party_size'] == 1:
summary = f"单排 {duration}"
if match_['average_rank']:
summary += '\n' + match_['average_rank']
# start_time = datetime.datetime.fromtimestamp(match_['start_time']).strftime('%Y-%m-%dT%H:%M:%S.000+08:00')
end_time = datetime.datetime.fromtimestamp(match_['end_time']).strftime('%Y-%m-%dT%H:%M:%S.000+08:00')
hero_name = utils.get_hero_chinese_name(Hero.get(hero_id=match_['hero_id']).localized_name)
data['embeds'].append({
'title': f"{hero_name} {match_['kills']}{match_['deaths']}{match_['assists']}",
'description': summary,
'color': 6732650 if match_['win'] else 16724787, # 66bb6a or FF3333
'fields': [],
'timestamp': end_time,
'url': f"https://www.opendota.com/matches/{match_['match_id']}",
})
# 生成图片报告
image_url = None
try:
# 直接等待异步函数而不是使用asyncio.run()
image_generator = ImageGenerator()
image_url = await image_generator.generate_recent_matches_image(name, matches[:limit])
except Exception as e:
logger.error(f"生成最近比赛报告图片失败: {str(e)}")
# 如果成功生成了图片添加到最后一个embed中
if image_url:
data['embeds'].append({
'image': {
'url': image_url
},
'color': 3447003 # 蓝色
})
return data
def get_friends_recent_matches():
matches = []
for friend in Friend.filter(active=True):
for match_ in friend.get_recent_matches():
if not Match.select().where(Match.match_id == match_.match_id).exists():
logger.info('create match, match info: %s' % match_.__dict__)
match_obj = Match.create(
match_id=match_.match_id,
start_time=datetime.datetime.fromtimestamp(match_.start_time),
duration=match_.duration,
radiant_win=match_.radiant_win,
party_size=match_.party_size,
)
matches.append(match_obj.serialize_match())
return matches
async def serialize_match_for_discord(match_):
# {
# "content": "## 天辉\n\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n\n## 夜魇\n\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n",
# "tts": false,
# "embeds": [
# {
# "id": 652627557,
# "color": 6732650,
# "fields": [
# {
# "id": 878517961,
# "name": "天辉",
# "value": "23",
# "inline": true
# },
# {
# "id": 159867313,
# "name": "夜魇 ",
# "value": "23",
# "inline": true
# },
# {
# "id": 202767768,
# "name": "3黑 受风,小金 00:34:23 赢",
# "value": ""
# }
# ],
# "author": {
# "name": "opendota",
# "url": "https://www.opendota.com/matches/7335993790"
# },
# "timestamp": "2022-01-01T13:22:00.000Z"
# }
# ],
# "components": [],
# "actions": {}
# }
party = [player['nickname'] for player in match_['players'] if player['nickname']]
is_radiant = False
for player in match_['players']:
if player['nickname']:
is_radiant = player['is_radiant']
break
win = is_radiant == match_['radiant_win']
summary = f"{match_['duration']}"
if not match_['party_size']:
if Match.filter(match_id=match_['match_id']).exists():
match_['party_size'] = Match.get(match_id=match_['match_id']).party_size
else:
match_['party_size'] = 0
if match_['party_size'] > 1:
summary = f"{match_['party_size']}{match_['duration']}"
elif match_['party_size'] == 1:
summary = f"单排 {match_['duration']}"
radiant = []
dire = []
radiant_highest_gold = 0
radiant_highest_gold_idx = 0
radiant_highest_damage = 0
radiant_highest_damage_idx = 0
dire_highest_gold = 0
dire_highest_gold_idx = 0
dire_highest_damage = 0
dire_highest_damage_idx = 0
for player in match_['players']:
player_name = player['personaname']
if player['nickname']:
player_name = f"**{player['nickname']}**"
desc = f"{player_name}Lv.**{player['level']}** {utils.get_hero_chinese_name(player['hero'])} **{player['kills']}** 杀 **{player['deaths']}** 死 **{player['assists']}** 助 **{utils.shorten_digits(player['total_gold'])}** 经济 **{utils.shorten_digits(player['hero_damage'])}** 伤害 "
if player['is_radiant']:
radiant.append(desc)
if radiant_highest_gold < player['total_gold']:
radiant_highest_gold = player['total_gold']
radiant_highest_gold_idx = len(radiant) - 1
if radiant_highest_damage < player['hero_damage']:
radiant_highest_damage = player['hero_damage']
radiant_highest_damage_idx = len(radiant) - 1
else:
dire.append(desc)
if dire_highest_gold < player['total_gold']:
dire_highest_gold = player['total_gold']
dire_highest_gold_idx = len(dire) - 1
if dire_highest_damage < player['hero_damage']:
dire_highest_damage = player['hero_damage']
dire_highest_damage_idx = len(dire) - 1
radiant[radiant_highest_gold_idx] = '💰' + radiant[radiant_highest_gold_idx]
radiant[radiant_highest_damage_idx] = '🩸' + radiant[radiant_highest_damage_idx]
dire[dire_highest_gold_idx] = '💰' + dire[dire_highest_gold_idx]
dire[dire_highest_damage_idx] = '🩸'+ dire[dire_highest_damage_idx]
color = 6732650 if win else 16724787 # 66bb6a or FF3333
content = '## 天辉\n\n' + '\n'.join(radiant) + '\n\n## 夜魇\n\n' + '\n'.join(dire) + '\n'
radiant_indicator = ''
dire_indicator = ''
if is_radiant:
radiant_indicator = '🌟 '
else:
dire_indicator = ' 🌟'
# 生成比赛报告图片
image_url = None
try:
# 直接等待异步函数而不是使用asyncio.run()
image_generator = ImageGenerator()
image_url = await image_generator.generate_match_report(match_)
except Exception as e:
logger.error(f"生成比赛报告图片失败: {str(e)}")
data = {
"content": content,
"tts": False,
"embeds": [
{
"color": color,
"fields": [
{
"name": radiant_indicator + "天辉",
"value": match_['radiant_score'],
"inline": True
},
{
"name": "夜魇" + dire_indicator,
"value": match_['dire_score'],
"inline": True
},
{
"name": summary,
"value": f"{''.join(party)}"
}
],
"author": {
"name": "opendota",
"url": "https://www.opendota.com/matches/%s" % match_['match_id']
},
"timestamp": match_['end_time']
}
],
}
# 如果成功生成了图片添加到embeds中
if image_url:
data["embeds"][0]["image"] = {
"url": image_url
}
return data
def check_rank_changes_for_discord():
"""Check for rank changes among all active friends and format for Discord"""
rank_changes = []
for friend in Friend.filter(active=True):
changed, old_rank_tier = friend.update_rank_tier()
if changed and friend.rank_tier is not None:
old_rank = utils.get_ranking(old_rank_tier) if old_rank_tier else "未校准"
new_rank = utils.get_ranking(friend.rank_tier)
rank_changes.append({
'name': friend.name,
'old_rank': old_rank,
'new_rank': new_rank,
'increased': friend.rank_tier > (old_rank_tier or 0)
})
if not rank_changes:
return None
data = {
'content': '## 天梯更新',
'embeds': []
}
for change in rank_changes:
direction = "⬆️ 上升" if change['increased'] else "⬇️ 下降"
color = 6732650 if change['increased'] else 16724787 # Green if increased, red if decreased
data['embeds'].append({
'title': f"{change['name']} 的天梯等级变化",
'description': f"{direction}{change['old_rank']}{change['new_rank']}",
'color': color
})
return data