diff options
-rw-r--r-- | Formatter.py (renamed from FortniteStatusFormatter.py) | 60 | ||||
-rwxr-xr-x | FortniteClient.py | 17 | ||||
-rw-r--r-- | FortniteEvents.py | 12 | ||||
-rw-r--r-- | FortniteStatusNotifier.py (renamed from FortniteStatusWrapper.py) | 18 | ||||
-rwxr-xr-x | tgbot.py | 101 |
5 files changed, 96 insertions, 112 deletions
diff --git a/FortniteStatusFormatter.py b/Formatter.py index 4aacd63..b87735b 100644 --- a/FortniteStatusFormatter.py +++ b/Formatter.py @@ -1,7 +1,14 @@ from telebot import formatting -import fortnitepy -import json -# TODO rename +import fortnitepy, typing + +# Status +def formatFortniteStatus(fortniteStatus): + statuses = [__formatFortniteServiceStatus(serviceStatus) for serviceStatus in fortniteStatus.serviceStatuses] + return formatting.format_text( + formatting.mbold("Fortnite status"), + "", + '\n'.join(statuses), + separator='\n') def __formatStatus(status): if (status == True): @@ -15,26 +22,20 @@ def __formatFortniteServiceStatus(fortniteServiceStatus): __formatStatus(fortniteServiceStatus.status), separator=': ') -def formatFortniteStatus(fortniteStatus): - statuses = [__formatFortniteServiceStatus(serviceStatus) for serviceStatus in fortniteStatus.serviceStatuses] +# Friend +async def formatFriends(friends: typing.List[fortnitepy.Friend]): + friends_formatted = [await __formatFriend(friend) for friend in friends] return formatting.format_text( - formatting.mbold("Fortnite status"), - "", - '\n'.join(statuses), + '\n\n'.join(friends_formatted), separator='\n') -def __formatFriend(friend): - return friend.display_name +async def __formatFriend(friend: fortnitepy.Friend): + stats = await friend.fetch_br_stats() + return await formatUser(friend) -def formatFriends(friends): - friends_formatted = [__formatFriend(friend) for friend in friends] - return formatting.format_text( - formatting.mbold("Registered friends:"), - "", - '\n'.join(friends_formatted), - separator='\n') - -def formatUser(user: fortnitepy.User, stats: fortnitepy.StatsV2): +# User +async def formatUser(user: fortnitepy.User): + stats = await user.fetch_br_stats() combined_stats = stats.get_combined_stats() if 'keyboardmouse' in combined_stats: return __formatUserDevice(user, combined_stats['keyboardmouse']) @@ -44,14 +45,19 @@ def formatUser(user: fortnitepy.User, stats: fortnitepy.StatsV2): def __formatUserDevice(user: fortnitepy.User, device_stats: dict): return formatting.format_text( formatting.mbold("User: ") + user.display_name, + formatting.mbold("External auth: ") + ', '.join([__formatExternalAuth(external_auth) for external_auth in user.external_auths]), formatting.mbold("ID: ") + user.id, - formatting.mbold("Matches played: ") + __numToStrSafe(device_stats['matchesplayed']), - formatting.mbold("Total kills: ") + __numToStrSafe(device_stats['kills']), - formatting.mbold("Wins: ") + __numToStrSafe(device_stats['wins']), + formatting.mbold("Matches played: ") + str(device_stats['matchesplayed']), + formatting.mbold("Total kills: ") + str(device_stats['kills']), + formatting.mbold("Wins: ") + str(device_stats['wins']), separator='\n') +def __formatExternalAuth(external_auth: fortnitepy.ExternalAuth): + return '{} \({}\)'.format(external_auth.external_display_name, external_auth.type) -def __numToStrSafe(num): - if num is not None: - return str(num) - else: - return 'Not available'
\ No newline at end of file +def formatFriendOnline(display_name: str): + return formatting.format_text( + u'\u2b50', + formatting.mbold('{}'.format(display_name)), + 'is playing Fortnite\!', + u'\u2b50', + separator=' ')
\ No newline at end of file diff --git a/FortniteClient.py b/FortniteClient.py index 7c5858c..c9b9997 100755 --- a/FortniteClient.py +++ b/FortniteClient.py @@ -12,17 +12,15 @@ __fortnite_account_key__ = 'fornite-account-key' class FortniteClient(fortnitepy.Client): device_auth = DeviceAuth() - observers = [] + observer = None - def __init__(self): + def __init__(self, friendPresenceObserver: PresenceObserver): + self.observer = friendPresenceObserver if self.device_auth.device_auth_file_exists(): self.__auth_device_auth() else: self.__auth_authorization_code() - def attach(self, observer: any): - self.observers.append(observer) - def get_friends(self): return self.friends @@ -74,15 +72,8 @@ class FortniteClient(fortnitepy.Client): for friend_request in self.incoming_pending_friends: await self.event_friend_request(friend_request) - async def event_party_invite(self, invitation: fortnitepy.ReceivedPartyInvitation): - await PartyInvite.on_event(invitation = invitation) - async def event_friend_request(self, request: typing.Union[fortnitepy.friend.IncomingPendingFriend, fortnitepy.friend.OutgoingPendingFriend]): await IncomingFriendRequest.on_event(request) async def event_friend_presence(self, before, after: fortnitepy.Presence): - await FriendPresence.on_event(before, after, self.observers) - -# Debug -#client = FortniteClient() -#client.run()
\ No newline at end of file + await FriendPresence.on_event(before, after, self.observer)
\ No newline at end of file diff --git a/FortniteEvents.py b/FortniteEvents.py index 6e7aa5e..5be03f2 100644 --- a/FortniteEvents.py +++ b/FortniteEvents.py @@ -1,18 +1,7 @@ import fortnitepy import typing -class PartyInvite: - - async def on_event(invitation: fortnitepy.ReceivedPartyInvitation): - print('Accepting party invitation from {}'.format(invitation.sender.display_name)) - clientParty = await invitation.accept() - for partyMember in clientParty.members: - if not self.get_friend(partyMember.id) and self.user.id != partyMember.id: - print('Adding {} as friend'.format(partyMember.display_name)) - await partyMember.add() - class IncomingFriendRequest: - async def on_event(request: typing.Union[fortnitepy.friend.IncomingPendingFriend, fortnitepy.friend.OutgoingPendingFriend]): if isinstance(request, fortnitepy.friend.IncomingPendingFriend): incoming_request = typing.cast(fortnitepy.friend.IncomingPendingFriend, request) @@ -24,7 +13,6 @@ class PresenceObserver: pass class FriendPresence: - async def on_event(before, after: fortnitepy.Presence, observers): if before is not None and after is not None: if before.playing != after.playing: diff --git a/FortniteStatusWrapper.py b/FortniteStatusNotifier.py index aa7ee12..1c40ec0 100644 --- a/FortniteStatusWrapper.py +++ b/FortniteStatusNotifier.py @@ -1,5 +1,6 @@ from telebot import formatting import time +import asyncio from pythonFortniteStatus.FortniteStatus import * __polling_interval__ = 5 @@ -10,15 +11,20 @@ class Observer: async def update(self, fortniteStatus) -> None: pass -class FortniteStatusWrapper: +class FortniteStatusNotifier: - observers = [] + observer = None fortniteStatus = None + + def __init__(self, observer: Observer): + self.observer = observer async def run(self): + # Initialize status + self.fortniteStatus = fortniteStatus.getStatus() while True: await self.__readStatus() - time.sleep(__polling_interval__) + await asyncio.sleep(__polling_interval__) async def __readStatus(self): serviceStatusTmp = fortniteStatus.getStatus() @@ -28,8 +34,4 @@ class FortniteStatusWrapper: async def __notify(self, fortniteStatus): print("Fortnite status changed, notifying observers") - for observer in self.observers: - await observer.update(fortniteStatus) - - def attach(self, observer: Observer): - self.observers.append(observer)
\ No newline at end of file + await self.observer.update(fortniteStatus)
\ No newline at end of file @@ -1,25 +1,44 @@ #!/usr/bin/python3 -import os -import time, threading, schedule -import telebot -import asyncio -import telebot.async_telebot -from FortniteStatusWrapper import * -from FortniteStatusFormatter import * +import os, time, telebot, asyncio, telebot.async_telebot, nest_asyncio +from FortniteStatusNotifier import * +from Formatter import * from FortniteClient import * from FortniteEvents import * from persistence import UserRepository -from datetime import datetime +# Check token in environment variables if "TELEBOT_BOT_TOKEN" not in os.environ: raise AssertionError("Please configure TELEBOT_BOT_TOKEN as environment variables") -#bot = telebot.TeleBot(os.environ["TELEBOT_BOT_TOKEN"]) +class FortniteStatusObserver(Observer): + async def update(self, fortniteStatus) -> None: + await send_message_to_all(formatFortniteStatus(fortniteStatus)) + +class FortnitePresenceObserver(PresenceObserver): + # Map name -> last seen not playing timestamp seconds + statuses = {} + + async def update(self, display_name: str, playing: bool) -> None: + print('FortnitePresenceObserver: {} playing = {}'.format(display_name, playing)) + if playing: + if not display_name in self.statuses: + await self.__notifyFriendPlaying(display_name) + self.statuses[display_name] = time.time() + else: + diff = time.time() - self.statuses[display_name] + if diff > 60 * 60: # 60 minutes + self.__notifyFriendPlaying(display_name) + else: + self.statuses[display_name] = time.time() + + async def __notifyFriendPlaying(self, display_name: str): + await send_message_to_all(formatFriendOnline(display_name)) + bot = telebot.async_telebot.AsyncTeleBot(os.environ["TELEBOT_BOT_TOKEN"]) userRepository = UserRepository('db.sqlite') -fortniteStatusWrapper = FortniteStatusWrapper() -fortniteClient = FortniteClient() +fortniteStatusWrapper = FortniteStatusNotifier(FortniteStatusObserver()) +fortniteClient = FortniteClient(FortnitePresenceObserver()) @bot.message_handler(commands = ['start']) async def startCommand(message): @@ -32,7 +51,7 @@ async def getStatus(message): @bot.message_handler(commands = ['friends']) async def getFriends(message): - await reply(message, formatFriends(fortniteClient.get_friends())) + await reply(message, await formatFriends(fortniteClient.get_friends())) @bot.message_handler(commands = ['find']) async def findUser(message): @@ -41,9 +60,11 @@ async def findUser(message): search_user_display_name = arg[1] print('Searching users by name {}'.format(search_user_display_name)) users: typing.List[fortnitepy.User] = await fortniteClient.fetch_users_by_display_name(search_user_display_name) - for user in users: - stats = await user.fetch_br_stats() - await reply(message, formatUser(user, stats)) + if (len(users) > 0): + for user in users: + await reply(message, await formatUser(user)) + else: + await reply(message, 'User {} not found'.format(search_user_display_name)) else: await reply(message, 'Usage: /find username') @@ -58,31 +79,6 @@ async def addUser(message): else: await reply(message, 'Usage: /add username') -class FortniteStatusObserver(Observer): - async def update(self, fortniteStatus) -> None: - await send_message_to_all(formatFortniteStatus(fortniteStatus)) - -class FortnitePresenceObserver(PresenceObserver): - - # Map name -> last seen not playing timestamp seconds - statuses = {} - - async def update(self, display_name: str, playing: bool) -> None: - print('FortnitePresenceObserver: {} playing = {}'.format(display_name, playing)) - if playing: - if not display_name in self.statuses: - await self.__notifyFriendPlaying(display_name) - self.statuses[display_name] = time.time() - else: - diff = time.time() - self.statuses[display_name] - if diff > 60 * 60: # 60 minutes - self.__notifyFriendPlaying(display_name) - else: - self.statuses[display_name] = time.time() - - async def __notifyFriendPlaying(self, display_name: str): - await send_message_to_all('{} is online'.format(display_name)) - async def send_message_to_all(message_text: str): for user in userRepository.getAllUsers(): await bot.send_message( @@ -97,19 +93,20 @@ async def reply(message, message_text): message_text, parse_mode='MarkdownV2') -async def run_fortnite_client(): - await fortniteClient.run() +async def run_tgbot(): + await bot.polling() -if __name__ == '__main__': - fortniteStatusWrapper.attach(FortniteStatusObserver()) - fortniteClient.attach(FortnitePresenceObserver()) +async def run_fortniteStatusWrapper(): + await fortniteStatusWrapper.run() - loop = asyncio.get_event_loop() +async def run_fortniteClient(): + fortniteClient.run() - tasks = [ - loop.create_task(bot.polling()), - loop.create_task(fortniteStatusWrapper.run()), - loop.create_task(run_fortnite_client()) - ] - loop.run_until_complete(asyncio.wait(tasks)) +async def run_all(): + await asyncio.gather(run_tgbot(), run_fortniteStatusWrapper(), run_fortniteClient()) + +if __name__ == '__main__': + nest_asyncio.apply() + loop = asyncio.get_event_loop() + loop.run_until_complete(run_all()) loop.close()
\ No newline at end of file |