1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
import fortnitepy
import json
import os
import typing
import traceback
from fortnite_client.device_auth import DeviceAuth
from app_types import *
__fortnite_account_key__ = 'fornite-account-key'
# Intefaces for events
class FriendPresenceObserver:
async def update(self, display_name: str, playing: bool, party_size: int) -> None:
pass
class ClientInitObserver:
async def on_event(self, fortnite_client) -> None:
pass
class NewFriendObserver:
async def on_event(self, friend: User) -> None:
pass
async def on_failure(self, display_name) -> None:
pass
class FortniteClient(fortnitepy.Client):
__device_auth: DeviceAuth
__friend_presence_observer: FriendPresenceObserver
__client_init_observer: ClientInitObserver
__new_friend_observer: NewFriendObserver
__is_initialized: bool = False
def __init__(self, client_init_observer: ClientInitObserver, friend_presence_observer: FriendPresenceObserver, new_friend_observer: NewFriendObserver):
self.__device_auth = DeviceAuth()
self.__client_init_observer = client_init_observer
self.__friend_presence_observer = friend_presence_observer
self.__new_friend_observer = new_friend_observer
if self.__device_auth.device_auth_file_exists():
self.__auth_device_auth()
else:
self.__auth_authorization_code()
async def get_friends(self) -> typing.List[User]:
return [User.from_fortnite_friend(friend) for friend in self.friends]
async def find_user(self, display_name: str):
user: fortnitepy.User = await self.fetch_user_by_display_name(display_name)
return User.from_fortnite_friend(user)
def is_initialized(self):
return self.__is_initialized
def __auth_authorization_code(self):
code = input("Enter authorization code (https://www.epicgames.com/id/api/redirect?clientId=3446cd72694c4a4485d81b77adbb2141&responseType=code):")
super().__init__(
auth=fortnitepy.AuthorizationCodeAuth(
code = code
)
)
def __auth_device_auth(self):
device_auth_details = self.__device_auth.get_device_auth_details().get(__fortnite_account_key__, {})
super().__init__(
auth=fortnitepy.DeviceAuth(
**device_auth_details
)
)
# Generate auth details if none were supplied yet
async def __generate_auth_details(self):
if not self.__device_auth.device_auth_file_exists():
device_auth_data = await self.auth.generate_device_auth()
details = {
'device_id': device_auth_data['deviceId'],
'account_id': device_auth_data['accountId'],
'secret': device_auth_data['secret'],
}
self.auth.__dict__.update(details)
self.dispatch_event(
'device_auth_generate',
details,
__fortnite_account_key__
)
async def event_device_auth_generate(self, details, email):
self.__device_auth.store_device_auth_details(email, details)
async def event_ready(self):
self.__is_initialized = True
await self.__generate_auth_details()
await self.__client_init_observer.on_event(self)
async def event_friend_request(self, request: typing.Union[fortnitepy.friend.IncomingPendingFriend, fortnitepy.friend.OutgoingPendingFriend]):
await IncomingFriendRequestEvent.on_event(request, self.__new_friend_observer)
async def event_friend_presence(self, before, after: fortnitepy.Presence):
await FriendPresenceEvent.on_event(before, after, self.__friend_presence_observer)
class FriendPresenceEvent:
async def on_event(before, after: fortnitepy.Presence, friend_presence_observer: FriendPresenceObserver):
if before is not None and after is not None:
if before.playing != after.playing:
print('FriendPresence changed for user {}, before {}, after {}'.format(after.friend.display_name, before.playing, after.playing))
party_size: int = 1
if after.has_properties:
party: fortnitepy.PresenceParty = after.party
if party is not None and party.playercount is not None:
party_size = int(party.playercount)
await friend_presence_observer.update(
after.friend.display_name,
after.playing,
party_size)
class IncomingFriendRequestEvent:
async def on_event(request: typing.Union[fortnitepy.friend.IncomingPendingFriend, fortnitepy.friend.OutgoingPendingFriend], new_friend_observer: NewFriendObserver):
if isinstance(request, fortnitepy.friend.IncomingPendingFriend):
incoming_request = typing.cast(fortnitepy.friend.IncomingPendingFriend, request)
print('Accepting friend request from {}'.format(incoming_request.display_name))
await incoming_request.accept()
accepted_friend = incoming_request.client.get_friend(request._id)
# Try fetch stats
try:
await IncomingFriendRequestEvent.__try_get_stats(accepted_friend)
await new_friend_observer.on_event(User.from_fortnite_friend(accepted_friend))
except:
print("An exception occurred while fetching user stats")
print(traceback.format_exc())
await accepted_friend.remove()
await new_friend_observer.on_failure(accepted_friend.display_name)
async def __try_get_stats(friend: fortnitepy.Friend):
await friend.fetch_br_stats()
|