dota2-match-calendar/sync_dota2_matches.py
Ching L 56a79c8f9d
All checks were successful
continuous-integration/drone/push Build is passing
改进TBD比赛处理机制 v3.6
- 放宽TBD事件匹配时间窗口从5分钟到1小时
- 新增自动删除过期TBD事件功能(超过2小时的过期事件)
- 删除不再需要的清理脚本 (cleanup_duplicates.py, delete_duplicates.py)
- 更新文档说明新功能

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-08 10:09:20 +08:00

1080 lines
47 KiB
Python

#!/usr/bin/env python3
"""
Dota 2 Tournament Calendar Sync v3
Fetches Tier 1 Dota 2 matches from Liquipedia and syncs them to Google Calendar
Features:
- Sync upcoming matches
- Update completed match results
- Update match times if they change
"""
import requests
from bs4 import BeautifulSoup
from google.oauth2 import service_account
from googleapiclient.discovery import build
from datetime import datetime, timedelta
import pytz
import re
import hashlib
import sys
import argparse
import time
class Dota2CalendarSync:
def __init__(self, credentials_file='credentials.json', calendar_id='primary'):
self.credentials_file = credentials_file
self.calendar_id = calendar_id
self.service = self._authenticate()
def _authenticate(self):
"""Authenticate with Google Calendar using service account credentials"""
try:
credentials = service_account.Credentials.from_service_account_file(
self.credentials_file,
scopes=['https://www.googleapis.com/auth/calendar']
)
service = build('calendar', 'v3', credentials=credentials)
print(f"✓ Successfully authenticated with Google Calendar")
return service
except Exception as e:
print(f"✗ Authentication failed: {e}")
sys.exit(1)
def fetch_all_matches(self):
"""Fetch both upcoming and completed matches from Liquipedia"""
url = 'https://liquipedia.net/dota2/Liquipedia:Matches'
headers = {
'User-Agent': 'Dota2CalendarSync/3.0 (https://github.com/youruser/dota2-calendar)'
}
print(f"Fetching matches from Liquipedia...")
try:
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'lxml')
matches = []
# Find all timestamps (these contain match info)
timestamps = soup.find_all('span', {'data-timestamp': True})
for timestamp_elem in timestamps:
parent = timestamp_elem.find_parent('div')
if not parent:
continue
text_content = parent.get_text()
# Check if this is a Tier 1 match
is_tier1 = any(tier in text_content for tier in [
'TI2025', 'The International', 'Major', 'Premier',
'Tier 1', 'DreamLeague', 'ESL One', 'PGL Major'
])
if is_tier1:
match_data = self._parse_match(parent, timestamp_elem)
if match_data:
matches.append(match_data)
# Remove duplicates
matches = self._remove_duplicates(matches)
# Separate upcoming and completed matches
upcoming = [m for m in matches if not m.get('completed', False)]
completed = [m for m in matches if m.get('completed', False)]
print(f"✓ Found {len(upcoming)} upcoming matches")
print(f"✓ Found {len(completed)} completed matches with results")
return upcoming, completed
except requests.RequestException as e:
print(f"✗ Error fetching Liquipedia data: {e}")
return [], []
def _parse_match(self, parent, timestamp_elem):
"""Parse match data from an element using HTML structure"""
try:
match_data = {}
# Get timestamp
timestamp = timestamp_elem.get('data-timestamp')
if timestamp:
match_data['datetime'] = datetime.fromtimestamp(int(timestamp), tz=pytz.UTC)
else:
return None
# Extract team names from HTML structure
team_blocks = parent.find_all('div', class_='block-team')
if len(team_blocks) >= 2:
# Get team names - prefer span.name over a tag (a tag might be empty icon link)
team1_elem = team_blocks[0].find('span', class_='name')
if not team1_elem or not team1_elem.get_text().strip():
# Try finding any a tag with text
for a_tag in team_blocks[0].find_all('a'):
if a_tag.get_text().strip():
team1_elem = a_tag
break
team2_elem = team_blocks[1].find('span', class_='name')
if not team2_elem or not team2_elem.get_text().strip():
# Try finding any a tag with text
for a_tag in team_blocks[1].find_all('a'):
if a_tag.get_text().strip():
team2_elem = a_tag
break
if team1_elem and team2_elem:
match_data['team1'] = self._clean_team_name(team1_elem.get_text().strip())
match_data['team2'] = self._clean_team_name(team2_elem.get_text().strip())
# If team blocks not found, try fallback
if 'team1' not in match_data:
# Fallback to text parsing
text = parent.get_text()
# Look for "vs" pattern
vs_match = re.search(r'([A-Za-z0-9\s\.\-_]+?)\s*vs\s*([A-Za-z0-9\s\.\-_]+)', text)
if vs_match:
team1_raw = vs_match.group(1).strip()
team2_raw = vs_match.group(2).strip()
# Clean up team names
team1_raw = re.sub(r'^.*CEST?', '', team1_raw).strip()
match_data['team1'] = self._clean_team_name(team1_raw)
match_data['team2'] = self._clean_team_name(team2_raw)
# Extract score from HTML structure
has_score = False
score_match = None
# Look for score in structured elements
score_holder = parent.find('div', class_='match-info-header-scoreholder')
if score_holder:
score_elems = score_holder.find_all('span', class_='match-info-header-scoreholder-score')
if len(score_elems) >= 2:
try:
score1 = int(score_elems[0].get_text().strip())
score2 = int(score_elems[1].get_text().strip())
if 0 <= score1 <= 5 and 0 <= score2 <= 5 and (score1 + score2) > 0:
has_score = True
match_data['score'] = f"{score1}-{score2}"
score_match = True # Use as flag
except ValueError:
pass
# If score not found in structure, try text pattern
if not has_score:
text = parent.get_text()
# Only look for dash pattern (not colon) to avoid matching time
score_pattern = re.search(r'(\d{1,2})-(\d{1,2})', text)
if score_pattern:
score1 = int(score_pattern.group(1))
score2 = int(score_pattern.group(2))
# Validate it's a reasonable game score
if 0 <= score1 <= 5 and 0 <= score2 <= 5 and (score1 + score2) > 0:
# Additional check: make sure this isn't part of a date (e.g., 2025-01-14)
surrounding_text = text[max(0, score_pattern.start()-5):score_pattern.end()+5]
if not re.search(r'\d{4}-\d{1,2}-\d{1,2}', surrounding_text):
has_score = True
match_data['score'] = f"{score1}-{score2}"
score_match = score_pattern
# Extract format (Bo1, Bo3, Bo5)
format_elem = parent.find('span', class_='match-info-header-scoreholder-lower')
if format_elem:
format_text = format_elem.get_text().strip()
format_match = re.search(r'(Bo\d)', format_text)
if format_match:
match_data['format'] = format_match.group(1)
else:
# Fallback to text search
text = parent.get_text()
format_match = re.search(r'\(?(Bo\d)\)?', text)
if format_match:
match_data['format'] = format_match.group(1)
# Extract tournament from HTML structure
tournament_elem = parent.find('div', class_='match-info-tournament')
if tournament_elem:
tournament_text = tournament_elem.get_text().strip()
# Clean up tournament name
tournament_text = re.sub(r'\+ Add details.*', '', tournament_text).strip()
if 'TI2025' in tournament_text:
match_data['tournament'] = 'The International 2025'
round_match = re.search(r'Round\s+\d+', tournament_text)
if round_match:
match_data['tournament'] += f" - {round_match.group(0)}"
else:
match_data['tournament'] = tournament_text
else:
# Fallback to text search
text = parent.get_text()
if 'TI2025' in text:
match_data['tournament'] = 'The International 2025'
round_match = re.search(r'Round\s+\d+', text)
if round_match:
match_data['tournament'] += f" - {round_match.group(0)}"
elif 'Major' in text:
major_match = re.search(r'[\w\s]+Major', text)
if major_match:
match_data['tournament'] = major_match.group(0).strip()
# Mark if has score and if completed
if has_score:
# TBD vs TBD matches should NEVER be marked as having a score or completed
if match_data.get('team1') == 'TBD' and match_data.get('team2') == 'TBD':
has_score = False
match_data['completed'] = False
match_data['has_score'] = False
# Remove any incorrectly parsed score
if 'score' in match_data:
del match_data['score']
else:
# Score already set above, extract score values
score_parts = re.match(r'(\d+)-(\d+)', match_data['score'])
if score_parts:
score1 = int(score_parts.group(1))
score2 = int(score_parts.group(2))
else:
score1 = score2 = 0
# Check if series is actually completed based on format
series_completed = False
if 'format' in match_data:
if 'Bo3' in match_data['format']:
# Bo3 is complete when someone reaches 2 wins
series_completed = (score1 >= 2 or score2 >= 2)
elif 'Bo5' in match_data['format']:
# Bo5 is complete when someone reaches 3 wins
series_completed = (score1 >= 3 or score2 >= 3)
elif 'Bo1' in match_data['format']:
# Bo1 is complete when there's any score
series_completed = True
else:
# Unknown format, assume completed if there's a score
series_completed = True
else:
# No format info, try to guess from score
# If someone has 2+ wins, likely a completed Bo3/Bo5
series_completed = (score1 >= 2 or score2 >= 2)
match_data['completed'] = series_completed
match_data['has_score'] = True # Mark that there's a score even if not completed
# Determine winner only if completed
if series_completed:
if score1 > score2:
match_data['winner'] = match_data.get('team1', 'Unknown')
else:
match_data['winner'] = match_data.get('team2', 'Unknown')
else:
match_data['completed'] = False
match_data['has_score'] = False
# Generate ID if we have valid data
if 'team1' in match_data and 'team2' in match_data:
match_data['id'] = self._generate_match_id(match_data)
return match_data
except Exception as e:
pass
return None
def _clean_team_name(self, name):
"""Clean and normalize team name"""
name = re.sub(r'\s+', ' ', name).strip()
name = re.sub(r'\s*\(.*?\)\s*$', '', name)
name = re.sub(r'^\d{4}-\d{2}-\d{2}.*', '', name).strip()
name = re.sub(r'^\w+\s+\d+,\s+\d{4}.*', '', name).strip()
return name
def _generate_match_id(self, match_data):
"""Generate a unique ID for a match"""
# Use teams and tournament for ID (not datetime to handle reschedules)
id_parts = []
if 'team1' in match_data:
id_parts.append(match_data['team1'])
if 'team2' in match_data:
id_parts.append(match_data['team2'])
if 'tournament' in match_data:
id_parts.append(match_data['tournament'])
else:
# Fall back to date if no tournament
if 'datetime' in match_data:
id_parts.append(str(match_data['datetime'].date()))
unique_string = '_'.join(id_parts)
return hashlib.md5(unique_string.encode()).hexdigest()[:16]
def _remove_duplicates(self, matches):
"""Remove duplicate matches based on ID"""
unique_matches = {}
for match in matches:
if match.get('id'):
unique_matches[match['id']] = match
return list(unique_matches.values())
def get_existing_events(self, days_back=7, days_ahead=30):
"""Get existing Dota 2 events from Google Calendar"""
try:
now = datetime.utcnow()
time_min = (now - timedelta(days=days_back)).isoformat() + 'Z'
time_max = (now + timedelta(days=days_ahead)).isoformat() + 'Z'
print(f"Checking existing events in calendar...")
events_result = self.service.events().list(
calendarId=self.calendar_id,
timeMin=time_min,
timeMax=time_max,
maxResults=500,
singleEvents=True,
orderBy='startTime'
).execute()
events = events_result.get('items', [])
# Filter for Dota 2 events
# Build multiple indexes for better matching
dota_events_by_id = {}
dota_events_by_match = {}
for event in events:
summary = event.get('summary', '')
# Check for Dota events - old format has "Dota 2", new format has tournament brackets
is_dota = ('Dota 2' in summary or
'The International' in summary or
'TI2025' in summary or
'[' in summary and 'vs' in summary) # New format has brackets
if is_dota:
description = event.get('description', '')
# Extract ID from description (for old events)
id_match = re.search(r'ID:\s*([a-f0-9]+)', description)
if id_match:
dota_events_by_id[id_match.group(1)] = event
# Also create key based on teams and tournament for matching
summary = event.get('summary', '')
# Remove completed markers and scores
summary = summary.replace('[COMPLETED] ', '')
# Remove checkmark and score (format: "✓ 2-1 Team vs Team")
summary = re.sub(r'^✓\s+\d+[-:]\d+\s+', '', summary)
# Also handle old format with score at end
summary = re.sub(r'\s*\([0-9\-\?]+\)\s*$', '', summary)
# Try new format first: "Team1 vs Team2 [Tournament]"
match = re.search(r'^(.*?)\s+vs\s+(.*?)\s*\[(.*?)\]$', summary)
if match:
team1 = match.group(1).strip()
team2 = match.group(2).strip()
tournament = match.group(3).strip()
else:
# Try old format: "Dota 2 - Tournament: Team1 vs Team2"
match = re.search(r'Dota 2 - (.*?):\s*(.*?)\s+vs\s+(.*?)$', summary)
if match:
tournament = match.group(1).strip()
team1 = match.group(2).strip()
team2 = match.group(3).strip()
if match:
# Create match key (teams + tournament)
match_key = f"{team1}_{team2}_{tournament}"
dota_events_by_match[match_key] = event
print(f"✓ Found {len(dota_events_by_id)} existing Dota 2 events")
# Return combined dictionary for backward compatibility
combined = {}
combined.update(dota_events_by_id)
# Store the by_match index as a special key
combined['_by_match'] = dota_events_by_match
return combined
except Exception as e:
print(f"✗ Error fetching calendar events: {e}")
return {}
def create_calendar_event(self, match_data):
"""Create a Google Calendar event for a match"""
team1 = match_data.get('team1', 'TBD')
team2 = match_data.get('team2', 'TBD')
tournament = match_data.get('tournament', '')
# New format: Teams first, tournament in brackets
if tournament:
summary = f"{team1} vs {team2} [{tournament}]"
else:
summary = f"{team1} vs {team2}"
# Build description
description_parts = []
if tournament:
description_parts.append(f"Tournament: {tournament}")
description_parts.append(f"Match: {team1} vs {team2}")
if 'format' in match_data:
description_parts.append(f"Format: {match_data['format']}")
if match_data.get('completed'):
description_parts.append(f"\n🏆 RESULT: {match_data.get('score', 'Unknown')}")
description_parts.append(f"Winner: {match_data.get('winner', 'Unknown')}")
description_parts.append(f"ID: {match_data['id']}")
description_parts.append("\nSource: Liquipedia")
description = '\n'.join(description_parts)
# Set start and end times
start_time = match_data.get('datetime', datetime.now(pytz.UTC))
# Estimate duration
duration = 2
if 'format' in match_data:
if 'Bo5' in match_data['format']:
duration = 4
elif 'Bo3' in match_data['format']:
duration = 3
elif 'Bo1' in match_data['format']:
duration = 1
end_time = start_time + timedelta(hours=duration)
event = {
'summary': summary,
'description': description,
'start': {
'dateTime': start_time.isoformat(),
'timeZone': 'UTC',
},
'end': {
'dateTime': end_time.isoformat(),
'timeZone': 'UTC',
},
'reminders': {
'useDefault': False,
'overrides': [
{'method': 'popup', 'minutes': 30},
],
},
'colorId': '9', # Blue
}
return event
def update_event_time(self, event_id, new_datetime):
"""Update the time of an existing event"""
try:
# Get the existing event
event = self.service.events().get(
calendarId=self.calendar_id,
eventId=event_id
).execute()
# Calculate duration from existing event
start_dt = datetime.fromisoformat(event['start']['dateTime'].replace('Z', '+00:00'))
end_dt = datetime.fromisoformat(event['end']['dateTime'].replace('Z', '+00:00'))
duration = end_dt - start_dt
# Update times
event['start']['dateTime'] = new_datetime.isoformat()
event['end']['dateTime'] = (new_datetime + duration).isoformat()
# Add note about time change to description
description = event.get('description', '')
if 'Last updated:' in description:
# Update the timestamp
description = re.sub(
r'Last updated:.*',
f"Last updated: {datetime.now(pytz.UTC).strftime('%Y-%m-%d %H:%M UTC')}",
description
)
else:
# Add timestamp
description += f"\nLast updated: {datetime.now(pytz.UTC).strftime('%Y-%m-%d %H:%M UTC')}"
event['description'] = description
# Update the event
updated_event = self.service.events().update(
calendarId=self.calendar_id,
eventId=event_id,
body=event
).execute()
return True
except Exception as e:
print(f"Error updating event time: {e}")
return False
def update_event_with_teams(self, event_id, match_data):
"""Update a TBD event with actual team names"""
try:
# Get the existing event
event = self.service.events().get(
calendarId=self.calendar_id,
eventId=event_id
).execute()
team1 = match_data.get('team1', 'TBD')
team2 = match_data.get('team2', 'TBD')
tournament = match_data.get('tournament', '')
# Update summary with actual team names
if tournament:
new_summary = f"{team1} vs {team2} [{tournament}]"
else:
new_summary = f"{team1} vs {team2}"
# Update description
description = event.get('description', '')
# Update the Match line
description = re.sub(
r'Match: .*?\n',
f"Match: {team1} vs {team2}\n",
description
)
# Update the ID to the new one
if 'ID:' in description:
description = re.sub(
r'ID: [a-f0-9]+',
f"ID: {match_data.get('id', '')}",
description
)
# Add update timestamp
if 'Teams updated:' in description:
description = re.sub(
r'Teams updated:.*',
f"Teams updated: {datetime.now(pytz.UTC).strftime('%Y-%m-%d %H:%M UTC')}",
description
)
else:
description = description.replace('\nSource:', f"\nTeams updated: {datetime.now(pytz.UTC).strftime('%Y-%m-%d %H:%M UTC')}\nSource:")
event['summary'] = new_summary
event['description'] = description
# Update the event
updated_event = self.service.events().update(
calendarId=self.calendar_id,
eventId=event_id,
body=event
).execute()
return True
except Exception as e:
print(f"Error updating event with teams: {e}")
return False
def update_event_with_score(self, event_id, match_data):
"""Update an existing calendar event with in-progress score"""
try:
# Get the existing event
event = self.service.events().get(
calendarId=self.calendar_id,
eventId=event_id
).execute()
# Update the description with current score
description = event.get('description', '')
# Check if score is already in the description
if '📊 CURRENT SCORE:' in description:
# Update existing score
description = re.sub(
r'📊 CURRENT SCORE:.*?\n',
f"📊 CURRENT SCORE: {match_data.get('score', 'Unknown')}\n",
description
)
else:
# Add new score
score_text = f"\n📊 CURRENT SCORE: {match_data.get('score', 'Unknown')}\n"
if 'ID:' in description:
description = description.replace('ID:', score_text + 'ID:')
else:
description += score_text
# Update the summary to show current score (without checkmark)
summary = event.get('summary', '')
# Remove any existing score
summary = re.sub(r'^\d+[-:]\d+\s+', '', summary)
# Add new score at the beginning
score = match_data.get('score', '?-?')
summary = f"{score} {summary}"
# Update the event
event['description'] = description
event['summary'] = summary
updated_event = self.service.events().update(
calendarId=self.calendar_id,
eventId=event_id,
body=event
).execute()
return True
except Exception as e:
print(f"Error updating event score: {e}")
return False
def update_event_with_result(self, event_id, match_data):
"""Update an existing calendar event with match results"""
try:
# Get the existing event
event = self.service.events().get(
calendarId=self.calendar_id,
eventId=event_id
).execute()
# Update the description with results
description = event.get('description', '')
# Check if results are already in the description
if '🏆 RESULT:' in description:
# Update existing result
description = re.sub(
r'🏆 RESULT:.*?\n.*?Winner:.*?\n',
f"🏆 RESULT: {match_data.get('score', 'Unknown')}\nWinner: {match_data.get('winner', 'Unknown')}\n",
description,
flags=re.DOTALL
)
else:
# Add new result
result_text = f"\n🏆 RESULT: {match_data.get('score', 'Unknown')}\nWinner: {match_data.get('winner', 'Unknown')}\n"
if 'ID:' in description:
description = description.replace('ID:', result_text + 'ID:')
else:
description += result_text
# Update the summary to show it's completed with result
summary = event.get('summary', '')
if '' not in summary:
# Add checkmark and score right after it
score = match_data.get('score', '?-?')
summary = f"{score} {summary}"
# Update the event
event['description'] = description
event['summary'] = summary
updated_event = self.service.events().update(
calendarId=self.calendar_id,
eventId=event_id,
body=event
).execute()
return True
except Exception as e:
print(f"Error updating event: {e}")
return False
def check_time_difference(self, event_datetime, new_datetime):
"""Check if there's a significant time difference (>= 5 minutes)"""
# Parse event datetime
if isinstance(event_datetime, str):
if event_datetime.endswith('Z'):
event_dt = datetime.fromisoformat(event_datetime.replace('Z', '+00:00'))
else:
event_dt = datetime.fromisoformat(event_datetime)
else:
event_dt = event_datetime
# Ensure timezone aware
if event_dt.tzinfo is None:
event_dt = event_dt.replace(tzinfo=pytz.UTC)
if new_datetime.tzinfo is None:
new_datetime = new_datetime.replace(tzinfo=pytz.UTC)
# Calculate difference in minutes
diff = abs((event_dt - new_datetime).total_seconds() / 60)
return diff >= 5 # Return True if difference is 5 minutes or more
def delete_calendar_event(self, event_id):
"""Delete a calendar event"""
try:
self.service.events().delete(
calendarId=self.calendar_id,
eventId=event_id
).execute()
return True
except Exception as e:
print(f"Error deleting event: {e}")
return False
def sync_matches_to_calendar(self, dry_run=False, update_results=True, update_times=True, delete_old_tbd=True):
"""Main sync function with time updates"""
print("\n" + "="*50)
print("Starting Dota 2 Calendar Sync v3")
print("="*50 + "\n")
# Fetch all matches
upcoming_matches, completed_matches = self.fetch_all_matches()
if not upcoming_matches and not completed_matches:
print("No matches found to sync")
return
# Get existing events
existing_events = self.get_existing_events(days_back=7, days_ahead=30)
# Counters
added_count = 0
skipped_count = 0
updated_count = 0
time_updated_count = 0
error_count = 0
deleted_tbd_count = 0
# Track which TBD events were updated
updated_tbd_events = set()
# Process upcoming matches
print("\nProcessing upcoming matches...")
print("-" * 30)
now = datetime.now(pytz.UTC)
# Include matches from the last 12 hours (to catch ongoing matches with scores)
twelve_hours_ago = now - timedelta(hours=12)
future_matches = [m for m in upcoming_matches
if m.get('datetime', now) >= twelve_hours_ago]
for match in future_matches:
match_id = match.get('id')
team1 = match.get('team1', 'TBD')
team2 = match.get('team2', 'TBD')
match_time = match.get('datetime', now)
tournament = match.get('tournament', '')
if not match_id:
continue
# Try to find existing event by ID or by match details
existing_event = None
# First try by ID
if match_id in existing_events:
existing_event = existing_events[match_id]
# If not found, try by match details (teams + tournament)
if not existing_event and '_by_match' in existing_events:
match_key = f"{team1}_{team2}_{tournament}"
if match_key in existing_events['_by_match']:
existing_event = existing_events['_by_match'][match_key]
# Also try to find by teams only (ignoring score) for live updates
# This handles cases where score changes during match
if not existing_event:
for event_key, event in existing_events['_by_match'].items():
# Check if teams match (order independent)
if (f"{team1}_{team2}" in event_key or f"{team2}_{team1}" in event_key) and tournament in event_key:
existing_event = event
print(f" → Found existing match by teams: {team1} vs {team2}")
break
# Special handling for TBD matches that might have been updated
# Look for TBD events at the same time and tournament
if not existing_event and '_by_match' in existing_events:
# Check if this match used to be TBD
for event_key, event in existing_events['_by_match'].items():
if 'TBD_TBD' in event_key and tournament in event_key:
# Check if time matches
event_start = event['start'].get('dateTime', event['start'].get('date'))
event_dt = datetime.fromisoformat(event_start.replace('Z', '+00:00'))
# Relaxed time matching: within 1 hour (3600 seconds)
if abs((event_dt - match_time).total_seconds()) < 3600: # Within 1 hour
existing_event = event
print(f" → Found TBD match to update: {team1} vs {team2}")
print(f" Time difference: {abs((event_dt - match_time).total_seconds())/60:.0f} minutes")
break
if existing_event:
# Check if this is a TBD match that now has team names
summary = existing_event.get('summary', '')
is_tbd_update = 'TBD' in summary and (team1 != 'TBD' or team2 != 'TBD')
if is_tbd_update:
# Update TBD match with actual teams
if dry_run:
print(f"◯ Would update TBD match with teams: {team1} vs {team2}")
updated_count += 1
else:
if self.update_event_with_teams(existing_event['id'], match):
print(f"✓ Updated TBD match with teams: {team1} vs {team2}")
updated_count += 1
updated_tbd_events.add(existing_event['id'])
time.sleep(0.2)
else:
print(f"✗ Failed to update TBD match: {team1} vs {team2}")
error_count += 1
# Check if this match has a score (completed or in-progress) and needs update
elif match.get('has_score') and update_results:
# Check current event status
summary = existing_event.get('summary', '')
description = existing_event.get('description', '')
current_score = None
# Try to extract current score from summary
score_in_summary = re.search(r'✓?\s*(\d+[-:]\d+)', summary)
if score_in_summary:
current_score = score_in_summary.group(1).replace(':', '-')
# Check if score needs update
new_score = match.get('score', 'Unknown')
if current_score == new_score:
print(f"⊘ Score unchanged: {team1} vs {team2} ({new_score})")
skipped_count += 1
else:
if match.get('completed'):
# Series is completed
if dry_run:
print(f"◯ Would update completed result: {team1} vs {team2} - {new_score}")
updated_count += 1
else:
if self.update_event_with_result(existing_event['id'], match):
print(f"✓ Updated completed result: {team1} vs {team2} - {new_score}")
updated_count += 1
time.sleep(0.2)
else:
print(f"✗ Failed to update: {team1} vs {team2}")
error_count += 1
else:
# Series is in-progress with partial score
if dry_run:
print(f"◯ Would update in-progress score: {team1} vs {team2} - {new_score}")
updated_count += 1
else:
if self.update_event_with_score(existing_event['id'], match):
print(f"📊 Updated in-progress score: {team1} vs {team2} - {new_score}")
updated_count += 1
time.sleep(0.2)
else:
print(f"✗ Failed to update score: {team1} vs {team2}")
error_count += 1
# Check if time has changed
elif update_times:
event_start = existing_event['start'].get('dateTime', existing_event['start'].get('date'))
if self.check_time_difference(event_start, match_time):
# Time has changed
if dry_run:
old_time = datetime.fromisoformat(event_start.replace('Z', '+00:00'))
print(f"◯ Would update time: {team1} vs {team2}")
print(f" Old: {old_time.strftime('%Y-%m-%d %H:%M UTC')}")
print(f" New: {match_time.strftime('%Y-%m-%d %H:%M UTC')}")
time_updated_count += 1
else:
old_time = datetime.fromisoformat(event_start.replace('Z', '+00:00'))
if self.update_event_time(existing_event['id'], match_time):
print(f"⏰ Updated time: {team1} vs {team2}")
print(f" Old: {old_time.strftime('%Y-%m-%d %H:%M UTC')}")
print(f" New: {match_time.strftime('%Y-%m-%d %H:%M UTC')}")
time_updated_count += 1
time.sleep(0.2)
else:
print(f"✗ Failed to update time: {team1} vs {team2}")
error_count += 1
else:
print(f"⊘ No change: {team1} vs {team2}")
skipped_count += 1
else:
print(f"⊘ Skipping (exists): {team1} vs {team2}")
skipped_count += 1
else:
# New match
if dry_run:
print(f"◯ Would add: {team1} vs {team2} at {match_time.strftime('%Y-%m-%d %H:%M UTC')}")
added_count += 1
else:
try:
event = self.create_calendar_event(match)
self.service.events().insert(
calendarId=self.calendar_id,
body=event
).execute()
print(f"✓ Added: {team1} vs {team2} at {match_time.strftime('%Y-%m-%d %H:%M UTC')}")
added_count += 1
time.sleep(0.2)
except Exception as e:
print(f"✗ Error adding {team1} vs {team2}: {e}")
error_count += 1
# Process completed matches for results
if update_results and completed_matches:
print("\nProcessing completed match results...")
print("-" * 30)
for match in completed_matches:
match_id = match.get('id')
team1 = match.get('team1', 'TBD')
team2 = match.get('team2', 'TBD')
score = match.get('score', 'Unknown')
tournament = match.get('tournament', '')
if not match_id:
continue
# Try to find existing event by ID or by match details
existing_event = None
# First try by ID
if match_id in existing_events:
existing_event = existing_events[match_id]
# If not found, try by match details
if not existing_event and '_by_match' in existing_events:
match_key = f"{team1}_{team2}_{tournament}"
if match_key in existing_events['_by_match']:
existing_event = existing_events['_by_match'][match_key]
# Also try to find by teams only (for live score updates)
if not existing_event:
for event_key, event in existing_events['_by_match'].items():
# Check if teams match (order independent) and tournament matches
if (f"{team1}_{team2}" in event_key or f"{team2}_{team1}" in event_key) and tournament in event_key:
existing_event = event
print(f" → Found existing match by teams: {team1} vs {team2}")
break
if existing_event:
# Check if already marked as completed
summary = existing_event.get('summary', '')
if '' in summary or '[COMPLETED]' in summary:
print(f"⊘ Already updated: {team1} vs {team2} ({score})")
else:
if dry_run:
print(f"◯ Would update result: {team1} vs {team2} - {score}")
updated_count += 1
else:
if self.update_event_with_result(existing_event['id'], match):
print(f"✓ Updated result: {team1} vs {team2} - {score}")
updated_count += 1
time.sleep(0.2)
else:
print(f"✗ Failed to update: {team1} vs {team2}")
error_count += 1
# Delete old TBD events that are past and not updated
if delete_old_tbd and not dry_run:
print("\nChecking for expired TBD events to delete...")
print("-" * 30)
# Get all TBD events again to check which ones to delete
for key, event in existing_events.items():
if key == '_by_match':
continue
summary = event.get('summary', '')
if 'TBD vs TBD' in summary:
event_id = event['id']
# Skip if this event was updated
if event_id in updated_tbd_events:
continue
# Check if event is in the past
event_start = event['start'].get('dateTime', event['start'].get('date'))
event_dt = datetime.fromisoformat(event_start.replace('Z', '+00:00'))
# If event is more than 2 hours in the past, delete it
if event_dt < now - timedelta(hours=2):
if dry_run:
print(f"◯ Would delete expired TBD event: {summary} ({event_dt.strftime('%Y-%m-%d %H:%M UTC')})")
deleted_tbd_count += 1
else:
if self.delete_calendar_event(event_id):
print(f"🗑️ Deleted expired TBD event: {summary} ({event_dt.strftime('%Y-%m-%d %H:%M UTC')})")
deleted_tbd_count += 1
time.sleep(0.2)
else:
print(f"✗ Failed to delete TBD event: {summary}")
error_count += 1
# Summary
print("\n" + "="*50)
print("Sync Summary")
print("="*50)
print(f"✓ Added: {added_count} matches")
if time_updated_count > 0:
print(f"⏰ Time updated: {time_updated_count} matches")
if updated_count > 0:
print(f"✓ Results updated: {updated_count} matches")
if deleted_tbd_count > 0:
print(f"🗑️ Deleted: {deleted_tbd_count} expired TBD events")
print(f"⊘ Skipped: {skipped_count} matches (no changes)")
if error_count > 0:
print(f"✗ Errors: {error_count} matches")
if dry_run:
print("\n⚠ DRY RUN - No actual changes were made")
print("\n✓ Sync complete!")
def main():
parser = argparse.ArgumentParser(
description='Sync Dota 2 Tier 1 matches from Liquipedia to Google Calendar v3'
)
parser.add_argument(
'--calendar-id',
default='primary',
help='Google Calendar ID (default: primary).'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Perform a dry run without actually creating/updating events'
)
parser.add_argument(
'--no-results',
action='store_true',
help='Skip updating completed match results'
)
parser.add_argument(
'--no-time-updates',
action='store_true',
help='Skip updating match times'
)
parser.add_argument(
'--credentials',
default='credentials.json',
help='Path to Google service account credentials JSON file'
)
args = parser.parse_args()
# Notice
print("\n" + "!"*60)
print("Dota 2 Calendar Sync v3")
print("Features: Match sync, result updates, time change detection")
print("Service Account: calendar-bot@tunpok.iam.gserviceaccount.com")
print("!"*60 + "\n")
# Initialize and run sync
try:
sync = Dota2CalendarSync(
credentials_file=args.credentials,
calendar_id=args.calendar_id
)
sync.sync_matches_to_calendar(
dry_run=args.dry_run,
update_results=not args.no_results,
update_times=not args.no_time_updates,
delete_old_tbd=True
)
except KeyboardInterrupt:
print("\n\nSync cancelled by user")
sys.exit(0)
except Exception as e:
print(f"\n✗ Fatal error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()