import json import requests import time from os.path import exists from loguru import logger from pyfzf.pyfzf import FzfPrompt fzf = FzfPrompt() conf_file = 'config.json' default_conf = { 'instance': 'fw.ponychord.rocks', 'public_list_instances': [ "open.audio", "funkwhale.co.uk", "am.pirateradio.social", "audio.liberta.vip", "audio.gafamfree.party", "tanukitunes.com", "funkwhale.juniorjpdj.pl", "audio.securetown.in.ua", "tavia.mle.party", "funkwhale.thurk.org", "buzzworkers.com", "soundship.de", "funkwhale.kameha.click", "music.chosto.me", "zik.goe.land", "music.humanoids.be", "music.hempton.us", "mizik.o-k-i.net", "klh.radiolivre.org", "hudba.feildel.fr", "funkwhale.mita.me", "funk.deko.cloud", "audio.graz.social", "funkwhale.desmu.fr", "listen.knsm.cc", "funkwhale.gegeweb.eu", "shitnoise.monster" ], 'automatic_fetch_new_instances': True, 'enable_server_transcoding': False, 'external_transcoder_http_proxy_path': "", 'track_activity_history': False, 'prefetch_playlist': True, 'enable_persistent_cache': False, 'mpv_volume': 100, 'show_like_button': True, 'show_artist_name_in_albums': False } def set_defaults(corrected_config=None): conf_rewrite = default_conf if corrected_config: conf_rewrite = corrected_config with open(conf_file, 'wt') as f: f.write(json.dumps(conf_rewrite, indent=4)) def check_config(): '''Check config and remove or add keys if not found in default config''' with open(conf_file, 'rt') as f: loaded_config = json.loads(f.read()) correct_conf = {} for k, v in loaded_config.items(): if k in default_conf.keys(): correct_conf[k] = v else: logger.warning(f'{k} from config will be removed. Value: {v}') for k, v in default_conf.items(): if k not in correct_conf.keys(): correct_conf[k] = v logger.warning(f'{k} added in config. Value: {v}') set_defaults(correct_conf) return correct_conf if not exists(conf_file): set_defaults() else: check_config() def get_config(key): '''return value from config by key''' with open(conf_file, 'rt') as f: loaded_config = json.loads(f.read()) return loaded_config.get(key) def set_config(key, value): '''set value new value config by key''' read_conf = check_config() with open(conf_file, 'wt') as f: read_conf[key] = value f.write(json.dumps(read_conf, indent=4)) @logger.catch def get_new_funkwhale_servers(): # Uses official API network.funkwhale.audio for getting new instances public_server_api = 'https://network.funkwhale.audio/dashboards/api/tsdb/query' now = int(time.time()) timeback = now - 86400 request_public_servers = { 'from': f"{timeback}", 'to': f"{now}", 'queries': [ { 'refId': "A", 'intervalMs': 60000, 'maxDataPoints': 1174, 'datasourceId': 1, 'rawSql': "SELECT * FROM (\n SELECT\n DISTINCT on (c.domain) c.domain as \"Name\",\n c.up as \"Is up\",\n coalesce(c.open_registrations, false) as \"Open registrations\",\n coalesce(anonymous_can_listen, false) as \"Anonymous can listen\",\n coalesce(c.usage_users_total, 0) as \"Total users\",\n coalesce(c.usage_users_active_month, 0) as \"Active users (this month)\",\n coalesce(c.software_version_major, 0)::text || '.' || coalesce(c.software_version_minor, 0)::text || '.' || coalesce(c.software_version_patch, 0)::text as \"Version\",\n c.time as \"Last checked\",\n d.first_seen as \"First seen\"\n FROM checks as c\n INNER JOIN domains AS d ON d.name = c.domain\n WHERE d.blocked = false AND c.up = true AND c.time > now() - INTERVAL '7 days'\n AND c.anonymous_can_listen IN ('true')\n AND c.open_registrations IN ('true','false')\n\n ORDER BY c.domain, c.time DESC\n) as t ORDER BY \"Active users (this month)\" DESC", 'format': "table" } ] } try: r = requests.post(public_server_api, json=request_public_servers) results = r.json() new_instances = [] if results: new_instances_list = results['results']['A']['tables'][0]['rows'] exists_instances = get_config('public_list_instances') for i in new_instances_list: if i[0] not in exists_instances and i[1]: new_instances.append(i[0]) new_instances.extend(get_new_funkwhale_servers_fediverse_observer()) new_instances = list(dict.fromkeys(new_instances)) return new_instances except: # If any errors then return empty list return [] def get_new_funkwhale_servers_fediverse_observer(): try: graphQL_request = { 'query': '{\n nodes(softwarename: \"funkwhale\") {\n domain\n metanodeinfo\n }\n}' } r = requests.post('https://api.fediverse.observer/', headers={'Accept-Encoding': 'gzip, deflate'}, json=graphQL_request) new_instances = [] exists_instances = get_config('public_list_instances') for i in r.json()['data']['nodes']: if i.get('metanodeinfo'): auth_no_required = json.loads(i['metanodeinfo'])['library']['anonymousCanListen'] if auth_no_required and i['domain'] not in exists_instances: new_instances.append(i['domain']) return new_instances except: return []