Files

580 lines
25 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import discord
from discord import app_commands
from discord.ext import commands, tasks
from typing import Optional
from datetime import datetime, timedelta
import asyncio
from repositories.football import FootballRepository
from repositories.config import ConfigRepository
from services.football_api import FootballApiService
class FootballCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.repo = FootballRepository()
self.config_repo = ConfigRepository()
self.api_service = None # Init lazily
self._notified_upcoming = set()
self._notified_result = set()
self.check_matches.start()
async def _get_api(self, guild_id: int):
key = await self.config_repo.get(guild_id, "FOOTBALL_API_KEY", "")
if not key:
return None
return FootballApiService(key)
async def interaction_check(self, interaction: discord.Interaction) -> bool:
# Check if service is enabled (handled by main bot logic usually, but here specifically for this Cog)
# Check channel restriction
allowed_str = await self.config_repo.get(interaction.guild_id, "CHANNEL_FOOTBALL_IDS", "")
if not allowed_str:
return True # No restriction
allowed_ids = [int(x.strip()) for x in allowed_str.split(',') if x.strip().isdigit()]
if not allowed_ids:
return True # Empty or invalid config implies no restriction
if interaction.channel_id not in allowed_ids:
await interaction.response.send_message(f"⚠️ commands are only allowed in: {', '.join(f'<#{cid}>' for cid in allowed_ids)}", ephemeral=True)
return False
return True
# --- Slash Commands Group ---
fb_group = app_commands.Group(name="fb", description="Football services commands")
@fb_group.command(name="history", description="View team match history")
@app_commands.describe(team="Team name", season="Year (e.g. 2023). Default: current season matches")
async def history_slash(self, interaction: discord.Interaction, team: str, season: Optional[int] = None):
await interaction.response.defer()
api = await self._get_api(interaction.guild_id)
if not api: return await interaction.followup.send("Missing API Key.")
team_data = await api.search_team(team)
if not team_data: return await interaction.followup.send("Team not found.")
matches = await api.get_team_history(team_data['id'], season=season)
if not matches: return await interaction.followup.send("No history found.")
# Summary Stats
wins = draws = losses = 0
target_id = team_data['id']
history_text = []
for m in matches[:15]: # Show last 15
home = m['homeTeam']
away = m['awayTeam']
score_h = m['score']['fullTime']['home']
score_a = m['score']['fullTime']['away']
# Outcome
is_home = (home['id'] == target_id)
goals_for = score_h if is_home else score_a
goals_against = score_a if is_home else score_h
result = ""
if goals_for > goals_against:
result = ""
wins += 1
elif goals_for < goals_against:
result = ""
losses += 1
else:
draws += 1
opponent = away['name'] if is_home else home['name']
date_str = m['utcDate'][:10]
history_text.append(f"{result} vs **{opponent}** ({goals_for}-{goals_against}) ` {date_str} `")
embed = discord.Embed(title=f"📜 History: {team_data['name']}", color=discord.Color.blue())
if season: embed.description = f"**Season {season}**"
embed.add_field(name="Form (Last 15)", value="\n".join(history_text) if history_text else "No matches", inline=False)
embed.add_field(name="Summary (This Set)", value=f"Win: {wins} | Draw: {draws} | Loss: {losses}", inline=False)
await interaction.followup.send(embed=embed)
class StandingsView(discord.ui.View):
def __init__(self, cog, guild_id: int):
super().__init__(timeout=180)
self.cog = cog
self.guild_id = guild_id
self.add_item(self.LeagueSelect(cog, guild_id))
class LeagueSelect(discord.ui.Select):
def __init__(self, cog, guild_id):
self.cog = cog
self.guild_id = guild_id
# Common Leagues Options
options = [
discord.SelectOption(label="Premier League", value="PL", description="England", emoji="🏴󠁧󠁢󠁥󠁮󠁧󠁿"),
discord.SelectOption(label="La Liga", value="PD", description="Spain", emoji="🇪🇸"),
discord.SelectOption(label="Bundesliga", value="BL1", description="Germany", emoji="🇩🇪"),
discord.SelectOption(label="Serie A", value="SA", description="Italy", emoji="🇮🇹"),
discord.SelectOption(label="Ligue 1", value="FL1", description="France", emoji="🇫🇷"),
discord.SelectOption(label="Champions League", value="CL", description="Europe", emoji="🇪🇺"),
]
super().__init__(placeholder="Select a league...", min_values=1, max_values=1, options=options)
async def callback(self, interaction: discord.Interaction):
await interaction.response.defer()
league_code = self.values[0]
embed = await self.cog.build_standings_embed(self.guild_id, league_code)
if embed:
await interaction.edit_original_response(embed=embed, view=self.view)
else:
await interaction.followup.send(f"Could not load data for {league_code}", ephemeral=True)
def _clean_name(self, name: str) -> str:
import re
# Remove common prefixes like "1. ", "1 "
# Remove common suffixes like " FC", " AFC", etc. (case insensitive)
# 1. Remove numbering prefix (e.g. "1. FC Koln" -> "FC Koln")
name = re.sub(r'^\d+[\.\s]+', '', name)
# 2. Remove common suffixes
# \b ensures whole word match
suffixes = [
"FC", "AFC", "CF", "SC", "BV", "NV", "AC", "AS", "Hotspur"
]
pattern = r'\b(' + '|'.join(suffixes) + r')\b'
clean = re.sub(pattern, '', name, flags=re.IGNORECASE)
# 3. Cleanup extra spaces and special chars
clean = clean.replace("&", "")
clean = re.sub(r'\s+', ' ', clean).strip()
return clean
def _is_interested(self, team_name: str, interested_list: list) -> bool:
if not interested_list: return False
# Normalize Data Name
# e.g. "1. FC Köln" -> "Köln"
raw_clean = self._clean_name(team_name).lower()
# Aliases Map (Normalized keys)
aliases = {
"man utd": ["manchester united"],
"man use": ["manchester united"],
"man city": ["manchester city"],
"psg": ["paris saint-germain"],
"spurs": ["tottenham"],
"wolves": ["wolverhampton wanderers"],
"brighton": ["brighton hove albion"],
"leverkusen": ["bayer leverkusen"],
"dortmund": ["borussia dortmund"],
"bayern": ["bayern munich", "bayern munchen"],
"inter": ["inter milano", "internazionale"],
"milan": ["ac milan"],
"real": ["real madrid"],
"atletico": ["atletico madrid", "my atletico"],
"barca": ["fc barcelona"],
"koln": ["1. fc koln", "fc koln"]
}
from thefuzz import fuzz
for interested in interested_list:
# Normalize Configured Name
user_clean = self._clean_name(interested).lower()
# 1. Direct Comparison (Clean vs Clean)
if user_clean == raw_clean: return True
if user_clean in raw_clean: return True
# 2. Check Aliases
for alias, targets in aliases.items():
alias_clean = self._clean_name(alias).lower()
if user_clean == alias_clean:
for t in targets:
if self._clean_name(t).lower() == raw_clean: return True
# 3. Fuzzy Matching
# token_set_ratio handles "Chelsea" vs "Chelsea FC" -> 100
# "Man City" vs "Manchester City" -> High
ratio = fuzz.token_set_ratio(user_clean, raw_clean)
if ratio >= 80:
return True
# 4. Fallback: Raw substring
if interested.lower() in team_name.lower(): return True
return False
async def build_standings_embed(self, guild_id: int, league_code: str) -> Optional[discord.Embed]:
api = await self._get_api(guild_id)
if not api: return None
data = await api.get_standings(league_code)
if not data or 'standings' not in data: return None
table = None
for s in data['standings']:
if s['type'] == 'TOTAL':
table = s['table']
break
if not table: return None
comp_name = data.get('competition', {}).get('name', league_code)
# Interested Teams
interested_config = await self.config_repo.get(guild_id, "FOOTBALL_TEAMS", "")
interested_teams = [t.strip().lower() for t in interested_config.split(',')] if interested_config else []
# ANSI Table Construction
lines = []
# Header: Hạng(3) CLB(15) Tr(2) T-H-B(7) HS(3) Đ(3)
header = f"\u001b[1;37m{'Hạng':<4} {'CLB':<16} {'Tr':<2} {'T-H-B':<7} {'HS':<3} {'Đ':<3}\u001b[0m"
lines.append(header)
lines.append("\u001b[0;30m" + "-" * 42 + "\u001b[0m")
for team in table[:25]:
pos = str(team['position'])
name = team['team']['name']
# Use Clean Name for display (Shorter)
display_name = self._clean_name(name)
# Smart Truncate
if len(display_name) > 15:
display_name = display_name[:14] + ""
played = str(team['playedGames'])
won = str(team['won'])
draw = str(team['draw'])
lost = str(team['lost'])
pts = str(team['points'])
gd = str(team['goalDifference'])
wdl = f"{won}-{draw}-{lost}"
# Default Coloring
style = "\u001b[0;37m"
suffix = "\u001b[0m"
is_interested = self._is_interested(name, interested_teams)
if is_interested:
# Blue Background to simulate highlight
style = "\u001b[0;37;44m"
elif int(pos) <= 4:
style = "\u001b[0;32m" # Green Text
elif int(pos) >= 18:
style = "\u001b[0;31m" # Red Text
line = f"{style}{pos:<4} {display_name:<16} {played:<2} {wdl:<7} {gd:<3} {pts:<3}{suffix}"
lines.append(line)
embed = discord.Embed(title=f"🏆 {comp_name}", color=discord.Color.from_rgb(44, 47, 51))
embed.description = f"```ansi\n{chr(10).join(lines)}\n```"
embed.set_footer(text="Xanh Dương: Đội của bạn | Xanh Lá: Top 4 | Đỏ: Nhóm xuống hạng")
return embed
@fb_group.command(name="standings", description="View league standings")
@app_commands.describe(league_code="League Code (e.g. PL, PD, SA, BL1, FL1, CL)")
async def standings_slash(self, interaction: discord.Interaction, league_code: str = "PL"):
await interaction.response.defer()
embed = await self.build_standings_embed(interaction.guild_id, league_code)
if not embed:
return await interaction.followup.send(f"Could not load standings for `{league_code}`. Check API Key or Code.")
view = self.StandingsView(self, interaction.guild_id)
await interaction.followup.send(embed=embed, view=view)
@fb_group.command(name="schedule", description="View upcoming match schedule")
@app_commands.describe(league="Filter by league code (e.g. PL, PD)")
async def schedule_slash(self, interaction: discord.Interaction, league: Optional[str] = None):
# Default to 10 days to fit within typical API limits while showing "upcoming" context
days = 10
await interaction.response.defer()
api = await self._get_api(interaction.guild_id)
if not api:
return await interaction.followup.send("⚠️ API Key missing. Configure in Dashboard.")
today = datetime.now()
end_date = today + timedelta(days=days)
matches = await api.get_matches_range(
today.strftime("%Y-%m-%d"),
end_date.strftime("%Y-%m-%d")
)
if not matches:
return await interaction.followup.send("Không tìm thấy trận đấu nào.")
# Filter Logic (Leagues)
config_leagues = await self.config_repo.get(interaction.guild_id, "FOOTBALL_LEAGUES", "")
target_leagues = set()
if config_leagues:
target_leagues.update([l.strip().lower() for l in config_leagues.split(',') if l.strip()])
if league: target_leagues.add(league.lower())
filtered = []
if target_leagues:
for m in matches:
comp_name = m.get('competition', {}).get('name', '').lower()
comp_code = m.get('competition', {}).get('code', '').lower()
if any(t in comp_name or t == comp_code for t in target_leagues):
filtered.append(m)
else:
filtered = matches # No filter = All matches (might be too many, but API usually limits free tier anyway)
if not filtered:
return await interaction.followup.send("Không có trận đấu nào trong các giải đã chọn.")
# Sort by time
filtered.sort(key=lambda x: x['utcDate'])
# Grid Layout using Embed Fields
# Title: Lịch Thi Đấu
embed = discord.Embed(title=f"📅 Lịch Thi Đấu ({len(filtered)} trận)", color=discord.Color.blue())
count = 0
for m in filtered[:18]: # Limit 18 matches (Discord limit 25 fields, let's keep it safe)
home = m['homeTeam']['name']
away = m['awayTeam']['name']
# Parse Time
dt = datetime.fromisoformat(m['utcDate'].replace('Z', '+00:00'))
# Format: 22:00 22/01
time_str = dt.strftime('%H:%M %d/%m')
status = m['status']
# Icon/State
state_text = f"🕐 {time_str}"
score_text = "vs"
if status in ['FINISHED', 'IN_PLAY', 'PAUSED']:
s_home = m['score']['fullTime']['home']
s_away = m['score']['fullTime']['away']
score_text = f"{s_home} - {s_away}"
if status == 'FINISHED':
state_text = f"✅ KT {dt.strftime('%d/%m')}"
else:
state_text = f"🔴 Phút {m.get('minute', '?')}"
# Highlight Interested Teams
interested_config = await self.config_repo.get(interaction.guild_id, "FOOTBALL_TEAMS", "")
interested_teams = [t.strip().lower() for t in interested_config.split(',')] if interested_config else []
is_interested = any(it in home.lower() for it in interested_teams) or \
any(it in away.lower() for it in interested_teams)
# Card Title
# Use Icons if possible, else Bold
# 🏠 Man Utd vs 🚌 Man City
# If interested, add Star
prefix = "" if is_interested else ""
field_name = f"{prefix}{home} {score_text} {away}"
field_val = f"{state_text}\n🏆 {m['competition']['code']}"
embed.add_field(name=field_name, value=field_val, inline=True)
count += 1
if len(filtered) > 18:
embed.set_footer(text=f"Còn {len(filtered) - 18} trận đấu khác...")
await interaction.followup.send(embed=embed)
@fb_group.command(name="sub", description="Subscribe to team updates")
async def sub_slash(self, interaction: discord.Interaction, team_name: str):
await interaction.response.defer()
api = await self._get_api(interaction.guild_id)
if not api: return await interaction.followup.send("Missing API Key.")
team = await api.search_team(team_name)
if not team:
return await interaction.followup.send(f"❌ Could not find team: {team_name}")
success = await self.repo.add_subscription(interaction.guild_id, interaction.channel_id, team['name'], team['id'])
if success:
await interaction.followup.send(f"✅ Subscribed to **{team['name']}** updates in this channel.")
else:
await interaction.followup.send(f"⚠️ Already subscribed to {team['name']}.")
@fb_group.command(name="unsub", description="Unsubscribe from team")
async def unsub_slash(self, interaction: discord.Interaction, team_name: str):
await interaction.response.defer()
success = await self.repo.remove_subscription(interaction.guild_id, team_name)
msg = f"✅ Unsubscribed from {team_name}." if success else "❌ Subscription not found."
await interaction.followup.send(msg)
@fb_group.command(name="list", description="List subscriptions")
async def list_slash(self, interaction: discord.Interaction):
# 1. DB Subs
subs = await self.repo.get_guild_subscriptions(interaction.guild_id)
lines = []
if subs:
for s in subs:
lines.append(f"• **{s.team_name}** (<#{s.channel_id}>) [Manual]")
# 2. Config Subs
cfg_teams_str = await self.config_repo.get(interaction.guild_id, "FOOTBALL_TEAMS", "")
cfg_channels_str = await self.config_repo.get(interaction.guild_id, "CHANNEL_FOOTBALL_IDS", "")
if cfg_teams_str and cfg_channels_str:
teams = [t.strip() for t in cfg_teams_str.split(',') if t.strip()]
channels = [c.strip() for c in cfg_channels_str.split(',') if c.strip().isdigit()]
if teams and channels:
channel_mentions = ", ".join([f"<#{c}>" for c in channels])
for t in teams:
lines.append(f"• **{t}** ({channel_mentions}) [Config]")
if not lines:
return await interaction.response.send_message("No football subscriptions in this server.")
msg = "**Subscriptions:**\n" + "\n".join(lines)
await interaction.response.send_message(msg)
@tasks.loop(minutes=2)
async def check_matches(self):
# 1. Fetch Subscription Data
explicit_subs = await self.repo.get_all_subscriptions()
# 2. Identify Active Guilds (from DB subs or just iterate known guilds if possible,
# but for efficiency we can just check guilds that have *some* config or sub)
# For simplicity, let's mix: Guilds with explicit subs + Guilds the bot is in (to check config)
active_guild_ids = set(s.guild_id for s in explicit_subs)
# Also add guilds where bot is present (to check for Config-only users)
for g in self.bot.guilds:
active_guild_ids.add(g.id)
for gid in active_guild_ids:
api = await self._get_api(gid)
if not api: continue
matches = await api.get_matches_today()
if not matches: continue
# --- Build Effective Subscription List for this Guild ---
effective_subs = []
# A. Explicit Subs (Database)
guild_explicit = [s for s in explicit_subs if s.guild_id == gid]
for s in guild_explicit:
# We normalize to a simple structure
effective_subs.append({
"team_name": s.team_name,
"team_id": s.team_id,
"channel_id": s.channel_id
})
# B. Implicit Subs (Config)
# Fetch Configs
cfg_teams_str = await self.config_repo.get(gid, "FOOTBALL_TEAMS", "")
cfg_channels_str = await self.config_repo.get(gid, "CHANNEL_FOOTBALL_IDS", "")
if cfg_teams_str and cfg_channels_str:
# Parse Teams
teams = [t.strip() for t in cfg_teams_str.split(',') if t.strip()]
# Parse Channels
channels = [int(c.strip()) for c in cfg_channels_str.split(',') if c.strip().isdigit()]
# Create combinatorial subs (All Configured Teams -> All Configured Channels)
for t in teams:
for c in channels:
# Avoid duplicates if already in explicit subs
if any(s['team_name'].lower() == t.lower() and s['channel_id'] == c for s in effective_subs):
continue
effective_subs.append({
"team_name": t, # Name based matching
"team_id": None, # ID might be unknown without search, rely on name match
"channel_id": c
})
if not effective_subs: continue
for m in matches:
mid = m['id']
home = m['homeTeam']['name']
away = m['awayTeam']['name']
# home_id = m['homeTeam']['id']
# away_id = m['awayTeam']['id']
status = m['status']
match_time = datetime.fromisoformat(m['utcDate'].replace('Z', '+00:00'))
# Filter Subs relevant to this match
relevant_subs = []
for s in effective_subs:
# Match by Name (Flexible) because Config might only have Name
if self._is_interested(home, [s['team_name']]) or self._is_interested(away, [s['team_name']]):
relevant_subs.append(s)
if not relevant_subs: continue
# 1. Upcoming Notification
if status == 'TIMED':
now = datetime.now(match_time.tzinfo) # Use match timezone
diff = (match_time - now).total_seconds()
# 9 to 11 minutes before
if 540 <= diff <= 660:
if mid not in self._notified_upcoming:
await self._notify(relevant_subs, m, "UPCOMING")
self._notified_upcoming.add(mid)
# 2. Result Notification
if status == 'FINISHED':
if mid not in self._notified_result:
await self._notify(relevant_subs, m, "FINISHED")
self._notified_result.add(mid)
async def _notify(self, subs, match, type):
home = match['homeTeam']['name']
away = match['awayTeam']['name']
if type == "UPCOMING":
title = "⚽ Trận đấu sắp diễn ra!"
desc = f"**{home}** vs **{away}**\nBắt đầu trong 10 phút!"
color = discord.Color.orange()
elif type == "FINISHED":
title = "🏁 Kết quả trận đấu"
score = f"{match['score']['fullTime']['home']} - {match['score']['fullTime']['away']}"
desc = f"**{home}** {score} **{away}**\nTrận đấu đã kết thúc."
color = discord.Color.gold()
embed = discord.Embed(title=title, description=desc, color=color)
# Deduplicate channels
# handle both obj and dict (during migration/hybrid state if not fully replaced)
target_channels = set()
for s in subs:
if isinstance(s, dict):
target_channels.add(s['channel_id'])
else:
target_channels.add(s.channel_id)
for cid in target_channels:
channel = self.bot.get_channel(cid)
if channel:
try:
await channel.send(embed=embed)
except: pass
@check_matches.before_loop
async def before_check_matches(self):
await self.bot.wait_until_ready()
async def setup(bot):
await bot.add_cog(FootballCog(bot))