feat: Tái cấu trúc bot sang kiến trúc cog, thêm hỗ trợ đa máy chủ, giới thiệu tính năng đăng ký bóng đá, giao diện web và quản lý cấu hình.

This commit is contained in:
2026-01-16 17:26:42 +07:00
parent 8c38357c28
commit b24365927a
39 changed files with 3864 additions and 997 deletions
+503
View File
@@ -0,0 +1,503 @@
import discord
from discord import app_commands
from discord.ext import commands, tasks
from typing import Optional
from datetime import datetime, timedelta
import asyncio
from repositories.football import FootballRepository
from repositories.config import ConfigRepository
from services.football_api import FootballApiService
class FootballCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.repo = FootballRepository()
self.config_repo = ConfigRepository()
self.api_service = None # Init lazily
self._notified_upcoming = set()
self._notified_result = set()
self.check_matches.start()
async def _get_api(self, guild_id: int):
key = await self.config_repo.get(guild_id, "FOOTBALL_API_KEY", "")
if not key:
return None
return FootballApiService(key)
async def interaction_check(self, interaction: discord.Interaction) -> bool:
# Check if service is enabled (handled by main bot logic usually, but here specifically for this Cog)
# Check channel restriction
allowed_str = await self.config_repo.get(interaction.guild_id, "CHANNEL_FOOTBALL_IDS", "")
if not allowed_str:
return True # No restriction
allowed_ids = [int(x.strip()) for x in allowed_str.split(',') if x.strip().isdigit()]
if not allowed_ids:
return True # Empty or invalid config implies no restriction
if interaction.channel_id not in allowed_ids:
await interaction.response.send_message(f"⚠️ commands are only allowed in: {', '.join(f'<#{cid}>' for cid in allowed_ids)}", ephemeral=True)
return False
return True
# --- Slash Commands Group ---
fb_group = app_commands.Group(name="fb", description="Football services commands")
@fb_group.command(name="history", description="View team match history")
@app_commands.describe(team="Team name", season="Year (e.g. 2023). Default: current season matches")
async def history_slash(self, interaction: discord.Interaction, team: str, season: Optional[int] = None):
await interaction.response.defer()
api = await self._get_api(interaction.guild_id)
if not api: return await interaction.followup.send("Missing API Key.")
team_data = await api.search_team(team)
if not team_data: return await interaction.followup.send("Team not found.")
matches = await api.get_team_history(team_data['id'], season=season)
if not matches: return await interaction.followup.send("No history found.")
# Summary Stats
wins = draws = losses = 0
target_id = team_data['id']
history_text = []
for m in matches[:15]: # Show last 15
home = m['homeTeam']
away = m['awayTeam']
score_h = m['score']['fullTime']['home']
score_a = m['score']['fullTime']['away']
# Outcome
is_home = (home['id'] == target_id)
goals_for = score_h if is_home else score_a
goals_against = score_a if is_home else score_h
result = ""
if goals_for > goals_against:
result = ""
wins += 1
elif goals_for < goals_against:
result = ""
losses += 1
else:
draws += 1
opponent = away['name'] if is_home else home['name']
date_str = m['utcDate'][:10]
history_text.append(f"{result} vs **{opponent}** ({goals_for}-{goals_against}) ` {date_str} `")
embed = discord.Embed(title=f"📜 History: {team_data['name']}", color=discord.Color.blue())
if season: embed.description = f"**Season {season}**"
embed.add_field(name="Form (Last 15)", value="\n".join(history_text) if history_text else "No matches", inline=False)
embed.add_field(name="Summary (This Set)", value=f"Win: {wins} | Draw: {draws} | Loss: {losses}", inline=False)
await interaction.followup.send(embed=embed)
class StandingsView(discord.ui.View):
def __init__(self, cog, guild_id: int):
super().__init__(timeout=180)
self.cog = cog
self.guild_id = guild_id
self.add_item(self.LeagueSelect(cog, guild_id))
class LeagueSelect(discord.ui.Select):
def __init__(self, cog, guild_id):
self.cog = cog
self.guild_id = guild_id
# Common Leagues Options
options = [
discord.SelectOption(label="Premier League", value="PL", description="England", emoji="🏴󠁧󠁢󠁥󠁮󠁧󠁿"),
discord.SelectOption(label="La Liga", value="PD", description="Spain", emoji="🇪🇸"),
discord.SelectOption(label="Bundesliga", value="BL1", description="Germany", emoji="🇩🇪"),
discord.SelectOption(label="Serie A", value="SA", description="Italy", emoji="🇮🇹"),
discord.SelectOption(label="Ligue 1", value="FL1", description="France", emoji="🇫🇷"),
discord.SelectOption(label="Champions League", value="CL", description="Europe", emoji="🇪🇺"),
]
super().__init__(placeholder="Select a league...", min_values=1, max_values=1, options=options)
async def callback(self, interaction: discord.Interaction):
await interaction.response.defer()
league_code = self.values[0]
embed = await self.cog.build_standings_embed(self.guild_id, league_code)
if embed:
await interaction.edit_original_response(embed=embed, view=self.view)
else:
await interaction.followup.send(f"Could not load data for {league_code}", ephemeral=True)
def _clean_name(self, name: str) -> str:
import re
# Remove common prefixes like "1. ", "1 "
# Remove common suffixes like " FC", " AFC", etc. (case insensitive)
# 1. Remove numbering prefix (e.g. "1. FC Koln" -> "FC Koln")
name = re.sub(r'^\d+[\.\s]+', '', name)
# 2. Remove common suffixes
# \b ensures whole word match
suffixes = [
"FC", "AFC", "CF", "SC", "BV", "NV", "AC", "AS", "Hotspur"
]
pattern = r'\b(' + '|'.join(suffixes) + r')\b'
clean = re.sub(pattern, '', name, flags=re.IGNORECASE)
# 3. Cleanup extra spaces and special chars
clean = clean.replace("&", "")
clean = re.sub(r'\s+', ' ', clean).strip()
return clean
def _is_interested(self, team_name: str, interested_list: list) -> bool:
if not interested_list: return False
# Normalize Data Name
# e.g. "1. FC Köln" -> "Köln"
raw_clean = self._clean_name(team_name).lower()
# Aliases Map (Normalized keys)
aliases = {
"man utd": ["manchester united"],
"man use": ["manchester united"],
"man city": ["manchester city"],
"psg": ["paris saint-germain"],
"spurs": ["tottenham"],
"wolves": ["wolverhampton wanderers"],
"brighton": ["brighton hove albion"],
"leverkusen": ["bayer leverkusen"],
"dortmund": ["borussia dortmund"],
"bayern": ["bayern munich", "bayern munchen"],
"inter": ["inter milano", "internazionale"],
"milan": ["ac milan"],
"real": ["real madrid"],
"atletico": ["atletico madrid", "my atletico"],
"barca": ["fc barcelona"],
"koln": ["1. fc koln", "fc koln"]
}
for interested in interested_list:
# Normalize Configured Name
# e.g. "1. FC Köln" -> "koln"
user_clean = self._clean_name(interested).lower()
# 1. Direct Comparison (Clean vs Clean)
# "koln" == "koln"
if user_clean == raw_clean: return True
if user_clean in raw_clean: return True # "chelsea" in "chelsea"
# 2. Check Aliases (Bidirectional check)
# If user saved "Man Utd", check if matches "Manchester United"
# If user saved "Manchester United", check if matches "Man Utd"
for alias, targets in aliases.items():
alias_clean = self._clean_name(alias).lower()
# Check if 'interested' is an alias
if user_clean == alias_clean:
# Does the real team name match any target?
for t in targets:
if self._clean_name(t).lower() == raw_clean: return True
# Check if current team 'raw_clean' is effectively one of the targets
# And user has the alias
# (Complex, sticking to simple match first)
pass
# 3. Fallback: Raw substring
if interested.lower() in team_name.lower(): return True
return False
async def build_standings_embed(self, guild_id: int, league_code: str) -> Optional[discord.Embed]:
api = await self._get_api(guild_id)
if not api: return None
data = await api.get_standings(league_code)
if not data or 'standings' not in data: return None
table = None
for s in data['standings']:
if s['type'] == 'TOTAL':
table = s['table']
break
if not table: return None
comp_name = data.get('competition', {}).get('name', league_code)
# Interested Teams
interested_config = await self.config_repo.get(guild_id, "FOOTBALL_TEAMS", "")
interested_teams = [t.strip().lower() for t in interested_config.split(',')] if interested_config else []
# ANSI Table Construction
lines = []
# Header: Hạng(3) CLB(15) Tr(2) T-H-B(7) HS(3) Đ(3)
header = f"\u001b[1;37m{'Hạng':<4} {'CLB':<16} {'Tr':<2} {'T-H-B':<7} {'HS':<3} {'Đ':<3}\u001b[0m"
lines.append(header)
lines.append("\u001b[0;30m" + "-" * 42 + "\u001b[0m")
for team in table[:25]:
pos = str(team['position'])
name = team['team']['name']
# Use Clean Name for display (Shorter)
display_name = self._clean_name(name)
# Smart Truncate
if len(display_name) > 15:
display_name = display_name[:14] + ""
played = str(team['playedGames'])
won = str(team['won'])
draw = str(team['draw'])
lost = str(team['lost'])
pts = str(team['points'])
gd = str(team['goalDifference'])
wdl = f"{won}-{draw}-{lost}"
# Default Coloring
style = "\u001b[0;37m"
suffix = "\u001b[0m"
is_interested = self._is_interested(name, interested_teams)
if is_interested:
# Blue Background to simulate highlight
style = "\u001b[0;37;44m"
elif int(pos) <= 4:
style = "\u001b[0;32m" # Green Text
elif int(pos) >= 18:
style = "\u001b[0;31m" # Red Text
line = f"{style}{pos:<4} {display_name:<16} {played:<2} {wdl:<7} {gd:<3} {pts:<3}{suffix}"
lines.append(line)
embed = discord.Embed(title=f"🏆 {comp_name}", color=discord.Color.from_rgb(44, 47, 51))
embed.description = f"```ansi\n{chr(10).join(lines)}\n```"
embed.set_footer(text="Xanh Dương: Đội của bạn | Xanh Lá: Top 4 | Đỏ: Nhóm xuống hạng")
return embed
@fb_group.command(name="standings", description="View league standings")
@app_commands.describe(league_code="League Code (e.g. PL, PD, SA, BL1, FL1, CL)")
async def standings_slash(self, interaction: discord.Interaction, league_code: str = "PL"):
await interaction.response.defer()
embed = await self.build_standings_embed(interaction.guild_id, league_code)
if not embed:
return await interaction.followup.send(f"Could not load standings for `{league_code}`. Check API Key or Code.")
view = self.StandingsView(self, interaction.guild_id)
await interaction.followup.send(embed=embed, view=view)
@fb_group.command(name="schedule", description="View upcoming match schedule")
@app_commands.describe(league="Filter by league code (e.g. PL, PD)")
async def schedule_slash(self, interaction: discord.Interaction, league: Optional[str] = None):
# Default to 10 days to fit within typical API limits while showing "upcoming" context
days = 10
await interaction.response.defer()
api = await self._get_api(interaction.guild_id)
if not api:
return await interaction.followup.send("⚠️ API Key missing. Configure in Dashboard.")
today = datetime.now()
end_date = today + timedelta(days=days)
matches = await api.get_matches_range(
today.strftime("%Y-%m-%d"),
end_date.strftime("%Y-%m-%d")
)
if not matches:
return await interaction.followup.send("Không tìm thấy trận đấu nào.")
# Filter Logic (Leagues)
config_leagues = await self.config_repo.get(interaction.guild_id, "FOOTBALL_LEAGUES", "")
target_leagues = set()
if config_leagues:
target_leagues.update([l.strip().lower() for l in config_leagues.split(',') if l.strip()])
if league: target_leagues.add(league.lower())
filtered = []
if target_leagues:
for m in matches:
comp_name = m.get('competition', {}).get('name', '').lower()
comp_code = m.get('competition', {}).get('code', '').lower()
if any(t in comp_name or t == comp_code for t in target_leagues):
filtered.append(m)
else:
filtered = matches # No filter = All matches (might be too many, but API usually limits free tier anyway)
if not filtered:
return await interaction.followup.send("Không có trận đấu nào trong các giải đã chọn.")
# Sort by time
filtered.sort(key=lambda x: x['utcDate'])
# Grid Layout using Embed Fields
# Title: Lịch Thi Đấu
embed = discord.Embed(title=f"📅 Lịch Thi Đấu ({len(filtered)} trận)", color=discord.Color.blue())
count = 0
for m in filtered[:18]: # Limit 18 matches (Discord limit 25 fields, let's keep it safe)
home = m['homeTeam']['name']
away = m['awayTeam']['name']
# Parse Time
dt = datetime.fromisoformat(m['utcDate'].replace('Z', '+00:00'))
# Format: 22:00 22/01
time_str = dt.strftime('%H:%M %d/%m')
status = m['status']
# Icon/State
state_text = f"🕐 {time_str}"
score_text = "vs"
if status in ['FINISHED', 'IN_PLAY', 'PAUSED']:
s_home = m['score']['fullTime']['home']
s_away = m['score']['fullTime']['away']
score_text = f"{s_home} - {s_away}"
if status == 'FINISHED':
state_text = f"✅ KT {dt.strftime('%d/%m')}"
else:
state_text = f"🔴 Phút {m.get('minute', '?')}"
# Highlight Interested Teams
interested_config = await self.config_repo.get(interaction.guild_id, "FOOTBALL_TEAMS", "")
interested_teams = [t.strip().lower() for t in interested_config.split(',')] if interested_config else []
is_interested = any(it in home.lower() for it in interested_teams) or \
any(it in away.lower() for it in interested_teams)
# Card Title
# Use Icons if possible, else Bold
# 🏠 Man Utd vs 🚌 Man City
# If interested, add Star
prefix = "" if is_interested else ""
field_name = f"{prefix}{home} {score_text} {away}"
field_val = f"{state_text}\n🏆 {m['competition']['code']}"
embed.add_field(name=field_name, value=field_val, inline=True)
count += 1
if len(filtered) > 18:
embed.set_footer(text=f"Còn {len(filtered) - 18} trận đấu khác...")
await interaction.followup.send(embed=embed)
@fb_group.command(name="sub", description="Subscribe to team updates")
async def sub_slash(self, interaction: discord.Interaction, team_name: str):
await interaction.response.defer()
api = await self._get_api(interaction.guild_id)
if not api: return await interaction.followup.send("Missing API Key.")
team = await api.search_team(team_name)
if not team:
return await interaction.followup.send(f"❌ Could not find team: {team_name}")
success = await self.repo.add_subscription(interaction.guild_id, interaction.channel_id, team['name'], team['id'])
if success:
await interaction.followup.send(f"✅ Subscribed to **{team['name']}** updates in this channel.")
else:
await interaction.followup.send(f"⚠️ Already subscribed to {team['name']}.")
@fb_group.command(name="unsub", description="Unsubscribe from team")
async def unsub_slash(self, interaction: discord.Interaction, team_name: str):
await interaction.response.defer()
success = await self.repo.remove_subscription(interaction.guild_id, team_name)
msg = f"✅ Unsubscribed from {team_name}." if success else "❌ Subscription not found."
await interaction.followup.send(msg)
@fb_group.command(name="list", description="List subscriptions")
async def list_slash(self, interaction: discord.Interaction):
subs = await self.repo.get_guild_subscriptions(interaction.guild_id)
if not subs:
return await interaction.response.send_message("No football subscriptions in this server.")
msg = "**Subscriptions:**\n" + "\n".join([f"{s.team_name} (<#{s.channel_id}>)" for s in subs])
await interaction.response.send_message(msg)
@tasks.loop(minutes=2)
async def check_matches(self):
subs = await self.repo.get_all_subscriptions()
if not subs: return
guild_ids = set(s.guild_id for s in subs)
for gid in guild_ids:
api = await self._get_api(gid)
if not api: continue
matches = await api.get_matches_today()
if not matches: continue
guild_subs = [s for s in subs if s.guild_id == gid]
for m in matches:
mid = m['id']
home_id = m['homeTeam']['id']
away_id = m['awayTeam']['id']
status = m['status']
match_time = datetime.fromisoformat(m['utcDate'].replace('Z', '+00:00'))
relevant_subs = [s for s in guild_subs if s.team_id in [home_id, away_id]]
if not relevant_subs: continue
# 1. Upcoming Notification
if status == 'TIMED':
now = datetime.utcnow().replace(tzinfo=match_time.tzinfo)
diff = (match_time - now).total_seconds()
if 540 <= diff <= 660:
if mid not in self._notified_upcoming:
await self._notify(relevant_subs, m, "UPCOMING")
self._notified_upcoming.add(mid)
# 2. Result Notification
if status == 'FINISHED':
if mid not in self._notified_result:
await self._notify(relevant_subs, m, "FINISHED")
self._notified_result.add(mid)
async def _notify(self, subs, match, type):
home = match['homeTeam']['name']
away = match['awayTeam']['name']
if type == "UPCOMING":
title = "⚽ Trận đấu sắp diễn ra!"
desc = f"**{home}** vs **{away}**\nBắt đầu trong 10 phút!"
color = discord.Color.orange()
elif type == "FINISHED":
title = "🏁 Kết quả trận đấu"
score = f"{match['score']['fullTime']['home']} - {match['score']['fullTime']['away']}"
desc = f"**{home}** {score} **{away}**\nTrận đấu đã kết thúc."
color = discord.Color.gold()
embed = discord.Embed(title=title, description=desc, color=color)
target_channels = set(s.channel_id for s in subs)
for cid in target_channels:
channel = self.bot.get_channel(cid)
if channel:
try:
await channel.send(embed=embed)
except: pass
@check_matches.before_loop
async def before_check_matches(self):
await self.bot.wait_until_ready()
async def setup(bot):
await bot.add_cog(FootballCog(bot))
+131
View File
@@ -0,0 +1,131 @@
import discord
from discord.ext import commands
from repositories import HomeDebtRepository
from repositories.config import ConfigRepository
from repositories.feature_toggle import FeatureToggleRepository
from utils.common import format_vnd
class HomeDebtCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.home_debt_repo = HomeDebtRepository()
self.config_repo = ConfigRepository()
self.feature_repo = FeatureToggleRepository()
async def get_allowed_channel_ids(self, guild_id: int) -> list[int]:
config_value = await self.config_repo.get(guild_id, "CHANNEL_HOME_DEBT_ID", "")
ids = []
if config_value:
for x in config_value.split(','):
try:
ids.append(int(x.strip()))
except ValueError:
pass
return ids
async def is_enabled(self, guild_id: int) -> bool:
return await self.feature_repo.get(guild_id, "home_debt")
async def is_correct_channel(self, ctx):
"""Kiểm tra xem command có được thực hiện trong đúng channel không"""
if not await self.is_enabled(ctx.guild.id):
return False
allowed_ids = await self.get_allowed_channel_ids(ctx.guild.id)
# New Rule: If no channels configured, allow ALL channels
if not allowed_ids:
return True
return ctx.channel.id in allowed_ids
@commands.command(name="hdadd", description="Thêm khoản chi tiêu mới")
async def add(self, ctx, amount: int):
"""Vì home chỉ có 2 người nên sẽ tự động thêm khoản chi tiêu cho người còn lại"""
try:
if not await self.is_correct_channel(ctx):
return
# Get info other user from home_debt table for THIS GUILD
other_user = await self.home_debt_repo.get_other(ctx.guild.id, ctx.author.id)
if not other_user:
await ctx.send("Không tìm thấy người dùng còn lại trong Guild này!")
return
other_user.value += round(amount / 2)
resp = await self.home_debt_repo.update_home_debt(other_user)
if not resp:
await ctx.send("Có lỗi xảy ra khi cập nhật dữ liệu")
return
await ctx.send(f"Đã thêm {format_vnd(round(amount * 1000 / 2))} cho {ctx.author.name}. Số dư hiện tại là {format_vnd(other_user.value * 1000)}")
except Exception as e:
await ctx.send(f"Có lỗi xảy ra: {str(e)}")
@commands.command(name="hdcheck", description="Kiểm tra số dư của bạn")
async def home_debt_check(self, ctx):
"""Kiểm tra số dư của mọi người"""
try:
if not await self.is_correct_channel(ctx):
return
# Get info user from home_debt table for THIS GUILD
users = await self.home_debt_repo.get_all(ctx.guild.id)
embed = discord.Embed(title=f"Số dư của mọi người tại {ctx.guild.name}", color=discord.Color.blue())
for user in users:
value = format_vnd(user.value * 1000)
member = ctx.guild.get_member(user.user_id)
name = member.display_name if member else f"User {user.user_id}"
embed.add_field(name=name, value=f"{value}", inline=False)
await ctx.send(embed=embed)
except Exception as e:
await ctx.send(f"Có lỗi xảy ra: {str(e)}")
@commands.command(name="hdvay", description="Vay nợ")
async def vay(self, ctx, amount: int):
"""Vay nợ"""
try:
if not await self.is_correct_channel(ctx):
return
# Get info user from home_debt table for THIS GUILD
user = await self.home_debt_repo.get(ctx.guild.id, ctx.author.id)
if not user:
user = await self.home_debt_repo.create_home_debt(ctx.guild.id, ctx.author.id, 0)
user.value += amount
resp = await self.home_debt_repo.update_home_debt(user)
if not resp:
await ctx.send("Có lỗi xảy ra khi cập nhật dữ liệu")
return
# Send message to user
await ctx.send(f"Đã vay {format_vnd(amount * 1000)} bởi {ctx.author.name}")
except Exception as e:
await ctx.send(f"Có lỗi xảy ra: {str(e)}")
@commands.command(name="hdtra", description="Trả nợ")
async def tra(self, ctx, amount: int):
"""Trả nợ"""
try:
if not await self.is_correct_channel(ctx):
return
# Get info user from home_debt table for THIS GUILD
user = await self.home_debt_repo.get(ctx.guild.id, ctx.author.id)
if not user:
user = await self.home_debt_repo.create_home_debt(ctx.guild.id, ctx.author.id, 0)
user.value -= amount
resp = await self.home_debt_repo.update_home_debt(user)
if not resp:
await ctx.send("Có lỗi xảy ra khi cập nhật dữ liệu")
return
# Send message to user
await ctx.send(f"Đã trả {format_vnd(amount * 1000)} từ {ctx.author.name}")
except Exception as e:
await ctx.send(f"Có lỗi xảy ra: {str(e)}")
async def setup(bot):
await bot.add_cog(HomeDebtCog(bot))
+388
View File
@@ -0,0 +1,388 @@
import discord
import asyncio
from discord.ext import commands
from typing import Set, Dict, List, Optional
from datetime import datetime
from repositories.noi_tu import NoiTuRepository
from repositories.score import ScoreRepository
from repositories.config import ConfigRepository
from repositories.feature_toggle import FeatureToggleRepository
# Game state class
class NoiTuGame:
def __init__(self):
self.is_active = False
self.current_word = ""
self.used_words: Set[str] = set()
self.last_player_id = None
self.last_player_name = None
self.last_message_time = None
self.timeout_task = None
self.channel = None
self.timer_message = None
self.timer_task = None
self.start_time = None
self.lock = asyncio.Lock()
class NoiTuCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.noi_tu_repo = NoiTuRepository()
self.score_repo = ScoreRepository()
self.config_repo = ConfigRepository()
self.feature_repo = FeatureToggleRepository()
self.games: Dict[int, NoiTuGame] = {}
def get_game_for_channel(self, channel_id: int) -> NoiTuGame:
if channel_id not in self.games:
self.games[channel_id] = NoiTuGame()
return self.games[channel_id]
async def is_enabled(self, guild_id: int) -> bool:
return await self.feature_repo.get(guild_id, "noi_tu")
async def get_allowed_channel_ids(self, guild_id: int) -> List[int]:
config = await self.config_repo.get(guild_id, "CHANNEL_NOI_TU_IDS", "")
ids = []
if config:
for x in config.split(','):
try:
ids.append(int(x.strip()))
except ValueError:
pass
return ids
async def get_admin_ids(self, guild_id: int) -> List[int]:
config = await self.config_repo.get(guild_id, "ADMIN_IDS", "")
ids = []
if config:
for x in config.split(','):
try:
ids.append(int(x.strip()))
except ValueError:
pass
return ids
async def is_correct_channel(self, ctx) -> bool:
if not await self.is_enabled(ctx.guild.id):
return False
allowed_ids = await self.get_allowed_channel_ids(ctx.guild.id)
# New Rule: If no channels configured, allow ALL channels
if not allowed_ids:
return True
return ctx.channel.id in allowed_ids
async def is_admin(self, ctx) -> bool:
admin_ids = await self.get_admin_ids(ctx.guild.id)
return ctx.author.id in admin_ids
# Helper functions
def get_first_word(self, word: str) -> str:
return word.strip().split()[0] if word else ''
def get_last_word(self, word: str) -> str:
return word.strip().split()[-1] if word else ''
def is_valid(self, prev, next: str) -> bool:
return self.get_last_word(prev) == self.get_first_word(next)
def format_time_remaining(self, seconds: int) -> str:
if seconds <= 0:
return "⏰ Hết thời gian!"
return f"⏰ Còn lại: {seconds} giây"
async def update_timer_message(self, game: NoiTuGame):
start_time = game.last_message_time
if not start_time:
return
for remaining in range(30, -1, -1):
if not game.is_active or not game.timer_message:
break
try:
if len(game.timer_message.embeds) == 0:
break
embed = game.timer_message.embeds[0]
embed.title = self.format_time_remaining(remaining)
if remaining <= 5:
embed.color = discord.Color.red()
elif remaining <= 10:
embed.color = discord.Color.orange()
elif remaining <= 20:
embed.color = discord.Color.yellow()
else:
embed.color = discord.Color.blue()
await game.timer_message.edit(embed=embed)
if remaining <= 0:
break
await asyncio.sleep(1)
except Exception as e:
print(f"Error updating timer: {e}")
break
async def game_timeout(self, game: NoiTuGame):
try:
for i in range(30):
if not game.is_active:
return
await asyncio.sleep(1)
if game.is_active and game.last_message_time:
time_diff = datetime.now() - game.last_message_time
if time_diff.total_seconds() >= 30:
embed = discord.Embed(
title="⏰ Hết thời gian!",
description=f"Không ai trả lời trong 30 giây.\n"
f"Từ cuối cùng: **{game.current_word}**\n"
f"Trò chơi kết thúc!",
color=discord.Color.orange()
)
if game.last_player_name:
embed.add_field(
name="👑 Người chiến thắng",
value=f"**{game.last_player_name}** - Từ cuối: **{game.current_word}**",
inline=False
)
if len(game.used_words) > 2 and game.channel and game.channel.guild:
# Update score for the guild
await self.score_repo.upsert_or_increment_point(game.channel.guild.id, game.last_player_id, game.last_player_name, 1)
if game.channel:
await game.channel.send(embed=embed)
self.reset_game(game)
except asyncio.CancelledError:
pass
def reset_game(self, game: NoiTuGame):
game.is_active = False
game.current_word = ""
game.used_words.clear()
game.last_player_id = None
game.last_player_name = None
game.channel = None
game.last_message_time = None
game.timeout_task = None
game.timer_message = None
game.timer_task = None
game.start_time = None
@commands.command(name='start')
async def start_game(self, ctx):
if not await self.is_correct_channel(ctx):
return
game = self.get_game_for_channel(ctx.channel.id)
if game.is_active:
await ctx.send("❌ Trò chơi đã đang diễn ra!")
return
start_word = await self.noi_tu_repo.get_random_word()
if not start_word:
await ctx.send("❌ Không có từ nào trong cơ sở dữ liệu!")
return
game.is_active = True
game.current_word = start_word
game.used_words = {start_word}
game.last_player_id = None
game.last_player_name = None
game.channel = ctx.channel
game.last_message_time = datetime.now()
game.start_time = datetime.now()
game.timeout_task = asyncio.create_task(self.game_timeout(game))
embed = discord.Embed(
title="🎮 Trò chơi Nối Từ đã bắt đầu!",
description=f"Từ đầu tiên: **{start_word}**\n\n"
f"📝 **Luật chơi:**\n"
f"• Mỗi từ gồm 2 từ ghép tiếng Việt (VD: 'âm cao', 'cao độ')\n"
f"• Từ đầu của từ mới phải trùng với từ cuối của từ trước\n"
f"• Không được lặp lại từ đã dùng\n"
f"• Thời gian trả lời tối đa: 30 giây\n\n"
f"⏰ Thời gian bắt đầu: {datetime.now().strftime('%H:%M:%S')}",
color=discord.Color.green()
)
await ctx.send(embed=embed)
@commands.command(name='end')
async def end_game(self, ctx):
if not await self.is_correct_channel(ctx):
return
game = self.get_game_for_channel(ctx.channel.id)
if not game.is_active:
await ctx.send("❌ Không có trò chơi nào đang diễn ra!")
return
if game.timeout_task:
game.timeout_task.cancel()
if game.timer_task:
game.timer_task.cancel()
game_duration = ""
if game.start_time:
duration = datetime.now() - game.start_time
minutes = int(duration.total_seconds() // 60)
seconds = int(duration.total_seconds() % 60)
game_duration = f"{minutes} phút {seconds} giây"
embed = discord.Embed(
title="🏁 Trò chơi Nối Từ đã kết thúc!",
description=f"📊 **Thống kê:**\n"
f"• Số từ đã sử dụng: {len(game.used_words)}\n"
f"• Từ cuối cùng: {game.current_word if game.current_word else 'N/A'}\n"
f"• Thời gian chơi: {game_duration}",
color=discord.Color.red()
)
if game.last_player_name:
embed.add_field(
name="👑 Người chiến thắng",
value=f"**{game.last_player_name}** - Từ cuối: **{game.current_word}**",
inline=False
)
await ctx.send(embed=embed)
self.reset_game(game)
@commands.command(name='add')
async def add_word(self, ctx, *, word: str):
if not await self.is_correct_channel(ctx):
return
if not await self.is_admin(ctx):
await ctx.send("❌ Chỉ admin mới có thể thêm từ!")
return
if not await self.noi_tu_repo.is_valid_word(word):
await ctx.send("❌ Từ phải có đúng 2 từ ghép!")
return
success = await self.noi_tu_repo.add(word)
if success:
embed = discord.Embed(
title="✅ Thêm từ thành công!",
description=f"Từ: **{word}**",
color=discord.Color.green()
)
await ctx.send(embed=embed)
else:
if await self.noi_tu_repo.is_exist(word):
await ctx.send(f"❌ Từ '{word}' đã tồn tại trong cơ sở dữ liệu!")
else:
await ctx.send("❌ Có lỗi xảy ra khi thêm từ!")
@commands.command(name='remove')
async def remove_word(self, ctx, *, word: str):
if not await self.is_correct_channel(ctx):
return
if not await self.is_admin(ctx):
await ctx.send("❌ Chỉ admin mới có thể xóa từ!")
return
if not await self.noi_tu_repo.is_exist(word):
await ctx.send(f"❌ Từ '{word}' không tồn tại trong cơ sở dữ liệu!")
return
success = await self.noi_tu_repo.remove(word)
if success:
embed = discord.Embed(
title="✅ Xóa từ thành công!",
description=f"Đã xóa từ: **{word}**",
color=discord.Color.green()
)
await ctx.send(embed=embed)
else:
await ctx.send("❌ Có lỗi xảy ra khi xóa từ!")
@commands.Cog.listener()
async def on_message(self, message):
if message.author.bot or not message.guild:
return
# Check if enabled for this guild
if not await self.is_enabled(message.guild.id):
return
# Check channel
allowed_ids = await self.get_allowed_channel_ids(message.guild.id)
if message.channel.id not in allowed_ids:
return
game = self.get_game_for_channel(message.channel.id)
if not game.is_active:
return
word = message.content.strip().lower()
if len(word.split()) != 2:
return
if not await self.noi_tu_repo.is_valid_word(word):
return
if game.last_player_id == message.author.id:
return
if word in game.used_words:
await message.add_reaction('')
await message.channel.send(f"❌ Từ '{word}' đã được sử dụng!")
return
if game.current_word:
if not self.is_valid(game.current_word, word):
return
if not await self.noi_tu_repo.is_exist(word):
await message.add_reaction('')
return
async with game.lock:
if not self.is_valid(game.current_word, word):
return
await message.add_reaction('')
game.current_word = word
game.used_words.add(word)
game.last_player_id = message.author.id
game.last_player_name = message.author.display_name
game.last_message_time = datetime.now()
if game.timeout_task:
game.timeout_task.cancel()
game.timeout_task = asyncio.create_task(self.game_timeout(game))
if game.timer_task:
game.timer_task.cancel()
next_hint = self.get_last_word(word).upper()
embed = discord.Embed(
title="⏰ Còn lại: 30 giây",
color=discord.Color.blue()
)
game.timer_message = await message.channel.send(embed=embed)
game.timer_task = asyncio.create_task(self.update_timer_message(game))
async def setup(bot):
await bot.add_cog(NoiTuCog(bot))
+57
View File
@@ -0,0 +1,57 @@
import discord
from discord.ext import commands
from repositories import ScoreRepository
from repositories.feature_toggle import FeatureToggleRepository
class ScoreCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.score_repo = ScoreRepository()
self.feature_repo = FeatureToggleRepository()
async def is_enabled(self, guild_id: int) -> bool:
return await self.feature_repo.get(guild_id, "score")
@commands.command(name="ncheck", description="Kiểm tra thông tin tài khoản của bạn")
async def score_check(self, ctx: commands.Context):
"""Kiểm tra thông tin tài khoản của bạn"""
if not await self.is_enabled(ctx.guild.id):
return
# Check if user is registered in database. If not, create a new one
score = await self.score_repo.get(ctx.guild.id, ctx.author.id)
if not score:
await ctx.send("Bạn chưa có tài khoản score")
await self.score_repo.create(ctx.guild.id, ctx.author.id, 0)
score = await self.score_repo.get(ctx.guild.id, ctx.author.id)
# Kiểm tra lại sau khi tạo
if not score:
await ctx.send("Có lỗi xảy ra khi tạo tài khoản score")
return
await ctx.send(f"Thông tin tài khoản của bạn: {score.point}")
@commands.command(name="rank", description="list rank")
async def list_rank(self, ctx: commands.Context):
"""Kiểm tra rank"""
if not await self.is_enabled(ctx.guild.id):
return
points = await self.score_repo.get_all(ctx.guild.id)
msg = "```\n"
msg += f"{'No.':<4} {'Name':<20} {'Win':>8}\n"
msg += "-" * 34 + "\n"
for i, b in enumerate(points, start=1):
msg += f"{i:<4} {b.user_name:<20} {b.point:>8}\n"
msg += "```"
await ctx.send(msg)
async def incr(self, guild_id, user_id, user_name, amount):
if not await self.is_enabled(guild_id):
return
await self.score_repo.upsert_or_increment_point(guild_id, user_id, user_name, amount)
async def setup(bot):
await bot.add_cog(ScoreCog(bot))