1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
import fortnitepy
import typing
import time
from fortnite_client import *
from telegram_bot import *
from persistence import *
from formatter import *
class ClientInitObserverImpl(ClientInitObserver):
async def on_event(self, fortnite_client: FortniteClient) -> None:
print('----------------')
print('FortniteClient ready as:')
print(fortnite_client.user.display_name)
print(fortnite_client.user.id)
print('----------------')
class FriendPresenceObserverImpl(FriendPresenceObserver):
__telegram_bot: TelegramBot
__presence_repository: PresenceRepository
def __init__(self, telegram_bot: TelegramBot, presence_repository: PresenceRepository):
self.__telegram_bot = telegram_bot
self.__presence_repository = presence_repository
async def update(self, display_name: str, playing: bool, party_size: int) -> None:
if playing:
last_presence = self.__presence_repository.get_last_user_presence(display_name)
diff = time.time() - last_presence
if diff > 60 * 60: # 60 minutes
await self.__notify_friend_playing(display_name, party_size)
self.__presence_repository.set_last_user_presence(display_name, time.time())
async def __notify_friend_playing(self, display_name: str, party_size: int):
await self.__telegram_bot.send_message_to_all(format_friend_online(display_name, party_size))
class NewFriendObserverImpl(NewFriendObserver):
__telegram_bot: TelegramBot
__stats_repository: StatsRepository
def __init__(self, telegram_bot: TelegramBot, stats_repository: StatsRepository):
self.__telegram_bot = telegram_bot
self.__stats_repository = stats_repository
async def on_event(self, friend: User) -> None:
await self.__stats_repository.put_stats(friend)
await self.__telegram_bot.send_message_to_all(format_new_friend(friend.display_name))
async def on_failure(self, display_name) -> None:
await self.__telegram_bot.send_message_to_all(format_failed_new_friend(display_name))
|