from src.fw_api import s, get_radios, post_radio_session, get_track_radio, list_libraries, concatinate_endpoint from src.utils import download_track from src.mpv_control import player from pyfzf.pyfzf import FzfPrompt from loguru import logger import threading import time fzf = FzfPrompt() audio_info = {} def list_radios(): radios = get_radios() count = radios.get('count') results = radios.get('results') view = [] for i in results: index = results.index(i) id_radio = i.get('id') name = i.get('name') view.append(f'{index}.{name}') if s.headers.get('Authorization'): # Radios avalaible only for auth user view.append('Favourites') view.append('Less listened') view.append('Random') view.append('Library') view.append('Recently Added') selected = fzf.prompt(view, f'--header \'Found {count} radios\'')[0].split('.', 1) if 'Favourites' in selected: radio_load(id_radio, 'favorites') elif 'Random' in selected: radio_load(id_radio, 'random') elif 'Library' in selected: libs = list_libraries()['results'] libraries_listing = [] for lib_i in libs: lib_name = lib_i.get('actor').get('full_username') if lib_name not in libraries_listing: libraries_listing.append(lib_name) libraries_listing.append('Custom') lib_addr = fzf.prompt(libraries_listing)[0] if lib_addr == 'Custom': print('Input remote user library (ex. nick@funkwhale.domain.example: ') lib_addr = input() radio_load(None, 'actor-content', lib_addr, lib_addr) elif 'Recently Added' in selected: radio_load(id_radio, 'recently-added') elif 'Less listened' in selected: radio_load(id_radio, 'less-listened') else: id_selected = selected[0] id_radio = results[int(id_selected)].get('id') name_radio = results[int(id_selected)].get('name') radio_load(id_radio, name=name_radio) def radio_generator(radio_session_id): count_t = 0 while radio_session_id != '': time.sleep(1) if not radio_event_gen.wait(0): break count_t += 1 if count_t >= 60: count_t = 0 radio_get_track(radio_session_id) logger.info('Radio generator stopped') radio_event_gen = threading.Event() def radio_load(id_radio=None, type_radio='custom', name=None, related_object=None): requested_radio = { 'custom_radio': id_radio, 'radio_type': type_radio, 'related_object_id': related_object } radio_session_id = post_radio_session(requested_radio).get('id') for i in range(0, 2): radio_get_track(radio_session_id) radio_event_gen.set() radio_task = threading.Thread(target=radio_generator, args=(radio_session_id,), daemon=True) radio_task.start() while True: try: select = fzf.prompt(('Next', 'Prev', 'Pause', 'Download', 'Info', 'Exit'), f"--header=\'Radio {name} playing...\'")[0] if select == 'Next': radio_get_track(radio_session_id) player.playlist_next() elif select == 'Prev': player.playlist_prev() elif select == 'Pause': if player.pause: player.pause = False else: player.pause = True elif select == 'Download': print('Downloading...') name_downloaded = download_track(player.filename) print(f'Downloaded: {name_downloaded}') elif select == 'Info': track = audio_info.get(player.filename) for i in ('title', 'fid', 'license', 'album', 'artist'): if i in ('album', 'artist'): name_aa = track.get(i).get('name') if not name_aa: name_aa = track.get(i).get('title') print(i + ': '+ name_aa) key = track.get(i) if key and isinstance(key, str): print(i + ': ' + key) input() elif select == 'Exit': try: radio_event_gen.clear() except: logger.exception('Error stopping Thread radio generator') pass player.playlist_clear() player.stop() break except KeyboardInterrupt: break def radio_get_track(radio_session_id): radio_context = get_track_radio({'session': radio_session_id}) if not radio_context: return if radio_context == "Radio doesn't have more candidates": return else: track = radio_context.get('track') listen_url = track.get('listen_url') audio_info[concatinate_endpoint(listen_url)] = track player.loadfile(concatinate_endpoint(listen_url), 'append-play')