From cfa79cbbaf42a8f74a2cd4bca4d1d495b4d597f1 Mon Sep 17 00:00:00 2001 From: Dmitrii Morozov Date: Tue, 7 May 2024 16:50:38 +0200 Subject: Python code style --- Formatter.py | 95 -------------------------------------- FortniteClient.py | 108 -------------------------------------------- FortniteEvents.py | 35 -------------- FortniteStatusNotifier.py | 37 --------------- TelegramBot.py | 59 ------------------------ Types.py | 50 -------------------- app_types.py | 50 ++++++++++++++++++++ commands.py | 12 ++--- formatter.py | 95 ++++++++++++++++++++++++++++++++++++++ fortnite_client.py | 108 ++++++++++++++++++++++++++++++++++++++++++++ fortnite_events.py | 35 ++++++++++++++ fortnite_status_notifier.py | 37 +++++++++++++++ persistence.py | 68 ++++++++++++++-------------- telegram_bot.py | 59 ++++++++++++++++++++++++ tgbot.py | 10 ++-- 15 files changed, 429 insertions(+), 429 deletions(-) delete mode 100644 Formatter.py delete mode 100755 FortniteClient.py delete mode 100644 FortniteEvents.py delete mode 100644 FortniteStatusNotifier.py delete mode 100644 TelegramBot.py delete mode 100644 Types.py create mode 100644 app_types.py create mode 100644 formatter.py create mode 100755 fortnite_client.py create mode 100644 fortnite_events.py create mode 100644 fortnite_status_notifier.py create mode 100644 telegram_bot.py diff --git a/Formatter.py b/Formatter.py deleted file mode 100644 index fa7a316..0000000 --- a/Formatter.py +++ /dev/null @@ -1,95 +0,0 @@ -from telebot import formatting -import typing -from Types import * - -# Status -def format_fortnite_status(fortnite_status): - statuses = [__format_fortnite_service_status(service_status) for service_status in fortnite_status.serviceStatuses] - return formatting.format_text( - formatting.mbold("Fortnite status"), - "", - '\n'.join(statuses), - separator='\n') - -def __format_status(status): - if (status == True): - return u'\u2705' - else: - return u'\u274c' - -def __format_fortnite_service_status(fortnite_service_status): - return formatting.format_text( - formatting.mbold(fortnite_service_status.serviceName), - __format_status(fortnite_service_status.status), - separator=': ') - -# User -def format_users(users: typing.List[User]): - users_formatted = [format_user(user) for user in users] - return formatting.format_text( - '\n\n'.join(users_formatted), - separator='\n') - -def format_user(user: User): - return formatting.format_text( - formatting.mbold("User: ") + user.display_name, - formatting.mbold("ID: ") + user.id, - separator='\n') - -# Stats -def format_user_stats_list(stats: typing.List[UserStats]): - stats_formatted = [__format_stats(single_stats) for single_stats in stats] - return formatting.format_text( - '\n\n'.join(stats_formatted), - separator='\n') - -def format_user_stats_difference(old_user_stats: typing.List[UserStats], new_user_stats: typing.List[UserStats]): - stats_formatted = [] - - for stats in old_user_stats: - matched = [x for x in new_user_stats if x.user_id == stats.user_id][0] - stats_formatted.append(__format_stats_difference(stats, matched)) - - return formatting.format_text( - '\n\n'.join(stats_formatted), - separator='\n') - -def __format_stats_difference(old_user_stats: UserStats, new_user_stats: UserStats): - return formatting.format_text( - formatting.mbold("User: ") + new_user_stats.user_display_name, - formatting.mbold("ID: ") + new_user_stats.user_id, - formatting.mbold("Level: ") + "{}{}".format(str(new_user_stats.level), __format_stat_difference(old_user_stats.level, new_user_stats.level)), - formatting.mbold("Matches played: ") + "{}{}".format(str(new_user_stats.matches_played), __format_stat_difference(old_user_stats.matches_played, new_user_stats.matches_played)), - formatting.mbold("Total kills: ") + "{}{}".format(str(new_user_stats.kills), __format_stat_difference(old_user_stats.kills, new_user_stats.kills)), - formatting.mbold("Wins: ") + "{}{}".format(str(new_user_stats.wins), __format_stat_difference(old_user_stats.wins, new_user_stats.wins)), - separator='\n') - -def __format_stat_difference(old_stat_value: int, new_stat_value: int): - if old_stat_value != new_stat_value: - return " \(\+ {}\)".format(str(new_stat_value - old_stat_value)) - else: - return "" - -def __format_stats(user_stats: UserStats): - return formatting.format_text( - formatting.mbold("User: ") + user_stats.user_display_name, - formatting.mbold("ID: ") + user_stats.user_id, - formatting.mbold("Level: ") + str(user_stats.level), - formatting.mbold("Matches played: ") + str(user_stats.matches_played), - formatting.mbold("Total kills: ") + str(user_stats.kills), - formatting.mbold("Wins: ") + str(user_stats.wins), - separator='\n') - -def format_friend_online(display_name: str, party_size: int): - if party_size == 1: - text = 'is playing Fortnite\!' - elif party_size == 2: - text = 'is playing Fortnite together with {} friend\!'.format(str(party_size - 1)) - elif party_size > 2: - text = 'is playing Fortnite together with {} friends\!'.format(str(party_size - 1)) - return formatting.format_text( - u'\u2b50', - formatting.mbold('{}'.format(display_name)), - text, - u'\u2b50', - separator=' ') \ No newline at end of file diff --git a/FortniteClient.py b/FortniteClient.py deleted file mode 100755 index c52aacb..0000000 --- a/FortniteClient.py +++ /dev/null @@ -1,108 +0,0 @@ -#!/usr/bin/python3 - -import fortnitepy -import json -import os -import typing -from device_auth import DeviceAuth -from Types import * - -__fortnite_account_key__ = 'fornite-account-key' - -# Intefaces for events -class FriendPresenceObserver: - async def update(self, display_name: str, playing: bool, party_size: int) -> None: - pass - -class ClientInitObserver: - async def on_event(self, fortniteClient) -> None: - pass - -class FortniteClient(fortnitepy.Client): - - __device_auth: DeviceAuth - __friendPresenceObserver: FriendPresenceObserver - __clientInitObserver: ClientInitObserver - - def __init__(self, clientInitObserver: ClientInitObserver, friendPresenceObserver: FriendPresenceObserver): - self.__device_auth = DeviceAuth() - self.__clientInitObserver = clientInitObserver - self.__friendPresenceObserver = friendPresenceObserver - if self.__device_auth.device_auth_file_exists(): - self.__auth_device_auth() - else: - self.__auth_authorization_code() - - async def get_friends(self) -> typing.List[User]: - return [User.from_fortnite_friend(friend) for friend in self.friends] - - async def find_user(self, display_name: str): - user: fortnitepy.User = await self.fetch_user_by_display_name(display_name) - return User.from_fortnite_friend(user) - - def __auth_authorization_code(self): - code = input("Enter authorization code (https://www.epicgames.com/id/api/redirect?clientId=3446cd72694c4a4485d81b77adbb2141&responseType=code):") - super().__init__( - auth=fortnitepy.AuthorizationCodeAuth( - code = code - ) - ) - - def __auth_device_auth(self): - device_auth_details = self.__device_auth.get_device_auth_details().get(__fortnite_account_key__, {}) - super().__init__( - auth=fortnitepy.DeviceAuth( - **device_auth_details - ) - ) - - # Generate auth details if none were supplied yet - async def __generate_auth_details(self): - if not self.__device_auth.device_auth_file_exists(): - device_auth_data = await self.auth.generate_device_auth() - details = { - 'device_id': device_auth_data['deviceId'], - 'account_id': device_auth_data['accountId'], - 'secret': device_auth_data['secret'], - } - self.auth.__dict__.update(details) - self.dispatch_event( - 'device_auth_generate', - details, - __fortnite_account_key__ - ) - - async def event_device_auth_generate(self, details, email): - self.__device_auth.store_device_auth_details(email, details) - - async def event_ready(self): - await self.__generate_auth_details() - await self.__clientInitObserver.on_event(self) - - async def event_friend_request(self, request: typing.Union[fortnitepy.friend.IncomingPendingFriend, fortnitepy.friend.OutgoingPendingFriend]): - await IncomingFriendRequestEvent.on_event(request) - - async def event_friend_presence(self, before, after: fortnitepy.Presence): - await FriendPresenceEvent.on_event(before, after, self.__friendPresenceObserver) - -class FriendPresenceEvent: - async def on_event(before, after: fortnitepy.Presence, friendPresenceObserver: FriendPresenceObserver): - if before is not None and after is not None: - if before.playing != after.playing: - print('FriendPresence changed for user {}, before {}, after {}'.format(after.friend.display_name, before.playing, after.playing)) - party_size: int = 1 - if after.has_properties: - party: fortnitepy.PresenceParty = after.party - if party is not None and party.playercount is not None: - party_size = int(party.playercount) - await friendPresenceObserver.update( - after.friend.display_name, - after.playing, - party_size) - -class IncomingFriendRequestEvent: - async def on_event(request: typing.Union[fortnitepy.friend.IncomingPendingFriend, fortnitepy.friend.OutgoingPendingFriend]): - if isinstance(request, fortnitepy.friend.IncomingPendingFriend): - incoming_request = typing.cast(fortnitepy.friend.IncomingPendingFriend, request) - print('Accepting friend request from {}'.format(incoming_request.display_name)) - #await incoming_request.accept() \ No newline at end of file diff --git a/FortniteEvents.py b/FortniteEvents.py deleted file mode 100644 index c096818..0000000 --- a/FortniteEvents.py +++ /dev/null @@ -1,35 +0,0 @@ -import fortnitepy -import typing -import time -from FortniteClient import * -from TelegramBot import * -from persistence import PresenceRepository -from Formatter import * - -class ClientInitObserverImpl(ClientInitObserver): - async def on_event(self, fortniteClient: FortniteClient) -> None: - print('----------------') - print('FortniteClient ready as:') - print(fortniteClient.user.display_name) - print(fortniteClient.user.id) - print('----------------') - -class FriendPresenceObserverImpl(FriendPresenceObserver): - - __telegramBot: TelegramBot - __presenceRepository: PresenceRepository - - def __init__(self, telegramBot: TelegramBot, presenceRepository: PresenceRepository): - self.__telegramBot = telegramBot - self.__presenceRepository = presenceRepository - - async def update(self, display_name: str, playing: bool, party_size: int) -> None: - if playing: - last_presence = self.__presenceRepository.getLastUserPresence(display_name) - diff = time.time() - last_presence - if diff > 60 * 60: # 60 minutes - await self.__notifyFriendPlaying(display_name, party_size) - self.__presenceRepository.setLastUserPresence(display_name, time.time()) - - async def __notifyFriendPlaying(self, display_name: str, party_size: int): - await self.__telegramBot.send_message_to_all(format_friend_online(display_name, party_size)) \ No newline at end of file diff --git a/FortniteStatusNotifier.py b/FortniteStatusNotifier.py deleted file mode 100644 index 7932380..0000000 --- a/FortniteStatusNotifier.py +++ /dev/null @@ -1,37 +0,0 @@ -from telebot import formatting -import time -import asyncio -from pythonFortniteStatus.FortniteStatus import * - -# Polling interval in seconds -__polling_interval__ = 5 * 60 # 5 minutes - -class FortniteStatusObserver: - async def update(self, fortnite_status) -> None: - pass - -class FortniteStatusNotifier: - - __fortniteStatusObserver: FortniteStatusObserver - __fortniteStatus: FortniteStatus - __lastFortniteStatus: any - - def __init__(self, fortniteStatusObserver: FortniteStatusObserver): - self.__fortniteStatusObserver = fortniteStatusObserver - self.__fortniteStatus = FortniteStatus() - - async def run(self): - # Initialize status - self.__lastFortniteStatus = self.__fortniteStatus.getStatus() - while True: - await self.__readStatus() - await asyncio.sleep(__polling_interval__) - - async def __readStatus(self): - serviceStatusTmp = self.__fortniteStatus.getStatus() - if serviceStatusTmp != self.__lastFortniteStatus: - await self.__notify(serviceStatusTmp) - self.__lastFortniteStatus = serviceStatusTmp - - async def __notify(self, fortniteStatus): - await self.__fortniteStatusObserver.update(fortniteStatus) \ No newline at end of file diff --git a/TelegramBot.py b/TelegramBot.py deleted file mode 100644 index 50db950..0000000 --- a/TelegramBot.py +++ /dev/null @@ -1,59 +0,0 @@ -import telebot -import os -import logging -import traceback -import sys -from telebot.async_telebot import AsyncTeleBot -from persistence import UserRepository - -class CommandHandler: - async def handle(self, message: telebot.types.Message): - pass - -class TelegramBot: - __bot: AsyncTeleBot - __userRepository: UserRepository - - def __init__(self, userRepository: UserRepository): - self.__userRepository = userRepository - - # Check token in environment variables - if "TELEBOT_BOT_TOKEN" not in os.environ: - raise AssertionError("Please configure TELEBOT_BOT_TOKEN as environment variables") - - self.__bot = telebot.async_telebot.AsyncTeleBot( - token=os.environ["TELEBOT_BOT_TOKEN"], - exception_handler=ExceptionHandler()) - - async def run(self): - await self.__bot.polling() - - def register_command_handler(self, command: str, command_handler: CommandHandler): - self.__bot.register_message_handler( - command_handler.handle, - commands=[ command ]) - - async def send_message_to_all(self, message_text: str): - for user in self.__userRepository.getAllUsers(): - try: - await self.__bot.send_message( - user[0], - message_text, - parse_mode='MarkdownV2' - ) - except Exception as error: - if 'bot was kicked from the group chat' in str(error): - self.__userRepository.removeChat(user[0]) - - async def reply(self, message, message_text): - await self.__bot.reply_to( - message, - message_text, - parse_mode='MarkdownV2') - -class ExceptionHandler(telebot.ExceptionHandler): - def handle(self, exception): - logging.error('Exception happened: {}'.format(str(exception))) - print(traceback.format_exc()) - sys.exit('Exiting with telebot exception') - return True \ No newline at end of file diff --git a/Types.py b/Types.py deleted file mode 100644 index fc2e034..0000000 --- a/Types.py +++ /dev/null @@ -1,50 +0,0 @@ -import fortnitepy - -class UserStats: - user_id: str - user_display_name: str - level: int - matches_played: int - kills: int - wins: int - -class User: - - id: str - display_name: str - - __fortniteUser: fortnitepy.user.UserBase - - def from_fortnite_friend(user: fortnitepy.user.UserBase): - - if user is None: - return None - - instance = User() - - instance.id = user.id - instance.display_name = user.display_name - instance.__fortniteUser = user - - return instance - - async def fetch_stats(self) -> UserStats: - stats = await self.__fortniteUser.fetch_br_stats() - bp_level: float = await self.__fortniteUser.fetch_battlepass_level(season=29) # TODO - combined_stats = stats.get_combined_stats() - device_stats = {} - if 'keyboardmouse' in combined_stats: - device_stats = combined_stats['keyboardmouse'] - else: - device_stats = combined_stats['gamepad'] - - stats = UserStats() - - stats.user_id = self.id - stats.user_display_name = self.display_name - stats.level = int(bp_level//1) - stats.matches_played = device_stats['matchesplayed'] - stats.kills = device_stats['kills'] - stats.wins = device_stats['wins'] - - return stats \ No newline at end of file diff --git a/app_types.py b/app_types.py new file mode 100644 index 0000000..699928a --- /dev/null +++ b/app_types.py @@ -0,0 +1,50 @@ +import fortnitepy + +class UserStats: + user_id: str + user_display_name: str + level: int + matches_played: int + kills: int + wins: int + +class User: + + id: str + display_name: str + + __fortnite_user: fortnitepy.user.UserBase + + def from_fortnite_friend(user: fortnitepy.user.UserBase): + + if user is None: + return None + + instance = User() + + instance.id = user.id + instance.display_name = user.display_name + instance.__fortnite_user = user + + return instance + + async def fetch_stats(self) -> UserStats: + stats = await self.__fortnite_user.fetch_br_stats() + bp_level: float = await self.__fortnite_user.fetch_battlepass_level(season=29) # TODO + combined_stats = stats.get_combined_stats() + device_stats = {} + if 'keyboardmouse' in combined_stats: + device_stats = combined_stats['keyboardmouse'] + else: + device_stats = combined_stats['gamepad'] + + stats = UserStats() + + stats.user_id = self.id + stats.user_display_name = self.display_name + stats.level = int(bp_level//1) + stats.matches_played = device_stats['matchesplayed'] + stats.kills = device_stats['kills'] + stats.wins = device_stats['wins'] + + return stats \ No newline at end of file diff --git a/commands.py b/commands.py index 4cf330b..feba4b8 100644 --- a/commands.py +++ b/commands.py @@ -1,8 +1,8 @@ import telebot -from TelegramBot import * -from Formatter import * +from telegram_bot import * +from formatter import * from persistence import * -from FortniteClient import FortniteClient +from fortnite_client import FortniteClient from pythonFortniteStatus.FortniteStatus import * class StartCommand(CommandHandler): @@ -19,7 +19,7 @@ class StartCommand(CommandHandler): alias = message.chat.username else: alias = message.chat.title - self.__user_repository.putUser(message.chat.id, alias) + self.__user_repository.put_user(message.chat.id, alias) await self.__telegram_bot.reply(message, 'This chat successfully registered to receive Fortnite updates') class GetStatusCommand(CommandHandler): @@ -74,7 +74,7 @@ class GetTodayStatsCommand(CommandHandler): self.__stats_repository = stats_repository async def handle(self, message: telebot.types.Message): - persisted_stats = self.__stats_repository.getStats() + persisted_stats = self.__stats_repository.get_stats() friends = await self.__fortnite_client.get_friends() current_stats = [await friend.fetch_stats() for friend in friends] await self.__telegram_bot.reply(message, format_user_stats_difference(persisted_stats, current_stats)) @@ -91,4 +91,4 @@ class RecordStatsCommand(CommandHandler): async def handle(self, message: telebot.types.Message): friends = await self.__fortnite_client.get_friends() for friend in friends: - await self.__stats_repository.putStats(friend) \ No newline at end of file + await self.__stats_repository.put_stats(friend) \ No newline at end of file diff --git a/formatter.py b/formatter.py new file mode 100644 index 0000000..9efc249 --- /dev/null +++ b/formatter.py @@ -0,0 +1,95 @@ +from telebot import formatting +import typing +from app_types import * + +# Status +def format_fortnite_status(fortnite_status): + statuses = [__format_fortnite_service_status(service_status) for service_status in fortnite_status.serviceStatuses] + return formatting.format_text( + formatting.mbold("Fortnite status"), + "", + '\n'.join(statuses), + separator='\n') + +def __format_status(status): + if (status == True): + return u'\u2705' + else: + return u'\u274c' + +def __format_fortnite_service_status(fortnite_service_status): + return formatting.format_text( + formatting.mbold(fortnite_service_status.serviceName), + __format_status(fortnite_service_status.status), + separator=': ') + +# User +def format_users(users: typing.List[User]): + users_formatted = [format_user(user) for user in users] + return formatting.format_text( + '\n\n'.join(users_formatted), + separator='\n') + +def format_user(user: User): + return formatting.format_text( + formatting.mbold("User: ") + user.display_name, + formatting.mbold("ID: ") + user.id, + separator='\n') + +# Stats +def format_user_stats_list(stats: typing.List[UserStats]): + stats_formatted = [__format_stats(single_stats) for single_stats in stats] + return formatting.format_text( + '\n\n'.join(stats_formatted), + separator='\n') + +def format_user_stats_difference(old_user_stats: typing.List[UserStats], new_user_stats: typing.List[UserStats]): + stats_formatted = [] + + for stats in old_user_stats: + matched = [x for x in new_user_stats if x.user_id == stats.user_id][0] + stats_formatted.append(__format_stats_difference(stats, matched)) + + return formatting.format_text( + '\n\n'.join(stats_formatted), + separator='\n') + +def __format_stats_difference(old_user_stats: UserStats, new_user_stats: UserStats): + return formatting.format_text( + formatting.mbold("User: ") + new_user_stats.user_display_name, + formatting.mbold("ID: ") + new_user_stats.user_id, + formatting.mbold("Level: ") + "{}{}".format(str(new_user_stats.level), __format_stat_difference(old_user_stats.level, new_user_stats.level)), + formatting.mbold("Matches played: ") + "{}{}".format(str(new_user_stats.matches_played), __format_stat_difference(old_user_stats.matches_played, new_user_stats.matches_played)), + formatting.mbold("Total kills: ") + "{}{}".format(str(new_user_stats.kills), __format_stat_difference(old_user_stats.kills, new_user_stats.kills)), + formatting.mbold("Wins: ") + "{}{}".format(str(new_user_stats.wins), __format_stat_difference(old_user_stats.wins, new_user_stats.wins)), + separator='\n') + +def __format_stat_difference(old_stat_value: int, new_stat_value: int): + if old_stat_value != new_stat_value: + return " \(\+ {}\)".format(str(new_stat_value - old_stat_value)) + else: + return "" + +def __format_stats(user_stats: UserStats): + return formatting.format_text( + formatting.mbold("User: ") + user_stats.user_display_name, + formatting.mbold("ID: ") + user_stats.user_id, + formatting.mbold("Level: ") + str(user_stats.level), + formatting.mbold("Matches played: ") + str(user_stats.matches_played), + formatting.mbold("Total kills: ") + str(user_stats.kills), + formatting.mbold("Wins: ") + str(user_stats.wins), + separator='\n') + +def format_friend_online(display_name: str, party_size: int): + if party_size == 1: + text = 'is playing Fortnite\!' + elif party_size == 2: + text = 'is playing Fortnite together with {} friend\!'.format(str(party_size - 1)) + elif party_size > 2: + text = 'is playing Fortnite together with {} friends\!'.format(str(party_size - 1)) + return formatting.format_text( + u'\u2b50', + formatting.mbold('{}'.format(display_name)), + text, + u'\u2b50', + separator=' ') \ No newline at end of file diff --git a/fortnite_client.py b/fortnite_client.py new file mode 100755 index 0000000..16b2652 --- /dev/null +++ b/fortnite_client.py @@ -0,0 +1,108 @@ +#!/usr/bin/python3 + +import fortnitepy +import json +import os +import typing +from device_auth import DeviceAuth +from app_types import * + +__fortnite_account_key__ = 'fornite-account-key' + +# Intefaces for events +class FriendPresenceObserver: + async def update(self, display_name: str, playing: bool, party_size: int) -> None: + pass + +class ClientInitObserver: + async def on_event(self, fortnite_client) -> None: + pass + +class FortniteClient(fortnitepy.Client): + + __device_auth: DeviceAuth + __friend_presence_observer: FriendPresenceObserver + __client_init_observer: ClientInitObserver + + def __init__(self, client_init_observer: ClientInitObserver, friend_presence_observer: FriendPresenceObserver): + self.__device_auth = DeviceAuth() + self.__client_init_observer = client_init_observer + self.__friend_presence_observer = friend_presence_observer + if self.__device_auth.device_auth_file_exists(): + self.__auth_device_auth() + else: + self.__auth_authorization_code() + + async def get_friends(self) -> typing.List[User]: + return [User.from_fortnite_friend(friend) for friend in self.friends] + + async def find_user(self, display_name: str): + user: fortnitepy.User = await self.fetch_user_by_display_name(display_name) + return User.from_fortnite_friend(user) + + def __auth_authorization_code(self): + code = input("Enter authorization code (https://www.epicgames.com/id/api/redirect?clientId=3446cd72694c4a4485d81b77adbb2141&responseType=code):") + super().__init__( + auth=fortnitepy.AuthorizationCodeAuth( + code = code + ) + ) + + def __auth_device_auth(self): + device_auth_details = self.__device_auth.get_device_auth_details().get(__fortnite_account_key__, {}) + super().__init__( + auth=fortnitepy.DeviceAuth( + **device_auth_details + ) + ) + + # Generate auth details if none were supplied yet + async def __generate_auth_details(self): + if not self.__device_auth.device_auth_file_exists(): + device_auth_data = await self.auth.generate_device_auth() + details = { + 'device_id': device_auth_data['deviceId'], + 'account_id': device_auth_data['accountId'], + 'secret': device_auth_data['secret'], + } + self.auth.__dict__.update(details) + self.dispatch_event( + 'device_auth_generate', + details, + __fortnite_account_key__ + ) + + async def event_device_auth_generate(self, details, email): + self.__device_auth.store_device_auth_details(email, details) + + async def event_ready(self): + await self.__generate_auth_details() + await self.__client_init_observer.on_event(self) + + async def event_friend_request(self, request: typing.Union[fortnitepy.friend.IncomingPendingFriend, fortnitepy.friend.OutgoingPendingFriend]): + await IncomingFriendRequestEvent.on_event(request) + + async def event_friend_presence(self, before, after: fortnitepy.Presence): + await FriendPresenceEvent.on_event(before, after, self.__friend_presence_observer) + +class FriendPresenceEvent: + async def on_event(before, after: fortnitepy.Presence, friend_presence_observer: FriendPresenceObserver): + if before is not None and after is not None: + if before.playing != after.playing: + print('FriendPresence changed for user {}, before {}, after {}'.format(after.friend.display_name, before.playing, after.playing)) + party_size: int = 1 + if after.has_properties: + party: fortnitepy.PresenceParty = after.party + if party is not None and party.playercount is not None: + party_size = int(party.playercount) + await friend_presence_observer.update( + after.friend.display_name, + after.playing, + party_size) + +class IncomingFriendRequestEvent: + async def on_event(request: typing.Union[fortnitepy.friend.IncomingPendingFriend, fortnitepy.friend.OutgoingPendingFriend]): + if isinstance(request, fortnitepy.friend.IncomingPendingFriend): + incoming_request = typing.cast(fortnitepy.friend.IncomingPendingFriend, request) + print('Accepting friend request from {}'.format(incoming_request.display_name)) + #await incoming_request.accept() \ No newline at end of file diff --git a/fortnite_events.py b/fortnite_events.py new file mode 100644 index 0000000..d9e143f --- /dev/null +++ b/fortnite_events.py @@ -0,0 +1,35 @@ +import fortnitepy +import typing +import time +from fortnite_client import * +from telegram_bot import * +from persistence import PresenceRepository +from formatter import * + +class ClientInitObserverImpl(ClientInitObserver): + async def on_event(self, fortnite_client: FortniteClient) -> None: + print('----------------') + print('FortniteClient ready as:') + print(fortnite_client.user.display_name) + print(fortnite_client.user.id) + print('----------------') + +class FriendPresenceObserverImpl(FriendPresenceObserver): + + __telegram_bot: TelegramBot + __presence_repository: PresenceRepository + + def __init__(self, telegram_bot: TelegramBot, presence_repository: PresenceRepository): + self.__telegram_bot = telegram_bot + self.__presence_repository = presence_repository + + async def update(self, display_name: str, playing: bool, party_size: int) -> None: + if playing: + last_presence = self.__presence_repository.get_last_user_presence(display_name) + diff = time.time() - last_presence + if diff > 60 * 60: # 60 minutes + await self.__notify_friend_playing(display_name, party_size) + self.__presence_repository.set_last_user_presence(display_name, time.time()) + + async def __notify_friend_playing(self, display_name: str, party_size: int): + await self.__telegram_bot.send_message_to_all(format_friend_online(display_name, party_size)) \ No newline at end of file diff --git a/fortnite_status_notifier.py b/fortnite_status_notifier.py new file mode 100644 index 0000000..edf3009 --- /dev/null +++ b/fortnite_status_notifier.py @@ -0,0 +1,37 @@ +from telebot import formatting +import time +import asyncio +from pythonFortniteStatus.FortniteStatus import * + +# Polling interval in seconds +__polling_interval__ = 5 * 60 # 5 minutes + +class FortniteStatusObserver: + async def update(self, fortnite_status) -> None: + pass + +class FortniteStatusNotifier: + + __fortnite_status_observer: FortniteStatusObserver + __fortnite_status: FortniteStatus + __last_fortnite_status: any + + def __init__(self, fortnite_status_observer: FortniteStatusObserver): + self.__fortnite_status_observer = fortnite_status_observer + self.__fortnite_status = FortniteStatus() + + async def run(self): + # Initialize status + self.__last_fortnite_status = self.__fortniteStatus.getStatus() + while True: + await self.__read_status() + await asyncio.sleep(__polling_interval__) + + async def __read_status(self): + service_status_tmp = self.__fortniteStatus.getStatus() + if service_status_tmp != self.__last_fortnite_status: + await self.__notify(service_status_tmp) + self.__last_fortnite_status = service_status_tmp + + async def __notify(self, fortnite_status): + await self.__fortnite_status_observer.update(fortnite_status) \ No newline at end of file diff --git a/persistence.py b/persistence.py index f8d7097..512836e 100644 --- a/persistence.py +++ b/persistence.py @@ -1,5 +1,5 @@ import sqlite3, typing -from Types import * +from app_types import * class UserRepository: @@ -7,29 +7,29 @@ class UserRepository: self.__initialize() def __initialize(self): - cur = self.__getConnection().cursor() + cur = self.__get_connection().cursor() cur.execute("CREATE TABLE IF NOT EXISTS user(chat_id INT, alias TEXT)") cur.execute("CREATE UNIQUE INDEX IF NOT EXISTS chat_id_idx ON user(chat_id)") - def __getConnection(self): + def __get_connection(self): return sqlite3.connect('db.sqlite') - def getUser(self, chat_id): - connection = self.__getConnection() + def get_user(self, chat_id): + connection = self.__get_connection() cur = connection.cursor() query = "select * from user where chat_id = {chat_id}".format(chat_id = chat_id) cur.execute(query) return cur.fetchone() - def getAllUsers(self): - connection = self.__getConnection() + def get_all_users(self): + connection = self.__get_connection() cur = connection.cursor() query = "select * from user" cur.execute(query) return cur.fetchall() - def removeChat(self, chat_id): - connection = self.__getConnection() + def remove_chat(self, chat_id): + connection = self.__get_connection() cur = connection.cursor() query = "DELETE FROM user where chat_id = {chat_id}".format( chat_id = chat_id) @@ -37,9 +37,9 @@ class UserRepository: connection.commit() - def putUser(self, chat_id, alias): - if not self.getUser(chat_id): - connection = self.__getConnection() + def put_user(self, chat_id, alias): + if not self.get_user(chat_id): + connection = self.__get_connection() cur = connection.cursor() query = "INSERT INTO user(chat_id, alias) VALUES({chat_id}, '{text}')".format( chat_id = chat_id, @@ -52,18 +52,18 @@ class StatsRepository: def __init__(self): self.__initialize() - def __getConnection(self): + def __get_connection(self): return sqlite3.connect('db.sqlite') def __initialize(self): - cur = self.__getConnection().cursor() + cur = self.__get_connection().cursor() cur.execute("CREATE TABLE IF NOT EXISTS stats(user_id TEXT, display_name TEXT, level INT, matches_played INT, kills INT, wins INT)") cur.execute("CREATE UNIQUE INDEX IF NOT EXISTS user_id_idx ON stats(user_id)") - async def putStats(self, user: User): + async def put_stats(self, user: User): stats: UserStats = await user.fetch_stats() - connection = self.__getConnection() + connection = self.__get_connection() cur = connection.cursor() query = "INSERT OR REPLACE INTO stats(user_id, display_name, level, matches_played, kills, wins) VALUES('{user_id}', '{display_name}', {level}, {matches_played}, {kills}, {wins})".format( user_id = user.id, @@ -75,39 +75,39 @@ class StatsRepository: cur.execute(query) connection.commit() - def getStats(self) -> typing.List[UserStats]: - connection = self.__getConnection() + def get_stats(self) -> typing.List[UserStats]: + connection = self.__get_connection() cur = connection.cursor() query = "SELECT * FROM stats" cur.execute(query) result = cur.fetchall() - return [self.__mapFromDb(stats) for stats in result] + return [self.__map_from_db(stats) for stats in result] - def __mapFromDb(self, record): - userStats = UserStats() - userStats.user_id = str(record[0]) - userStats.user_display_name = str(record[1]) - userStats.level = int(record[2]) - userStats.matches_played = int(record[3]) - userStats.kills = int(record[4]) - userStats.wins = int(record[5]) - return userStats + def __map_from_db(self, record): + user_stats = UserStats() + user_stats.user_id = str(record[0]) + user_stats.user_display_name = str(record[1]) + user_stats.level = int(record[2]) + user_stats.matches_played = int(record[3]) + user_stats.kills = int(record[4]) + user_stats.wins = int(record[5]) + return user_stats class PresenceRepository: def __init__(self): self.__initialize() - def __getConnection(self): + def __get_connection(self): return sqlite3.connect('db.sqlite') def __initialize(self): - cur = self.__getConnection().cursor() + cur = self.__get_connection().cursor() cur.execute("CREATE TABLE IF NOT EXISTS user_presence(display_name TEXT, last_online INT)") cur.execute("CREATE UNIQUE INDEX IF NOT EXISTS display_name_idx ON user_presence(display_name)") - def getLastUserPresence(self, display_name): - connection = self.__getConnection() + def get_last_user_presence(self, display_name): + connection = self.__get_connection() cur = connection.cursor() query = "select last_online from user_presence where display_name = '{display_name}'".format(display_name = display_name) cur.execute(query) @@ -118,8 +118,8 @@ class PresenceRepository: else: return 0 - def setLastUserPresence(self, display_name: str, last_online: int): - connection = self.__getConnection() + def set_last_user_presence(self, display_name: str, last_online: int): + connection = self.__get_connection() cur = connection.cursor() query = "INSERT OR REPLACE INTO user_presence(display_name, last_online) VALUES('{display_name}', {last_online})".format( display_name = display_name, diff --git a/telegram_bot.py b/telegram_bot.py new file mode 100644 index 0000000..26d3071 --- /dev/null +++ b/telegram_bot.py @@ -0,0 +1,59 @@ +import telebot +import os +import logging +import traceback +import sys +from telebot.async_telebot import AsyncTeleBot +from persistence import UserRepository + +class CommandHandler: + async def handle(self, message: telebot.types.Message): + pass + +class TelegramBot: + __bot: AsyncTeleBot + __user_repository: UserRepository + + def __init__(self, user_repository: UserRepository): + self.__user_repository = user_repository + + # Check token in environment variables + if "TELEBOT_BOT_TOKEN" not in os.environ: + raise AssertionError("Please configure TELEBOT_BOT_TOKEN as environment variables") + + self.__bot = telebot.async_telebot.AsyncTeleBot( + token=os.environ["TELEBOT_BOT_TOKEN"], + exception_handler=ExceptionHandler()) + + async def run(self): + await self.__bot.polling() + + def register_command_handler(self, command: str, command_handler: CommandHandler): + self.__bot.register_message_handler( + command_handler.handle, + commands=[ command ]) + + async def send_message_to_all(self, message_text: str): + for user in self.__user_repository.get_all_users(): + try: + await self.__bot.send_message( + user[0], + message_text, + parse_mode='MarkdownV2' + ) + except Exception as error: + if 'bot was kicked from the group chat' in str(error): + self.__user_repository.remove_chat(user[0]) + + async def reply(self, message, message_text): + await self.__bot.reply_to( + message, + message_text, + parse_mode='MarkdownV2') + +class ExceptionHandler(telebot.ExceptionHandler): + def handle(self, exception): + logging.error('Exception happened: {}'.format(str(exception))) + print(traceback.format_exc()) + sys.exit('Exiting with telebot exception') + return True \ No newline at end of file diff --git a/tgbot.py b/tgbot.py index a0e248b..6fbd07c 100755 --- a/tgbot.py +++ b/tgbot.py @@ -1,12 +1,12 @@ #!/usr/bin/python3 import os, time, telebot, asyncio, telebot.async_telebot, nest_asyncio, sys -from FortniteStatusNotifier import * -from Formatter import * -from FortniteClient import * -from FortniteEvents import * +from fortnite_status_notifier import * +from formatter import * +from fortnite_client import * +from fortnite_events import * from persistence import UserRepository, StatsRepository, PresenceRepository -from TelegramBot import TelegramBot, CommandHandler +from telegram_bot import TelegramBot, CommandHandler from commands import * class FortniteStatusObserverImpl(FortniteStatusObserver): -- cgit v1.2.3