From 3089066696ce90eb1a4c0b381e9fc414ec00db85 Mon Sep 17 00:00:00 2001 From: ue86388 Date: Tue, 16 Apr 2024 15:28:02 +0200 Subject: User statistics --- Formatter.py | 71 ++++++++++++++++++++++++++++++++++--------------------- FortniteClient.py | 26 ++++++++++++-------- FortniteEvents.py | 4 ++++ Types.py | 50 +++++++++++++++++++++++++++++++++++++++ persistence.py | 53 +++++++++++++++++++++++++++++++++++++---- tgbot.py | 58 ++++++++++++++++++++++++++++++++++++--------- 6 files changed, 210 insertions(+), 52 deletions(-) create mode 100644 Types.py diff --git a/Formatter.py b/Formatter.py index 07499e5..7241aca 100644 --- a/Formatter.py +++ b/Formatter.py @@ -1,5 +1,6 @@ from telebot import formatting -import fortnitepy, typing +import typing +from Types import * # Status def formatFortniteStatus(fortniteStatus): @@ -22,40 +23,56 @@ def __formatFortniteServiceStatus(fortniteServiceStatus): __formatStatus(fortniteServiceStatus.status), separator=': ') -# Friend -async def formatFriends(friends: typing.List[fortnitepy.Friend]): - friends_formatted = [await __formatFriend(friend) for friend in friends] +# User +def formatUsers(users: typing.List[User]): + users_formatted = [formatUser(user) for user in users] return formatting.format_text( - '\n\n'.join(friends_formatted), + '\n\n'.join(users_formatted), separator='\n') -async def __formatFriend(friend: fortnitepy.Friend): - stats = await friend.fetch_br_stats() - return await formatUser(friend) - -# User -async def formatUser(user: fortnitepy.User): - stats = await user.fetch_br_stats() - bp_level: float = await user.fetch_battlepass_level(season=29) - combined_stats = stats.get_combined_stats() - if 'keyboardmouse' in combined_stats: - return __formatUserDevice(user, combined_stats['keyboardmouse'], bp_level) - else: - return __formatUserDevice(user, combined_stats['gamepad'], bp_level) - -def __formatUserDevice(user: fortnitepy.User, device_stats: dict, bp_level: float): +def formatUser(user: User): return formatting.format_text( formatting.mbold("User: ") + user.display_name, - formatting.mbold("External auth: ") + ', '.join([__formatExternalAuth(external_auth) for external_auth in user.external_auths]), formatting.mbold("ID: ") + user.id, - formatting.mbold("Level: ") + str(int(bp_level//1)), - formatting.mbold("Matches played: ") + str(device_stats['matchesplayed']), - formatting.mbold("Total kills: ") + str(device_stats['kills']), - formatting.mbold("Wins: ") + str(device_stats['wins']), separator='\n') -def __formatExternalAuth(external_auth: fortnitepy.ExternalAuth): - return '{} \({}\)'.format(external_auth.external_display_name, external_auth.type) +# Stats +def formatUserStatsList(stats: typing.List[UserStats]): + stats_formatted = [__formatStats(singleStats) for singleStats in stats] + return formatting.format_text( + '\n\n'.join(stats_formatted), + separator='\n') + +def formatUserStatsDifference(oldUserStats: typing.List[UserStats], newUserStats: typing.List[UserStats]): + stats_formatted = [] + + for stats in oldUserStats: + matched = [x for x in newUserStats if x.user_id == stats.user_id][0] + stats_formatted.append(__formatStatsDifference(stats, matched)) + + return formatting.format_text( + '\n\n'.join(stats_formatted), + separator='\n') + +def __formatStatsDifference(oldUserStats: UserStats, newUserStats: UserStats): + return formatting.format_text( + formatting.mbold("User: ") + oldUserStats.user_display_name, + formatting.mbold("ID: ") + oldUserStats.user_id, + formatting.mbold("Level: ") + "{} \(\+ {}\)".format(str(oldUserStats.level), str(newUserStats.level - oldUserStats.level)), + formatting.mbold("Matches played: ") + "{} \(\+ {}\)".format(str(oldUserStats.matches_played), str(newUserStats.matches_played - oldUserStats.matches_played)), + formatting.mbold("Total kills: ") + "{} \(\+ {}\)".format(str(oldUserStats.kills), str(newUserStats.kills - oldUserStats.kills)), + formatting.mbold("Wins: ") + "{} \(\+ {}\)".format(str(oldUserStats.wins), str(newUserStats.wins - oldUserStats.wins)), + separator='\n') + +def __formatStats(userStats: UserStats): + return formatting.format_text( + formatting.mbold("User: ") + userStats.user_display_name, + formatting.mbold("ID: ") + userStats.user_id, + formatting.mbold("Level: ") + str(userStats.level), + formatting.mbold("Matches played: ") + str(userStats.matches_played), + formatting.mbold("Total kills: ") + str(userStats.kills), + formatting.mbold("Wins: ") + str(userStats.wins), + separator='\n') def formatFriendOnline(display_name: str, party_size: int): if party_size == 1: diff --git a/FortniteClient.py b/FortniteClient.py index c9b9997..a8e6774 100755 --- a/FortniteClient.py +++ b/FortniteClient.py @@ -6,23 +6,30 @@ import os import typing from DeviceAuth import DeviceAuth from FortniteEvents import * +from Types import * __fortnite_account_key__ = 'fornite-account-key' class FortniteClient(fortnitepy.Client): device_auth = DeviceAuth() - observer = None + presenceObserver = None + clientInit = None - def __init__(self, friendPresenceObserver: PresenceObserver): - self.observer = friendPresenceObserver + def __init__(self, friendPresenceObserver: PresenceObserver, clientInit: ClientInit): + self.presenceObserver = friendPresenceObserver + self.clientInit = clientInit if self.device_auth.device_auth_file_exists(): self.__auth_device_auth() else: self.__auth_authorization_code() - def get_friends(self): - return self.friends + async def get_friends(self) -> typing.List[User]: + return [User.from_fortnite_friend(friend) for friend in self.friends] + + async def find_user(self, display_name: str): + user: fortnitepy.User = await self.fetch_user_by_display_name(display_name) + return User.from_fortnite_friend(user) def __auth_authorization_code(self): code = input("Enter authorization code (https://www.epicgames.com/id/api/redirect?clientId=3446cd72694c4a4485d81b77adbb2141&responseType=code):") @@ -67,13 +74,12 @@ class FortniteClient(fortnitepy.Client): print('----------------') await self.generate_auth_details() - - # Accept pending friends - for friend_request in self.incoming_pending_friends: - await self.event_friend_request(friend_request) + + # Call observers + await self.clientInit.on_event() async def event_friend_request(self, request: typing.Union[fortnitepy.friend.IncomingPendingFriend, fortnitepy.friend.OutgoingPendingFriend]): await IncomingFriendRequest.on_event(request) async def event_friend_presence(self, before, after: fortnitepy.Presence): - await FriendPresence.on_event(before, after, self.observer) \ No newline at end of file + await FriendPresence.on_event(before, after, self.presenceObserver) \ No newline at end of file diff --git a/FortniteEvents.py b/FortniteEvents.py index 7e34056..b0477f4 100644 --- a/FortniteEvents.py +++ b/FortniteEvents.py @@ -12,6 +12,10 @@ class PresenceObserver: async def update(self, display_name: str, playing: bool, party_size: int) -> None: pass +class ClientInit: + async def on_event(self) -> None: + pass + class FriendPresence: async def on_event(before, after: fortnitepy.Presence, observer: PresenceObserver): if before is not None and after is not None: diff --git a/Types.py b/Types.py new file mode 100644 index 0000000..fc2e034 --- /dev/null +++ b/Types.py @@ -0,0 +1,50 @@ +import fortnitepy + +class UserStats: + user_id: str + user_display_name: str + level: int + matches_played: int + kills: int + wins: int + +class User: + + id: str + display_name: str + + __fortniteUser: fortnitepy.user.UserBase + + def from_fortnite_friend(user: fortnitepy.user.UserBase): + + if user is None: + return None + + instance = User() + + instance.id = user.id + instance.display_name = user.display_name + instance.__fortniteUser = user + + return instance + + async def fetch_stats(self) -> UserStats: + stats = await self.__fortniteUser.fetch_br_stats() + bp_level: float = await self.__fortniteUser.fetch_battlepass_level(season=29) # TODO + combined_stats = stats.get_combined_stats() + device_stats = {} + if 'keyboardmouse' in combined_stats: + device_stats = combined_stats['keyboardmouse'] + else: + device_stats = combined_stats['gamepad'] + + stats = UserStats() + + stats.user_id = self.id + stats.user_display_name = self.display_name + stats.level = int(bp_level//1) + stats.matches_played = device_stats['matchesplayed'] + stats.kills = device_stats['kills'] + stats.wins = device_stats['wins'] + + return stats \ No newline at end of file diff --git a/persistence.py b/persistence.py index 6070b69..92452ce 100644 --- a/persistence.py +++ b/persistence.py @@ -1,10 +1,9 @@ -import sqlite3 +import sqlite3, typing +from Types import * class UserRepository: - conn = None - - def __init__(self, db_path): + def __init__(self): self.__initialize() def __initialize(self): @@ -47,3 +46,49 @@ class UserRepository: text = alias) cur.execute(query) connection.commit() + +class StatsRepository: + + def __init__(self): + self.__initialize() + + def __getConnection(self): + return sqlite3.connect('db.sqlite') + + def __initialize(self): + cur = self.__getConnection().cursor() + cur.execute("CREATE TABLE IF NOT EXISTS stats(user_id TEXT, display_name TEXT, level INT, matches_played INT, kills INT, wins INT)") + cur.execute("CREATE UNIQUE INDEX IF NOT EXISTS user_id_idx ON stats(user_id)") + + async def putStats(self, user: User): + stats: UserStats = await user.fetch_stats() + + connection = self.__getConnection() + cur = connection.cursor() + query = "INSERT OR REPLACE INTO stats(user_id, display_name, level, matches_played, kills, wins) VALUES('{user_id}', '{display_name}', {level}, {matches_played}, {kills}, {wins})".format( + user_id = user.id, + display_name = user.display_name, + level = stats.level, + matches_played = stats.matches_played, + kills = stats.kills, + wins = stats.wins) + cur.execute(query) + connection.commit() + + def getStats(self) -> typing.List[UserStats]: + connection = self.__getConnection() + cur = connection.cursor() + query = "SELECT * FROM stats" + cur.execute(query) + result = cur.fetchall() + return [self.__mapFromDb(stats) for stats in result] + + def __mapFromDb(self, record): + userStats = UserStats() + userStats.user_id = str(record[0]) + userStats.user_display_name = str(record[1]) + userStats.level = int(record[2]) + userStats.matches_played = int(record[3]) + userStats.kills = int(record[4]) + userStats.wins = int(record[5]) + return userStats \ No newline at end of file diff --git a/tgbot.py b/tgbot.py index 1a0e733..ba87df7 100755 --- a/tgbot.py +++ b/tgbot.py @@ -1,16 +1,26 @@ #!/usr/bin/python3 -import os, time, telebot, asyncio, telebot.async_telebot, nest_asyncio, sys, logging +import os, time, telebot, asyncio, telebot.async_telebot, nest_asyncio, sys, logging, time from FortniteStatusNotifier import * from Formatter import * from FortniteClient import * from FortniteEvents import * -from persistence import UserRepository +from persistence import UserRepository, StatsRepository # Check token in environment variables if "TELEBOT_BOT_TOKEN" not in os.environ: raise AssertionError("Please configure TELEBOT_BOT_TOKEN as environment variables") +class ClientInitObserver(ClientInit): + async def on_event(self) -> None: + # Accept pending friends + for friend_request in fortniteClient.incoming_pending_friends: + await fortniteClient.event_friend_request(friend_request) + + # Record user stats + if len(statsRepository.getStats()) == 0: + await record_user_stats() + class FortniteStatusObserver(Observer): async def update(self, fortniteStatus) -> None: await send_message_to_all(formatFortniteStatus(fortniteStatus)) @@ -43,9 +53,10 @@ class ExceptionHandler(telebot.ExceptionHandler): bot = telebot.async_telebot.AsyncTeleBot( token=os.environ["TELEBOT_BOT_TOKEN"], exception_handler=ExceptionHandler()) -userRepository = UserRepository('db.sqlite') +userRepository = UserRepository() +statsRepository = StatsRepository() fortniteStatusWrapper = FortniteStatusNotifier(FortniteStatusObserver()) -fortniteClient = FortniteClient(FortnitePresenceObserver()) +fortniteClient = FortniteClient(FortnitePresenceObserver(), ClientInitObserver()) @bot.message_handler(commands = ['start']) async def startCommand(message: telebot.types.Message): @@ -62,17 +73,30 @@ async def getStatus(message): @bot.message_handler(commands = ['friends']) async def getFriends(message): - await reply(message, await formatFriends(fortniteClient.get_friends())) + friends = await fortniteClient.get_friends() + await reply(message, formatUsers(friends)) + +@bot.message_handler(commands = ['stats']) +async def getStats(message): + friends = await fortniteClient.get_friends() + stats = [await friend.fetch_stats() for friend in friends] + await reply(message, formatUserStatsList(stats)) + +@bot.message_handler(commands = ['todaystats']) +async def getTodayStats(message): + persisted_stats = statsRepository.getStats() + friends = await fortniteClient.get_friends() + current_stats = [await friend.fetch_stats() for friend in friends] + await reply(message, formatUserStatsDifference(persisted_stats, current_stats)) @bot.message_handler(commands = ['find']) async def findUser(message): arg = message.text.split() if len(arg) > 1: search_user_display_name = arg[1] - users: typing.List[fortnitepy.User] = await fortniteClient.fetch_users_by_display_name(search_user_display_name) - if (len(users) > 0): - for user in users: - await reply(message, await formatUser(user)) + user: User = await fortniteClient.find_user(search_user_display_name) + if user is not None: + await reply(message, formatUser(user)) else: await reply(message, 'User {} not found'.format(search_user_display_name)) else: @@ -83,7 +107,6 @@ async def addUser(message): arg = message.text.split() if len(arg) > 1: user_id = arg[1] - print('Adding user with ID as friend {}'.format(user_id)) await fortniteClient.add_friend(user_id) await reply(message, 'Send friend request successfully') else: @@ -107,6 +130,12 @@ async def reply(message, message_text): message_text, parse_mode='MarkdownV2') +async def record_user_stats(): + print('Recording user stats') + friends = await fortniteClient.get_friends() + for friend in friends: + await statsRepository.putStats(friend) + async def run_tgbot(): await bot.polling() @@ -116,8 +145,15 @@ async def run_fortniteStatusWrapper(): async def run_fortniteClient(): fortniteClient.run() +async def run_record_stats(): + while True: + t = time.localtime() + if t.tm_hour == 5: # only at 05:00 + await record_user_stats() + await asyncio.sleep(60 * 60 * 60) # 1 hour + async def run_all(): - await asyncio.gather(run_tgbot(), run_fortniteStatusWrapper(), run_fortniteClient()) + await asyncio.gather(run_tgbot(), run_fortniteStatusWrapper(), run_fortniteClient(), run_record_stats()) if __name__ == '__main__': nest_asyncio.apply() -- cgit v1.2.3