summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Formatter.py71
-rwxr-xr-xFortniteClient.py26
-rw-r--r--FortniteEvents.py4
-rw-r--r--Types.py50
-rw-r--r--persistence.py53
-rwxr-xr-xtgbot.py58
6 files changed, 210 insertions, 52 deletions
diff --git a/Formatter.py b/Formatter.py
index 07499e5..7241aca 100644
--- a/Formatter.py
+++ b/Formatter.py
@@ -1,5 +1,6 @@
from telebot import formatting
-import fortnitepy, typing
+import typing
+from Types import *
# Status
def formatFortniteStatus(fortniteStatus):
@@ -22,40 +23,56 @@ def __formatFortniteServiceStatus(fortniteServiceStatus):
__formatStatus(fortniteServiceStatus.status),
separator=': ')
-# Friend
-async def formatFriends(friends: typing.List[fortnitepy.Friend]):
- friends_formatted = [await __formatFriend(friend) for friend in friends]
+# User
+def formatUsers(users: typing.List[User]):
+ users_formatted = [formatUser(user) for user in users]
return formatting.format_text(
- '\n\n'.join(friends_formatted),
+ '\n\n'.join(users_formatted),
separator='\n')
-async def __formatFriend(friend: fortnitepy.Friend):
- stats = await friend.fetch_br_stats()
- return await formatUser(friend)
-
-# User
-async def formatUser(user: fortnitepy.User):
- stats = await user.fetch_br_stats()
- bp_level: float = await user.fetch_battlepass_level(season=29)
- combined_stats = stats.get_combined_stats()
- if 'keyboardmouse' in combined_stats:
- return __formatUserDevice(user, combined_stats['keyboardmouse'], bp_level)
- else:
- return __formatUserDevice(user, combined_stats['gamepad'], bp_level)
-
-def __formatUserDevice(user: fortnitepy.User, device_stats: dict, bp_level: float):
+def formatUser(user: User):
return formatting.format_text(
formatting.mbold("User: ") + user.display_name,
- formatting.mbold("External auth: ") + ', '.join([__formatExternalAuth(external_auth) for external_auth in user.external_auths]),
formatting.mbold("ID: ") + user.id,
- formatting.mbold("Level: ") + str(int(bp_level//1)),
- formatting.mbold("Matches played: ") + str(device_stats['matchesplayed']),
- formatting.mbold("Total kills: ") + str(device_stats['kills']),
- formatting.mbold("Wins: ") + str(device_stats['wins']),
separator='\n')
-def __formatExternalAuth(external_auth: fortnitepy.ExternalAuth):
- return '{} \({}\)'.format(external_auth.external_display_name, external_auth.type)
+# Stats
+def formatUserStatsList(stats: typing.List[UserStats]):
+ stats_formatted = [__formatStats(singleStats) for singleStats in stats]
+ return formatting.format_text(
+ '\n\n'.join(stats_formatted),
+ separator='\n')
+
+def formatUserStatsDifference(oldUserStats: typing.List[UserStats], newUserStats: typing.List[UserStats]):
+ stats_formatted = []
+
+ for stats in oldUserStats:
+ matched = [x for x in newUserStats if x.user_id == stats.user_id][0]
+ stats_formatted.append(__formatStatsDifference(stats, matched))
+
+ return formatting.format_text(
+ '\n\n'.join(stats_formatted),
+ separator='\n')
+
+def __formatStatsDifference(oldUserStats: UserStats, newUserStats: UserStats):
+ return formatting.format_text(
+ formatting.mbold("User: ") + oldUserStats.user_display_name,
+ formatting.mbold("ID: ") + oldUserStats.user_id,
+ formatting.mbold("Level: ") + "{} \(\+ {}\)".format(str(oldUserStats.level), str(newUserStats.level - oldUserStats.level)),
+ formatting.mbold("Matches played: ") + "{} \(\+ {}\)".format(str(oldUserStats.matches_played), str(newUserStats.matches_played - oldUserStats.matches_played)),
+ formatting.mbold("Total kills: ") + "{} \(\+ {}\)".format(str(oldUserStats.kills), str(newUserStats.kills - oldUserStats.kills)),
+ formatting.mbold("Wins: ") + "{} \(\+ {}\)".format(str(oldUserStats.wins), str(newUserStats.wins - oldUserStats.wins)),
+ separator='\n')
+
+def __formatStats(userStats: UserStats):
+ return formatting.format_text(
+ formatting.mbold("User: ") + userStats.user_display_name,
+ formatting.mbold("ID: ") + userStats.user_id,
+ formatting.mbold("Level: ") + str(userStats.level),
+ formatting.mbold("Matches played: ") + str(userStats.matches_played),
+ formatting.mbold("Total kills: ") + str(userStats.kills),
+ formatting.mbold("Wins: ") + str(userStats.wins),
+ separator='\n')
def formatFriendOnline(display_name: str, party_size: int):
if party_size == 1:
diff --git a/FortniteClient.py b/FortniteClient.py
index c9b9997..a8e6774 100755
--- a/FortniteClient.py
+++ b/FortniteClient.py
@@ -6,23 +6,30 @@ import os
import typing
from DeviceAuth import DeviceAuth
from FortniteEvents import *
+from Types import *
__fortnite_account_key__ = 'fornite-account-key'
class FortniteClient(fortnitepy.Client):
device_auth = DeviceAuth()
- observer = None
+ presenceObserver = None
+ clientInit = None
- def __init__(self, friendPresenceObserver: PresenceObserver):
- self.observer = friendPresenceObserver
+ def __init__(self, friendPresenceObserver: PresenceObserver, clientInit: ClientInit):
+ self.presenceObserver = friendPresenceObserver
+ self.clientInit = clientInit
if self.device_auth.device_auth_file_exists():
self.__auth_device_auth()
else:
self.__auth_authorization_code()
- def get_friends(self):
- return self.friends
+ async def get_friends(self) -> typing.List[User]:
+ return [User.from_fortnite_friend(friend) for friend in self.friends]
+
+ async def find_user(self, display_name: str):
+ user: fortnitepy.User = await self.fetch_user_by_display_name(display_name)
+ return User.from_fortnite_friend(user)
def __auth_authorization_code(self):
code = input("Enter authorization code (https://www.epicgames.com/id/api/redirect?clientId=3446cd72694c4a4485d81b77adbb2141&responseType=code):")
@@ -67,13 +74,12 @@ class FortniteClient(fortnitepy.Client):
print('----------------')
await self.generate_auth_details()
-
- # Accept pending friends
- for friend_request in self.incoming_pending_friends:
- await self.event_friend_request(friend_request)
+
+ # Call observers
+ await self.clientInit.on_event()
async def event_friend_request(self, request: typing.Union[fortnitepy.friend.IncomingPendingFriend, fortnitepy.friend.OutgoingPendingFriend]):
await IncomingFriendRequest.on_event(request)
async def event_friend_presence(self, before, after: fortnitepy.Presence):
- await FriendPresence.on_event(before, after, self.observer) \ No newline at end of file
+ await FriendPresence.on_event(before, after, self.presenceObserver) \ No newline at end of file
diff --git a/FortniteEvents.py b/FortniteEvents.py
index 7e34056..b0477f4 100644
--- a/FortniteEvents.py
+++ b/FortniteEvents.py
@@ -12,6 +12,10 @@ class PresenceObserver:
async def update(self, display_name: str, playing: bool, party_size: int) -> None:
pass
+class ClientInit:
+ async def on_event(self) -> None:
+ pass
+
class FriendPresence:
async def on_event(before, after: fortnitepy.Presence, observer: PresenceObserver):
if before is not None and after is not None:
diff --git a/Types.py b/Types.py
new file mode 100644
index 0000000..fc2e034
--- /dev/null
+++ b/Types.py
@@ -0,0 +1,50 @@
+import fortnitepy
+
+class UserStats:
+ user_id: str
+ user_display_name: str
+ level: int
+ matches_played: int
+ kills: int
+ wins: int
+
+class User:
+
+ id: str
+ display_name: str
+
+ __fortniteUser: fortnitepy.user.UserBase
+
+ def from_fortnite_friend(user: fortnitepy.user.UserBase):
+
+ if user is None:
+ return None
+
+ instance = User()
+
+ instance.id = user.id
+ instance.display_name = user.display_name
+ instance.__fortniteUser = user
+
+ return instance
+
+ async def fetch_stats(self) -> UserStats:
+ stats = await self.__fortniteUser.fetch_br_stats()
+ bp_level: float = await self.__fortniteUser.fetch_battlepass_level(season=29) # TODO
+ combined_stats = stats.get_combined_stats()
+ device_stats = {}
+ if 'keyboardmouse' in combined_stats:
+ device_stats = combined_stats['keyboardmouse']
+ else:
+ device_stats = combined_stats['gamepad']
+
+ stats = UserStats()
+
+ stats.user_id = self.id
+ stats.user_display_name = self.display_name
+ stats.level = int(bp_level//1)
+ stats.matches_played = device_stats['matchesplayed']
+ stats.kills = device_stats['kills']
+ stats.wins = device_stats['wins']
+
+ return stats \ No newline at end of file
diff --git a/persistence.py b/persistence.py
index 6070b69..92452ce 100644
--- a/persistence.py
+++ b/persistence.py
@@ -1,10 +1,9 @@
-import sqlite3
+import sqlite3, typing
+from Types import *
class UserRepository:
- conn = None
-
- def __init__(self, db_path):
+ def __init__(self):
self.__initialize()
def __initialize(self):
@@ -47,3 +46,49 @@ class UserRepository:
text = alias)
cur.execute(query)
connection.commit()
+
+class StatsRepository:
+
+ def __init__(self):
+ self.__initialize()
+
+ def __getConnection(self):
+ return sqlite3.connect('db.sqlite')
+
+ def __initialize(self):
+ cur = self.__getConnection().cursor()
+ cur.execute("CREATE TABLE IF NOT EXISTS stats(user_id TEXT, display_name TEXT, level INT, matches_played INT, kills INT, wins INT)")
+ cur.execute("CREATE UNIQUE INDEX IF NOT EXISTS user_id_idx ON stats(user_id)")
+
+ async def putStats(self, user: User):
+ stats: UserStats = await user.fetch_stats()
+
+ connection = self.__getConnection()
+ cur = connection.cursor()
+ query = "INSERT OR REPLACE INTO stats(user_id, display_name, level, matches_played, kills, wins) VALUES('{user_id}', '{display_name}', {level}, {matches_played}, {kills}, {wins})".format(
+ user_id = user.id,
+ display_name = user.display_name,
+ level = stats.level,
+ matches_played = stats.matches_played,
+ kills = stats.kills,
+ wins = stats.wins)
+ cur.execute(query)
+ connection.commit()
+
+ def getStats(self) -> typing.List[UserStats]:
+ connection = self.__getConnection()
+ cur = connection.cursor()
+ query = "SELECT * FROM stats"
+ cur.execute(query)
+ result = cur.fetchall()
+ return [self.__mapFromDb(stats) for stats in result]
+
+ def __mapFromDb(self, record):
+ userStats = UserStats()
+ userStats.user_id = str(record[0])
+ userStats.user_display_name = str(record[1])
+ userStats.level = int(record[2])
+ userStats.matches_played = int(record[3])
+ userStats.kills = int(record[4])
+ userStats.wins = int(record[5])
+ return userStats \ No newline at end of file
diff --git a/tgbot.py b/tgbot.py
index 1a0e733..ba87df7 100755
--- a/tgbot.py
+++ b/tgbot.py
@@ -1,16 +1,26 @@
#!/usr/bin/python3
-import os, time, telebot, asyncio, telebot.async_telebot, nest_asyncio, sys, logging
+import os, time, telebot, asyncio, telebot.async_telebot, nest_asyncio, sys, logging, time
from FortniteStatusNotifier import *
from Formatter import *
from FortniteClient import *
from FortniteEvents import *
-from persistence import UserRepository
+from persistence import UserRepository, StatsRepository
# Check token in environment variables
if "TELEBOT_BOT_TOKEN" not in os.environ:
raise AssertionError("Please configure TELEBOT_BOT_TOKEN as environment variables")
+class ClientInitObserver(ClientInit):
+ async def on_event(self) -> None:
+ # Accept pending friends
+ for friend_request in fortniteClient.incoming_pending_friends:
+ await fortniteClient.event_friend_request(friend_request)
+
+ # Record user stats
+ if len(statsRepository.getStats()) == 0:
+ await record_user_stats()
+
class FortniteStatusObserver(Observer):
async def update(self, fortniteStatus) -> None:
await send_message_to_all(formatFortniteStatus(fortniteStatus))
@@ -43,9 +53,10 @@ class ExceptionHandler(telebot.ExceptionHandler):
bot = telebot.async_telebot.AsyncTeleBot(
token=os.environ["TELEBOT_BOT_TOKEN"],
exception_handler=ExceptionHandler())
-userRepository = UserRepository('db.sqlite')
+userRepository = UserRepository()
+statsRepository = StatsRepository()
fortniteStatusWrapper = FortniteStatusNotifier(FortniteStatusObserver())
-fortniteClient = FortniteClient(FortnitePresenceObserver())
+fortniteClient = FortniteClient(FortnitePresenceObserver(), ClientInitObserver())
@bot.message_handler(commands = ['start'])
async def startCommand(message: telebot.types.Message):
@@ -62,17 +73,30 @@ async def getStatus(message):
@bot.message_handler(commands = ['friends'])
async def getFriends(message):
- await reply(message, await formatFriends(fortniteClient.get_friends()))
+ friends = await fortniteClient.get_friends()
+ await reply(message, formatUsers(friends))
+
+@bot.message_handler(commands = ['stats'])
+async def getStats(message):
+ friends = await fortniteClient.get_friends()
+ stats = [await friend.fetch_stats() for friend in friends]
+ await reply(message, formatUserStatsList(stats))
+
+@bot.message_handler(commands = ['todaystats'])
+async def getTodayStats(message):
+ persisted_stats = statsRepository.getStats()
+ friends = await fortniteClient.get_friends()
+ current_stats = [await friend.fetch_stats() for friend in friends]
+ await reply(message, formatUserStatsDifference(persisted_stats, current_stats))
@bot.message_handler(commands = ['find'])
async def findUser(message):
arg = message.text.split()
if len(arg) > 1:
search_user_display_name = arg[1]
- users: typing.List[fortnitepy.User] = await fortniteClient.fetch_users_by_display_name(search_user_display_name)
- if (len(users) > 0):
- for user in users:
- await reply(message, await formatUser(user))
+ user: User = await fortniteClient.find_user(search_user_display_name)
+ if user is not None:
+ await reply(message, formatUser(user))
else:
await reply(message, 'User {} not found'.format(search_user_display_name))
else:
@@ -83,7 +107,6 @@ async def addUser(message):
arg = message.text.split()
if len(arg) > 1:
user_id = arg[1]
- print('Adding user with ID as friend {}'.format(user_id))
await fortniteClient.add_friend(user_id)
await reply(message, 'Send friend request successfully')
else:
@@ -107,6 +130,12 @@ async def reply(message, message_text):
message_text,
parse_mode='MarkdownV2')
+async def record_user_stats():
+ print('Recording user stats')
+ friends = await fortniteClient.get_friends()
+ for friend in friends:
+ await statsRepository.putStats(friend)
+
async def run_tgbot():
await bot.polling()
@@ -116,8 +145,15 @@ async def run_fortniteStatusWrapper():
async def run_fortniteClient():
fortniteClient.run()
+async def run_record_stats():
+ while True:
+ t = time.localtime()
+ if t.tm_hour == 5: # only at 05:00
+ await record_user_stats()
+ await asyncio.sleep(60 * 60 * 60) # 1 hour
+
async def run_all():
- await asyncio.gather(run_tgbot(), run_fortniteStatusWrapper(), run_fortniteClient())
+ await asyncio.gather(run_tgbot(), run_fortniteStatusWrapper(), run_fortniteClient(), run_record_stats())
if __name__ == '__main__':
nest_asyncio.apply()