Ching L bb4ee378d9
All checks were successful
continuous-integration/drone/push Build is passing
feat: add method to recalculate streak from recent matches
- Add recalculate_streak_from_recent_matches to Friend class
- Fetches last 20 matches and recalculates win/loss streaks
- Sorts matches chronologically to ensure accurate calculation
2025-09-11 15:26:18 +08:00

606 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import peewee
import opendota
import datetime
from loguru import logger
import json
import players
import utils
from image_generator import ImageGenerator
import asyncio
db = peewee.SqliteDatabase('dota.db')
hero_client = opendota.HeroesApi()
player_client = opendota.PlayersApi()
match_client = opendota.MatchesApi()
image_generator = ImageGenerator()
# 初始化全局变量,用于存储连胜连败更新
streak_updates = []
class BaseModel(peewee.Model):
class Meta:
database = db
class Hero(BaseModel):
hero_id = peewee.IntegerField(primary_key=True)
name = peewee.CharField()
localized_name = peewee.CharField()
primary_attr = peewee.CharField()
attack_type = peewee.CharField()
roles = peewee.CharField()
@classmethod
def fetch_heroes(cls):
heroes = hero_client.get_heroes()
for hero in heroes:
cls.get_or_create(
hero_id=hero.id,
defaults={
'name': hero.name,
'localized_name': hero.localized_name,
'primary_attr': hero.primary_attr,
'attack_type': hero.attack_type,
'roles': ','.join(hero.roles),
}
)
class Match(BaseModel):
match_id = peewee.IntegerField(primary_key=True)
start_time = peewee.DateTimeField()
duration = peewee.IntegerField()
radiant_win = peewee.BooleanField()
party_size = peewee.IntegerField(null=True)
opendota_response = peewee.TextField(null=True)
def serialize_match(self):
if not self.opendota_response:
try:
match_ = match_client.get_matches_by_match_id(self.match_id)
m_dict = match_.to_dict()
for player in m_dict['players']:
if player['last_login']:
# datatime obj to timestamp
player['last_login'] = int(player['last_login'].timestamp())
self.opendota_response = json.dumps(m_dict)
self.save()
except Exception as e:
logger.error('fail to get match %s' % self.match_id)
raise e
md = json.loads(self.opendota_response)
match_data = {
'players': [players.serialize_player(player) for player in md['players']],
'dire_score': md['dire_score'],
'radiant_score': md['radiant_score'],
# isoformat utc+8
'start_time': datetime.datetime.fromtimestamp(md['start_time']).strftime('%Y-%m-%dT%H:%M:%S.000+08:00'),
'end_time': datetime.datetime.fromtimestamp(md['start_time'] + md['duration']).strftime('%Y-%m-%dT%H:%M:%S.000+08:00'),
'duration': '%d:%02d:%02d' % utils.convert_seconds_to_hms(md['duration']),
'radiant_win': md['radiant_win'],
'party_size': self.party_size,
'match_id': self.match_id,
}
if not self.party_size:
player_account_ids = [player['account_id'] for player in md['players'] if player['account_id']]
match_data['party_size'] = Friend.select().where(Friend.steam_id.in_(player_account_ids)).count()
return match_data
class Friend(BaseModel):
steam_id = peewee.IntegerField(primary_key=True)
name = peewee.CharField()
active = peewee.BooleanField(default=True)
rank_tier = peewee.IntegerField(null=True)
win_streak = peewee.IntegerField(default=0) # 连胜计数
loss_streak = peewee.IntegerField(default=0) # 连败计数
last_match_id = peewee.IntegerField(null=True) # 上一场比赛ID用于避免重复计算
def get_recent_matches(self, limit=1):
try:
return player_client.get_players_by_account_id_select_matches(self.steam_id, limit=limit)
except Exception as e:
logger.error('fail to get player %s recent matches. error: %s' % (self.steam_id, e))
return []
def update_rank_tier(self):
"""Update player's rank tier from OpenDota API"""
try:
player_info = player_client.get_players_by_account_id(self.steam_id)
if player_info and hasattr(player_info, 'rank_tier') and player_info.rank_tier:
old_rank_tier = self.rank_tier
self.rank_tier = player_info.rank_tier
self.save()
return old_rank_tier != self.rank_tier, old_rank_tier
return False, None
except Exception as e:
logger.error(f'Failed to update rank tier for player {self.steam_id}. Error: {e}')
return False, None
def serialize_recent_matches(self, limit=1):
matches = self.get_recent_matches(limit=limit)
data = []
for match_ in matches:
data.append({
'match_id': match_.match_id,
'win': match_.radiant_win == (match_.player_slot < 128),
'is_radiant': match_.player_slot < 128,
'kills': match_.kills,
'deaths': match_.deaths,
'assists': match_.assists,
'party_size': match_.party_size,
'start_time': match_.start_time,
'end_time': match_.start_time + match_.duration,
'duration': match_.duration,
'average_rank': utils.get_ranking(match_.average_rank),
'hero_id': match_.hero_id,
})
return data
@classmethod
async def serialize_recent_matches_for_discord(cls, friends, limit=5):
# {
# "content": "## 水哥的战报\n",
# "embeds": [
# {
# "description": "3黑 00:34:23",
# "fields": [],
# "title": "2 杀 5 死 3 助 ",
# "color": 6732650,
# "url": "https://www.opendota.com/matches/7335993790",
# "timestamp": "2023-09-12T16:00:00.000Z"
# }
# ],
# }
matches = []
# if limit > 10:
# limit = 10
for friend in friends:
matches_ = friend.serialize_recent_matches(limit=limit)
if not matches_:
continue
matches.extend(matches_)
if not matches:
return None
# sort matches by start_time from latest to oldest
matches.sort(key=lambda x: x['start_time'], reverse=True)
name = friends[0].name
data = {
'content': f'## {name}的战报',
'embeds': [],
}
for match_ in matches[:min(limit, 9)]:
duration = '%d:%02d:%02d' % utils.convert_seconds_to_hms(match_['duration'])
summary = f"{duration}"
if match_['party_size'] == None:
if Match.filter(match_id=match_['match_id']).exists():
match_['party_size'] = Match.get(match_id=match_['match_id']).party_size
if match_['party_size']:
match_['party_size'] = 0
if match_['party_size'] and match_['party_size'] > 1:
summary = f"{match_['party_size']}{duration}"
elif match_['party_size'] and match_['party_size'] == 1:
summary = f"单排 {duration}"
if match_['average_rank']:
summary += '\n' + match_['average_rank']
# start_time = datetime.datetime.fromtimestamp(match_['start_time']).strftime('%Y-%m-%dT%H:%M:%S.000+08:00')
end_time = datetime.datetime.fromtimestamp(match_['end_time']).strftime('%Y-%m-%dT%H:%M:%S.000+08:00')
hero_name = utils.get_hero_chinese_name(Hero.get(hero_id=match_['hero_id']).localized_name)
data['embeds'].append({
'title': f"{hero_name} {match_['kills']}{match_['deaths']}{match_['assists']}",
'description': summary,
'color': 6732650 if match_['win'] else 16724787, # 66bb6a or FF3333
'fields': [],
'timestamp': end_time,
'url': f"https://www.opendota.com/matches/{match_['match_id']}",
})
# 生成图片报告
image_url = None
try:
# 直接等待异步函数而不是使用asyncio.run()
image_generator = ImageGenerator()
image_url = await image_generator.generate_recent_matches_image(name, matches[:limit])
except Exception as e:
logger.error(f"生成最近比赛报告图片失败: {str(e)}")
# 如果成功生成了图片添加到最后一个embed中
if image_url:
data['embeds'].append({
'image': {
'url': image_url
},
'color': 3447003 # 蓝色
})
return data
def recalculate_streak_from_recent_matches(self):
"""获取近20场比赛并重新计算连胜连败记录"""
try:
# 获取近20场比赛
recent_matches = self.get_recent_matches(limit=20)
if not recent_matches:
logger.warning(f"No recent matches found for {self.name}")
return False
# 按时间从旧到新排序start_time升序
recent_matches.sort(key=lambda x: x.start_time)
# 重置连胜连败计数
self.win_streak = 0
self.loss_streak = 0
# 从最旧的比赛开始计算连胜连败
for match in recent_matches:
# 判断是否获胜
player_won = match.radiant_win == (match.player_slot < 128)
if player_won:
self.win_streak += 1
self.loss_streak = 0
else:
self.loss_streak += 1
self.win_streak = 0
# 更新最后一场比赛的ID避免重复计算
if recent_matches:
self.last_match_id = recent_matches[-1].match_id
# 保存更新后的数据
self.save()
logger.info(f"Updated streak for {self.name}: {self.win_streak} wins, {self.loss_streak} losses")
return True
except Exception as e:
logger.error(f"Failed to recalculate streak for {self.name}: {e}")
return False
def update_streak(self, match_id, win):
"""更新连胜连败计数"""
# 避免重复计算同一场比赛
if self.last_match_id == match_id:
return None
old_win_streak = self.win_streak
old_loss_streak = self.loss_streak
if win:
self.win_streak += 1
self.loss_streak = 0 # 重置连败
else:
self.loss_streak += 1
self.win_streak = 0 # 重置连胜
self.last_match_id = match_id
self.save()
# 返回连胜连败状态变化信息
result = {
'name': self.name,
'win': win,
'win_streak': self.win_streak,
'loss_streak': self.loss_streak,
'win_streak_broken': not win and old_win_streak >= 3,
'loss_streak_broken': win and old_loss_streak >= 3,
'old_win_streak': old_win_streak,
'old_loss_streak': old_loss_streak
}
return result
def get_friends_recent_matches():
matches = []
global streak_updates # 使用全局变量存储连胜连败更新
streak_updates = []
processed_matches = set() # 记录已处理的比赛ID避免重复处理开黑比赛
active_friends = list(Friend.filter(active=True))
for friend in active_friends:
try:
recent_matches = friend.get_recent_matches()
for match_ in recent_matches:
# 如果这场比赛已经被处理过开黑情况下其他朋友已处理跳过API调用
if match_.match_id in processed_matches:
# 仍需要为当前朋友更新连胜,但不重复创建比赛记录
player_won = match_.radiant_win == (match_.player_slot < 128)
streak_info = friend.update_streak(match_.match_id, player_won)
if streak_info:
streak_updates.append(streak_info)
continue
# 标记此比赛为已处理
processed_matches.add(match_.match_id)
# 判断当前朋友是否获胜
player_won = match_.radiant_win == (match_.player_slot < 128)
# 更新当前朋友的连胜连败
streak_info = friend.update_streak(match_.match_id, player_won)
if streak_info:
streak_updates.append(streak_info)
# 如果是开黑比赛,为其他可能在同一场比赛的朋友也更新连胜
# 但不需要额外的API调用
if match_.party_size and match_.party_size > 1:
# 获取比赛详细信息以找出其他朋友
if not Match.select().where(Match.match_id == match_.match_id).exists():
# 先创建比赛记录,这样可以获取详细信息
match_obj = Match.create(
match_id=match_.match_id,
start_time=datetime.datetime.fromtimestamp(match_.start_time),
duration=match_.duration,
radiant_win=match_.radiant_win,
party_size=match_.party_size,
)
try:
# 获取比赛详细信息
match_obj.serialize_match() # 这会触发获取详细信息
# 解析比赛数据,找出其他朋友
if match_obj.opendota_response:
match_data = json.loads(match_obj.opendota_response)
player_account_ids = [p['account_id'] for p in match_data['players'] if p['account_id']]
# 为其他在这场比赛中的朋友更新连胜
for other_friend in active_friends:
if other_friend != friend and other_friend.steam_id in player_account_ids:
# 找到该朋友在比赛中的信息
for player in match_data['players']:
if player['account_id'] == other_friend.steam_id:
other_player_won = match_.radiant_win == (player['player_slot'] < 128)
other_streak_info = other_friend.update_streak(match_.match_id, other_player_won)
if other_streak_info:
streak_updates.append(other_streak_info)
break
except Exception as e:
logger.error(f'failed to get match details for {match_.match_id}: {e}')
matches.append(match_obj.serialize_match())
else:
# 单排比赛,正常处理
if not Match.select().where(Match.match_id == match_.match_id).exists():
logger.info('create match, match info: %s' % match_.__dict__)
match_obj = Match.create(
match_id=match_.match_id,
start_time=datetime.datetime.fromtimestamp(match_.start_time),
duration=match_.duration,
radiant_win=match_.radiant_win,
party_size=match_.party_size,
)
matches.append(match_obj.serialize_match())
except Exception as e:
logger.error(f'failed to get recent matches for friend {friend.name}: {e}')
continue
return matches
async def serialize_match_for_discord(match_):
# {
# "content": "## 天辉\n\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n\n## 夜魇\n\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n我LV23 大鱼人): 2 杀 5 死 3 助 12345 经济 13442 伤害\n",
# "tts": false,
# "embeds": [
# {
# "id": 652627557,
# "color": 6732650,
# "fields": [
# {
# "id": 878517961,
# "name": "天辉",
# "value": "23",
# "inline": true
# },
# {
# "id": 159867313,
# "name": "夜魇 ",
# "value": "23",
# "inline": true
# },
# {
# "id": 202767768,
# "name": "3黑 受风,小金 00:34:23 赢",
# "value": ""
# }
# ],
# "author": {
# "name": "opendota",
# "url": "https://www.opendota.com/matches/7335993790"
# },
# "timestamp": "2022-01-01T13:22:00.000Z"
# }
# ],
# "components": [],
# "actions": {}
# }
party = [player['nickname'] for player in match_['players'] if player['nickname']]
is_radiant = False
for player in match_['players']:
if player['nickname']:
is_radiant = player['is_radiant']
break
win = is_radiant == match_['radiant_win']
summary = f"{match_['duration']}"
if not match_['party_size']:
if Match.filter(match_id=match_['match_id']).exists():
match_['party_size'] = Match.get(match_id=match_['match_id']).party_size
else:
match_['party_size'] = 0
if match_['party_size'] > 1:
summary = f"{match_['party_size']}{match_['duration']}"
elif match_['party_size'] == 1:
summary = f"单排 {match_['duration']}"
radiant = []
dire = []
radiant_highest_gold = 0
radiant_highest_gold_idx = 0
radiant_highest_damage = 0
radiant_highest_damage_idx = 0
dire_highest_gold = 0
dire_highest_gold_idx = 0
dire_highest_damage = 0
dire_highest_damage_idx = 0
for player in match_['players']:
player_name = player['personaname']
if player['nickname']:
player_name = f"**{player['nickname']}**"
desc = f"{player_name}Lv.**{player['level']}** {utils.get_hero_chinese_name(player['hero'])} **{player['kills']}** 杀 **{player['deaths']}** 死 **{player['assists']}** 助 **{utils.shorten_digits(player['total_gold'])}** 经济 **{utils.shorten_digits(player['hero_damage'])}** 伤害 "
if player['is_radiant']:
radiant.append(desc)
if radiant_highest_gold < player['total_gold']:
radiant_highest_gold = player['total_gold']
radiant_highest_gold_idx = len(radiant) - 1
if radiant_highest_damage < player['hero_damage']:
radiant_highest_damage = player['hero_damage']
radiant_highest_damage_idx = len(radiant) - 1
else:
dire.append(desc)
if dire_highest_gold < player['total_gold']:
dire_highest_gold = player['total_gold']
dire_highest_gold_idx = len(dire) - 1
if dire_highest_damage < player['hero_damage']:
dire_highest_damage = player['hero_damage']
dire_highest_damage_idx = len(dire) - 1
radiant[radiant_highest_gold_idx] = '💰' + radiant[radiant_highest_gold_idx]
radiant[radiant_highest_damage_idx] = '🩸' + radiant[radiant_highest_damage_idx]
dire[dire_highest_gold_idx] = '💰' + dire[dire_highest_gold_idx]
dire[dire_highest_damage_idx] = '🩸'+ dire[dire_highest_damage_idx]
color = 6732650 if win else 16724787 # 66bb6a or FF3333
content = '## 天辉\n\n' + '\n'.join(radiant) + '\n\n## 夜魇\n\n' + '\n'.join(dire) + '\n'
radiant_indicator = ''
dire_indicator = ''
if is_radiant:
radiant_indicator = '🌟 '
else:
dire_indicator = ' 🌟'
# 生成比赛报告图片
image_url = None
try:
# 直接等待异步函数而不是使用asyncio.run()
image_generator = ImageGenerator()
image_url = await image_generator.generate_match_report(match_)
except Exception as e:
logger.error(f"生成比赛报告图片失败: {str(e)}")
data = {
"content": content,
"tts": False,
"embeds": [
{
"color": color,
"fields": [
{
"name": radiant_indicator + "天辉",
"value": match_['radiant_score'],
"inline": True
},
{
"name": "夜魇" + dire_indicator,
"value": match_['dire_score'],
"inline": True
},
{
"name": summary,
"value": f"{''.join(party)}"
}
],
"author": {
"name": "opendota",
"url": "https://www.opendota.com/matches/%s" % match_['match_id']
},
"timestamp": match_['end_time']
}
],
}
# 如果成功生成了图片添加到embeds中
if image_url:
data["embeds"][0]["image"] = {
"url": image_url
}
return data
def check_rank_changes_for_discord():
"""Check for rank changes among all active friends and format for Discord"""
rank_changes = []
for friend in Friend.filter(active=True):
changed, old_rank_tier = friend.update_rank_tier()
if changed and friend.rank_tier is not None:
old_rank = utils.get_ranking(old_rank_tier) if old_rank_tier else "未校准"
new_rank = utils.get_ranking(friend.rank_tier)
rank_changes.append({
'name': friend.name,
'old_rank': old_rank,
'new_rank': new_rank,
'increased': friend.rank_tier > (old_rank_tier or 0)
})
if not rank_changes:
return None
data = {
'content': '## 天梯更新',
'embeds': []
}
for change in rank_changes:
direction = "⬆️ 上升" if change['increased'] else "⬇️ 下降"
color = 6732650 if change['increased'] else 16724787 # Green if increased, red if decreased
data['embeds'].append({
'title': f"{change['name']} 的天梯等级变化",
'description': f"{direction}{change['old_rank']}{change['new_rank']}",
'color': color
})
return data
def check_streaks():
"""检查连胜连败并返回通知消息"""
global streak_updates
notifications = []
for update in streak_updates:
# 连胜达到3场或以上
if update['win_streak'] >= 3:
notifications.append(f"🔥 **{update['name']}** 正在**{update['win_streak']}连胜**")
# 连败达到3场或以上
if update['loss_streak'] >= 3:
notifications.append(f"💔 **{update['name']}** 正在**{update['loss_streak']}连败**")
# 连胜被终结
if update['win_streak_broken'] and update['old_win_streak'] >= 3:
notifications.append(f"⚡ **{update['name']}** 的**{update['old_win_streak']}连胜**被终结了!")
# 连败被终结
if update['loss_streak_broken'] and update['old_loss_streak'] >= 3:
notifications.append(f"🌈 **{update['name']}** 终于结束了**{update['old_loss_streak']}连败**")
# 清空更新列表,避免重复通知
streak_updates = []
return notifications