All checks were successful
continuous-integration/drone Build is passing
- Added win field to serialize_match_for_discord return data - Display victory/defeat emoji and text at start of match messages - Extract win status directly from data instead of inferring from embed color
193 lines
6.4 KiB
Python
193 lines
6.4 KiB
Python
import discord
|
||
from discord.ext import tasks
|
||
from loguru import logger
|
||
|
||
import dota
|
||
import utils
|
||
|
||
import sentry_sdk
|
||
|
||
import datetime
|
||
import asyncio
|
||
|
||
sentry_sdk.init(
|
||
dsn="https://272f1e4ecb3847ac8d24be796515e558@o4506942768021504.ingest.us.sentry.io/4506986058743808",
|
||
)
|
||
|
||
|
||
# formatter = logging.Formatter('%(levelname)s %(name)s %(asctime)s %(message)s', '%Y-%m-%d %H:%M:%S')
|
||
# log_handler = logging.FileHandler(utils.logger_file)
|
||
# log_handler.setFormatter(formatter)
|
||
# logger = logging.getLogger(__name__)
|
||
# logger.addHandler(log_handler)
|
||
# logger.propagate = False
|
||
|
||
logger.info('start bot')
|
||
|
||
# bot = discord.Bot(proxy='http://127.0.0.1:1235')
|
||
bot = discord.Bot()
|
||
|
||
channel_id = 1152167937852055552
|
||
|
||
@tasks.loop(minutes=1)
|
||
async def send_message(channel):
|
||
if utils.is_game_time():
|
||
send_message.change_interval(minutes=2)
|
||
else:
|
||
send_message.change_interval(minutes=15)
|
||
|
||
try:
|
||
matches = dota.get_friends_recent_matches()
|
||
# 获取连胜连败消息
|
||
streak_notifications = dota.check_streaks()
|
||
except Exception as e:
|
||
logger.error(f"Error in send_message task: {e}")
|
||
return
|
||
|
||
# 用于标记是否是第一场比赛
|
||
first_match = True
|
||
|
||
for match_ in matches:
|
||
data = await dota.serialize_match_for_discord(match_)
|
||
logger.info(f"sending match {match_['match_id']}, {data}")
|
||
try:
|
||
# 从embed中提取朋友信息,并添加胜负状态
|
||
friends_info = ""
|
||
win_status = "✅ **胜利** " if data.get('win') else "❌ **失败** "
|
||
|
||
if data['embeds'] and len(data['embeds']) > 0:
|
||
embed = data['embeds'][0]
|
||
if 'fields' in embed and len(embed['fields']) > 2:
|
||
# 第三个field包含了朋友信息
|
||
field = embed['fields'][2]
|
||
if 'value' in field and field['value']:
|
||
friends_info = f"{win_status}**{field['value']}** 的比赛:\n\n"
|
||
|
||
# 将朋友信息放在内容开头,连胜连败消息只在第一场比赛时添加
|
||
content = data['content']
|
||
if first_match and streak_notifications:
|
||
streak_msg = '\n'.join(streak_notifications) + '\n\n'
|
||
content = friends_info + streak_msg + content
|
||
first_match = False # 标记已经处理过第一场比赛
|
||
else:
|
||
content = friends_info + content
|
||
|
||
# 发送比赛结果
|
||
await channel.send(content=content, embeds=[discord.Embed.from_dict(embed) for embed in data['embeds']])
|
||
|
||
except Exception as e:
|
||
logger.error(f"send match error {e}")
|
||
|
||
@bot.command(description="获取最近战绩", name='recent_matches')
|
||
async def get_friends_recent_matches(ctx, name, match_count=5):
|
||
await ctx.defer()
|
||
logger.info(f"get_friends_recent_matches {name} {match_count}")
|
||
friends = dota.Friend.filter(name=name)
|
||
match_count = int(match_count)
|
||
if friends.count() == 0:
|
||
await ctx.respond(content=f'找不到 {name} 的信息')
|
||
return
|
||
data = await dota.Friend.serialize_recent_matches_for_discord(friends, match_count)
|
||
if not data:
|
||
await ctx.respond(content=f'找不到 {name} 的战绩')
|
||
return
|
||
try:
|
||
await ctx.respond(content=data['content'], embeds=[discord.Embed.from_dict(embed) for embed in data['embeds']])
|
||
except Exception as e:
|
||
logger.error(f"send recent_matches error {e}")
|
||
|
||
@bot.command(description='获取朋友', name='list_friends')
|
||
async def get_friends(ctx):
|
||
logger.info(f'get_friends')
|
||
friends = dota.Friend.select()
|
||
msg = ''
|
||
if friends.count() == 0:
|
||
msg = '没有朋友'
|
||
else:
|
||
for friend in friends:
|
||
msg += f'{friend.name} {friend.steam_id} {friend.active}\n'
|
||
await ctx.respond(content=msg)
|
||
|
||
@bot.command(description='修改朋友信息', name='mod_friend')
|
||
async def mod_friend(ctx, steam_id, name):
|
||
logger.info(f'mod_friend {steam_id} {name}')
|
||
friend = dota.Friend.get_or_none(steam_id=steam_id)
|
||
if friend:
|
||
friend.name = name
|
||
friend.save()
|
||
await ctx.respond(content=f'修改成功 {steam_id} {name}')
|
||
else:
|
||
await ctx.respond(content=f'找不到 {steam_id}')
|
||
|
||
@bot.command(description='添加朋友', name='add_friend')
|
||
async def add_friend(ctx, steam_id, name):
|
||
logger.info(f'add_friend {steam_id} {name}')
|
||
friend = dota.Friend.get_or_none(steam_id=steam_id)
|
||
if friend:
|
||
await ctx.respond(content=f'已经存在 {steam_id} {name}')
|
||
else:
|
||
friend = dota.Friend.create(steam_id=steam_id, name=name, active=True)
|
||
await ctx.respond(content=f'添加成功 {steam_id} {name}')
|
||
|
||
@bot.command(description='禁用朋友', name='deactivate_friend')
|
||
async def deactivate_friend(ctx, steam_id):
|
||
logger.info(f'deactivate_friend {steam_id}')
|
||
friend = dota.Friend.get_or_none(steam_id=steam_id)
|
||
if friend:
|
||
friend.active = False
|
||
friend.save()
|
||
await ctx.respond(content=f'禁用成功 {steam_id}')
|
||
else:
|
||
await ctx.respond(content=f'找不到 {steam_id}')
|
||
|
||
@bot.command(description='启用朋友', name='activate_friend')
|
||
async def activate_friend(ctx, steam_id):
|
||
logger.info(f'activate_friend {steam_id}')
|
||
friend = dota.Friend.get_or_none(steam_id=steam_id)
|
||
if friend:
|
||
friend.active = True
|
||
friend.save()
|
||
await ctx.respond(content=f'启用成功 {steam_id}')
|
||
else:
|
||
await ctx.respond(content=f'找不到 {steam_id}')
|
||
|
||
@tasks.loop(minutes=1)
|
||
async def heartbeat():
|
||
utils.heartbeat()
|
||
|
||
@tasks.loop(hours=24)
|
||
async def check_rank_changes(channel):
|
||
logger.info("Checking for rank changes")
|
||
try:
|
||
data = await dota.check_rank_changes_for_discord()
|
||
if data:
|
||
logger.info(f"Sending rank changes: {data}")
|
||
await channel.send(content=data['content'], embeds=[discord.Embed.from_dict(embed) for embed in data['embeds']])
|
||
except Exception as e:
|
||
logger.error(f"Error checking rank changes: {e}")
|
||
sentry_sdk.capture_exception(e)
|
||
|
||
@check_rank_changes.before_loop
|
||
async def before_check_rank_changes():
|
||
# 等待到晚上9点再开始第一次运行
|
||
now = datetime.datetime.now()
|
||
target_time = now.replace(hour=21, minute=0, second=0, microsecond=0)
|
||
if now >= target_time:
|
||
target_time = target_time + datetime.timedelta(days=1)
|
||
|
||
seconds_until_target = (target_time - now).total_seconds()
|
||
await asyncio.sleep(seconds_until_target)
|
||
|
||
@bot.event
|
||
async def on_ready():
|
||
logger.info(f"We have logged in as {bot.user}")
|
||
|
||
channel = bot.get_channel(channel_id)
|
||
send_message.start(channel)
|
||
heartbeat.start()
|
||
|
||
# 启动天梯检查任务
|
||
check_rank_changes.start(channel)
|
||
|
||
bot.run('MTE1MjE2NTc3NDMwNDIyMzI2Mg.GEi-17.VvuIkRy_cFD9XF6wtTagY95LKEbTxKaxy-FxGw') # 这里替换成你自己的 token
|