from src.settings import get_config import requests import json import time import urllib.parse import os from loguru import logger attempts_for_federate = 3 auth_file = '.auth.json' if os.path.exists(auth_file): with open('.auth.json', 'rt') as f: auth = json.loads(f.read()) else: # The default umask is 0o22 which turns off write permission of group and others os.umask(0) descriptor = os.open( path=auth_file, flags=( os.O_WRONLY # access mode: write only | os.O_CREAT # create if not exists | os.O_TRUNC # truncate the file to zero ), mode=0o600) with open(descriptor, 'wt') as f: f.write('{}') auth = {} class current_instance: s = requests.Session() s.headers.update({"Accept-encoding": 'gzip, br, deflate'}) instance = get_config('instance') token = auth.get(instance) listen_token = None @logger.catch def select_instance(new_instance=None): current_instance.instance = new_instance with open(auth_file, 'rt') as f: auth = json.loads(f.read()) new_token = auth.get(current_instance.instance) current_instance.s.headers.update({"Authorization": None}) current_instance.token = new_token current_instance.listen_token = None if new_token: s.get(f'https://{current_instance.instance}') s.headers.update({"Authorization": "Bearer " + new_token}) if current_instance.token: current_instance.s.headers.update({"Authorization": "Bearer " + current_instance.token}) else: # Get cookies from unauthorized instance for working some functionality (radios) current_instance.s.get(f'https://{current_instance.instance}/') @logger.catch def get_me(): '''Return current user if token avalaible''' if not current_instance.token: return r = current_instance.s.get(f'https://{current_instance.instance}/api/v1/users/me') resp = r.json() current_instance.listen_token = resp['tokens']['listen'] return resp @logger.catch def get_instance_settings(): r = current_instance.s.get( f'https://{current_instance.instance}/api/v1/instance/settings') return r.json() @logger.catch def get_audio_file(track_uuid, listen_url=False, download=False, external_transcoding=get_config('external_transcoder_http_proxy_path'), transcoding=get_config('enable_server_transcoding'), to='ogg'): params = { "download": download, "to": to } if current_instance.token and not current_instance.listen_token: get_me() if current_instance.listen_token: params['token'] = current_instance.listen_token if not transcoding: del params['to'] if listen_url: url = f'https://{current_instance.instance}{track_uuid}?' else: url = f'https://{current_instance.instance}/api/v1/listen/{track_uuid}?' if external_transcoding != "": url = external_transcoding + url return url + urllib.parse.urlencode(params, doseq=True) @logger.catch def get_tracks(page=None, q=None, artist=None, album=None, library=None, tag=None, favourites=None, include_channels=None, pg=None): '''This function get tracks by params''' params = { 'page': page, 'q': q, 'artist': artist, 'album': album, 'tag': tag, 'favourites': favourites, 'library': library, 'include_channels': include_channels } if pg: r = current_instance.s.get(pg) else: r = current_instance.s.get( f'https://{current_instance.instance}/api/v1/tracks', params=params) return r.json() @logger.catch def get_favorires_tracks(page=None, q=None, scope=None, include_channels=None, pg=None): '''This function get favorites tracks (not only for user)''' params = { 'page': page, 'q': q, 'scope': scope, 'include_channels': include_channels } if pg: r = current_instance.s.get(pg) else: r = current_instance.s.get( f'https://{current_instance.instance}/api/v1/favorites/tracks/', params=params) return r.json() @logger.catch def get_artists(page=None, q=None, artist=None, album=None, library=None, favourites=None, refresh=False, pg=None): '''This function get artists by params''' params = { 'page': page, 'q': q, 'artist': artist, 'album': album, 'library': library, 'favourites': favourites, 'refresh': refresh } if pg: r = current_instance.s.get(pg) else: r = current_instance.s.get( f'https://{current_instance.instance}/api/v1/artists', params=params) return r.json() @logger.catch def get_albums(page=None, q=None, artist=None, library=None, include_channels=None, refresh=False, pg=None): '''This function get artists by params''' params = { 'page': page, 'q': q, 'artist': artist, 'library': library, 'include_channels': include_channels, 'refresh': refresh } if pg: r = current_instance.s.get(pg) else: r = current_instance.s.get( f'https://{current_instance.instance}/api/v1/albums', params=params) return r.json() @logger.catch def get_channels(page=None, q=None, tag=None, pg=None): params = { 'page': page, 'q': q, 'tag': tag } if pg: r = current_instance.s.get(pg) else: r = current_instance.s.get( f'https://{current_instance.instance}/api/v1/channels', params=params) return r.json() @logger.catch def get_playlists(page=None, page_size=None, q=None, ordering='-modification_date', pg=None): '''List playlists''' params = { 'page': page, 'page_size': page_size, 'q': q, 'ordering': ordering } if pg: r = current_instance.s.get(pg) else: r = current_instance.s.get( f'https://{current_instance.instance}/api/v1/playlists', params=params) r.raise_for_status() return r.json() @logger.catch def get_playlist_tracks(playlist_id, pg=None): '''Retrieve all tracks in the playlist''' if pg: r = current_instance.s.get(pg) else: r = current_instance.s.get( f'https://{current_instance.instance}/api/v1/playlists/{playlist_id}/tracks') return r.json() @logger.catch def list_libraries(page=None, page_size=None, q=None, scope='all', pg=None): params = { 'page': page, 'page_size': page_size, 'q': q, 'scope': scope, } if pg: r = current_instance.s.get(pg) else: r = current_instance.s.get( f'https://{current_instance.instance}/api/v1/libraries', params=params) return r.json() @logger.catch def get_tags(q=None, pg=None): params = { 'q': q, } if pg: r = current_instance.s.get(pg) else: r = current_instance.s.get( f'https://{current_instance.instance}/api/v1/tags', params=params) return r.json() @logger.catch def federate_search_by_url(object): params = { 'object': object } for i in range(attempts_for_federate): try: r = current_instance.s.post( f'https://{current_instance.instance}/api/v1/federation/fetches', json=params) r.raise_for_status() break except Exception as Err: logger.error(f'Attempt {i}: {Err}') time.sleep(3) return r.json() @logger.catch def federate_remote_library(fid): '''Fetch remote library for follow and scan''' params = { 'fid': fid } for i in range(attempts_for_federate): try: r = current_instance.s.post( f'https://{current_instance.instance}/api/v1/federation/libraries/fetch', json=params) r.raise_for_status() break except Exception as Err: logger.error(f'Attempt {i}: {Err}') time.sleep(3) return r.json() @logger.catch def scan_remote_library(uuid): '''Scan remote library''' for i in range(attempts_for_federate): try: r = current_instance.s.post( f'https://{current_instance.instance}/api/v1/federation/libraries/{uuid}') r.raise_for_status() break except Exception as Err: logger.error(f'Attempt {i}: {Err}') time.sleep(3) return r.json() @logger.catch def record_track_in_history(track_id): params = { 'track': int(track_id) } r = current_instance.s.post( f'https://{current_instance.instance}/api/v1/history/listenings', json=params) r.raise_for_status() return r.json @logger.catch def favorite_track(track_id): r = current_instance.s.post( f'https://{current_instance.instance}/api/v1/favorites/tracks', json={'track': int(track_id)}) r.raise_for_status() return r.json @logger.catch def unfavorite_track(track_id): r = current_instance.s.post( f'https://{current_instance.instance}/api/v1/favorites/tracks/delete', json={'track': int(track_id)}) r.raise_for_status() return r.json @logger.catch def hide_content(content): '''This function hide content (write permission)''' r = current_instance.s.post( f'https://{current_instance.instance}/api/v1/moderation/content-filters/', json=content) r.raise_for_status() return r.json # [FunkWhale radios] def get_radios(): r = current_instance.s.get( f'https://{current_instance.instance}/api/v1/radios/radios/') return r.json() def post_radio_session(requested_radio): r = current_instance.s.post( f'https://{current_instance.instance}/api/v1/radios/sessions/', json=requested_radio) return r.json() @logger.catch def get_track_radio(radio_session): r = current_instance.s.post( f'https://{current_instance.instance}/api/v1/radios/tracks/', json=radio_session) return r.json()