import fortnitepy import json import os import typing import traceback from fortnite_client.device_auth import DeviceAuth from app_types import * __fortnite_account_key__ = 'fornite-account-key' # Intefaces for events class FriendPresenceObserver: async def update(self, display_name: str, playing: bool, party_size: int) -> None: pass class ClientInitObserver: async def on_event(self, fortnite_client) -> None: pass class NewFriendObserver: async def on_event(self, friend: User) -> None: pass async def on_failure(self, display_name) -> None: pass class FortniteClient(fortnitepy.Client): __device_auth: DeviceAuth __friend_presence_observer: FriendPresenceObserver __client_init_observer: ClientInitObserver __new_friend_observer: NewFriendObserver __is_initialized: bool = False def __init__(self, client_init_observer: ClientInitObserver, friend_presence_observer: FriendPresenceObserver, new_friend_observer: NewFriendObserver): self.__device_auth = DeviceAuth() self.__client_init_observer = client_init_observer self.__friend_presence_observer = friend_presence_observer self.__new_friend_observer = new_friend_observer if self.__device_auth.device_auth_file_exists(): self.__auth_device_auth() else: self.__auth_authorization_code() async def get_friends(self) -> typing.List[User]: return [User.from_fortnite_friend(friend) for friend in self.friends] async def find_user(self, display_name: str): user: fortnitepy.User = await self.fetch_user_by_display_name(display_name) return User.from_fortnite_friend(user) def is_initialized(self): return self.__is_initialized def __auth_authorization_code(self): code = input("Enter authorization code (https://www.epicgames.com/id/api/redirect?clientId=3446cd72694c4a4485d81b77adbb2141&responseType=code):") super().__init__( auth=fortnitepy.AuthorizationCodeAuth( code = code ) ) def __auth_device_auth(self): device_auth_details = self.__device_auth.get_device_auth_details().get(__fortnite_account_key__, {}) super().__init__( auth=fortnitepy.DeviceAuth( **device_auth_details ) ) # Generate auth details if none were supplied yet async def __generate_auth_details(self): if not self.__device_auth.device_auth_file_exists(): device_auth_data = await self.auth.generate_device_auth() details = { 'device_id': device_auth_data['deviceId'], 'account_id': device_auth_data['accountId'], 'secret': device_auth_data['secret'], } self.auth.__dict__.update(details) self.dispatch_event( 'device_auth_generate', details, __fortnite_account_key__ ) async def event_device_auth_generate(self, details, email): self.__device_auth.store_device_auth_details(email, details) async def event_ready(self): self.__is_initialized = True await self.__generate_auth_details() await self.__client_init_observer.on_event(self) async def event_friend_request(self, request: typing.Union[fortnitepy.friend.IncomingPendingFriend, fortnitepy.friend.OutgoingPendingFriend]): await IncomingFriendRequestEvent.on_event(request, self.__new_friend_observer) async def event_friend_presence(self, before, after: fortnitepy.Presence): await FriendPresenceEvent.on_event(before, after, self.__friend_presence_observer) class FriendPresenceEvent: async def on_event(before, after: fortnitepy.Presence, friend_presence_observer: FriendPresenceObserver): if before is not None and after is not None: if before.playing != after.playing: print('FriendPresence changed for user {}, before {}, after {}'.format(after.friend.display_name, before.playing, after.playing)) party_size: int = 1 if after.has_properties: party: fortnitepy.PresenceParty = after.party if party is not None and party.playercount is not None: party_size = int(party.playercount) await friend_presence_observer.update( after.friend.display_name, after.playing, party_size) class IncomingFriendRequestEvent: async def on_event(request: typing.Union[fortnitepy.friend.IncomingPendingFriend, fortnitepy.friend.OutgoingPendingFriend], new_friend_observer: NewFriendObserver): if isinstance(request, fortnitepy.friend.IncomingPendingFriend): incoming_request = typing.cast(fortnitepy.friend.IncomingPendingFriend, request) print('Accepting friend request from {}'.format(incoming_request.display_name)) await incoming_request.accept() accepted_friend = incoming_request.client.get_friend(request._id) # Try fetch stats try: await IncomingFriendRequestEvent.__try_get_stats(accepted_friend) await new_friend_observer.on_event(User.from_fortnite_friend(accepted_friend)) except: print("An exception occurred while fetching user stats") print(traceback.format_exc()) await accepted_friend.remove() await new_friend_observer.on_failure(accepted_friend.display_name) async def __try_get_stats(friend: fortnitepy.Friend): await friend.fetch_br_stats()