summaryrefslogtreecommitdiff
path: root/fortnite_status/__init__.py
blob: ee21ad64138da83c4ceda8adcc63a8a8cc9be49e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# Simple utility class which provides an access to Fortnite service status report.
# Usage: ForniteStatus().getStatus()

import requests as req
import html5lib
import asyncio
from bs4 import BeautifulSoup

# Polling interval in seconds
__polling_interval__ = 5 * 60 # 5 minutes

class FortniteStatusObserver:
    async def update(self, fortnite_status) -> None:
        pass

class FortniteStatus:
    """
    Instantiate a FortniteStatus class.
    """

    class Status:
        service_statuses = []

        def __init__(self, service_statuses):
            self.service_statuses = service_statuses
        
        def __eq__(self, other): 
            if not isinstance(other, FortniteStatus.Status):
                return NotImplemented
            return sorted(self.service_statuses) == sorted(other.service_statuses)

        def prettify(self):
            return 'Fortnite services status:\n' + '\n'.join([service_status.prettify() for service_status in self.service_statuses])

    class ServiceStatus:
        service_name = ''
        status = False

        def __init__(self, service_name, status):
            self.service_name = service_name
            self.status = status
        
        def __lt__(self, other):
            if not isinstance(other, FortniteStatus.ServiceStatus):
                return NotImplemented
            return self.service_name < other.service_name    
        
        def __eq__(self, other): 
            if not isinstance(other, FortniteStatus.ServiceStatus):
                return NotImplemented
            return self.service_name == other.service_name and self.status == other.status


        def prettify(self):
            return f'{self.service_name}, {self.status}'

    def __find_fortnite_status_html_component(self, html):
        for component in html.findAll('div', {'class': 'component-container'}):
                inner_containers = component.findAll('div', {'class': 'component-inner-container'})
                for inner_container in inner_containers:
                    for names in inner_container.findAll('span', {'class': 'name'}):
                        for name in names.findAll('span'):
                            if 'class' not in name.attrs and 'Fortnite' in name.text:
                                return component

    def __parse_fortnite_status(self, html):
        component = self.__find_fortnite_status_html_component(html)
        service_statuses = []
        child_container = component.find('div', {'class': 'child-components-container'})
        for inner_container in child_container.findAll('div', {'class': 'component-inner-container'}):
            name = inner_container.find('span', {'class': 'name'}).text.strip()
            status_string = inner_container.find('span', {'class': 'component-status'}).text.strip()
            if status_string == 'Operational':
                status_code = True
            else:
                status_code = False
            service_statuses.append(self.ServiceStatus(name, status_code))
        return self.Status(service_statuses)

    def get_status(self):
        web_content = req.get("https://status.epicgames.com/")
        parsed_html = BeautifulSoup(web_content.text, 'html5lib')

        return self.__parse_fortnite_status(parsed_html)

    def print_status(self):
        """
        Prints a current Fortnite services status in stdout.
        Example:

        """
        print(self.get_status().prettify())

class FortniteStatusNotifier:

    __fortnite_status_observer: FortniteStatusObserver
    __fortnite_status: FortniteStatus
    __last_fortnite_status: any

    def __init__(self, fortnite_status_observer: FortniteStatusObserver):
        self.__fortnite_status_observer = fortnite_status_observer
        self.__fortnite_status = FortniteStatus()
    
    async def run(self):
        # Initialize status
        self.__last_fortnite_status = self.__fortnite_status.get_status()
        while True:
            await self.__read_status()
            await asyncio.sleep(__polling_interval__)
    
    async def __read_status(self):
        service_status_tmp = self.__fortnite_status.get_status()
        if service_status_tmp != self.__last_fortnite_status:
            await self.__notify(service_status_tmp)
        self.__last_fortnite_status = service_status_tmp

    async def __notify(self, fortnite_status):
        await self.__fortnite_status_observer.update(fortnite_status)