mirror of
https://gitea.phreedom.club/localhost_frssoft/funkwlmpv
synced 2024-11-16 04:03:01 +00:00
177 lines
4.6 KiB
Python
177 lines
4.6 KiB
Python
from src.mpv_control import set_http_header
|
|
import requests, json, time
|
|
from loguru import logger
|
|
|
|
with open('.auth.json', 'rt') as f:
|
|
auth = json.loads(f.read())
|
|
|
|
s = requests.Session()
|
|
instance = 'fw.ponychord.rocks'
|
|
token = auth.get(instance)
|
|
|
|
if token:
|
|
s.headers.update({
|
|
"Authorization": "Bearer " + token,
|
|
"Accept-encoding": 'gzip'
|
|
})
|
|
set_http_header(['Authorization: ' + 'Bearer ' + token])
|
|
else:
|
|
s.headers.update({"Accept-encoding": 'gzip'})
|
|
s.get(f'https://{instance}/') # Get cookies from unauthorized instance for working some functionality (radios)
|
|
set_http_header()
|
|
|
|
|
|
def select_instance(new_instance=None):
|
|
global instance
|
|
instance = new_instance
|
|
with open('.auth.json', 'rt') as f:
|
|
auth = json.loads(f.read())
|
|
new_token = auth.get(instance)
|
|
s.headers.update({"Authorization": None,
|
|
"Accept-encoding": 'gzip'})
|
|
set_http_header()
|
|
if new_token:
|
|
s.get(f'https://{instance}')
|
|
s.headers.update({
|
|
"Authorization": "Bearer " + new_token,
|
|
"Accept-encoding": 'gzip'
|
|
})
|
|
player.http_header_fields = ['Authorization: ' + 'Bearer ' + new_token]
|
|
set_http_header(['Authorization: ' + 'Bearer ' + token])
|
|
|
|
|
|
def concatinate_endpoint(endpoint):
|
|
return 'https://' + instance + endpoint
|
|
|
|
|
|
@logger.catch
|
|
def get_tracks(page=None, q=None, artist=None, album=None, favourites=None, include_channels=None, pg=None):
|
|
'''This function get tracks by params'''
|
|
params = {
|
|
'page': page,
|
|
'q': q,
|
|
'artist': artist,
|
|
'album': album,
|
|
'favourites': favourites,
|
|
'include_channels': include_channels
|
|
}
|
|
if pg:
|
|
r = s.get(pg)
|
|
else:
|
|
r = s.get(f'https://{instance}/api/v1/tracks', params=params)
|
|
return r.json()
|
|
|
|
|
|
@logger.catch
|
|
def get_artists(page=None, q=None, artist=None, album=None, favourites=None, pg=None):
|
|
'''This function get artists by params'''
|
|
params = {
|
|
'page': page,
|
|
'q': q,
|
|
'artist': artist,
|
|
'album': album,
|
|
'favourites': favourites
|
|
}
|
|
if pg:
|
|
r = s.get(pg)
|
|
else:
|
|
r = s.get(f'https://{instance}/api/v1/artists', params=params)
|
|
return r.json()
|
|
|
|
|
|
@logger.catch
|
|
def get_albums(page=None, q=None, artist=None, include_channels=None, pg=None):
|
|
'''This function get artists by params'''
|
|
params = {
|
|
'page': page,
|
|
'q': q,
|
|
'artist': artist,
|
|
'include_channels': include_channels
|
|
}
|
|
if pg:
|
|
r = s.get(pg)
|
|
else:
|
|
r = s.get(f'https://{instance}/api/v1/albums', params=params)
|
|
return r.json()
|
|
|
|
|
|
@logger.catch
|
|
def get_channels(page=None, q=None, tag=None, pg=None):
|
|
params = {
|
|
'page': page,
|
|
'q': q,
|
|
'tag': tag
|
|
}
|
|
if pg:
|
|
r = s.get(pg)
|
|
else:
|
|
r = s.get(f'https://{instance}/api/v1/channels', params=params)
|
|
return r.json()
|
|
|
|
|
|
@logger.catch
|
|
def list_libraries(page=None, page_size=None, q=None, scope='all', pg=None):
|
|
params = {
|
|
'page': page,
|
|
'page_size': page_size,
|
|
'q': q,
|
|
'scope': scope,
|
|
}
|
|
if pg:
|
|
r = s.get(pg)
|
|
else:
|
|
r = s.get(f'https://{instance}/api/v1/libraries', params=params)
|
|
return r.json()
|
|
|
|
|
|
@logger.catch
|
|
def federate_search_by_url(object):
|
|
params = {
|
|
'object': object
|
|
}
|
|
r = s.post(f'https://{instance}/api/v1/federation/fetches', json=params)
|
|
return r.json()
|
|
|
|
|
|
@logger.catch
|
|
def favorite_track(track_id):
|
|
r = s.post(f'https://{instance}/api/v1/favorites/tracks', json={'track': int(track_id)})
|
|
r.raise_for_status()
|
|
return r.json
|
|
|
|
|
|
@logger.catch
|
|
def unfavorite_track(track_id):
|
|
r = s.post(f'https://{instance}/api/v1/favorites/tracks/delete', json={'track': int(track_id)})
|
|
r.raise_for_status()
|
|
return r.json
|
|
|
|
|
|
@logger.catch
|
|
def hide_content(content):
|
|
'''This function hide content (write permission)'''
|
|
r = s.post(f'https://{instance}/api/v1/moderation/content-filters/', json=content)
|
|
r.raise_for_status()
|
|
return r.json
|
|
|
|
|
|
# [FunkWhale radios]
|
|
def get_radios():
|
|
r = s.get(f'https://{instance}/api/v1/radios/radios/')
|
|
return r.json()
|
|
|
|
|
|
def post_radio_session(requested_radio):
|
|
r = s.post(f'https://{instance}/api/v1/radios/sessions/', json=requested_radio)
|
|
return r.json()
|
|
|
|
|
|
def get_track_radio(radio_session):
|
|
r = s.post(f'https://{instance}/api/v1/radios/tracks/',json=radio_session)
|
|
try:
|
|
r.raise_for_status()
|
|
return r.json()
|
|
except:
|
|
logger.exception('Radio: get next track failed')
|
|
|