From aaef526c6c37ed1b3d4016092a2937433cd4d7d7 Mon Sep 17 00:00:00 2001 From: Dmitrii Morozov Date: Tue, 7 May 2024 04:08:27 +0200 Subject: Minor refactoring, extract commands to separate classes --- tgbot.py | 114 +++++++++------------------------------------------------------ 1 file changed, 15 insertions(+), 99 deletions(-) (limited to 'tgbot.py') diff --git a/tgbot.py b/tgbot.py index 2044d41..1bab35e 100755 --- a/tgbot.py +++ b/tgbot.py @@ -6,10 +6,8 @@ from Formatter import * from FortniteClient import * from FortniteEvents import * from persistence import UserRepository, StatsRepository, PresenceRepository - -# Check token in environment variables -if "TELEBOT_BOT_TOKEN" not in os.environ: - raise AssertionError("Please configure TELEBOT_BOT_TOKEN as environment variables") +from TelegramBot import TelegramBot, CommandHandler +from Commands import * class ClientInitObserver(ClientInit): async def on_event(self) -> None: @@ -19,11 +17,11 @@ class ClientInitObserver(ClientInit): # Record user stats if len(statsRepository.getStats()) == 0: - await record_user_stats() + await recordStatsCommand.handle(None) class FortniteStatusObserver(Observer): async def update(self, fortniteStatus) -> None: - await send_message_to_all(formatFortniteStatus(fortniteStatus)) + await telegramBot.send_message_to_all(formatFortniteStatus(fortniteStatus)) class FortnitePresenceObserver(PresenceObserver): async def update(self, display_name: str, playing: bool, party_size: int) -> None: @@ -35,107 +33,25 @@ class FortnitePresenceObserver(PresenceObserver): prensenceRepository.setLastUserPresence(display_name, time.time()) async def __notifyFriendPlaying(self, display_name: str, party_size: int): - await send_message_to_all(formatFriendOnline(display_name, party_size)) - -class ExceptionHandler(telebot.ExceptionHandler): - def handle(self, exception): - logging.error('Exception happened: {}'.format(str(exception))) - sys.exit('Exiting with telebot exception') - return True + await telegramBot.send_message_to_all(formatFriendOnline(display_name, party_size)) -bot = telebot.async_telebot.AsyncTeleBot( - token=os.environ["TELEBOT_BOT_TOKEN"], - exception_handler=ExceptionHandler()) userRepository = UserRepository() statsRepository = StatsRepository() prensenceRepository = PresenceRepository() +telegramBot = TelegramBot(userRepository) fortniteStatusWrapper = FortniteStatusNotifier(FortniteStatusObserver()) fortniteClient = FortniteClient(FortnitePresenceObserver(), ClientInitObserver()) +recordStatsCommand = RecordStatsCommand(telegramBot, fortniteClient, statsRepository) -@bot.message_handler(commands = ['start']) -async def startCommand(message: telebot.types.Message): - if message.chat.type == 'private': - alias = message.chat.username - else: - alias = message.chat.title - userRepository.putUser(message.chat.id, alias) - await reply(message, 'This chat successfully registered to receive Fortnite updates') - -@bot.message_handler(commands = ['status']) -async def getStatus(message): - await reply(message, formatFortniteStatus(fortniteStatus.getStatus())) - -@bot.message_handler(commands = ['friends']) -async def getFriends(message): - friends = await fortniteClient.get_friends() - await reply(message, formatUsers(friends)) - -@bot.message_handler(commands = ['stats']) -async def getStats(message): - friends = await fortniteClient.get_friends() - stats = [await friend.fetch_stats() for friend in friends] - await reply(message, formatUserStatsList(stats)) - -@bot.message_handler(commands = ['todaystats']) -async def getTodayStats(message): - persisted_stats = statsRepository.getStats() - friends = await fortniteClient.get_friends() - current_stats = [await friend.fetch_stats() for friend in friends] - await reply(message, formatUserStatsDifference(persisted_stats, current_stats)) - -@bot.message_handler(commands = ['recordstats']) -async def recordStats(message): - await record_user_stats() - -@bot.message_handler(commands = ['find']) -async def findUser(message): - arg = message.text.split() - if len(arg) > 1: - search_user_display_name = arg[1] - user: User = await fortniteClient.find_user(search_user_display_name) - if user is not None: - await reply(message, formatUser(user)) - else: - await reply(message, 'User {} not found'.format(search_user_display_name)) - else: - await reply(message, 'Usage: /find username') - -@bot.message_handler(commands = ['add']) -async def addUser(message): - arg = message.text.split() - if len(arg) > 1: - user_id = arg[1] - await fortniteClient.add_friend(user_id) - await reply(message, 'Send friend request successfully') - else: - await reply(message, 'Usage: /add username') - -async def send_message_to_all(message_text: str): - for user in userRepository.getAllUsers(): - try: - await bot.send_message( - user[0], - message_text, - parse_mode='MarkdownV2' - ) - except Exception as error: - if 'bot was kicked from the group chat' in str(error): - userRepository.removeChat(user[0]) - -async def reply(message, message_text): - await bot.reply_to( - message, - message_text, - parse_mode='MarkdownV2') - -async def record_user_stats(): - print('Recording user stats') - friends = await fortniteClient.get_friends() - for friend in friends: - await statsRepository.putStats(friend) +telegramBot.register_command_handler('start', StartCommand(telegramBot, userRepository)) +telegramBot.register_command_handler('status', GetStatusCommand(telegramBot)) +telegramBot.register_command_handler('friends', GetFriendsCommand(telegramBot, fortniteClient)) +telegramBot.register_command_handler('stats', GetStatsCommand(telegramBot, fortniteClient)) +telegramBot.register_command_handler('todaystats', GetTodayStatsCommand(telegramBot, fortniteClient, statsRepository)) +telegramBot.register_command_handler('recordstats', recordStatsCommand) async def run_tgbot(): - await bot.polling() + await telegramBot.run() async def run_fortniteStatusWrapper(): await fortniteStatusWrapper.run() @@ -147,7 +63,7 @@ async def run_record_stats(): while True: t = time.localtime() if t.tm_hour == 5: # only at 05:00 - await record_user_stats() + await recordStatsCommand.handle(None) await asyncio.sleep(60 * 60) # 1 hour async def run_all(): -- cgit v1.2.3