summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Formatter.py (renamed from FortniteStatusFormatter.py)60
-rwxr-xr-xFortniteClient.py17
-rw-r--r--FortniteEvents.py12
-rw-r--r--FortniteStatusNotifier.py (renamed from FortniteStatusWrapper.py)18
-rwxr-xr-xtgbot.py101
5 files changed, 96 insertions, 112 deletions
diff --git a/FortniteStatusFormatter.py b/Formatter.py
index 4aacd63..b87735b 100644
--- a/FortniteStatusFormatter.py
+++ b/Formatter.py
@@ -1,7 +1,14 @@
from telebot import formatting
-import fortnitepy
-import json
-# TODO rename
+import fortnitepy, typing
+
+# Status
+def formatFortniteStatus(fortniteStatus):
+ statuses = [__formatFortniteServiceStatus(serviceStatus) for serviceStatus in fortniteStatus.serviceStatuses]
+ return formatting.format_text(
+ formatting.mbold("Fortnite status"),
+ "",
+ '\n'.join(statuses),
+ separator='\n')
def __formatStatus(status):
if (status == True):
@@ -15,26 +22,20 @@ def __formatFortniteServiceStatus(fortniteServiceStatus):
__formatStatus(fortniteServiceStatus.status),
separator=': ')
-def formatFortniteStatus(fortniteStatus):
- statuses = [__formatFortniteServiceStatus(serviceStatus) for serviceStatus in fortniteStatus.serviceStatuses]
+# Friend
+async def formatFriends(friends: typing.List[fortnitepy.Friend]):
+ friends_formatted = [await __formatFriend(friend) for friend in friends]
return formatting.format_text(
- formatting.mbold("Fortnite status"),
- "",
- '\n'.join(statuses),
+ '\n\n'.join(friends_formatted),
separator='\n')
-def __formatFriend(friend):
- return friend.display_name
+async def __formatFriend(friend: fortnitepy.Friend):
+ stats = await friend.fetch_br_stats()
+ return await formatUser(friend)
-def formatFriends(friends):
- friends_formatted = [__formatFriend(friend) for friend in friends]
- return formatting.format_text(
- formatting.mbold("Registered friends:"),
- "",
- '\n'.join(friends_formatted),
- separator='\n')
-
-def formatUser(user: fortnitepy.User, stats: fortnitepy.StatsV2):
+# User
+async def formatUser(user: fortnitepy.User):
+ stats = await user.fetch_br_stats()
combined_stats = stats.get_combined_stats()
if 'keyboardmouse' in combined_stats:
return __formatUserDevice(user, combined_stats['keyboardmouse'])
@@ -44,14 +45,19 @@ def formatUser(user: fortnitepy.User, stats: fortnitepy.StatsV2):
def __formatUserDevice(user: fortnitepy.User, device_stats: dict):
return formatting.format_text(
formatting.mbold("User: ") + user.display_name,
+ formatting.mbold("External auth: ") + ', '.join([__formatExternalAuth(external_auth) for external_auth in user.external_auths]),
formatting.mbold("ID: ") + user.id,
- formatting.mbold("Matches played: ") + __numToStrSafe(device_stats['matchesplayed']),
- formatting.mbold("Total kills: ") + __numToStrSafe(device_stats['kills']),
- formatting.mbold("Wins: ") + __numToStrSafe(device_stats['wins']),
+ formatting.mbold("Matches played: ") + str(device_stats['matchesplayed']),
+ formatting.mbold("Total kills: ") + str(device_stats['kills']),
+ formatting.mbold("Wins: ") + str(device_stats['wins']),
separator='\n')
+def __formatExternalAuth(external_auth: fortnitepy.ExternalAuth):
+ return '{} \({}\)'.format(external_auth.external_display_name, external_auth.type)
-def __numToStrSafe(num):
- if num is not None:
- return str(num)
- else:
- return 'Not available' \ No newline at end of file
+def formatFriendOnline(display_name: str):
+ return formatting.format_text(
+ u'\u2b50',
+ formatting.mbold('{}'.format(display_name)),
+ 'is playing Fortnite\!',
+ u'\u2b50',
+ separator=' ') \ No newline at end of file
diff --git a/FortniteClient.py b/FortniteClient.py
index 7c5858c..c9b9997 100755
--- a/FortniteClient.py
+++ b/FortniteClient.py
@@ -12,17 +12,15 @@ __fortnite_account_key__ = 'fornite-account-key'
class FortniteClient(fortnitepy.Client):
device_auth = DeviceAuth()
- observers = []
+ observer = None
- def __init__(self):
+ def __init__(self, friendPresenceObserver: PresenceObserver):
+ self.observer = friendPresenceObserver
if self.device_auth.device_auth_file_exists():
self.__auth_device_auth()
else:
self.__auth_authorization_code()
- def attach(self, observer: any):
- self.observers.append(observer)
-
def get_friends(self):
return self.friends
@@ -74,15 +72,8 @@ class FortniteClient(fortnitepy.Client):
for friend_request in self.incoming_pending_friends:
await self.event_friend_request(friend_request)
- async def event_party_invite(self, invitation: fortnitepy.ReceivedPartyInvitation):
- await PartyInvite.on_event(invitation = invitation)
-
async def event_friend_request(self, request: typing.Union[fortnitepy.friend.IncomingPendingFriend, fortnitepy.friend.OutgoingPendingFriend]):
await IncomingFriendRequest.on_event(request)
async def event_friend_presence(self, before, after: fortnitepy.Presence):
- await FriendPresence.on_event(before, after, self.observers)
-
-# Debug
-#client = FortniteClient()
-#client.run() \ No newline at end of file
+ await FriendPresence.on_event(before, after, self.observer) \ No newline at end of file
diff --git a/FortniteEvents.py b/FortniteEvents.py
index 6e7aa5e..5be03f2 100644
--- a/FortniteEvents.py
+++ b/FortniteEvents.py
@@ -1,18 +1,7 @@
import fortnitepy
import typing
-class PartyInvite:
-
- async def on_event(invitation: fortnitepy.ReceivedPartyInvitation):
- print('Accepting party invitation from {}'.format(invitation.sender.display_name))
- clientParty = await invitation.accept()
- for partyMember in clientParty.members:
- if not self.get_friend(partyMember.id) and self.user.id != partyMember.id:
- print('Adding {} as friend'.format(partyMember.display_name))
- await partyMember.add()
-
class IncomingFriendRequest:
-
async def on_event(request: typing.Union[fortnitepy.friend.IncomingPendingFriend, fortnitepy.friend.OutgoingPendingFriend]):
if isinstance(request, fortnitepy.friend.IncomingPendingFriend):
incoming_request = typing.cast(fortnitepy.friend.IncomingPendingFriend, request)
@@ -24,7 +13,6 @@ class PresenceObserver:
pass
class FriendPresence:
-
async def on_event(before, after: fortnitepy.Presence, observers):
if before is not None and after is not None:
if before.playing != after.playing:
diff --git a/FortniteStatusWrapper.py b/FortniteStatusNotifier.py
index aa7ee12..1c40ec0 100644
--- a/FortniteStatusWrapper.py
+++ b/FortniteStatusNotifier.py
@@ -1,5 +1,6 @@
from telebot import formatting
import time
+import asyncio
from pythonFortniteStatus.FortniteStatus import *
__polling_interval__ = 5
@@ -10,15 +11,20 @@ class Observer:
async def update(self, fortniteStatus) -> None:
pass
-class FortniteStatusWrapper:
+class FortniteStatusNotifier:
- observers = []
+ observer = None
fortniteStatus = None
+
+ def __init__(self, observer: Observer):
+ self.observer = observer
async def run(self):
+ # Initialize status
+ self.fortniteStatus = fortniteStatus.getStatus()
while True:
await self.__readStatus()
- time.sleep(__polling_interval__)
+ await asyncio.sleep(__polling_interval__)
async def __readStatus(self):
serviceStatusTmp = fortniteStatus.getStatus()
@@ -28,8 +34,4 @@ class FortniteStatusWrapper:
async def __notify(self, fortniteStatus):
print("Fortnite status changed, notifying observers")
- for observer in self.observers:
- await observer.update(fortniteStatus)
-
- def attach(self, observer: Observer):
- self.observers.append(observer) \ No newline at end of file
+ await self.observer.update(fortniteStatus) \ No newline at end of file
diff --git a/tgbot.py b/tgbot.py
index 38b1789..5422cb6 100755
--- a/tgbot.py
+++ b/tgbot.py
@@ -1,25 +1,44 @@
#!/usr/bin/python3
-import os
-import time, threading, schedule
-import telebot
-import asyncio
-import telebot.async_telebot
-from FortniteStatusWrapper import *
-from FortniteStatusFormatter import *
+import os, time, telebot, asyncio, telebot.async_telebot, nest_asyncio
+from FortniteStatusNotifier import *
+from Formatter import *
from FortniteClient import *
from FortniteEvents import *
from persistence import UserRepository
-from datetime import datetime
+# Check token in environment variables
if "TELEBOT_BOT_TOKEN" not in os.environ:
raise AssertionError("Please configure TELEBOT_BOT_TOKEN as environment variables")
-#bot = telebot.TeleBot(os.environ["TELEBOT_BOT_TOKEN"])
+class FortniteStatusObserver(Observer):
+ async def update(self, fortniteStatus) -> None:
+ await send_message_to_all(formatFortniteStatus(fortniteStatus))
+
+class FortnitePresenceObserver(PresenceObserver):
+ # Map name -> last seen not playing timestamp seconds
+ statuses = {}
+
+ async def update(self, display_name: str, playing: bool) -> None:
+ print('FortnitePresenceObserver: {} playing = {}'.format(display_name, playing))
+ if playing:
+ if not display_name in self.statuses:
+ await self.__notifyFriendPlaying(display_name)
+ self.statuses[display_name] = time.time()
+ else:
+ diff = time.time() - self.statuses[display_name]
+ if diff > 60 * 60: # 60 minutes
+ self.__notifyFriendPlaying(display_name)
+ else:
+ self.statuses[display_name] = time.time()
+
+ async def __notifyFriendPlaying(self, display_name: str):
+ await send_message_to_all(formatFriendOnline(display_name))
+
bot = telebot.async_telebot.AsyncTeleBot(os.environ["TELEBOT_BOT_TOKEN"])
userRepository = UserRepository('db.sqlite')
-fortniteStatusWrapper = FortniteStatusWrapper()
-fortniteClient = FortniteClient()
+fortniteStatusWrapper = FortniteStatusNotifier(FortniteStatusObserver())
+fortniteClient = FortniteClient(FortnitePresenceObserver())
@bot.message_handler(commands = ['start'])
async def startCommand(message):
@@ -32,7 +51,7 @@ async def getStatus(message):
@bot.message_handler(commands = ['friends'])
async def getFriends(message):
- await reply(message, formatFriends(fortniteClient.get_friends()))
+ await reply(message, await formatFriends(fortniteClient.get_friends()))
@bot.message_handler(commands = ['find'])
async def findUser(message):
@@ -41,9 +60,11 @@ async def findUser(message):
search_user_display_name = arg[1]
print('Searching users by name {}'.format(search_user_display_name))
users: typing.List[fortnitepy.User] = await fortniteClient.fetch_users_by_display_name(search_user_display_name)
- for user in users:
- stats = await user.fetch_br_stats()
- await reply(message, formatUser(user, stats))
+ if (len(users) > 0):
+ for user in users:
+ await reply(message, await formatUser(user))
+ else:
+ await reply(message, 'User {} not found'.format(search_user_display_name))
else:
await reply(message, 'Usage: /find username')
@@ -58,31 +79,6 @@ async def addUser(message):
else:
await reply(message, 'Usage: /add username')
-class FortniteStatusObserver(Observer):
- async def update(self, fortniteStatus) -> None:
- await send_message_to_all(formatFortniteStatus(fortniteStatus))
-
-class FortnitePresenceObserver(PresenceObserver):
-
- # Map name -> last seen not playing timestamp seconds
- statuses = {}
-
- async def update(self, display_name: str, playing: bool) -> None:
- print('FortnitePresenceObserver: {} playing = {}'.format(display_name, playing))
- if playing:
- if not display_name in self.statuses:
- await self.__notifyFriendPlaying(display_name)
- self.statuses[display_name] = time.time()
- else:
- diff = time.time() - self.statuses[display_name]
- if diff > 60 * 60: # 60 minutes
- self.__notifyFriendPlaying(display_name)
- else:
- self.statuses[display_name] = time.time()
-
- async def __notifyFriendPlaying(self, display_name: str):
- await send_message_to_all('{} is online'.format(display_name))
-
async def send_message_to_all(message_text: str):
for user in userRepository.getAllUsers():
await bot.send_message(
@@ -97,19 +93,20 @@ async def reply(message, message_text):
message_text,
parse_mode='MarkdownV2')
-async def run_fortnite_client():
- await fortniteClient.run()
+async def run_tgbot():
+ await bot.polling()
-if __name__ == '__main__':
- fortniteStatusWrapper.attach(FortniteStatusObserver())
- fortniteClient.attach(FortnitePresenceObserver())
+async def run_fortniteStatusWrapper():
+ await fortniteStatusWrapper.run()
- loop = asyncio.get_event_loop()
+async def run_fortniteClient():
+ fortniteClient.run()
- tasks = [
- loop.create_task(bot.polling()),
- loop.create_task(fortniteStatusWrapper.run()),
- loop.create_task(run_fortnite_client())
- ]
- loop.run_until_complete(asyncio.wait(tasks))
+async def run_all():
+ await asyncio.gather(run_tgbot(), run_fortniteStatusWrapper(), run_fortniteClient())
+
+if __name__ == '__main__':
+ nest_asyncio.apply()
+ loop = asyncio.get_event_loop()
+ loop.run_until_complete(run_all())
loop.close() \ No newline at end of file