From ee3e87a6ec19878d56e8f386b20c58e4d9b211b3 Mon Sep 17 00:00:00 2001 From: Dmitrii Morozov Date: Tue, 7 May 2024 17:15:25 +0200 Subject: Modules --- .gitmodules | 3 - app_types.py | 50 -------------- app_types/__init__.py | 50 ++++++++++++++ commands.py | 94 --------------------------- device_auth.py | 22 ------- formatter.py | 95 --------------------------- formatter/__init__.py | 95 +++++++++++++++++++++++++++ fortnite_client.py | 108 ------------------------------- fortnite_client/__init__.py | 106 ++++++++++++++++++++++++++++++ fortnite_client/device_auth.py | 22 +++++++ fortnite_client/fortnite_events.py | 35 ++++++++++ fortnite_events.py | 35 ---------- fortnite_status/__init__.py | 119 ++++++++++++++++++++++++++++++++++ fortnite_status_notifier.py | 37 ----------- main.py | 68 +++++++++++++++++++ persistence.py | 129 ------------------------------------- persistence/__init__.py | 128 ++++++++++++++++++++++++++++++++++++ pythonFortniteStatus | 1 - telegram_bot.py | 59 ----------------- telegram_bot/__init__.py | 59 +++++++++++++++++ telegram_bot/commands.py | 94 +++++++++++++++++++++++++++ tgbot.py | 68 ------------------- 22 files changed, 776 insertions(+), 701 deletions(-) delete mode 100644 .gitmodules delete mode 100644 app_types.py create mode 100644 app_types/__init__.py delete mode 100644 commands.py delete mode 100644 device_auth.py delete mode 100644 formatter.py create mode 100644 formatter/__init__.py delete mode 100755 fortnite_client.py create mode 100644 fortnite_client/__init__.py create mode 100644 fortnite_client/device_auth.py create mode 100644 fortnite_client/fortnite_events.py delete mode 100644 fortnite_events.py create mode 100644 fortnite_status/__init__.py delete mode 100644 fortnite_status_notifier.py create mode 100755 main.py delete mode 100644 persistence.py create mode 100644 persistence/__init__.py delete mode 160000 pythonFortniteStatus delete mode 100644 telegram_bot.py create mode 100644 telegram_bot/__init__.py create mode 100644 telegram_bot/commands.py delete mode 100755 tgbot.py diff --git a/.gitmodules b/.gitmodules deleted file mode 100644 index 136e707..0000000 --- a/.gitmodules +++ /dev/null @@ -1,3 +0,0 @@ -[submodule "pythonFortniteStatus"] - path = pythonFortniteStatus - url = https://git.snoopdesigns.site/python-fortnite-status.git diff --git a/app_types.py b/app_types.py deleted file mode 100644 index 699928a..0000000 --- a/app_types.py +++ /dev/null @@ -1,50 +0,0 @@ -import fortnitepy - -class UserStats: - user_id: str - user_display_name: str - level: int - matches_played: int - kills: int - wins: int - -class User: - - id: str - display_name: str - - __fortnite_user: fortnitepy.user.UserBase - - def from_fortnite_friend(user: fortnitepy.user.UserBase): - - if user is None: - return None - - instance = User() - - instance.id = user.id - instance.display_name = user.display_name - instance.__fortnite_user = user - - return instance - - async def fetch_stats(self) -> UserStats: - stats = await self.__fortnite_user.fetch_br_stats() - bp_level: float = await self.__fortnite_user.fetch_battlepass_level(season=29) # TODO - combined_stats = stats.get_combined_stats() - device_stats = {} - if 'keyboardmouse' in combined_stats: - device_stats = combined_stats['keyboardmouse'] - else: - device_stats = combined_stats['gamepad'] - - stats = UserStats() - - stats.user_id = self.id - stats.user_display_name = self.display_name - stats.level = int(bp_level//1) - stats.matches_played = device_stats['matchesplayed'] - stats.kills = device_stats['kills'] - stats.wins = device_stats['wins'] - - return stats \ No newline at end of file diff --git a/app_types/__init__.py b/app_types/__init__.py new file mode 100644 index 0000000..699928a --- /dev/null +++ b/app_types/__init__.py @@ -0,0 +1,50 @@ +import fortnitepy + +class UserStats: + user_id: str + user_display_name: str + level: int + matches_played: int + kills: int + wins: int + +class User: + + id: str + display_name: str + + __fortnite_user: fortnitepy.user.UserBase + + def from_fortnite_friend(user: fortnitepy.user.UserBase): + + if user is None: + return None + + instance = User() + + instance.id = user.id + instance.display_name = user.display_name + instance.__fortnite_user = user + + return instance + + async def fetch_stats(self) -> UserStats: + stats = await self.__fortnite_user.fetch_br_stats() + bp_level: float = await self.__fortnite_user.fetch_battlepass_level(season=29) # TODO + combined_stats = stats.get_combined_stats() + device_stats = {} + if 'keyboardmouse' in combined_stats: + device_stats = combined_stats['keyboardmouse'] + else: + device_stats = combined_stats['gamepad'] + + stats = UserStats() + + stats.user_id = self.id + stats.user_display_name = self.display_name + stats.level = int(bp_level//1) + stats.matches_played = device_stats['matchesplayed'] + stats.kills = device_stats['kills'] + stats.wins = device_stats['wins'] + + return stats \ No newline at end of file diff --git a/commands.py b/commands.py deleted file mode 100644 index feba4b8..0000000 --- a/commands.py +++ /dev/null @@ -1,94 +0,0 @@ -import telebot -from telegram_bot import * -from formatter import * -from persistence import * -from fortnite_client import FortniteClient -from pythonFortniteStatus.FortniteStatus import * - -class StartCommand(CommandHandler): - - __telegram_bot: TelegramBot - __user_repository: UserRepository - - def __init__(self, telegram_bot: TelegramBot, user_repository: UserRepository): - self.__telegram_bot = telegram_bot - self.__user_repository = user_repository - - async def handle(self, message: telebot.types.Message): - if message.chat.type == 'private': - alias = message.chat.username - else: - alias = message.chat.title - self.__user_repository.put_user(message.chat.id, alias) - await self.__telegram_bot.reply(message, 'This chat successfully registered to receive Fortnite updates') - -class GetStatusCommand(CommandHandler): - - __telegram_bot: TelegramBot - __fortnite_status: FortniteStatus - - def __init__(self, telegram_bot: TelegramBot): - self.__telegram_bot = telegram_bot - self.__fortnite_status = FortniteStatus() - - async def handle(self, message: telebot.types.Message): - await self.__telegram_bot.reply(message, format_fortnite_status(self.__fortnite_status.getStatus())) - -class GetFriendsCommand(CommandHandler): - - __telegram_bot: TelegramBot - __fortnite_client: FortniteClient - - def __init__(self, telegram_bot: TelegramBot, fortnite_client: FortniteClient): - self.__telegram_bot = telegram_bot - self.__fortnite_client = fortnite_client - - async def handle(self, message: telebot.types.Message): - friends = await self.__fortnite_client.get_friends() - await self.__telegram_bot.reply(message, format_users(friends)) - - -class GetStatsCommand(CommandHandler): - - __telegram_bot: TelegramBot - __fortnite_client: FortniteClient - - def __init__(self, telegram_bot: TelegramBot, fortnite_client: FortniteClient): - self.__telegram_bot = telegram_bot - self.__fortnite_client = fortnite_client - - async def handle(self, message: telebot.types.Message): - friends = await self.__fortnite_client.get_friends() - stats = [await friend.fetch_stats() for friend in friends] - await self.__telegram_bot.reply(message, format_user_stats_list(stats)) - -class GetTodayStatsCommand(CommandHandler): - - __telegram_bot: TelegramBot - __fortnite_client: FortniteClient - __stats_repository: StatsRepository - - def __init__(self, telegram_bot: TelegramBot, fortnite_client: FortniteClient, stats_repository: StatsRepository): - self.__telegram_bot = telegram_bot - self.__fortnite_client = fortnite_client - self.__stats_repository = stats_repository - - async def handle(self, message: telebot.types.Message): - persisted_stats = self.__stats_repository.get_stats() - friends = await self.__fortnite_client.get_friends() - current_stats = [await friend.fetch_stats() for friend in friends] - await self.__telegram_bot.reply(message, format_user_stats_difference(persisted_stats, current_stats)) - -class RecordStatsCommand(CommandHandler): - - __fortnite_client: FortniteClient - __stats_repository: StatsRepository - - def __init__(self, fortnite_client: FortniteClient, stats_repository: StatsRepository): - self.__fortnite_client = fortnite_client - self.__stats_repository = stats_repository - - async def handle(self, message: telebot.types.Message): - friends = await self.__fortnite_client.get_friends() - for friend in friends: - await self.__stats_repository.put_stats(friend) \ No newline at end of file diff --git a/device_auth.py b/device_auth.py deleted file mode 100644 index 4d46918..0000000 --- a/device_auth.py +++ /dev/null @@ -1,22 +0,0 @@ -import json -import os - -__filename__ = 'device-auth.json' - -class DeviceAuth: - - def device_auth_file_exists(self): - return os.path.isfile(__filename__) - - def get_device_auth_details(self): - if os.path.isfile(__filename__): - with open(__filename__, 'r') as fp: - return json.load(fp) - return {} - - def store_device_auth_details(self, email, details): - existing = self.get_device_auth_details() - existing[email] = details - - with open(__filename__, 'w') as fp: - json.dump(existing, fp) \ No newline at end of file diff --git a/formatter.py b/formatter.py deleted file mode 100644 index 9efc249..0000000 --- a/formatter.py +++ /dev/null @@ -1,95 +0,0 @@ -from telebot import formatting -import typing -from app_types import * - -# Status -def format_fortnite_status(fortnite_status): - statuses = [__format_fortnite_service_status(service_status) for service_status in fortnite_status.serviceStatuses] - return formatting.format_text( - formatting.mbold("Fortnite status"), - "", - '\n'.join(statuses), - separator='\n') - -def __format_status(status): - if (status == True): - return u'\u2705' - else: - return u'\u274c' - -def __format_fortnite_service_status(fortnite_service_status): - return formatting.format_text( - formatting.mbold(fortnite_service_status.serviceName), - __format_status(fortnite_service_status.status), - separator=': ') - -# User -def format_users(users: typing.List[User]): - users_formatted = [format_user(user) for user in users] - return formatting.format_text( - '\n\n'.join(users_formatted), - separator='\n') - -def format_user(user: User): - return formatting.format_text( - formatting.mbold("User: ") + user.display_name, - formatting.mbold("ID: ") + user.id, - separator='\n') - -# Stats -def format_user_stats_list(stats: typing.List[UserStats]): - stats_formatted = [__format_stats(single_stats) for single_stats in stats] - return formatting.format_text( - '\n\n'.join(stats_formatted), - separator='\n') - -def format_user_stats_difference(old_user_stats: typing.List[UserStats], new_user_stats: typing.List[UserStats]): - stats_formatted = [] - - for stats in old_user_stats: - matched = [x for x in new_user_stats if x.user_id == stats.user_id][0] - stats_formatted.append(__format_stats_difference(stats, matched)) - - return formatting.format_text( - '\n\n'.join(stats_formatted), - separator='\n') - -def __format_stats_difference(old_user_stats: UserStats, new_user_stats: UserStats): - return formatting.format_text( - formatting.mbold("User: ") + new_user_stats.user_display_name, - formatting.mbold("ID: ") + new_user_stats.user_id, - formatting.mbold("Level: ") + "{}{}".format(str(new_user_stats.level), __format_stat_difference(old_user_stats.level, new_user_stats.level)), - formatting.mbold("Matches played: ") + "{}{}".format(str(new_user_stats.matches_played), __format_stat_difference(old_user_stats.matches_played, new_user_stats.matches_played)), - formatting.mbold("Total kills: ") + "{}{}".format(str(new_user_stats.kills), __format_stat_difference(old_user_stats.kills, new_user_stats.kills)), - formatting.mbold("Wins: ") + "{}{}".format(str(new_user_stats.wins), __format_stat_difference(old_user_stats.wins, new_user_stats.wins)), - separator='\n') - -def __format_stat_difference(old_stat_value: int, new_stat_value: int): - if old_stat_value != new_stat_value: - return " \(\+ {}\)".format(str(new_stat_value - old_stat_value)) - else: - return "" - -def __format_stats(user_stats: UserStats): - return formatting.format_text( - formatting.mbold("User: ") + user_stats.user_display_name, - formatting.mbold("ID: ") + user_stats.user_id, - formatting.mbold("Level: ") + str(user_stats.level), - formatting.mbold("Matches played: ") + str(user_stats.matches_played), - formatting.mbold("Total kills: ") + str(user_stats.kills), - formatting.mbold("Wins: ") + str(user_stats.wins), - separator='\n') - -def format_friend_online(display_name: str, party_size: int): - if party_size == 1: - text = 'is playing Fortnite\!' - elif party_size == 2: - text = 'is playing Fortnite together with {} friend\!'.format(str(party_size - 1)) - elif party_size > 2: - text = 'is playing Fortnite together with {} friends\!'.format(str(party_size - 1)) - return formatting.format_text( - u'\u2b50', - formatting.mbold('{}'.format(display_name)), - text, - u'\u2b50', - separator=' ') \ No newline at end of file diff --git a/formatter/__init__.py b/formatter/__init__.py new file mode 100644 index 0000000..9efc249 --- /dev/null +++ b/formatter/__init__.py @@ -0,0 +1,95 @@ +from telebot import formatting +import typing +from app_types import * + +# Status +def format_fortnite_status(fortnite_status): + statuses = [__format_fortnite_service_status(service_status) for service_status in fortnite_status.serviceStatuses] + return formatting.format_text( + formatting.mbold("Fortnite status"), + "", + '\n'.join(statuses), + separator='\n') + +def __format_status(status): + if (status == True): + return u'\u2705' + else: + return u'\u274c' + +def __format_fortnite_service_status(fortnite_service_status): + return formatting.format_text( + formatting.mbold(fortnite_service_status.serviceName), + __format_status(fortnite_service_status.status), + separator=': ') + +# User +def format_users(users: typing.List[User]): + users_formatted = [format_user(user) for user in users] + return formatting.format_text( + '\n\n'.join(users_formatted), + separator='\n') + +def format_user(user: User): + return formatting.format_text( + formatting.mbold("User: ") + user.display_name, + formatting.mbold("ID: ") + user.id, + separator='\n') + +# Stats +def format_user_stats_list(stats: typing.List[UserStats]): + stats_formatted = [__format_stats(single_stats) for single_stats in stats] + return formatting.format_text( + '\n\n'.join(stats_formatted), + separator='\n') + +def format_user_stats_difference(old_user_stats: typing.List[UserStats], new_user_stats: typing.List[UserStats]): + stats_formatted = [] + + for stats in old_user_stats: + matched = [x for x in new_user_stats if x.user_id == stats.user_id][0] + stats_formatted.append(__format_stats_difference(stats, matched)) + + return formatting.format_text( + '\n\n'.join(stats_formatted), + separator='\n') + +def __format_stats_difference(old_user_stats: UserStats, new_user_stats: UserStats): + return formatting.format_text( + formatting.mbold("User: ") + new_user_stats.user_display_name, + formatting.mbold("ID: ") + new_user_stats.user_id, + formatting.mbold("Level: ") + "{}{}".format(str(new_user_stats.level), __format_stat_difference(old_user_stats.level, new_user_stats.level)), + formatting.mbold("Matches played: ") + "{}{}".format(str(new_user_stats.matches_played), __format_stat_difference(old_user_stats.matches_played, new_user_stats.matches_played)), + formatting.mbold("Total kills: ") + "{}{}".format(str(new_user_stats.kills), __format_stat_difference(old_user_stats.kills, new_user_stats.kills)), + formatting.mbold("Wins: ") + "{}{}".format(str(new_user_stats.wins), __format_stat_difference(old_user_stats.wins, new_user_stats.wins)), + separator='\n') + +def __format_stat_difference(old_stat_value: int, new_stat_value: int): + if old_stat_value != new_stat_value: + return " \(\+ {}\)".format(str(new_stat_value - old_stat_value)) + else: + return "" + +def __format_stats(user_stats: UserStats): + return formatting.format_text( + formatting.mbold("User: ") + user_stats.user_display_name, + formatting.mbold("ID: ") + user_stats.user_id, + formatting.mbold("Level: ") + str(user_stats.level), + formatting.mbold("Matches played: ") + str(user_stats.matches_played), + formatting.mbold("Total kills: ") + str(user_stats.kills), + formatting.mbold("Wins: ") + str(user_stats.wins), + separator='\n') + +def format_friend_online(display_name: str, party_size: int): + if party_size == 1: + text = 'is playing Fortnite\!' + elif party_size == 2: + text = 'is playing Fortnite together with {} friend\!'.format(str(party_size - 1)) + elif party_size > 2: + text = 'is playing Fortnite together with {} friends\!'.format(str(party_size - 1)) + return formatting.format_text( + u'\u2b50', + formatting.mbold('{}'.format(display_name)), + text, + u'\u2b50', + separator=' ') \ No newline at end of file diff --git a/fortnite_client.py b/fortnite_client.py deleted file mode 100755 index 16b2652..0000000 --- a/fortnite_client.py +++ /dev/null @@ -1,108 +0,0 @@ -#!/usr/bin/python3 - -import fortnitepy -import json -import os -import typing -from device_auth import DeviceAuth -from app_types import * - -__fortnite_account_key__ = 'fornite-account-key' - -# Intefaces for events -class FriendPresenceObserver: - async def update(self, display_name: str, playing: bool, party_size: int) -> None: - pass - -class ClientInitObserver: - async def on_event(self, fortnite_client) -> None: - pass - -class FortniteClient(fortnitepy.Client): - - __device_auth: DeviceAuth - __friend_presence_observer: FriendPresenceObserver - __client_init_observer: ClientInitObserver - - def __init__(self, client_init_observer: ClientInitObserver, friend_presence_observer: FriendPresenceObserver): - self.__device_auth = DeviceAuth() - self.__client_init_observer = client_init_observer - self.__friend_presence_observer = friend_presence_observer - if self.__device_auth.device_auth_file_exists(): - self.__auth_device_auth() - else: - self.__auth_authorization_code() - - async def get_friends(self) -> typing.List[User]: - return [User.from_fortnite_friend(friend) for friend in self.friends] - - async def find_user(self, display_name: str): - user: fortnitepy.User = await self.fetch_user_by_display_name(display_name) - return User.from_fortnite_friend(user) - - def __auth_authorization_code(self): - code = input("Enter authorization code (https://www.epicgames.com/id/api/redirect?clientId=3446cd72694c4a4485d81b77adbb2141&responseType=code):") - super().__init__( - auth=fortnitepy.AuthorizationCodeAuth( - code = code - ) - ) - - def __auth_device_auth(self): - device_auth_details = self.__device_auth.get_device_auth_details().get(__fortnite_account_key__, {}) - super().__init__( - auth=fortnitepy.DeviceAuth( - **device_auth_details - ) - ) - - # Generate auth details if none were supplied yet - async def __generate_auth_details(self): - if not self.__device_auth.device_auth_file_exists(): - device_auth_data = await self.auth.generate_device_auth() - details = { - 'device_id': device_auth_data['deviceId'], - 'account_id': device_auth_data['accountId'], - 'secret': device_auth_data['secret'], - } - self.auth.__dict__.update(details) - self.dispatch_event( - 'device_auth_generate', - details, - __fortnite_account_key__ - ) - - async def event_device_auth_generate(self, details, email): - self.__device_auth.store_device_auth_details(email, details) - - async def event_ready(self): - await self.__generate_auth_details() - await self.__client_init_observer.on_event(self) - - async def event_friend_request(self, request: typing.Union[fortnitepy.friend.IncomingPendingFriend, fortnitepy.friend.OutgoingPendingFriend]): - await IncomingFriendRequestEvent.on_event(request) - - async def event_friend_presence(self, before, after: fortnitepy.Presence): - await FriendPresenceEvent.on_event(before, after, self.__friend_presence_observer) - -class FriendPresenceEvent: - async def on_event(before, after: fortnitepy.Presence, friend_presence_observer: FriendPresenceObserver): - if before is not None and after is not None: - if before.playing != after.playing: - print('FriendPresence changed for user {}, before {}, after {}'.format(after.friend.display_name, before.playing, after.playing)) - party_size: int = 1 - if after.has_properties: - party: fortnitepy.PresenceParty = after.party - if party is not None and party.playercount is not None: - party_size = int(party.playercount) - await friend_presence_observer.update( - after.friend.display_name, - after.playing, - party_size) - -class IncomingFriendRequestEvent: - async def on_event(request: typing.Union[fortnitepy.friend.IncomingPendingFriend, fortnitepy.friend.OutgoingPendingFriend]): - if isinstance(request, fortnitepy.friend.IncomingPendingFriend): - incoming_request = typing.cast(fortnitepy.friend.IncomingPendingFriend, request) - print('Accepting friend request from {}'.format(incoming_request.display_name)) - #await incoming_request.accept() \ No newline at end of file diff --git a/fortnite_client/__init__.py b/fortnite_client/__init__.py new file mode 100644 index 0000000..bac4e5d --- /dev/null +++ b/fortnite_client/__init__.py @@ -0,0 +1,106 @@ +import fortnitepy +import json +import os +import typing +from fortnite_client.device_auth import DeviceAuth +from app_types import * + +__fortnite_account_key__ = 'fornite-account-key' + +# Intefaces for events +class FriendPresenceObserver: + async def update(self, display_name: str, playing: bool, party_size: int) -> None: + pass + +class ClientInitObserver: + async def on_event(self, fortnite_client) -> None: + pass + +class FortniteClient(fortnitepy.Client): + + __device_auth: DeviceAuth + __friend_presence_observer: FriendPresenceObserver + __client_init_observer: ClientInitObserver + + def __init__(self, client_init_observer: ClientInitObserver, friend_presence_observer: FriendPresenceObserver): + self.__device_auth = DeviceAuth() + self.__client_init_observer = client_init_observer + self.__friend_presence_observer = friend_presence_observer + if self.__device_auth.device_auth_file_exists(): + self.__auth_device_auth() + else: + self.__auth_authorization_code() + + async def get_friends(self) -> typing.List[User]: + return [User.from_fortnite_friend(friend) for friend in self.friends] + + async def find_user(self, display_name: str): + user: fortnitepy.User = await self.fetch_user_by_display_name(display_name) + return User.from_fortnite_friend(user) + + def __auth_authorization_code(self): + code = input("Enter authorization code (https://www.epicgames.com/id/api/redirect?clientId=3446cd72694c4a4485d81b77adbb2141&responseType=code):") + super().__init__( + auth=fortnitepy.AuthorizationCodeAuth( + code = code + ) + ) + + def __auth_device_auth(self): + device_auth_details = self.__device_auth.get_device_auth_details().get(__fortnite_account_key__, {}) + super().__init__( + auth=fortnitepy.DeviceAuth( + **device_auth_details + ) + ) + + # Generate auth details if none were supplied yet + async def __generate_auth_details(self): + if not self.__device_auth.device_auth_file_exists(): + device_auth_data = await self.auth.generate_device_auth() + details = { + 'device_id': device_auth_data['deviceId'], + 'account_id': device_auth_data['accountId'], + 'secret': device_auth_data['secret'], + } + self.auth.__dict__.update(details) + self.dispatch_event( + 'device_auth_generate', + details, + __fortnite_account_key__ + ) + + async def event_device_auth_generate(self, details, email): + self.__device_auth.store_device_auth_details(email, details) + + async def event_ready(self): + await self.__generate_auth_details() + await self.__client_init_observer.on_event(self) + + async def event_friend_request(self, request: typing.Union[fortnitepy.friend.IncomingPendingFriend, fortnitepy.friend.OutgoingPendingFriend]): + await IncomingFriendRequestEvent.on_event(request) + + async def event_friend_presence(self, before, after: fortnitepy.Presence): + await FriendPresenceEvent.on_event(before, after, self.__friend_presence_observer) + +class FriendPresenceEvent: + async def on_event(before, after: fortnitepy.Presence, friend_presence_observer: FriendPresenceObserver): + if before is not None and after is not None: + if before.playing != after.playing: + print('FriendPresence changed for user {}, before {}, after {}'.format(after.friend.display_name, before.playing, after.playing)) + party_size: int = 1 + if after.has_properties: + party: fortnitepy.PresenceParty = after.party + if party is not None and party.playercount is not None: + party_size = int(party.playercount) + await friend_presence_observer.update( + after.friend.display_name, + after.playing, + party_size) + +class IncomingFriendRequestEvent: + async def on_event(request: typing.Union[fortnitepy.friend.IncomingPendingFriend, fortnitepy.friend.OutgoingPendingFriend]): + if isinstance(request, fortnitepy.friend.IncomingPendingFriend): + incoming_request = typing.cast(fortnitepy.friend.IncomingPendingFriend, request) + print('Accepting friend request from {}'.format(incoming_request.display_name)) + #await incoming_request.accept() \ No newline at end of file diff --git a/fortnite_client/device_auth.py b/fortnite_client/device_auth.py new file mode 100644 index 0000000..4d46918 --- /dev/null +++ b/fortnite_client/device_auth.py @@ -0,0 +1,22 @@ +import json +import os + +__filename__ = 'device-auth.json' + +class DeviceAuth: + + def device_auth_file_exists(self): + return os.path.isfile(__filename__) + + def get_device_auth_details(self): + if os.path.isfile(__filename__): + with open(__filename__, 'r') as fp: + return json.load(fp) + return {} + + def store_device_auth_details(self, email, details): + existing = self.get_device_auth_details() + existing[email] = details + + with open(__filename__, 'w') as fp: + json.dump(existing, fp) \ No newline at end of file diff --git a/fortnite_client/fortnite_events.py b/fortnite_client/fortnite_events.py new file mode 100644 index 0000000..2d252ad --- /dev/null +++ b/fortnite_client/fortnite_events.py @@ -0,0 +1,35 @@ +import fortnitepy +import typing +import time +from fortnite_client import * +from telegram_bot import * +from persistence import * +from formatter import * + +class ClientInitObserverImpl(ClientInitObserver): + async def on_event(self, fortnite_client: FortniteClient) -> None: + print('----------------') + print('FortniteClient ready as:') + print(fortnite_client.user.display_name) + print(fortnite_client.user.id) + print('----------------') + +class FriendPresenceObserverImpl(FriendPresenceObserver): + + __telegram_bot: TelegramBot + __presence_repository: PresenceRepository + + def __init__(self, telegram_bot: TelegramBot, presence_repository: PresenceRepository): + self.__telegram_bot = telegram_bot + self.__presence_repository = presence_repository + + async def update(self, display_name: str, playing: bool, party_size: int) -> None: + if playing: + last_presence = self.__presence_repository.get_last_user_presence(display_name) + diff = time.time() - last_presence + if diff > 60 * 60: # 60 minutes + await self.__notify_friend_playing(display_name, party_size) + self.__presence_repository.set_last_user_presence(display_name, time.time()) + + async def __notify_friend_playing(self, display_name: str, party_size: int): + await self.__telegram_bot.send_message_to_all(format_friend_online(display_name, party_size)) \ No newline at end of file diff --git a/fortnite_events.py b/fortnite_events.py deleted file mode 100644 index d9e143f..0000000 --- a/fortnite_events.py +++ /dev/null @@ -1,35 +0,0 @@ -import fortnitepy -import typing -import time -from fortnite_client import * -from telegram_bot import * -from persistence import PresenceRepository -from formatter import * - -class ClientInitObserverImpl(ClientInitObserver): - async def on_event(self, fortnite_client: FortniteClient) -> None: - print('----------------') - print('FortniteClient ready as:') - print(fortnite_client.user.display_name) - print(fortnite_client.user.id) - print('----------------') - -class FriendPresenceObserverImpl(FriendPresenceObserver): - - __telegram_bot: TelegramBot - __presence_repository: PresenceRepository - - def __init__(self, telegram_bot: TelegramBot, presence_repository: PresenceRepository): - self.__telegram_bot = telegram_bot - self.__presence_repository = presence_repository - - async def update(self, display_name: str, playing: bool, party_size: int) -> None: - if playing: - last_presence = self.__presence_repository.get_last_user_presence(display_name) - diff = time.time() - last_presence - if diff > 60 * 60: # 60 minutes - await self.__notify_friend_playing(display_name, party_size) - self.__presence_repository.set_last_user_presence(display_name, time.time()) - - async def __notify_friend_playing(self, display_name: str, party_size: int): - await self.__telegram_bot.send_message_to_all(format_friend_online(display_name, party_size)) \ No newline at end of file diff --git a/fortnite_status/__init__.py b/fortnite_status/__init__.py new file mode 100644 index 0000000..244502b --- /dev/null +++ b/fortnite_status/__init__.py @@ -0,0 +1,119 @@ +# Simple utility class which provides an access to Fortnite service status report. +# Usage: ForniteStatus().getStatus() + +import requests as req +import html5lib +from bs4 import BeautifulSoup + +# Polling interval in seconds +__polling_interval__ = 5 * 60 # 5 minutes + +class FortniteStatusObserver: + async def update(self, fortnite_status) -> None: + pass + +class FortniteStatus: + """ + Instantiate a FortniteStatus class. + """ + + class Status: + serviceStatuses = [] + + def __init__(self, serviceStatuses): + self.serviceStatuses = serviceStatuses + + def __eq__(self, other): + if not isinstance(other, FortniteStatus.Status): + return NotImplemented + return sorted(self.serviceStatuses) == sorted(other.serviceStatuses) + + def prettify(self): + return 'Fortnite services status:\n' + '\n'.join([serviceStatus.prettify() for serviceStatus in self.serviceStatuses]) + + class ServiceStatus: + serviceName = '' + status = False + + def __init__(self, serviceName, status): + self.serviceName = serviceName + self.status = status + + def __lt__(self, other): + if not isinstance(other, FortniteStatus.ServiceStatus): + return NotImplemented + return self.serviceName < other.serviceName + + def __eq__(self, other): + if not isinstance(other, FortniteStatus.ServiceStatus): + return NotImplemented + return self.serviceName == other.serviceName and self.status == other.status + + + def prettify(self): + return f'{self.serviceName}, {self.status}' + + def __findFortniteStatusHtmlComponent(self, html): + for component in html.findAll('div', {'class': 'component-container'}): + innerContainers = component.findAll('div', {'class': 'component-inner-container'}) + for innerContainer in innerContainers: + for names in innerContainer.findAll('span', {'class': 'name'}): + for name in names.findAll('span'): + if 'class' not in name.attrs and 'Fortnite' in name.text: + return component + + def __parseFortniteStatus(self, html): + component = self.__findFortniteStatusHtmlComponent(html) + serviceStatuses = [] + childContainer = component.find('div', {'class': 'child-components-container'}) + for innerContainer in childContainer.findAll('div', {'class': 'component-inner-container'}): + name = innerContainer.find('span', {'class': 'name'}).text.strip() + statusString = innerContainer.find('span', {'class': 'component-status'}).text.strip() + if statusString == 'Operational': + statusCode = True + else: + statusCode = False + serviceStatuses.append(self.ServiceStatus(name, statusCode)) + return self.Status(serviceStatuses) + + def getStatus(self): + webContent = req.get("https://status.epicgames.com/") + parsedHtml = BeautifulSoup(webContent.text, 'html5lib') + + return self.__parseFortniteStatus(parsedHtml) + + + + def printStatus(self): + """ + Prints a current Fortnite services status in stdout. + Example: + + """ + print(self.getStatus().prettify()) + +class FortniteStatusNotifier: + + __fortnite_status_observer: FortniteStatusObserver + __fortnite_status: FortniteStatus + __last_fortnite_status: any + + def __init__(self, fortnite_status_observer: FortniteStatusObserver): + self.__fortnite_status_observer = fortnite_status_observer + self.__fortnite_status = FortniteStatus() + + async def run(self): + # Initialize status + self.__last_fortnite_status = self.__fortnite_status.getStatus() + while True: + await self.__read_status() + await asyncio.sleep(__polling_interval__) + + async def __read_status(self): + service_status_tmp = self.__fortnite_status.getStatus() + if service_status_tmp != self.__last_fortnite_status: + await self.__notify(service_status_tmp) + self.__last_fortnite_status = service_status_tmp + + async def __notify(self, fortnite_status): + await self.__fortnite_status_observer.update(fortnite_status) \ No newline at end of file diff --git a/fortnite_status_notifier.py b/fortnite_status_notifier.py deleted file mode 100644 index edf3009..0000000 --- a/fortnite_status_notifier.py +++ /dev/null @@ -1,37 +0,0 @@ -from telebot import formatting -import time -import asyncio -from pythonFortniteStatus.FortniteStatus import * - -# Polling interval in seconds -__polling_interval__ = 5 * 60 # 5 minutes - -class FortniteStatusObserver: - async def update(self, fortnite_status) -> None: - pass - -class FortniteStatusNotifier: - - __fortnite_status_observer: FortniteStatusObserver - __fortnite_status: FortniteStatus - __last_fortnite_status: any - - def __init__(self, fortnite_status_observer: FortniteStatusObserver): - self.__fortnite_status_observer = fortnite_status_observer - self.__fortnite_status = FortniteStatus() - - async def run(self): - # Initialize status - self.__last_fortnite_status = self.__fortniteStatus.getStatus() - while True: - await self.__read_status() - await asyncio.sleep(__polling_interval__) - - async def __read_status(self): - service_status_tmp = self.__fortniteStatus.getStatus() - if service_status_tmp != self.__last_fortnite_status: - await self.__notify(service_status_tmp) - self.__last_fortnite_status = service_status_tmp - - async def __notify(self, fortnite_status): - await self.__fortnite_status_observer.update(fortnite_status) \ No newline at end of file diff --git a/main.py b/main.py new file mode 100755 index 0000000..f016f57 --- /dev/null +++ b/main.py @@ -0,0 +1,68 @@ +#!/usr/bin/python3 + +import os, time, telebot, asyncio, telebot.async_telebot, nest_asyncio, sys +from fortnite_status import * +from formatter import * +from fortnite_client import * +from fortnite_client.fortnite_events import * +from persistence import * +from telegram_bot import * +from telegram_bot.commands import * + +class FortniteStatusObserverImpl(FortniteStatusObserver): + + telegram_bot: TelegramBot + + def __init__(self, telegram_bot: TelegramBot): + self.__telegram_bot = telegram_bot + + async def update(self, fortnite_status) -> None: + await self.__telegram_bot.send_message_to_all(format_fortnite_status(fortnite_status)) + +user_repository = UserRepository() +stats_repository = StatsRepository() +presence_repository = PresenceRepository() +telegram_bot = TelegramBot(user_repository) +fortnite_status_notifier = FortniteStatusNotifier(FortniteStatusObserverImpl(telegram_bot)) + +fortnite_client = FortniteClient( + ClientInitObserverImpl(), + FriendPresenceObserverImpl(telegram_bot, presence_repository)) + +record_stats_command = RecordStatsCommand(fortnite_client, stats_repository) + +telegram_bot.register_command_handler('start', StartCommand(telegram_bot, user_repository)) +telegram_bot.register_command_handler('status', GetStatusCommand(telegram_bot)) +telegram_bot.register_command_handler('friends', GetFriendsCommand(telegram_bot, fortnite_client)) +telegram_bot.register_command_handler('stats', GetStatsCommand(telegram_bot, fortnite_client)) +telegram_bot.register_command_handler('todaystats', GetTodayStatsCommand(telegram_bot, fortnite_client, stats_repository)) +telegram_bot.register_command_handler('recordstats', record_stats_command) + +async def run_tgbot(): + await telegram_bot.run() + +async def run_fortnite_status_notifier(): + await fortnite_status_notifier.run() + +async def run_fortnite_client(): + fortnite_client.run() + +async def run_record_stats(): + while True: + t = time.localtime() + if t.tm_hour == 5: # only at 05:00 + await recordStatsCommand.handle(None) + await asyncio.sleep(60 * 60) # 1 hour + +async def run_all(): + await asyncio.gather( + run_tgbot(), + run_fortnite_status_notifier(), + run_fortnite_client(), + run_record_stats()) + +if __name__ == '__main__': + nest_asyncio.apply() + loop = asyncio.get_event_loop() + loop.run_until_complete(run_all()) + loop.close() \ No newline at end of file diff --git a/persistence.py b/persistence.py deleted file mode 100644 index 512836e..0000000 --- a/persistence.py +++ /dev/null @@ -1,129 +0,0 @@ -import sqlite3, typing -from app_types import * - -class UserRepository: - - def __init__(self): - self.__initialize() - - def __initialize(self): - cur = self.__get_connection().cursor() - cur.execute("CREATE TABLE IF NOT EXISTS user(chat_id INT, alias TEXT)") - cur.execute("CREATE UNIQUE INDEX IF NOT EXISTS chat_id_idx ON user(chat_id)") - - def __get_connection(self): - return sqlite3.connect('db.sqlite') - - def get_user(self, chat_id): - connection = self.__get_connection() - cur = connection.cursor() - query = "select * from user where chat_id = {chat_id}".format(chat_id = chat_id) - cur.execute(query) - return cur.fetchone() - - def get_all_users(self): - connection = self.__get_connection() - cur = connection.cursor() - query = "select * from user" - cur.execute(query) - return cur.fetchall() - - def remove_chat(self, chat_id): - connection = self.__get_connection() - cur = connection.cursor() - query = "DELETE FROM user where chat_id = {chat_id}".format( - chat_id = chat_id) - cur.execute(query) - connection.commit() - - - def put_user(self, chat_id, alias): - if not self.get_user(chat_id): - connection = self.__get_connection() - cur = connection.cursor() - query = "INSERT INTO user(chat_id, alias) VALUES({chat_id}, '{text}')".format( - chat_id = chat_id, - text = alias) - cur.execute(query) - connection.commit() - -class StatsRepository: - - def __init__(self): - self.__initialize() - - def __get_connection(self): - return sqlite3.connect('db.sqlite') - - def __initialize(self): - cur = self.__get_connection().cursor() - cur.execute("CREATE TABLE IF NOT EXISTS stats(user_id TEXT, display_name TEXT, level INT, matches_played INT, kills INT, wins INT)") - cur.execute("CREATE UNIQUE INDEX IF NOT EXISTS user_id_idx ON stats(user_id)") - - async def put_stats(self, user: User): - stats: UserStats = await user.fetch_stats() - - connection = self.__get_connection() - cur = connection.cursor() - query = "INSERT OR REPLACE INTO stats(user_id, display_name, level, matches_played, kills, wins) VALUES('{user_id}', '{display_name}', {level}, {matches_played}, {kills}, {wins})".format( - user_id = user.id, - display_name = user.display_name, - level = stats.level, - matches_played = stats.matches_played, - kills = stats.kills, - wins = stats.wins) - cur.execute(query) - connection.commit() - - def get_stats(self) -> typing.List[UserStats]: - connection = self.__get_connection() - cur = connection.cursor() - query = "SELECT * FROM stats" - cur.execute(query) - result = cur.fetchall() - return [self.__map_from_db(stats) for stats in result] - - def __map_from_db(self, record): - user_stats = UserStats() - user_stats.user_id = str(record[0]) - user_stats.user_display_name = str(record[1]) - user_stats.level = int(record[2]) - user_stats.matches_played = int(record[3]) - user_stats.kills = int(record[4]) - user_stats.wins = int(record[5]) - return user_stats - -class PresenceRepository: - - def __init__(self): - self.__initialize() - - def __get_connection(self): - return sqlite3.connect('db.sqlite') - - def __initialize(self): - cur = self.__get_connection().cursor() - cur.execute("CREATE TABLE IF NOT EXISTS user_presence(display_name TEXT, last_online INT)") - cur.execute("CREATE UNIQUE INDEX IF NOT EXISTS display_name_idx ON user_presence(display_name)") - - def get_last_user_presence(self, display_name): - connection = self.__get_connection() - cur = connection.cursor() - query = "select last_online from user_presence where display_name = '{display_name}'".format(display_name = display_name) - cur.execute(query) - user_presence = cur.fetchone() - - if user_presence: - return user_presence[0] - else: - return 0 - - def set_last_user_presence(self, display_name: str, last_online: int): - connection = self.__get_connection() - cur = connection.cursor() - query = "INSERT OR REPLACE INTO user_presence(display_name, last_online) VALUES('{display_name}', {last_online})".format( - display_name = display_name, - last_online = last_online) - cur.execute(query) - connection.commit() - \ No newline at end of file diff --git a/persistence/__init__.py b/persistence/__init__.py new file mode 100644 index 0000000..9d9220e --- /dev/null +++ b/persistence/__init__.py @@ -0,0 +1,128 @@ +import sqlite3, typing +from app_types import * + +class UserRepository: + + def __init__(self): + self.__initialize() + + def __initialize(self): + cur = self.__get_connection().cursor() + cur.execute("CREATE TABLE IF NOT EXISTS user(chat_id INT, alias TEXT)") + cur.execute("CREATE UNIQUE INDEX IF NOT EXISTS chat_id_idx ON user(chat_id)") + + def __get_connection(self): + return sqlite3.connect('db.sqlite') + + def get_user(self, chat_id): + connection = self.__get_connection() + cur = connection.cursor() + query = "select * from user where chat_id = {chat_id}".format(chat_id = chat_id) + cur.execute(query) + return cur.fetchone() + + def get_all_users(self): + connection = self.__get_connection() + cur = connection.cursor() + query = "select * from user" + cur.execute(query) + return cur.fetchall() + + def remove_chat(self, chat_id): + connection = self.__get_connection() + cur = connection.cursor() + query = "DELETE FROM user where chat_id = {chat_id}".format( + chat_id = chat_id) + cur.execute(query) + connection.commit() + + + def put_user(self, chat_id, alias): + if not self.get_user(chat_id): + connection = self.__get_connection() + cur = connection.cursor() + query = "INSERT INTO user(chat_id, alias) VALUES({chat_id}, '{text}')".format( + chat_id = chat_id, + text = alias) + cur.execute(query) + connection.commit() + +class StatsRepository: + + def __init__(self): + self.__initialize() + + def __get_connection(self): + return sqlite3.connect('db.sqlite') + + def __initialize(self): + cur = self.__get_connection().cursor() + cur.execute("CREATE TABLE IF NOT EXISTS stats(user_id TEXT, display_name TEXT, level INT, matches_played INT, kills INT, wins INT)") + cur.execute("CREATE UNIQUE INDEX IF NOT EXISTS user_id_idx ON stats(user_id)") + + async def put_stats(self, user: User): + stats: UserStats = await user.fetch_stats() + + connection = self.__get_connection() + cur = connection.cursor() + query = "INSERT OR REPLACE INTO stats(user_id, display_name, level, matches_played, kills, wins) VALUES('{user_id}', '{display_name}', {level}, {matches_played}, {kills}, {wins})".format( + user_id = user.id, + display_name = user.display_name, + level = stats.level, + matches_played = stats.matches_played, + kills = stats.kills, + wins = stats.wins) + cur.execute(query) + connection.commit() + + def get_stats(self) -> typing.List[UserStats]: + connection = self.__get_connection() + cur = connection.cursor() + query = "SELECT * FROM stats" + cur.execute(query) + result = cur.fetchall() + return [self.__map_from_db(stats) for stats in result] + + def __map_from_db(self, record): + user_stats = UserStats() + user_stats.user_id = str(record[0]) + user_stats.user_display_name = str(record[1]) + user_stats.level = int(record[2]) + user_stats.matches_played = int(record[3]) + user_stats.kills = int(record[4]) + user_stats.wins = int(record[5]) + return user_stats + +class PresenceRepository: + + def __init__(self): + self.__initialize() + + def __get_connection(self): + return sqlite3.connect('db.sqlite') + + def __initialize(self): + cur = self.__get_connection().cursor() + cur.execute("CREATE TABLE IF NOT EXISTS user_presence(display_name TEXT, last_online INT)") + cur.execute("CREATE UNIQUE INDEX IF NOT EXISTS display_name_idx ON user_presence(display_name)") + + def get_last_user_presence(self, display_name): + connection = self.__get_connection() + cur = connection.cursor() + query = "select last_online from user_presence where display_name = '{display_name}'".format(display_name = display_name) + cur.execute(query) + user_presence = cur.fetchone() + + if user_presence: + return user_presence[0] + else: + return 0 + + def set_last_user_presence(self, display_name: str, last_online: int): + connection = self.__get_connection() + cur = connection.cursor() + query = "INSERT OR REPLACE INTO user_presence(display_name, last_online) VALUES('{display_name}', {last_online})".format( + display_name = display_name, + last_online = last_online) + cur.execute(query) + connection.commit() \ No newline at end of file diff --git a/pythonFortniteStatus b/pythonFortniteStatus deleted file mode 160000 index 657eff0..0000000 --- a/pythonFortniteStatus +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 657eff0289301ece411b07751113b6a36c6ce79b diff --git a/telegram_bot.py b/telegram_bot.py deleted file mode 100644 index 26d3071..0000000 --- a/telegram_bot.py +++ /dev/null @@ -1,59 +0,0 @@ -import telebot -import os -import logging -import traceback -import sys -from telebot.async_telebot import AsyncTeleBot -from persistence import UserRepository - -class CommandHandler: - async def handle(self, message: telebot.types.Message): - pass - -class TelegramBot: - __bot: AsyncTeleBot - __user_repository: UserRepository - - def __init__(self, user_repository: UserRepository): - self.__user_repository = user_repository - - # Check token in environment variables - if "TELEBOT_BOT_TOKEN" not in os.environ: - raise AssertionError("Please configure TELEBOT_BOT_TOKEN as environment variables") - - self.__bot = telebot.async_telebot.AsyncTeleBot( - token=os.environ["TELEBOT_BOT_TOKEN"], - exception_handler=ExceptionHandler()) - - async def run(self): - await self.__bot.polling() - - def register_command_handler(self, command: str, command_handler: CommandHandler): - self.__bot.register_message_handler( - command_handler.handle, - commands=[ command ]) - - async def send_message_to_all(self, message_text: str): - for user in self.__user_repository.get_all_users(): - try: - await self.__bot.send_message( - user[0], - message_text, - parse_mode='MarkdownV2' - ) - except Exception as error: - if 'bot was kicked from the group chat' in str(error): - self.__user_repository.remove_chat(user[0]) - - async def reply(self, message, message_text): - await self.__bot.reply_to( - message, - message_text, - parse_mode='MarkdownV2') - -class ExceptionHandler(telebot.ExceptionHandler): - def handle(self, exception): - logging.error('Exception happened: {}'.format(str(exception))) - print(traceback.format_exc()) - sys.exit('Exiting with telebot exception') - return True \ No newline at end of file diff --git a/telegram_bot/__init__.py b/telegram_bot/__init__.py new file mode 100644 index 0000000..ab9f61d --- /dev/null +++ b/telegram_bot/__init__.py @@ -0,0 +1,59 @@ +import telebot +import os +import logging +import traceback +import sys +from telebot.async_telebot import AsyncTeleBot +from persistence import * + +class CommandHandler: + async def handle(self, message: telebot.types.Message): + pass + +class TelegramBot: + __bot: AsyncTeleBot + __user_repository: UserRepository + + def __init__(self, user_repository: UserRepository): + self.__user_repository = user_repository + + # Check token in environment variables + if "TELEBOT_BOT_TOKEN" not in os.environ: + raise AssertionError("Please configure TELEBOT_BOT_TOKEN as environment variables") + + self.__bot = telebot.async_telebot.AsyncTeleBot( + token=os.environ["TELEBOT_BOT_TOKEN"], + exception_handler=ExceptionHandler()) + + async def run(self): + await self.__bot.polling() + + def register_command_handler(self, command: str, command_handler: CommandHandler): + self.__bot.register_message_handler( + command_handler.handle, + commands=[ command ]) + + async def send_message_to_all(self, message_text: str): + for user in self.__user_repository.get_all_users(): + try: + await self.__bot.send_message( + user[0], + message_text, + parse_mode='MarkdownV2' + ) + except Exception as error: + if 'bot was kicked from the group chat' in str(error): + self.__user_repository.remove_chat(user[0]) + + async def reply(self, message, message_text): + await self.__bot.reply_to( + message, + message_text, + parse_mode='MarkdownV2') + +class ExceptionHandler(telebot.ExceptionHandler): + def handle(self, exception): + logging.error('Exception happened: {}'.format(str(exception))) + print(traceback.format_exc()) + sys.exit('Exiting with telebot exception') + return True \ No newline at end of file diff --git a/telegram_bot/commands.py b/telegram_bot/commands.py new file mode 100644 index 0000000..cafde03 --- /dev/null +++ b/telegram_bot/commands.py @@ -0,0 +1,94 @@ +import telebot +from telegram_bot import * +from formatter import * +from persistence import * +from fortnite_client import * +from fortnite_status import * + +class StartCommand(CommandHandler): + + __telegram_bot: TelegramBot + __user_repository: UserRepository + + def __init__(self, telegram_bot: TelegramBot, user_repository: UserRepository): + self.__telegram_bot = telegram_bot + self.__user_repository = user_repository + + async def handle(self, message: telebot.types.Message): + if message.chat.type == 'private': + alias = message.chat.username + else: + alias = message.chat.title + self.__user_repository.put_user(message.chat.id, alias) + await self.__telegram_bot.reply(message, 'This chat successfully registered to receive Fortnite updates') + +class GetStatusCommand(CommandHandler): + + __telegram_bot: TelegramBot + __fortnite_status: FortniteStatus + + def __init__(self, telegram_bot: TelegramBot): + self.__telegram_bot = telegram_bot + self.__fortnite_status = FortniteStatus() + + async def handle(self, message: telebot.types.Message): + await self.__telegram_bot.reply(message, format_fortnite_status(self.__fortnite_status.getStatus())) + +class GetFriendsCommand(CommandHandler): + + __telegram_bot: TelegramBot + __fortnite_client: FortniteClient + + def __init__(self, telegram_bot: TelegramBot, fortnite_client: FortniteClient): + self.__telegram_bot = telegram_bot + self.__fortnite_client = fortnite_client + + async def handle(self, message: telebot.types.Message): + friends = await self.__fortnite_client.get_friends() + await self.__telegram_bot.reply(message, format_users(friends)) + + +class GetStatsCommand(CommandHandler): + + __telegram_bot: TelegramBot + __fortnite_client: FortniteClient + + def __init__(self, telegram_bot: TelegramBot, fortnite_client: FortniteClient): + self.__telegram_bot = telegram_bot + self.__fortnite_client = fortnite_client + + async def handle(self, message: telebot.types.Message): + friends = await self.__fortnite_client.get_friends() + stats = [await friend.fetch_stats() for friend in friends] + await self.__telegram_bot.reply(message, format_user_stats_list(stats)) + +class GetTodayStatsCommand(CommandHandler): + + __telegram_bot: TelegramBot + __fortnite_client: FortniteClient + __stats_repository: StatsRepository + + def __init__(self, telegram_bot: TelegramBot, fortnite_client: FortniteClient, stats_repository: StatsRepository): + self.__telegram_bot = telegram_bot + self.__fortnite_client = fortnite_client + self.__stats_repository = stats_repository + + async def handle(self, message: telebot.types.Message): + persisted_stats = self.__stats_repository.get_stats() + friends = await self.__fortnite_client.get_friends() + current_stats = [await friend.fetch_stats() for friend in friends] + await self.__telegram_bot.reply(message, format_user_stats_difference(persisted_stats, current_stats)) + +class RecordStatsCommand(CommandHandler): + + __fortnite_client: FortniteClient + __stats_repository: StatsRepository + + def __init__(self, fortnite_client: FortniteClient, stats_repository: StatsRepository): + self.__fortnite_client = fortnite_client + self.__stats_repository = stats_repository + + async def handle(self, message: telebot.types.Message): + friends = await self.__fortnite_client.get_friends() + for friend in friends: + await self.__stats_repository.put_stats(friend) \ No newline at end of file diff --git a/tgbot.py b/tgbot.py deleted file mode 100755 index 6fbd07c..0000000 --- a/tgbot.py +++ /dev/null @@ -1,68 +0,0 @@ -#!/usr/bin/python3 - -import os, time, telebot, asyncio, telebot.async_telebot, nest_asyncio, sys -from fortnite_status_notifier import * -from formatter import * -from fortnite_client import * -from fortnite_events import * -from persistence import UserRepository, StatsRepository, PresenceRepository -from telegram_bot import TelegramBot, CommandHandler -from commands import * - -class FortniteStatusObserverImpl(FortniteStatusObserver): - - telegram_bot: TelegramBot - - def __init__(self, telegram_bot: TelegramBot): - self.__telegram_bot = telegram_bot - - async def update(self, fortnite_status) -> None: - await self.__telegram_bot.send_message_to_all(format_fortnite_status(fortnite_status)) - -user_repository = UserRepository() -stats_repository = StatsRepository() -presence_repository = PresenceRepository() -telegram_bot = TelegramBot(user_repository) -fortnite_status_notifier = FortniteStatusNotifier(FortniteStatusObserverImpl(telegram_bot)) - -fortnite_client = FortniteClient( - ClientInitObserverImpl(), - FriendPresenceObserverImpl(telegram_bot, presence_repository)) - -record_stats_command = RecordStatsCommand(fortnite_client, stats_repository) - -telegram_bot.register_command_handler('start', StartCommand(telegram_bot, user_repository)) -telegram_bot.register_command_handler('status', GetStatusCommand(telegram_bot)) -telegram_bot.register_command_handler('friends', GetFriendsCommand(telegram_bot, fortnite_client)) -telegram_bot.register_command_handler('stats', GetStatsCommand(telegram_bot, fortnite_client)) -telegram_bot.register_command_handler('todaystats', GetTodayStatsCommand(telegram_bot, fortnite_client, stats_repository)) -telegram_bot.register_command_handler('recordstats', record_stats_command) - -async def run_tgbot(): - await telegram_bot.run() - -async def run_fortnite_status_notifier(): - await fortnite_status_notifier.run() - -async def run_fortnite_client(): - fortnite_client.run() - -async def run_record_stats(): - while True: - t = time.localtime() - if t.tm_hour == 5: # only at 05:00 - await recordStatsCommand.handle(None) - await asyncio.sleep(60 * 60) # 1 hour - -async def run_all(): - await asyncio.gather( - run_tgbot(), - run_fortnite_status_notifier(), - run_fortnite_client(), - run_record_stats()) - -if __name__ == '__main__': - nest_asyncio.apply() - loop = asyncio.get_event_loop() - loop.run_until_complete(run_all()) - loop.close() \ No newline at end of file -- cgit v1.2.3