summaryrefslogtreecommitdiff
path: root/main.py
blob: 8d618e648627439c8e208390a01932132c7501be (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import argparse
import logging
import spotipy
import sys
import time
from tqdm import tqdm
from spotipy.oauth2 import SpotifyOAuth
from yandex_music import Client, Artist

REDIRECT_URI = 'https://open.spotify.com'
DEFAULT_SPOTIFY_LIMIT = 50

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class Synchronizer:

    def __init__(self, arguments):
        spotify_auth_manager = spotipy.oauth2.SpotifyOAuth(
            client_id=arguments.id,
            client_secret=arguments.secret,
            redirect_uri=REDIRECT_URI,
            scope='playlist-modify-public, user-library-modify, user-library-read',
            username=arguments.spotify_user,
        )
        self.spotify_client = spotipy.Spotify(
            auth_manager=spotify_auth_manager
        )
        self.spotify_username = self._handle_spotify_exception(self.spotify_client.me)()['id']
        self.yandex_client = Client(arguments.yandex_token)
        self.yandex_client.init()
    
    def start(self):
        self._sync_liked_tracks()
        self._sync_playlists()
        sys.exit()

    def _sync_liked_tracks(self):
        logger.info(f'Start synchronization of liked tracks...')
        likes_tracks = self.yandex_client.users_likes_tracks().tracks
        tracks = self.yandex_client.tracks([f'{track.id}:{track.album_id}' for track in likes_tracks if track.album_id])
        for track in tqdm(tracks):
            time.sleep(0.25)
            spotify_item = self._find_spotify_item(track)
            if spotify_item is not None:
                self._handle_spotify_exception(self.spotify_client.current_user_saved_tracks_add)([spotify_item])

    def _sync_playlists(self):
        logger.info(f'Start syncronization of playlists...')
        playlists = self.yandex_client.users_playlists_list()
        for playlist in playlists:
            logger.info(f'Syncronizing playlist {playlist.title}...')

            spotify_playlist = self._find_or_create_spotify_playlist(playlist.title)
            spotify_playlist_id = spotify_playlist['id']
            existing_tracks = self._find_all_playlist_tracks(spotify_playlist_id)

            playlist_tracks = playlist.fetch_tracks()
            if not playlist.collective:
                tracks = [track.track for track in playlist_tracks]
            elif playlist.collective and playlist_tracks:
                tracks = self.yandex_client.tracks([track.track_id for track in playlist_tracks])
            else:
                tracks = []
            
            for track in tqdm(tracks):
                time.sleep(0.25)
                spotify_item = self._find_spotify_item(track)
                if spotify_item is not None:
                    if spotify_item in existing_tracks:
                        logger.debug(f'Skipping existing track {spotify_item}')
                    else:
                        self._handle_spotify_exception(self.spotify_client.user_playlist_add_tracks)(
                            self.spotify_username, 
                            spotify_playlist_id, 
                            [spotify_item]
                        )
    
    def _find_or_create_spotify_playlist(self, title):
        for existing_spotify_playlist in self.spotify_client.user_playlists(user=self.spotify_username)['items']:

            if existing_spotify_playlist['name'] == title:
                logger.debug(f'Found existing playlist with title {title}')
                return existing_spotify_playlist
                break
        
        logger.debug(f'Creating new playlist {title}...')
        return self._handle_spotify_exception(self.spotify_client.user_playlist_create)(self.spotify_username, title)

    def _find_all_playlist_tracks(self, playlist_id):
        result = []
        offset = 0
        existing_tracks_page = self.spotify_client.user_playlist_tracks(
            user=self.spotify_username,
            playlist_id=playlist_id,
            limit=DEFAULT_SPOTIFY_LIMIT)
        
        while True:
            for track in existing_tracks_page['items']:
                result.append(track['track']['id'])
            if existing_tracks_page['next'] is None:
                break;
            offset = offset + DEFAULT_SPOTIFY_LIMIT
            existing_tracks_page = self.spotify_client.user_playlist_tracks(
                user=self.spotify_username,
                playlist_id=playlist_id,
                limit=DEFAULT_SPOTIFY_LIMIT,
                offset=offset)
        return result

    def _find_spotify_item(self, item):
        type_ = item.__class__.__name__.casefold()
        query = f'{", ".join([artist.name for artist in item.artists])} - {item.title}'
        found_items = self._handle_spotify_exception(self.spotify_client.search)(query, type=type_)[f'{type_}s']['items']

        if not len(found_items):
            logger.info(f'Item {query} not found')
            return None

        return found_items[0]['id']

    def _handle_spotify_exception(self, func):
        def wrapper(*args, **kwargs):
            retry = 1
            while True:
                try:
                    return func(*args, **kwargs)
                except spotipy.exceptions.SpotifyException as exception:
                    if exception.http_status != 429:
                        raise exception

                    if 'retry-after' in exception.headers:
                        sleep(int(exception.headers['retry-after']) + 1)
                except ReadTimeout as exception:
                    logger.info(f'Read timed out. Retrying #{retry}...')

                    if retry > MAX_REQUEST_RETRIES:
                        logger.info('Max retries reached.')
                        raise exception

                    logger.info('Trying again...')
                    retry += 1

        return wrapper


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Synchronizes playlists data between Yandex Music and Spotify')
    parser.add_argument('--spotify_user', required=True, help='Username at spotify.com')

    spotify_oauth = parser.add_argument_group('spotify_oauth')
    spotify_oauth.add_argument('--id', required=True, help='Client ID of Spotify app')
    spotify_oauth.add_argument('--secret', required=True, help='Client Secret of Spotify app')

    parser.add_argument('--yandex_token', required=True, help='Token from music.yandex.com account')
    arguments = parser.parse_args()

    try:
        instance = Synchronizer(arguments)
        instance.start()
    except Exception as e:
        logger.error(f'An unexpected error occurred: {str(e)}')