349 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import peewee
import opendota
import datetime
from loguru import logger
import json
import players
import utils
from image_generator import ImageGenerator
db = peewee.SqliteDatabase('dota.db')
hero_client = opendota.HeroesApi()
player_client = opendota.PlayersApi()
match_client = opendota.MatchesApi()
image_generator = ImageGenerator()
class BaseModel(peewee.Model):
class Meta:
database = db
class Hero(BaseModel):
hero_id = peewee.IntegerField(primary_key=True)
name = peewee.CharField()
localized_name = peewee.CharField()
primary_attr = peewee.CharField()
attack_type = peewee.CharField()
roles = peewee.CharField()
@classmethod
def fetch_heroes(cls):
heroes = hero_client.get_heroes()
for hero in heroes:
cls.get_or_create(
hero_id=hero.id,
defaults={
'name': hero.name,
'localized_name': hero.localized_name,
'primary_attr': hero.primary_attr,
'attack_type': hero.attack_type,
'roles': ','.join(hero.roles),
}
)
class Match(BaseModel):
match_id = peewee.IntegerField(primary_key=True)
start_time = peewee.DateTimeField()
duration = peewee.IntegerField()
radiant_win = peewee.BooleanField()
party_size = peewee.IntegerField(null=True)
opendota_response = peewee.TextField(null=True)
def serialize_match(self):
if not self.opendota_response:
try:
match_ = match_client.get_matches_by_match_id(self.match_id)
m_dict = match_.to_dict()
for player in m_dict['players']:
if player['last_login']:
# datatime obj to timestamp
player['last_login'] = int(player['last_login'].timestamp())
self.opendota_response = json.dumps(m_dict)
self.save()
except Exception as e:
logger.error('fail to get match %s' % self.match_id)
raise e
md = json.loads(self.opendota_response)
match_data = {
'players': [players.serialize_player(player) for player in md['players']],
'dire_score': md['dire_score'],
'radiant_score': md['radiant_score'],
# isoformat utc+8
'start_time': datetime.datetime.fromtimestamp(md['start_time']).strftime('%Y-%m-%dT%H:%M:%S.000+08:00'),
'end_time': datetime.datetime.fromtimestamp(md['start_time'] + md['duration']).strftime('%Y-%m-%dT%H:%M:%S.000+08:00'),
'duration': '%d:%02d:%02d' % utils.convert_seconds_to_hms(md['duration']),
'radiant_win': md['radiant_win'],
'party_size': self.party_size,
'match_id': self.match_id,
}
if not self.party_size:
player_account_ids = [player['account_id'] for player in md['players'] if player['account_id']]
match_data['party_size'] = Friend.select().where(Friend.steam_id.in_(player_account_ids)).count()
return match_data
class Friend(BaseModel):
steam_id = peewee.IntegerField(primary_key=True)
name = peewee.CharField()
active = peewee.BooleanField(default=True)
def get_recent_matches(self, limit=1):
try:
return player_client.get_players_by_account_id_select_matches(self.steam_id, limit=limit)
except Exception as e:
logger.error('fail to get player %s recent matches. error: %s' % (self.steam_id, e))
return []
def serialize_recent_matches(self, limit=1):
matches = self.get_recent_matches(limit=limit)
data = []
for match_ in matches:
data.append({
'match_id': match_.match_id,
'win': match_.radiant_win == (match_.player_slot < 128),
'is_radiant': match_.player_slot < 128,
'kills': match_.kills,
'deaths': match_.deaths,
'assists': match_.assists,
'party_size': match_.party_size,
'start_time': match_.start_time,
'end_time': match_.start_time + match_.duration,
'duration': match_.duration,
'average_rank': utils.get_ranking(match_.average_rank),
'hero_id': match_.hero_id,
})
return data
@classmethod
def serialize_recent_matches_for_discord(cls, friends, limit=5):
# {
# "content": "## 水哥的战报\n",
# "embeds": [
# {
# "description": "3黑 00:34:23",
# "fields": [],
# "title": "2 杀 5 死 3 助 ",
# "color": 6732650,
# "url": "https://www.opendota.com/matches/7335993790",
# "timestamp": "2023-09-12T16:00:00.000Z"
# }
# ],
# }
matches = []
if limit > 10:
limit = 10
for friend in friends:
matches_ = friend.serialize_recent_matches(limit=limit)
if not matches_:
continue
matches.extend(matches_)
if not matches:
return None
# sort matches by start_time from latest to oldest
matches.sort(key=lambda x: x['start_time'], reverse=True)
name = friends[0].name
data = {
'content': f'## {name}的战报',
'embeds': [],
}
for match_ in matches[:limit]:
duration = '%d:%02d:%02d' % utils.convert_seconds_to_hms(match_['duration'])
summary = f"{duration}"
if match_['party_size'] == None:
if Match.filter(match_id=match_['match_id']).exists():
match_['party_size'] = Match.get(match_id=match_['match_id']).party_size
if match_['party_size']:
match_['party_size'] = 0
if match_['party_size'] and match_['party_size'] > 1:
summary = f"{match_['party_size']}{duration}"
elif match_['party_size'] and match_['party_size'] == 1:
summary = f"单排 {duration}"
if match_['average_rank']:
summary += '\n' + match_['average_rank']
# start_time = datetime.datetime.fromtimestamp(match_['start_time']).strftime('%Y-%m-%dT%H:%M:%S.000+08:00')
end_time = datetime.datetime.fromtimestamp(match_['end_time']).strftime('%Y-%m-%dT%H:%M:%S.000+08:00')
hero_name = Hero.get(hero_id=match_['hero_id']).localized_name
data['embeds'].append({
'title': f"{hero_name} {match_['kills']}{match_['deaths']}{match_['assists']}",
'description': summary,
'color': 6732650 if match_['win'] else 16724787, # 66bb6a or FF3333
'fields': [],
'timestamp': end_time,
'url': f"https://www.opendota.com/matches/{match_['match_id']}",
})
return data
def get_friends_recent_matches():
matches = []
for friend in Friend.filter(active=True):
for match_ in friend.get_recent_matches():
if not Match.select().where(Match.match_id == match_.match_id).exists():
logger.info('create match, match info: %s' % match_.__dict__)
match_obj = Match.create(
match_id=match_.match_id,
start_time=datetime.datetime.fromtimestamp(match_.start_time),
duration=match_.duration,
radiant_win=match_.radiant_win,
party_size=match_.party_size,
)
matches.append(match_obj.serialize_match())
return matches
def serialize_match_for_discord(match_):
# {
# "content": "## 天辉\n\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n\n## 夜魇\n\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n",
# "tts": false,
# "embeds": [
# {
# "id": 652627557,
# "color": 6732650,
# "fields": [
# {
# "id": 878517961,
# "name": "天辉",
# "value": "23",
# "inline": true
# },
# {
# "id": 159867313,
# "name": "夜魇 ",
# "value": "23",
# "inline": true
# },
# {
# "id": 202767768,
# "name": "3黑 受风,小金 00:34:23 赢",
# "value": ""
# }
# ],
# "author": {
# "name": "opendota",
# "url": "https://www.opendota.com/matches/7335993790"
# },
# "timestamp": "2022-01-01T13:22:00.000Z"
# }
# ],
# "components": [],
# "actions": {}
# }
party = [player['nickname'] for player in match_['players'] if player['nickname']]
is_radiant = False
for player in match_['players']:
if player['nickname']:
is_radiant = player['is_radiant']
break
win = is_radiant == match_['radiant_win']
summary = f"{match_['duration']}"
if not match_['party_size']:
if Match.filter(match_id=match_['match_id']).exists():
match_['party_size'] = Match.get(match_id=match_['match_id']).party_size
else:
match_['party_size'] = 0
if match_['party_size'] > 1:
summary = f"{match_['party_size']}{match_['duration']}"
elif match_['party_size'] == 1:
summary = f"单排 {match_['duration']}"
radiant = []
dire = []
radiant_highest_gold = 0
radiant_highest_gold_idx = 0
radiant_highest_damage = 0
radiant_highest_damage_idx = 0
dire_highest_gold = 0
dire_highest_gold_idx = 0
dire_highest_damage = 0
dire_highest_damage_idx = 0
for player in match_['players']:
player_name = player['personaname']
if player['nickname']:
player_name = f"**{player['nickname']}**"
desc = f"{player_name}Lv.**{player['level']}** {utils.get_hero_chinese_name(player['hero'])} **{player['kills']}** 杀 **{player['deaths']}** 死 **{player['assists']}** 助 **{utils.shorten_digits(player['total_gold'])}** 经济 **{utils.shorten_digits(player['hero_damage'])}** 伤害 "
if player['is_radiant']:
radiant.append(desc)
if radiant_highest_gold < player['total_gold']:
radiant_highest_gold = player['total_gold']
radiant_highest_gold_idx = len(radiant) - 1
if radiant_highest_damage < player['hero_damage']:
radiant_highest_damage = player['hero_damage']
radiant_highest_damage_idx = len(radiant) - 1
else:
dire.append(desc)
if dire_highest_gold < player['total_gold']:
dire_highest_gold = player['total_gold']
dire_highest_gold_idx = len(dire) - 1
if dire_highest_damage < player['hero_damage']:
dire_highest_damage = player['hero_damage']
dire_highest_damage_idx = len(dire) - 1
radiant[radiant_highest_gold_idx] = '💰' + radiant[radiant_highest_gold_idx]
radiant[radiant_highest_damage_idx] = '🩸' + radiant[radiant_highest_damage_idx]
dire[dire_highest_gold_idx] = '💰' + dire[dire_highest_gold_idx]
dire[dire_highest_damage_idx] = '🩸'+ dire[dire_highest_damage_idx]
color = 6732650 if win else 16724787 # 66bb6a or FF3333
content = '## 天辉\n\n' + '\n'.join(radiant) + '\n\n## 夜魇\n\n' + '\n'.join(dire) + '\n'
radiant_indicator = ''
dire_indicator = ''
if is_radiant:
radiant_indicator = '🌟 '
else:
dire_indicator = ' 🌟'
# 生成比赛报告图片
try:
image_url = image_generator.generate_match_report(match_)
except Exception as e:
logger.error(f"生成比赛报告图片失败: {str(e)}")
image_url = None
data = {
"content": content,
"tts": False,
"embeds": [
{
"color": color,
"fields": [
{
"name": radiant_indicator + "天辉",
"value": match_['radiant_score'],
"inline": True
},
{
"name": "夜魇" + dire_indicator,
"value": match_['dire_score'],
"inline": True
},
{
"name": summary,
"value": f"{''.join(party)}"
}
],
"author": {
"name": "opendota",
"url": "https://www.opendota.com/matches/%s" % match_['match_id']
},
"timestamp": match_['end_time']
}
],
}
# 如果成功生成了图片添加到embeds中
if image_url:
data["embeds"][0]["image"] = {
"url": image_url
}
return data