#!/usr/bin/python3 import os import time, threading, schedule import telebot import asyncio import telebot.async_telebot from FortniteStatusWrapper import * from FortniteStatusFormatter import * from FortniteClient import * from FortniteEvents import * from persistence import UserRepository from datetime import datetime if "TELEBOT_BOT_TOKEN" not in os.environ: raise AssertionError("Please configure TELEBOT_BOT_TOKEN as environment variables") #bot = telebot.TeleBot(os.environ["TELEBOT_BOT_TOKEN"]) bot = telebot.async_telebot.AsyncTeleBot(os.environ["TELEBOT_BOT_TOKEN"]) userRepository = UserRepository('db.sqlite') fortniteStatusWrapper = FortniteStatusWrapper() fortniteClient = FortniteClient() @bot.message_handler(commands = ['start']) async def startCommand(message): userRepository.putUser(message.chat.id) await bot.reply_to(message, "This chat successfully registered to receive Fortnite updates!") @bot.message_handler(commands = ['status']) async def getStatus(message): await bot.reply_to( message, formatFortniteStatus(fortniteStatus.getStatus()), parse_mode='MarkdownV2') @bot.message_handler(commands = ['friends']) async def getFriends(message): await bot.reply_to( message, formatFriends(fortniteClient.get_friends()), parse_mode='MarkdownV2') @bot.message_handler(commands = ['find']) async def findUser(message): arg = message.text.split() if len(arg) > 1: search_user_display_name = arg[1] print('Searching users by name {}'.format(search_user_display_name)) users: typing.List[fortnitepy.User] = await fortniteClient.fetch_users_by_display_name(search_user_display_name) for user in users: stats = await user.fetch_br_stats() await bot.reply_to( message, formatUser(user, stats), parse_mode='MarkdownV2') else: await bot.reply_to( message, 'Usage: /find username', parse_mode='MarkdownV2') @bot.message_handler(commands = ['add']) async def addUser(message): arg = message.text.split() if len(arg) > 1: user_id = arg[1] print('Adding user with ID as friend {}'.format(user_id)) await fortniteClient.add_friend(user_id) await bot.reply_to( message, 'Send friend request successfully', parse_mode='MarkdownV2') else: await bot.reply_to( message, 'Usage: /add username', parse_mode='MarkdownV2') class FortniteStatusObserver(Observer): def update(self, fortniteStatus) -> None: for user in userRepository.getAllUsers(): bot.send_message( user[0], formatFortniteStatus(fortniteStatus), parse_mode='MarkdownV2' ) class FortnitePresenceObserver(PresenceObserver): # Map name -> last seen not playing timestamp seconds statuses = {} def update(self, display_name: str, playing: bool) -> None: print('FortnitePresenceObserver: {} playing = {}'.format(display_name, playing)) if playing: if not display_name in self.statuses: self.__notifyFriendPlaying(display_name) self.statuses[display_name] = time.time() else: diff = time.time() - self.statuses[display_name] if diff > 60 * 60: # 60 minutes self.__notifyFriendPlaying(display_name) else: self.statuses[display_name] = time.time() def __notifyFriendPlaying(self, display_name: str): for user in userRepository.getAllUsers(): bot.send_message( user[0], '{} is online'.format(display_name), parse_mode='MarkdownV2' ) async def run_bot(): await bot.polling() async def run_client(): await fortniteClient.run() if __name__ == '__main__': fortniteStatusWrapper.attach(FortniteStatusObserver()) fortniteClient.attach(FortnitePresenceObserver()) loop = asyncio.get_event_loop() loop.create_task(bot.polling()) loop.create_task(fortniteClient.run()) loop.run_forever()