from src.mpv_control import set_http_header from src.settings import get_config import requests import json import time import urllib.parse import os from loguru import logger auth_file = '.auth.json' if os.path.exists(auth_file): with open('.auth.json', 'rt') as f: auth = json.loads(f.read()) else: # The default umask is 0o22 which turns off write permission of group and others os.umask(0) descriptor = os.open( path=auth_file, flags=( os.O_WRONLY # access mode: write only | os.O_CREAT # create if not exists | os.O_TRUNC # truncate the file to zero ), mode=0o600) with open(descriptor, 'wt') as f: f.write('{}') auth = {} class current_instance: s = requests.Session() instance = get_config('instance') token = auth.get(instance) @logger.catch def select_instance(new_instance=None): current_instance.instance = new_instance with open(auth_file, 'rt') as f: auth = json.loads(f.read()) new_token = auth.get(current_instance.instance) current_instance.s.headers.update({"Authorization": None, "Accept-encoding": 'gzip'}) set_http_header() if new_token: s.get(f'https://{current_instance.instance}') s.headers.update({ "Authorization": "Bearer " + new_token, "Accept-encoding": 'gzip' }) set_http_header(['Authorization: ' + 'Bearer ' + new_token]) if current_instance.token: current_instance.s.headers.update({ "Authorization": "Bearer " + current_instance.token, "Accept-encoding": 'gzip' }) set_http_header(['Authorization: ' + 'Bearer ' + current_instance.token]) else: current_instance.s.headers.update({"Accept-encoding": 'gzip'}) # Get cookies from unauthorized instance for working some functionality (radios) current_instance.s.get(f'https://{current_instance.instance}/') set_http_header() @logger.catch def get_instance_settings(): r = current_instance.s.get( f'https://{current_instance.instance}/api/v1/instance/settings') return r.json() @logger.catch def get_audio_file(track_uuid, listen_url=False, download=False, transcoding=get_config('enable_server_transcoding'), to='ogg'): if not transcoding: to = None params = { "download": download, "to": to } if listen_url: url = f'https://{current_instance.instance}{track_uuid}?' else: url = f'https://{current_instance.instance}/api/v1/listen/{track_uuid}?' return url + urllib.parse.urlencode(params) @logger.catch def get_tracks(page=None, q=None, artist=None, album=None, tag=None, favourites=None, include_channels=None, pg=None): '''This function get tracks by params''' params = { 'page': page, 'q': q, 'artist': artist, 'album': album, 'tag': tag, 'favourites': favourites, 'include_channels': include_channels } if pg: r = current_instance.s.get(pg) else: r = current_instance.s.get( f'https://{current_instance.instance}/api/v1/tracks', params=params) return r.json() @logger.catch def get_favorires_tracks(page=None, q=None, scope=None, include_channels=None, pg=None): '''This function get favorites tracks (not only for user)''' params = { 'page': page, 'q': q, 'scope': scope, 'include_channels': include_channels } if pg: r = current_instance.s.get(pg) else: r = current_instance.s.get( f'https://{current_instance.instance}/api/v1/favorites/tracks/', params=params) return r.json() @logger.catch def get_artists(page=None, q=None, artist=None, album=None, favourites=None, refresh=False, pg=None): '''This function get artists by params''' params = { 'page': page, 'q': q, 'artist': artist, 'album': album, 'favourites': favourites, 'refresh': refresh } if pg: r = current_instance.s.get(pg) else: r = current_instance.s.get( f'https://{current_instance.instance}/api/v1/artists', params=params) return r.json() @logger.catch def get_albums(page=None, q=None, artist=None, include_channels=None, refresh=False, pg=None): '''This function get artists by params''' params = { 'page': page, 'q': q, 'artist': artist, 'include_channels': include_channels, 'refresh': refresh } if pg: r = current_instance.s.get(pg) else: r = current_instance.s.get( f'https://{current_instance.instance}/api/v1/albums', params=params) return r.json() @logger.catch def get_channels(page=None, q=None, tag=None, pg=None): params = { 'page': page, 'q': q, 'tag': tag } if pg: r = current_instance.s.get(pg) else: r = current_instance.s.get( f'https://{current_instance.instance}/api/v1/channels', params=params) return r.json() @logger.catch def get_playlists(page=None, page_size=None, q=None, ordering='-modification_date', pg=None): '''List playlists''' params = { 'page': page, 'page_size': page_size, 'q': q, 'ordering': ordering } if pg: r = current_instance.s.get(pg) else: r = current_instance.s.get( f'https://{current_instance.instance}/api/v1/playlists', params=params) r.raise_for_status() return r.json() @logger.catch def get_playlist_tracks(playlist_id, pg=None): '''Retrieve all tracks in the playlist''' if pg: r = current_instance.s.get(pg) else: r = current_instance.s.get( f'https://{current_instance.instance}/api/v1/playlists/{playlist_id}/tracks') return r.json() @logger.catch def list_libraries(page=None, page_size=None, q=None, scope='all', pg=None): params = { 'page': page, 'page_size': page_size, 'q': q, 'scope': scope, } if pg: r = current_instance.s.get(pg) else: r = current_instance.s.get( f'https://{current_instance.instance}/api/v1/libraries', params=params) return r.json() @logger.catch def get_tags(q=None, pg=None): params = { 'q': q, } if pg: r = current_instance.s.get(pg) else: r = current_instance.s.get( f'https://{current_instance.instance}/api/v1/tags', params=params) return r.json() @logger.catch def federate_search_by_url(object): params = { 'object': object } r = current_instance.s.post( f'https://{current_instance.instance}/api/v1/federation/fetches', json=params) return r.json() @logger.catch def record_track_in_history(track_id): params = { 'track': int(track_id) } r = current_instance.s.post( f'https://{current_instance.instance}/api/v1/history/listenings', json=params) r.raise_for_status() return r.json @logger.catch def favorite_track(track_id): r = current_instance.s.post( f'https://{current_instance.instance}/api/v1/favorites/tracks', json={'track': int(track_id)}) r.raise_for_status() return r.json @logger.catch def unfavorite_track(track_id): r = current_instance.s.post( f'https://{current_instance.instance}/api/v1/favorites/tracks/delete', json={'track': int(track_id)}) r.raise_for_status() return r.json @logger.catch def hide_content(content): '''This function hide content (write permission)''' r = current_instance.s.post( f'https://{current_instance.instance}/api/v1/moderation/content-filters/', json=content) r.raise_for_status() return r.json # [FunkWhale radios] def get_radios(): r = current_instance.s.get( f'https://{current_instance.instance}/api/v1/radios/radios/') return r.json() def post_radio_session(requested_radio): r = current_instance.s.post( f'https://{current_instance.instance}/api/v1/radios/sessions/', json=requested_radio) return r.json() @logger.catch def get_track_radio(radio_session): r = current_instance.s.post( f'https://{current_instance.instance}/api/v1/radios/tracks/', json=radio_session) return r.json()