Compare commits

..

No commits in common. "92a52dd2b8b0847faa851e3c59bb289330d42cd3" and "50e7402c92a46ddaa8c2065532b701b0edebbb93" have entirely different histories.

26 changed files with 3318 additions and 39 deletions

View File

@ -1,6 +1,36 @@
# funkwlplay
# funkwlmpv
### Deprecated
Just for fun. A simple TUI media player for FunkWhale instances. Writen on python
basic functional for create playlist tracks. Be simple as possible. No implement player and tui. Just bare cli solution.
Features:
* Simple interface
* Listening to tracks
* Listening to radios (including users/libraries)
* Selecting and listening to albums
* Selecting and listening to artists
* Search by albums, artists
* Switching instance from the public list[1] and the official instances list network.funkwhale.audio (if avalaible)
* All other features are working 50/50
Dependencies:
* python3.9+
* [mpv](https://mpv.io)
* [fzf](https://github.com/junegunn/fzf)
Python dependencies:
```pip install -r requirements.txt```
Optional: brotli
For virtual env just type (ofc after install mpv, fzf, python3):
`./run`
### About cache folder
funkwlmpv has to cache tracks before playing (default disabled "enable_persistent_cache"). Cache is persistent and you should manage it manually because the (script/program) can't clean it automatically.
Cache structure: cache/domain.tld/[track uuid]
You can play tracks offline, example: mpv --shuffle cache/*/*
cache_helper.sh - it might be useful for compression cache (lossy: vorbis 128 kbps, no thumbnail)
Also, tnx Inex for his FunkWhale instance (set by default instance)
[1]**Warning:** may content _unofficial instances_
**Warning 2:** This software not userfriendly. Maybe developerfriendly.

32
cache_helper.sh Executable file
View File

@ -0,0 +1,32 @@
#!/bin/bash
if [ ! -n "$1" ]; then
echo 'Usage: cache_helper.sh path/to/cache'
exit 1
fi
total_before=0
total_after=0
for i in "$1"/*/*; do
if [ $(ffprobe -hide_banner -print_format json -select_streams a:0 -show_streams "$i" | jq -r '.streams[0].bit_rate') -gt 128000 ]; then
size_before=$(stat --format=%s "$i")
total_before=$(( $total_before + $size_before ))
ffmpeg -hide_banner -loglevel error -i "$i" -vn "$i".ogg
if [ $? -eq 0 ]; then
size_after=$(stat --format=%s "$i".ogg)
total_after=$(( $total_after + $size_after ))
size_reduced=$(( $size_before - $size_after ))
echo "Reduced: $(echo $size_reduced | numfmt --to=iec)"
mv "$i".ogg "$i"
else
echo "$i convert failed"
fi
else
echo "$i already OK"
fi
done
echo "Total before: $(echo $total_before | numfmt --to=iec)"
echo "Total after: $(echo $total_after | numfmt --to=iec)"
echo "Note: only included processed tracks"

144
funkwlmpv Executable file
View File

@ -0,0 +1,144 @@
#!/usr/bin/env python3
from src.fw_api import current_instance, get_instance_settings, get_node_info
from src.fw_radios import list_radios
from src.fw_artists import list_artists
from src.fw_albums import list_albums
from src.fw_tracks import list_tracks
from src.fw_channels import list_channels
from src.fw_playlists import list_playlists
from src.fw_recents import list_fav_or_history
from src.fw_instances import instances_menu
import src.mpv_control
import json
import os
from shlex import quote
from shutil import get_terminal_size
from pyfzf.pyfzf import FzfPrompt
os.environ['FZF_DEFAULT_OPTS'] = "--margin 2,0,0,0 --preview-window down:2:hidden:wrap --bind ?:toggle-preview --preview 'echo {}'"
fzf = FzfPrompt()
os.system('clear')
if get_terminal_size().columns > 32:
print('\n\n')
os.system('cat .icon.txt')
def main():
while True:
support_message = ''
instance_title = ''
menu = ['Radios',
'Artists',
'Albums',
'Tracks',
'Channels',
'Playlists',
'Favorites',
'Recently listened',
'About instance',
'Switch instance']
try:
ins_nodeinfo = get_node_info()
support_message = ins_nodeinfo['metadata']['instanceSupportMessage']
instance_title = ins_nodeinfo['metadata']['nodeName']
instance_stats = []
for k, v in ins_nodeinfo['metadata']['library'].items():
if k == 'anonymousCanListen' and v == False and not current_instance.s.headers.get('Authorization'):
instance_stats.append(f'!!! {k}: {v} !!!')
menu = ['Switch instance', 'About instance']
continue
instance_stats.append(f'{k}: {v}')
instance_stats.append(ins_nodeinfo['software']['version'])
instance_stats = '\n'.join(instance_stats)
main_menu_header = quote(f'''{instance_title}\n{instance_stats}'''.strip())
except Exception as E:
splitted = ':\n'.join(str(E).split(':'))
main_menu_header = quote(f'''Connection failed:\n{splitted}'''.strip())
menu = ['Switch instance']
if not current_instance.s.headers.get('Authorization'):
menu.append('Sign in')
if support_message != '':
menu.append('Donate')
if src.mpv_control.player.playlist_playing_pos != -1:
menu.insert(0, 'Player')
selected = fzf.prompt(menu, f"--header={main_menu_header}")
if selected == []:
print('bye-bye :3')
break
else:
selected = selected[0]
if selected == 'Radios':
list_radios()
if selected == 'Artists':
list_artists()
if selected == 'Albums':
list_albums()
if selected == 'Tracks':
list_tracks()
if selected == 'Channels':
list_channels()
if selected == 'Playlists':
list_playlists()
if selected == 'Favorites':
list_fav_or_history()
if selected == 'Recently listened':
list_fav_or_history(is_history_view=True)
if selected == 'Switch instance':
instances_menu()
if selected == 'Sign in':
print(f'''
If You want sign in, please visit:
https://{current_instance.instance}/settings/applications/new
And fill Name funkwlmpv
Scopes:
Read | Write (optional):
write:libraries
write:favorites
write:listenings
write:follows
write:filters
Insert token from "Access token" here''')
register_token = input()
with open('.auth.json', 'rt') as f:
tkns = json.load(f)
with open('.auth.json', 'wt') as f:
tkns[current_instance.instance] = register_token
f.write(json.dumps(tkns, indent=4))
del tkns
del register_token
del f
os.system('clear')
current_instance.select_instance(current_instance.instance)
if selected == 'Donate':
os.system(f'less <<EOF\nSupport instance message:\n{support_message}\nEOF')
if selected == 'About instance':
ins_settings = get_instance_settings()
about_instance_info = []
for i in ins_settings:
k, v = i.get('verbose_name'), i.get('value')
about_instance_info.append(f'{k}: {v}')
about_instance_info.append('|||||Some stats:')
if ins_nodeinfo['metadata'].get('usage'):
for k, v in ins_nodeinfo['metadata']['usage'].items():
about_instance_info.append(f'{k}: {v}')
for k, v in ins_nodeinfo['metadata']['library'].items():
about_instance_info.append(f'{k}: {v}')
for k, v in ins_nodeinfo['usage'].items():
about_instance_info.append(f'{k}: {v}')
about_instance_info = '\n'.join(about_instance_info)
os.system(f'less <<EOF\n{about_instance_info}\nEOF')
del about_instance_info
if selected == 'Player':
src.mpv_control.player_menu(
storage=src.mpv_control.player_fw_storage.storage)
if __name__ == '__main__':
main()

17
funkwlplay Executable file
View File

@ -0,0 +1,17 @@
#!/bin/bash
fw_instance="${wf_instance:=open.audio}"
get(){
curl $1
}
case $1 in
"tag") search="tag=$2" ;;
"search") read input ; search="q=$input" ;;
esac
echo '#EXTM3U' > playlist.m3u8
for i in $(cat instances); do
get "https://$i/api/v1/tracks?playable=true&hidden=false&include_channels=false&order=random&$search" | jq -r ".results.[] | \"#EXTINF:-1, \\(.title)\nhttps://$i\\(.listen_url)\"" >> playlist.m3u8 &
done

45
funkwlplay.py Executable file → Normal file
View File

@ -1,4 +1,3 @@
#!/bin/env python3
import requests
import concurrent.futures
import argparse
@ -10,48 +9,26 @@ with open('instances') as instances:
parser = argparse.ArgumentParser(
prog='funkwlplay',
prog='funkwhale playlist',
description='Create playlist from query or just random playlist tracks from funkwhale instances')
parser.add_argument('-s', '--search', help='This global search on funkwhale instances, it matches artists, albums, tracks, etc...')
parser.add_argument('-t', '--tag', help='This tag search, use this as genre search')
parser.add_argument('-i', '--instance', help='Specify instance, by default search on all instances in instances file')
parser.add_argument('-r', '--recursion', type=int, default=0, help='Use recursion if instance contain more than 50 tracks')
parser.add_argument('-d', '--depth', type=int, default=5, help='Depth of recursion, default is 5 pages, 250 tracks')
parser.add_argument('-s', '--search')
parser.add_argument('-t', '--tag')
parser.add_argument('-i', '--instance')
parser.add_argument('-r', '--recursion', type=int, default=0)
parser.add_argument('-d', '--depth', type=int, default=5)
args = parser.parse_args()
if args.instance:
instances = [args.instance]
def create_playlist_file(track_list):
filename = 'playlist.m3u8'
with open(filename, 'w') as file:
with open('playlist.m3u8', 'w') as file:
file.write('#EXTM3U\n')
for i in track_list:
file.write('\n' + i)
print(f'Playlist saved as {filename}')
def filter_tracks(tracks):
def remove_unreach_tracks(track):
try:
r = requests.head(track['listen_url'], timeout=1)
r.raise_for_status()
return 1
except:
return 0
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
before = len(tracks)
res = [executor.submit(remove_unreach_tracks, track) for track in tracks]
concurrent.futures.wait(res)
avalaible = []
for idx, track in enumerate(tracks):
is_avalaible = res[idx].result()
if is_avalaible == 1:
avalaible.append(track)
tracks = avalaible
after = before - len(tracks)
print(f'-{after} unreach tracks')
Path('filter_tags').touch()
Path('filter_artists').touch()
Path('filter_raw_urls').touch()
@ -74,14 +51,10 @@ def filter_tracks(tracks):
tracks_stor.append(i)
def search_tracks_on_instance(instance, tag='', query='', recursion=args.recursion):
r = requests.get(f'https://{instance}/api/v1/tracks', params={'tag': tag, 'q': query,
tracks = requests.get(f'https://{instance}/api/v1/tracks', params={'tag': tag, 'q': query,
'local': True, 'playable': True,
'ordering': 'random'}, timeout=10)
r.raise_for_status()
tracks = r.json()
'ordering': 'random'}, timeout=10).json()
count = tracks['count']
print(f'found {count} tracks on {instance}')
if recursion == 1:

View File

@ -55,4 +55,3 @@ tanukitunes.com
tunez.awadwatt.com
watts.refchat.net
zik.goe.land
funkwhale.agapimou.top

View File

@ -1 +1,4 @@
loguru
requests
pyfzf
mpv

12
run Executable file
View File

@ -0,0 +1,12 @@
#!/bin/bash
if [ ! -d env_fw ]; then
python3 -m venv env_fw
. env_fw/bin/activate
pip3 install -r requirements.txt || pip install -r requirements.txt || exit 1
./funkwlmpv
else
. env_fw/bin/activate
./funkwlmpv
fi

42
src/android_termux_api.py Normal file
View File

@ -0,0 +1,42 @@
import subprocess
import os
import json
import threading
import time
from src.mpv_control import player
from loguru import logger
'''Warning! This module can be very battery drain'''
@logger.catch
def handle_vol_lvl_as_switch_track():
volume_diff = []
while True:
for a in range(2): # ~2 secs
volume = subprocess.Popen("termux-volume", stdout=subprocess.PIPE).stdout
json_volume = json.loads(volume.read())
time.sleep(0.300)
for i in json_volume:
if i['stream'] == 'music':
volume_diff.append(i['volume'])
if len(volume_diff) == 2:
before, after = volume_diff
difference = after - before
if difference == 2:
try:
player.playlist_next()
except:
pass
os.system(f'termux-volume music {before}')
elif difference == -2:
try:
player.playlist_prev()
except:
pass
os.system(f'termux-volume music {before}')
volume_diff = []
handle_vol_lvl = threading.Thread(
target=handle_vol_lvl_as_switch_track, daemon=True)
handle_vol_lvl.start()

80
src/fw_albums.py Normal file
View File

@ -0,0 +1,80 @@
import src.fw_artists
from src.fw_api import get_artists, get_tracks, get_albums, get_audio_file
from src.fw_libraries import libraries
from src.settings import get_config
from src.mpv_control import player, player_menu, track_url_to_uuid
from pyfzf.pyfzf import FzfPrompt
from loguru import logger
fzf = FzfPrompt()
@logger.catch
def list_albums(albums=None, pg=None, search=None, artist=None, library=None, include_channels=None, refresh=False):
show_artist_name_in_albums = get_config('show_artist_name_in_albums')
albums_next = None
albums_prev = None
play_artist_albums = False
if not albums:
albums = get_albums(q=search, artist=artist, library=library,
include_channels=include_channels, refresh=refresh, pg=pg)
albums_next = albums.get('next')
albums_prev = albums.get('previous')
albums_results = albums.get('results')
if artist:
play_artist_albums = True
else:
play_artist_albums = True
albums_results = albums
view = ['Search', 'Library']
if play_artist_albums:
view.append('Play all')
if albums_next:
view.append('Next page')
if albums_prev:
view.append('Prev page')
for i in albums_results:
index = albums_results.index(i)
album_name = i.get('title')
album_tracks_count = i.get('tracks_count')
option_str = f'{index}.{album_name} | {album_tracks_count}'
artist_name = i.get('artist')
if show_artist_name_in_albums and isinstance(artist_name, dict):
artist_name = artist_name.get('name')
option_str += f' | {artist_name}'
view.append(f'{option_str}')
select = fzf.prompt(view, '--header=\'map: album | tracks count | opt. artist\'')
if select == []:
return
else:
select = select[0].split('.', 1)[0]
if select == 'Next page':
list_albums(pg=albums_next)
elif select == 'Prev page':
list_albums(pg=albums_prev)
elif select == 'Search':
print('Search by albums: ')
list_albums(search=input())
elif select == 'Library':
select_lib = libraries()
list_albums(library=select_lib)
elif select == 'Play all':
if artist:
src.fw_artists.play_artist(artist)
else:
src.fw_artists.play_artist(albums_results[0].get('artist'))
else:
play_album(album_id=albums_results[int(select)].get('id'))
def play_album(album_id):
tracks = get_tracks(album=album_id, ordering='disc_number,position', include_channels=True)
tracks_results = tracks.get('results')
storage = {}
for i in tracks_results:
listen_url = get_audio_file(i['listen_url'], True)
storage[track_url_to_uuid(listen_url)] = i
player.loadfile(listen_url, 'append-play')
player_menu("Album playing...", storage)

431
src/fw_api.py Normal file
View File

@ -0,0 +1,431 @@
from src.settings import get_config
import requests
import json
import time
import urllib.parse
import os
from loguru import logger
attempts_for_federate = 3
auth_file = '.auth.json'
if os.path.exists(auth_file):
with open('.auth.json', 'rt') as f:
auth = json.loads(f.read())
else:
# The default umask is 0o22 which turns off write permission of group and others
os.umask(0)
descriptor = os.open(
path=auth_file,
flags=(
os.O_WRONLY # access mode: write only
| os.O_CREAT # create if not exists
| os.O_TRUNC # truncate the file to zero
),
mode=0o600)
with open(descriptor, 'wt') as f:
f.write('{}')
auth = {}
class current_instance:
s = requests.Session()
s.headers.update({
"Accept-encoding": 'gzip, br, deflate',
"User-Agent": "funkwlmpv/latest-commit; +https://git.phreedom.club/localhost_frssoft/funkwlmpv"
})
instance = get_config('instance')
token = auth.get(instance)
listen_token = None
@logger.catch
def select_instance(new_instance=None):
current_instance.instance = new_instance
with open(auth_file, 'rt') as f:
auth = json.load(f)
new_token = auth.get(current_instance.instance)
current_instance.s.headers.update({"Authorization": None})
current_instance.token = new_token
current_instance.listen_token = None
if new_token:
current_instance.s.get(f'https://{current_instance.instance}', timeout=30)
current_instance.s.headers.update({"Authorization": "Bearer " + new_token})
if current_instance.token:
current_instance.s.headers.update({"Authorization": "Bearer " + current_instance.token})
else:
# Get cookies from unauthorized instance for working some functionality (radios)
current_instance.s.get(f'https://{current_instance.instance}/', timeout=30)
@logger.catch
def get_me():
'''Return current user if token avalaible'''
if not current_instance.token:
return
r = current_instance.s.get(f'https://{current_instance.instance}/api/v1/users/me')
r.raise_for_status()
resp = r.json()
current_instance.listen_token = resp['tokens']['listen']
return resp
def get_instance_settings():
r = current_instance.s.get(
f'https://{current_instance.instance}/api/v1/instance/settings')
return r.json()
def get_node_info():
r = current_instance.s.get(
f'https://{current_instance.instance}/api/v1/instance/nodeinfo/2.0/',
timeout=30)
r.raise_for_status()
return r.json()
@logger.catch
def get_audio_file(track_uuid, listen_url=False, download=False,
external_transcoding=get_config('external_transcoder_http_proxy_path'),
transcoding=get_config('enable_server_transcoding'), to='ogg', max_bitrate=128):
params = {
"download": download,
"to": to,
"max_bitrate": max_bitrate
}
if current_instance.token and not current_instance.listen_token:
get_me()
if current_instance.listen_token:
params['token'] = current_instance.listen_token
if not transcoding:
del params['to']
del params['max_bitrate']
if listen_url:
url = f'https://{current_instance.instance}{track_uuid}?'
else:
url = f'https://{current_instance.instance}/api/v1/listen/{track_uuid}?'
if external_transcoding != "":
url = external_transcoding + url
return url + urllib.parse.urlencode(params, doseq=True)
@logger.catch
def get_tracks(page=None, ordering=None, q=None, page_size=None,
artist=None, album=None, library=None,
tag=None, favourites=None, include_channels=None, pg=None):
'''This function get tracks by params'''
params = {
'page': page,
'page_size': page_size,
'ordering': ordering,
'q': q,
'artist': artist,
'album': album,
'tag': tag,
'favourites': favourites,
'library': library,
'include_channels': include_channels
}
if pg:
r = current_instance.s.get(pg)
else:
r = current_instance.s.get(
f'https://{current_instance.instance}/api/v1/tracks', params=params)
return r.json()
@logger.catch
def get_favorires_tracks(page=None, q=None, scope=None, include_channels=None, pg=None):
'''Get favorites tracks (not only for user)'''
params = {
'page': page,
'q': q,
'scope': scope,
'include_channels': include_channels
}
if pg:
r = current_instance.s.get(pg)
else:
r = current_instance.s.get(
f'https://{current_instance.instance}/api/v1/favorites/tracks/', params=params)
return r.json()
@logger.catch
def get_recently_listened(page=None, q=None, scope=None, include_channels=None, pg=None):
'''Get recently listened tracks (not only for user)'''
params = {
'page': page,
'q': q,
'scope': scope,
'include_channels': include_channels
}
if pg:
r = current_instance.s.get(pg)
else:
r = current_instance.s.get(
f'https://{current_instance.instance}/api/v1/history/listenings', params=params)
return r.json()
@logger.catch
def get_artists(page=None, q=None, artist=None, album=None, tag=None,
library=None, scope=None, favourites=None, refresh=False, pg=None):
'''This function get artists by params'''
params = {
'page': page,
'q': q,
'artist': artist,
'album': album,
'tag': tag,
'library': library,
'scope': scope,
'favourites': favourites,
'refresh': refresh
}
if pg:
r = current_instance.s.get(pg)
else:
r = current_instance.s.get(
f'https://{current_instance.instance}/api/v1/artists', params=params)
return r.json()
@logger.catch
def get_albums(page=None, q=None, ordering=None,
artist=None, library=None, include_channels=None, refresh=False, pg=None):
'''This function get artists by params'''
params = {
'page': page,
'ordering': ordering,
'q': q,
'artist': artist,
'library': library,
'include_channels': include_channels,
'refresh': refresh
}
if pg:
r = current_instance.s.get(pg)
else:
r = current_instance.s.get(
f'https://{current_instance.instance}/api/v1/albums', params=params)
return r.json()
@logger.catch
def get_channels(page=None, q=None, tag=None, pg=None):
params = {
'page': page,
'q': q,
'tag': tag
}
if pg:
r = current_instance.s.get(pg)
else:
r = current_instance.s.get(
f'https://{current_instance.instance}/api/v1/channels', params=params)
return r.json()
@logger.catch
def get_playlists(page=None, page_size=None, q=None, ordering='-modification_date', pg=None):
'''List playlists'''
params = {
'page': page,
'page_size': page_size,
'q': q,
'ordering': ordering
}
if pg:
r = current_instance.s.get(pg)
else:
r = current_instance.s.get(
f'https://{current_instance.instance}/api/v1/playlists', params=params)
r.raise_for_status()
return r.json()
@logger.catch
def get_playlist_tracks(playlist_id, pg=None):
'''Retrieve all tracks in the playlist'''
if pg:
r = current_instance.s.get(pg)
else:
r = current_instance.s.get(
f'https://{current_instance.instance}/api/v1/playlists/{playlist_id}/tracks')
return r.json()
@logger.catch
def list_libraries(page=None, page_size=None, q=None, scope='all', pg=None):
params = {
'page': page,
'page_size': page_size,
'q': q,
'scope': scope,
}
if pg:
r = current_instance.s.get(pg)
else:
r = current_instance.s.get(
f'https://{current_instance.instance}/api/v1/libraries', params=params)
return r.json()
@logger.catch
def assigned_libraries_on_track(track_id, page=None, page_size=None, pg=None):
params = {
'page': page,
'page_size': page_size,
}
if pg:
r = current_instance.s.get(pg)
else:
r = current_instance.s.get(
f'https://{current_instance.instance}/api/v1/tracks/{track_id}/libraries', params=params)
return r.json()
@logger.catch
def get_tags(q=None, ordering='-creation_date', pg=None):
params = {
'q': q,
'ordering': ordering
}
if pg:
r = current_instance.s.get(pg)
else:
r = current_instance.s.get(
f'https://{current_instance.instance}/api/v1/tags', params=params)
return r.json()
@logger.catch
def federate_search_by_url(object):
params = {
'object': object
}
for i in range(attempts_for_federate):
try:
r = current_instance.s.post(
f'https://{current_instance.instance}/api/v1/federation/fetches', json=params)
r.raise_for_status()
break
except Exception as Err:
logger.error(f'Attempt {i}: {Err}')
time.sleep(3)
return r.json()
@logger.catch
def federate_remote_library(fid):
'''Fetch remote library for follow and scan'''
params = {
'fid': fid
}
for i in range(attempts_for_federate):
try:
r = current_instance.s.post(
f'https://{current_instance.instance}/api/v1/federation/libraries/fetch', json=params)
r.raise_for_status()
break
except Exception as Err:
logger.error(f'Attempt {i}: {Err}')
time.sleep(3)
return r.json()
@logger.catch
def scan_remote_library(uuid):
for i in range(attempts_for_federate):
try:
r = current_instance.s.post(
f'https://{current_instance.instance}/api/v1/federation/libraries/{uuid}/scan')
r.raise_for_status()
break
except Exception as Err:
logger.error(f'Attempt {i}: {Err}')
time.sleep(3)
return r.json()
@logger.catch
def follow_on_remote_library(uuid):
params = {'target': uuid}
for i in range(attempts_for_federate):
try:
r = current_instance.s.post(
f'https://{current_instance.instance}/api/v1/federation/follows/library/',
json=params)
r.raise_for_status()
break
except Exception as Err:
logger.error(f'Attempt follow {i}: {Err}')
time.sleep(3)
return r.json()
@logger.catch
def unfollow_remote_library(uuid):
r = current_instance.s.delete(
f'https://{current_instance.instance}/api/v1/federation/follows/library/{uuid}/')
r.raise_for_status()
return r.json()
@logger.catch
def record_track_in_history(track_id):
params = {
'track': int(track_id)
}
r = current_instance.s.post(
f'https://{current_instance.instance}/api/v1/history/listenings', json=params)
r.raise_for_status()
return r.json
@logger.catch
def favorite_track(track_id):
r = current_instance.s.post(
f'https://{current_instance.instance}/api/v1/favorites/tracks', json={'track': int(track_id)})
r.raise_for_status()
return r.json
@logger.catch
def unfavorite_track(track_id):
r = current_instance.s.post(
f'https://{current_instance.instance}/api/v1/favorites/tracks/delete', json={'track': int(track_id)})
r.raise_for_status()
return r.json
@logger.catch
def hide_content(content):
'''This function hide content (write permission)'''
r = current_instance.s.post(
f'https://{current_instance.instance}/api/v1/moderation/content-filters/', json=content)
r.raise_for_status()
return r.json
# [FunkWhale radios]
def get_radios():
r = current_instance.s.get(
f'https://{current_instance.instance}/api/v1/radios/radios/')
return r.json()
def post_radio_session(requested_radio):
r = current_instance.s.post(
f'https://{current_instance.instance}/api/v1/radios/sessions/', json=requested_radio)
return r.json()
@logger.catch
def get_track_radio(radio_session):
r = current_instance.s.post(
f'https://{current_instance.instance}/api/v1/radios/tracks/', json=radio_session)
return r.json()

10
src/fw_api_hints.py Normal file
View File

@ -0,0 +1,10 @@
def hint_scope():
print(
'''Limit the results to a given user or pod:
Use all (or do not specify the property to disable scope filtering)
Use me to retrieve content relative to the current user
Use subscribed to retrieve content in libraries you follow
Use actor:alice@example.com to retrieve content relative to the account `alice@example.com
Use domain:example.com to retrieve content relative to the domain `example.com
You can specify multiple coma separated scopes, e.g me,subscribed to retrieve content matching either scopes
''')

84
src/fw_artists.py Normal file
View File

@ -0,0 +1,84 @@
from src.fw_api import get_artists, get_tracks, get_audio_file
from src.fw_api_hints import hint_scope
from src.fw_albums import list_albums
from src.fw_libraries import libraries
from src.fw_tags import list_tags
from src.mpv_control import player, player_menu, track_url_to_uuid
from pyfzf.pyfzf import FzfPrompt
from loguru import logger
fzf = FzfPrompt()
@logger.catch
def list_artists(pg=None, search=None, library=None, scope=None, tag=None):
artists = get_artists(q=search, library=library, pg=pg, scope=scope, tag=tag)
artists_next = artists.get('next')
artists_prev = artists.get('previous')
artists_results = artists.get('results')
view = ['Search', 'Tag', 'Library', 'Limit by scope']
if artists_next:
view.append('Next page')
if artists_prev:
view.append('Prev page')
for i in artists_results:
index = artists_results.index(i)
artist_name = i.get('name')
artist_tracks_count = i.get('tracks_count')
view.append(f'{index}.{artist_name} | {artist_tracks_count}')
select = fzf.prompt(view, '--header=\'map: artist | tracks count\'')
if select == []:
return
else:
select = select[0].split('.', 1)[0]
if select == 'Next page':
list_artists(pg=artists_next)
elif select == 'Prev page':
list_artists(pg=artists_prev)
elif select == 'Search':
print('Search by artist:')
list_artists(search=input())
elif select == 'Tag':
list_artists(tag=list_tags())
elif select == 'Library':
select_lib = libraries()
list_artists(library=select_lib)
elif select == 'Limit by scope':
hint_scope()
scope = input()
list_artists(scope=scope)
else:
albums = artists_results[int(select)].get('albums')
if albums:
list_albums(albums=albums)
else: # Fallback on tracks of selected artist
play_artist(artists_results[int(select)]['id'])
def play_artist(artist_id):
tracks = get_tracks(artist=artist_id, ordering='disc_number,position',
include_channels=True, pg=None)
tracks_next = tracks.get('next')
tracks_count = tracks.get('count')
storage = {}
if tracks_count > 50:
print(f'Loading {tracks_count} tracks...')
elif tracks_count == 0:
logger.warning('Empty tracks. Nothing to do')
return
while True:
tracks_results = tracks.get('results')
tracks_next = tracks.get('next')
for i in tracks_results:
listen_url = get_audio_file(i['listen_url'], True)
storage[track_url_to_uuid(listen_url)] = i
player.loadfile(listen_url, 'append-play')
if tracks_next:
tracks = get_tracks(
artist=artist_id, include_channels=True, pg=tracks_next)
else:
break
artist_name = tracks.get('results')[0]['artist']['name']
player_menu(f"Artist {artist_name} playing...", storage)

43
src/fw_channels.py Normal file
View File

@ -0,0 +1,43 @@
from src.fw_api import get_channels
from src.fw_albums import list_albums
from loguru import logger
from pyfzf.pyfzf import FzfPrompt
fzf = FzfPrompt()
@logger.catch
def list_channels(pg=None, search=None):
channels = get_channels(q=search, pg=pg)
channels_next = channels.get('next')
channels_prev = channels.get('previous')
channels_results = channels.get('results')
view = ['Search']
if channels_next:
view.append('Next page')
if channels_prev:
view.append('Prev page')
for i in channels_results:
index = channels_results.index(i)
channel_name = i.get('artist').get('name')
view.append(f'{index}.{channel_name}')
select = fzf.prompt(view)[0].split('.', 1)
if select == []:
return
else:
select = select[0]
if select == 'Next page':
list_channels(pg=channels_next)
elif select == 'Prev page':
list_channels(pg=channels_prev)
elif select == 'Search':
print('Search by channel:')
list_channels(search=input())
else:
refresh = False
if channels_results[int(select)].get('artist').get('is_local') == False:
refresh = True
list_albums(artist=channels_results[int(select)].get(
'artist').get('id'), include_channels=True, refresh=refresh)

151
src/fw_instances.py Normal file
View File

@ -0,0 +1,151 @@
from src.fw_api import current_instance
import src.settings as settings
from pyfzf.pyfzf import FzfPrompt
from shlex import quote
from loguru import logger
import json
import time
import concurrent
import requests
fzf = FzfPrompt()
@logger.catch
def get_new_funkwhale_servers():
# Uses official API network.funkwhale.audio for getting new instances
public_server_api = 'https://network.funkwhale.audio/dashboards/api/tsdb/query'
now = int(time.time())
timeback = now - 86400
request_public_servers = {
'from': f"{timeback}",
'to': f"{now}",
'queries': [
{
'refId': "A",
'intervalMs': 60000,
'maxDataPoints': 1174,
'datasourceId': 1,
'rawSql': "SELECT * FROM (\n SELECT\n DISTINCT on (c.domain) c.domain as \"Name\",\n c.up as \"Is up\",\n coalesce(c.open_registrations, false) as \"Open registrations\",\n coalesce(anonymous_can_listen, false) as \"Anonymous can listen\",\n coalesce(c.usage_users_total, 0) as \"Total users\",\n coalesce(c.usage_users_active_month, 0) as \"Active users (this month)\",\n coalesce(c.software_version_major, 0)::text || '.' || coalesce(c.software_version_minor, 0)::text || '.' || coalesce(c.software_version_patch, 0)::text as \"Version\",\n c.time as \"Last checked\",\n d.first_seen as \"First seen\"\n FROM checks as c\n INNER JOIN domains AS d ON d.name = c.domain\n WHERE d.blocked = false AND c.up = true AND c.time > now() - INTERVAL '7 days'\n AND c.anonymous_can_listen IN ('true')\n AND c.open_registrations IN ('true','false')\n\n ORDER BY c.domain, c.time DESC\n) as t ORDER BY \"Active users (this month)\" DESC",
'format': "table"
}
]
}
try:
r = requests.post(public_server_api, json=request_public_servers)
results = r.json()
new_instances = {}
if results:
new_instances_list = results['results']['A']['tables'][0]['rows']
for i in new_instances_list:
anonymousCanListen = i[1]
if anonymousCanListen:
new_instances[i[0]] = f'{anonymousCanListen} | ?'
for i in get_new_funkwhale_servers_fediverse_observer():
new_instances[i] = "?"
return new_instances
except: # If any errors then return empty list
return {}
def get_new_funkwhale_servers_fediverse_observer():
try:
graphQL_request = {
'query':
'{\n nodes(softwarename: \"funkwhale\") {\n domain\n metanodeinfo\n }\n}'
}
r = requests.post('https://api.fediverse.observer/',
headers={'Accept-Encoding': 'gzip, deflate'},
json=graphQL_request)
new_instances = []
for i in r.json()['data']['nodes']:
if i.get('metanodeinfo'):
auth_no_required = json.loads(i['metanodeinfo'])['library']['anonymousCanListen']
if auth_no_required and i['domain']:
new_instances.append(i['domain'])
return new_instances
except:
return []
def fetch_instances_nodeinfo_and_avalaibility(instances):
extended_instances_info = {}
def request_nodeinfo(instance):
return requests.get('https://' + instance + '/api/v1/instance/nodeinfo/2.0/',
headers={
'Accept-Encoding': 'gzip, brotli, deflate',
'User-Agent': 'funkwlmpv/latest-commit; +https://git.phreedom.club/localhost_frssoft/funkwlmpv'},
timeout=10).json()
with concurrent.futures.ThreadPoolExecutor() as executor: # optimally defined number of threads
res = [executor.submit(request_nodeinfo, instance) for instance in instances]
concurrent.futures.wait(res)
for idx, v in enumerate(instances):
try:
data_for_instance = res[idx].result()
anon = data_for_instance['metadata']['library']['anonymousCanListen']
tracks = data_for_instance['metadata']['library']['tracks']['total']
extended_instances_info[v] = f'{anon} | {tracks}'
except:
extended_instances_info[v] = 'fail'
return extended_instances_info
def instances_menu(fetch_manually=False, fetch_node_info=False):
with open('config.json', 'rt') as f:
conf = json.loads(f.read())
if conf.get('automatic_fetch_new_instances') or fetch_manually:
public_server_list_instances = get_new_funkwhale_servers()
new_ins_count = len(public_server_list_instances)
else:
public_server_list_instances = {}
new_ins_count = 'Disabled'
list_instances = conf.get('public_list_instances_extended')
if public_server_list_instances != {}:
list_instances_merge = {**list_instances, **public_server_list_instances}
settings.set_config('public_list_instances_extended', list_instances_merge)
list_instances = list_instances_merge
map_in_extend_mode = ''
if fetch_node_info:
list_instances = fetch_instances_nodeinfo_and_avalaibility([instance.split('|')[0].strip() for instance in list_instances.keys()])
settings.set_config('public_list_instances_extended', list_instances)
map_in_extend_mode = '\nmap: instance | anonymousCanListen | tracks'
instance_menu_selector = ['Add new instance',
'Fetch new instances',
'Fetch nodeinfo and avalaibility',
'Remove unreachible instances',
'Shuffle']
instance = fzf.prompt(
instance_menu_selector +
[f'{instance} | {info}' for instance, info in list_instances.items()],
'--header='+quote(f'Select instance\nNew instances: {new_ins_count}{map_in_extend_mode}'))
if instance == []:
return
else:
instance = instance[0].split('|')[0].strip()
if instance == 'Add new instance':
new = input('example.com\n').strip()
list_instances[new] = 'added by user'
settings.set_config('public_list_instances_extended', list_instances)
instance = new
if instance == 'Fetch new instances':
return instances_menu(fetch_manually=True)
if instance == 'Fetch nodeinfo and avalaibility':
return instances_menu(fetch_node_info=True)
if instance == 'Shuffle':
import random
instance = random.choice(list(list_instances.keys()))
if instance == 'Remove unreachible instances':
clean_unreach = {}
for ins, info in list_instances.items():
if 'fail' not in info.split():
clean_unreach[ins] = info
settings.set_config('public_list_instances_extended', clean_unreach)
return instances_menu()
current_instance.select_instance(instance)

78
src/fw_libraries.py Normal file
View File

@ -0,0 +1,78 @@
from src.fw_api import current_instance, list_libraries, federate_remote_library, scan_remote_library, follow_on_remote_library
from pyfzf.pyfzf import FzfPrompt
from loguru import logger
import time
fzf = FzfPrompt()
@logger.catch
def libraries(pg=None, radio=False, search=None):
libs_res = list_libraries(pg=pg, q=search)
libs_count = libs_res.get('count')
libs_next = libs_res.get('next')
libs_prev = libs_res.get('previous')
libs = libs_res.get('results')
libraries_listing = ['Search']
if libs_next:
libraries_listing.append('Next')
if libs_prev:
libraries_listing.append('Prev')
if current_instance.s.headers.get('Authorization'):
libraries_listing.append('Add remote library')
for lib_i in libs:
index = libs.index(lib_i)
lib_name = lib_i.get('name')
lib_tracks_count = lib_i.get('uploads_count')
lib_access = lib_i.get('privacy_level')
lib_by = lib_i.get('actor').get('full_username')
libraries_listing.append(f'{index}.{lib_name} | {lib_by} | {lib_tracks_count} | {lib_access}')
lib_select = fzf.prompt(
libraries_listing,
f'--header=\'found {libs_count} libraries\nmap: library name | owner | tracks count\'')
if lib_select == []:
return
else:
lib_select = lib_select[0].split('.', 1)
if lib_select[0] == 'Next':
return libraries(pg=libs_next, search=search)
elif lib_select[0] == 'Prev':
return libraries(pg=libs_prev, search=search)
elif lib_select[0] == 'Search':
q = input('Name of library:\n')
return libraries(search=q)
elif lib_select[0] == 'Add remote library':
print('Search a remote library (url\\fid):')
new_library = federate_remote_library(input().strip())
if new_library.get('detail'):
logger.error(new_library['detail'])
return
if new_library.get('count') > 0:
print('Library found')
one_lib = new_library['results'][0]
if one_lib['privacy_level'] == 'private':
logger.warning('This library is private, you should wait until your request is approved')
follow_on_remote_library(one_lib['uuid'])
scan = scan_remote_library(one_lib['uuid'])
if scan.get('detail'):
logger.error(scan['detail'])
return
status = scan['status']
if status == 'scheduled':
print(f'Scanning {status}. Please wait few minutes for scan and open libraries menu again')
else:
print(f'Scan is {status}')
time.sleep(3)
return
else:
lib_addr = lib_select[0]
lib_name = lib_select[1]
lib_uuid = libs[int(lib_addr)]['uuid']
lib_fid = libs[int(lib_addr)]['fid']
if radio:
return None, 'library', f'{lib_name}\n{lib_fid}', lib_uuid
else:
return lib_uuid

64
src/fw_playlists.py Normal file
View File

@ -0,0 +1,64 @@
from src.fw_api import get_playlists, get_playlist_tracks, get_audio_file
from src.mpv_control import player, player_menu, track_url_to_uuid
from pyfzf.pyfzf import FzfPrompt
from loguru import logger
fzf = FzfPrompt()
@logger.catch
def list_playlists(pg=None, search=None):
playlists = get_playlists(q=search, pg=pg)
playlists_next = playlists.get('next')
playlists_prev = playlists.get('previous')
playlists_results = playlists.get('results')
view = ['Search']
if playlists_next:
view.append('Next page')
if playlists_prev:
view.append('Prev page')
for i in playlists_results:
index = playlists_results.index(i)
playlist_name = i.get('name')
view.append(f'{index}.{playlist_name}')
select = fzf.prompt(view)
if select == []:
return
else:
select = select[0].split('.', 1)[0]
if select == 'Next page':
list_playlists(pg=playlists_next)
elif select == 'Prev page':
list_playlists(pg=playlists_prev)
elif select == 'Search':
print('Search by playlist:')
list_playlists(search=input())
else:
play_playlist(playlist_id=playlists_results[int(select)].get('id'))
def play_playlist(playlist_id):
tracks = get_playlist_tracks(playlist_id, pg=None)
tracks_next = tracks.get('next')
tracks_count = tracks.get('count')
storage = {}
if tracks_count > 50:
print(f'Loading {tracks_count} tracks...')
elif tracks_count == 0:
logger.warning('Empty tracks. Nothing to do')
return
while True:
tracks_results = tracks.get('results')
tracks_next = tracks.get('next')
for i in tracks_results:
track = i.get('track')
listen_url = get_audio_file(track['listen_url'], True)
storage[track_url_to_uuid(listen_url)] = track
player.loadfile(listen_url, 'append-play')
if tracks_next:
tracks = get_playlist_tracks(playlist=playlist_id, pg=tracks_next)
else:
break
player_menu(f"Playlist playing...", storage)

207
src/fw_radios.py Normal file
View File

@ -0,0 +1,207 @@
from src.fw_api import current_instance, get_radios, post_radio_session, get_track_radio, list_libraries, favorite_track, get_audio_file, hide_content
from src.fw_libraries import libraries
from src.fw_tags import list_tags
from src.utils import download_track, track_info_output
from src.mpv_control import player, track_url_to_uuid, player_fw_storage, soft_volume_reduce, send_listen_activity
from src.settings import get_config
from pyfzf.pyfzf import FzfPrompt
from loguru import logger
from shlex import quote
import threading
import time
fzf = FzfPrompt()
@logger.catch
def list_radios(error_given=None):
radios = get_radios()
count = radios.get('count')
results = radios.get('results')
view = []
for i in results:
index = results.index(i)
id_radio = i.get('id')
name = i.get('name')
descr = i.get('description')
radio_option = name
if descr and descr != "":
radio_option += f' | {descr}'
view.append(f'{index}.{radio_option}')
# Radios avalaible only for auth user
if current_instance.s.headers.get('Authorization'):
view.append('Favourites')
view.append('Less listened')
view.extend(['Tag', 'Random', 'Libraries', 'Users', 'Recently Added'])
header = f'Found {count} radios'
if error_given:
header += f'\n{error_given}'
header = quote(header)
selected = fzf.prompt(
view, f'--header {header} --read0', delimiter="\0")
if selected == []:
return
else:
selected = selected[0].split('.', 1)
if 'Favourites' in selected:
radio_load(id_radio, 'favorites', name='your favorites tracks')
elif 'Tag' in selected:
tag = list_tags()
radio_load(type_radio='tag', name=f'by tag: {tag}', related_object=tag)
elif 'Random' in selected:
radio_load(id_radio, 'random', name='totally random')
elif 'Libraries' in selected:
id_radio, type_radio, name_radio, related_obj = libraries(radio=True)
radio_load(id_radio, type_radio, name_radio, related_obj)
elif 'Users' in selected:
libs = list_libraries()['results']
libraries_listing = []
for lib_i in libs:
lib_name = lib_i.get('actor').get('full_username')
if lib_name not in libraries_listing:
libraries_listing.append(lib_name)
libraries_listing.append('Custom')
lib_addr = fzf.prompt(libraries_listing)[0]
if lib_addr == 'Custom':
print('Input remote user library (ex. nick@funkwhale.domain.example: ')
lib_addr = input()
radio_load(None, 'actor-content', lib_addr, lib_addr)
elif 'Recently Added' in selected:
radio_load(id_radio, 'recently-added',
name='Newest content on the network')
elif 'Less listened' in selected:
radio_load(id_radio, 'less-listened', name="Less listened tracks")
else:
id_selected = selected[0]
id_radio = results[int(id_selected)].get('id')
name_radio = results[int(id_selected)].get('name')
radio_load(id_radio, name=name_radio)
def radio_generator(radio_session_id):
count_t = 0
while radio_session_id != '':
time.sleep(1)
if not radio_event_gen.wait(0):
break
count_t += 1
if count_t >= 60:
count_t = 0
playlist_remaining = len(player.playlist) - \
player.playlist_current_pos
if playlist_remaining <= 2:
radio_get_track(radio_session_id)
print('\rRadio generator stopped', flush=True)
radio_event_gen = threading.Event()
@logger.catch
def radio_load(id_radio=None, type_radio='custom', name=None, related_object=None):
show_like_button = get_config('show_like_button')
player.volume = get_config('mpv_volume')
requested_radio = {
'custom_radio': id_radio,
'radio_type': type_radio,
'related_object_id': related_object
}
radio_session_id = post_radio_session(requested_radio).get('id')
for i in range(0, 2):
try:
radio_get_track(radio_session_id, first_run=True)
except Exception as E:
return list_radios(error_given=f'Error: {E}')
radio_event_gen.set()
radio_task = threading.Thread(
target=radio_generator, args=(radio_session_id,), daemon=True)
radio_task.start()
player_items_menu = ['Next', 'Prev', 'Pause', 'Download', 'Info', 'Share']
if show_like_button:
player_items_menu.append('Like')
player_items_menu.extend(['Hide artist', 'Exit'])
while True:
try:
if player.pause:
player_items_menu[2] = 'Play'
else:
player_items_menu[2] = 'Pause'
try:
select = fzf.prompt(player_items_menu,
quote(f"--header=\'Radio {name} playing...\'"))[0]
except:
select = 'Exit'
if select == 'Next':
playlist_remaining = player.playlist_count - player.playlist_current_pos
if playlist_remaining <= 2:
threading.Thread(target=radio_get_track, args=(
radio_session_id,), daemon=True).start()
if playlist_remaining > 1:
player.playlist_next()
else:
print('No more tracks, please wait for new...')
time.sleep(3)
elif select == 'Prev':
player.playlist_prev()
elif select in ('Pause', 'Play'):
if player.pause:
player.pause = False
else:
player.pause = True
elif select == 'Hide artist':
track = player_fw_storage.storage.get(track_url_to_uuid())
hide_content(
{'target': {'id': track.get('artist').get('id'), 'type': 'artist'}})
elif select == 'Download':
name_downloaded = download_track(player.stream_open_filename)
elif select == 'Info':
track = player_fw_storage.storage.get(track_url_to_uuid())
track['direct_url'] = player.stream_open_filename
track_info_output(track)
elif select == 'Share':
send_listen_activity()
elif select == 'Like':
favorite_track(player_fw_storage.storage.get(
track_url_to_uuid())['id'])
elif select == 'Exit':
radio_event_gen.clear()
soft_volume_reduce()
player.playlist_clear()
player.stop()
player_fw_storage.storage = {}
break
except Exception as E:
radio_event_gen.clear()
player.playlist_clear()
player.stop()
player_fw_storage.storage = {}
logger.exception(f'Radio force stopped: {E}')
break
def radio_get_track(radio_session_id, first_run=False):
radio_context = get_track_radio({'session': radio_session_id})
if not radio_context:
return
if isinstance(radio_context, str):
logger.error(radio_context)
if radio_context == "Radio doesn't have more candidates":
radio_event_gen.clear()
if first_run:
radio_context = 'This radio may be private or haven\'t tracks'
raise IOError(radio_context)
return
if radio_context.get('error'):
logger.error(radio_context.get('error'))
return
else:
track = radio_context.get('track')
listen_url = track['listen_url']
player_fw_storage.storage[track_url_to_uuid(listen_url)] = track
player.loadfile(get_audio_file(
listen_url, listen_url=True), 'append-play')

58
src/fw_recents.py Normal file
View File

@ -0,0 +1,58 @@
from src.fw_api import get_favorires_tracks, get_recently_listened, get_audio_file
from src.fw_api_hints import hint_scope
from src.mpv_control import player, player_menu, track_url_to_uuid, player_fw_storage, play_track
from pyfzf.pyfzf import FzfPrompt
from loguru import logger
fzf = FzfPrompt()
@logger.catch
def list_fav_or_history(pg=None, search=None, scope=None, is_history_view=False):
if is_history_view:
action = 'listened'
tracks = get_recently_listened(q=search, scope=scope, pg=pg)
else:
action = 'liked'
tracks = get_favorires_tracks(q=search, scope=scope, pg=pg)
tracks_next = tracks.get('next')
tracks_prev = tracks.get('previous')
tracks_results = tracks.get('results')
view = ['Search', 'Limit by scope', 'Play this page']
if tracks_next:
view.append('Next page')
if tracks_prev:
view.append('Prev page')
for i in tracks_results:
index = tracks_results.index(i)
track_name = i['track'].get('title')
who_user = i['user'].get('username')
view.append(f'{index}.{track_name} | {who_user}')
select = fzf.prompt(view, f'--multi --header=\'map: track title | who {action}\'')
if select == []:
return
if 'Next page' in select:
list_fav_or_history(pg=tracks_next, is_history_view=is_history_view)
elif 'Prev page' in select:
list_fav_or_history(pg=tracks_prev, is_history_view=is_history_view)
elif 'Search' in select:
print('Search by track:')
list_fav_or_history(search=input(), is_history_view=is_history_view)
elif 'Limit by scope' in select:
hint_scope()
scope = input()
list_fav_or_history(scope=scope, search=search, is_history_view=is_history_view)
elif 'Play this page' in select:
for i in tracks_results:
play_track(track=i['track'], multi=True)
player_fw_storage.menu_ctx = list_fav_or_history
player_fw_storage.menu_ctx_args = [pg, search, scope, is_history_view]
elif len(select) > 1:
for i in select:
play_track(track=tracks_results[int(
i.split('.', 1)[0])]['track'], multi=True)
else:
play_track(track=tracks_results[int(
select[0].split('.', 1)[0])]['track'])

36
src/fw_tags.py Normal file
View File

@ -0,0 +1,36 @@
from src.fw_api import get_tags, get_tracks
from pyfzf.pyfzf import FzfPrompt
from loguru import logger
fzf = FzfPrompt()
@logger.catch
def list_tags(pg=None, search=None, error_given=''):
tags = get_tags(q=search, pg=pg)
tags_next = tags.get('next')
tags_prev = tags.get('previous')
tags_results = tags.get('results')
view = ['Search']
if tags_next:
view.append('Next page')
if tags_prev:
view.append('Prev page')
for i in tags_results:
index = tags_results.index(i)
tag_name = i.get('name')
view.append(f'{index}.{tag_name}')
select = fzf.prompt(view, f"--header=\'{error_given}\'")[0].split('.', 1)[0]
if select == 'Next page':
return list_tags(pg=tags_next)
elif select == 'Prev page':
return list_tags(pg=tags_prev)
elif select == 'Search':
print('Search by tag:')
return list_tags(search=input())
else:
selected_tag = tags_results[int(select)].get('name')
if get_tracks(tag=selected_tag, page_size=1)['count'] == 0:
return list_tags(pg=pg, search=search, error_given='This tag no contain tracks, select another')
return selected_tag

71
src/fw_tracks.py Normal file
View File

@ -0,0 +1,71 @@
from src.fw_api import get_tracks
from src.fw_tags import list_tags
from src.fw_libraries import libraries
from src.mpv_control import play_track, player_fw_storage
from pyfzf.pyfzf import FzfPrompt
from loguru import logger
import time
fzf = FzfPrompt()
@logger.catch
def list_tracks(pg=None, search=None, tag=None, library=None):
tracks = get_tracks(q=search, pg=pg, tag=tag, library=library)
tracks_count = tracks.get('count')
tracks_next = tracks.get('next')
tracks_prev = tracks.get('previous')
tracks_results = tracks.get('results')
view = ['Search', 'Tags', 'Library', 'Play this page']
if tracks_next:
view.append('Play all pages')
view.append('Next page')
if tracks_prev:
view.append('Prev page')
for i in tracks_results:
index = tracks_results.index(i)
track_name = i.get('title')
view.append(f'{index}.{track_name}')
select = fzf.prompt(view, f'--header=\'Found {tracks_count} tracks\'')
if select == []:
return
else:
select = select[0].split('.', 1)[0]
if select == 'Next page':
list_tracks(pg=tracks_next)
elif select == 'Prev page':
list_tracks(pg=tracks_prev)
elif select == 'Search':
print('Search by track:')
list_tracks(search=input())
elif select == 'Tags':
select_tag = list_tags()
list_tracks(tag=select_tag)
elif select == 'Library':
select_lib = libraries()
list_tracks(library=select_lib)
elif select == 'Play this page':
for i in tracks_results:
play_track(track=i, multi=True)
player_fw_storage.menu_ctx = list_tracks
player_fw_storage.menu_ctx_args = [pg, search, tag, library]
elif select == 'Play all pages':
if tracks_count > 500:
yn = input('WARNING: you really want add more than 500 tracks? (y/[n] or number of tracks)\n').lower()
try:
tracks_count = int(yn)
except:
if yn != 'y':
return
count_loaded = 0
while tracks_count > count_loaded:
for i in tracks_results:
play_track(track=i, multi=True)
count_loaded += 1
time.sleep(0.2)
tracks = get_tracks(pg=tracks_next)
tracks_next = tracks.get('next')
tracks_results = tracks.get('results')
else:
play_track(track=tracks_results[int(select)])

240
src/mpv_control.py Normal file
View File

@ -0,0 +1,240 @@
import src.fw_api
from src.utils import download_track, print_there, track_info_output, indices
from src.settings import get_config
from loguru import logger
from pyfzf.pyfzf import FzfPrompt
from shutil import get_terminal_size
from shlex import quote
from contextlib import suppress
import mpv
import time
import re
import requests
fzf = FzfPrompt()
if get_config('enable_persistent_cache'):
player = mpv.MPV(cache=True,
scripts='src/mpv_scripts/mpv_cache.lua:src/mpv_scripts/streamsave.lua',
script_opts='streamsave-save_directory=cache,streamsave-dump_mode=continuous,streansave-force_extension=.mkv,streamsave-autostart=no,output_label=overwrite')
player.command('script-message', 'streamsave-path', 'cache')
else:
player = mpv.MPV(cache=True, demuxer_max_bytes=25*1024*1024)
player.ytdl = False # Prevent attempts load track with yt-dlp
player.volume = get_config('mpv_volume')
player.prefetch_playlist = get_config('prefetch_playlist')
show_like_button = get_config('show_like_button')
share_to_fediverse_token = get_config('share_to_fediverse_token')
share_to_fediverse_instance = get_config('share_to_fediverse_instance')
shuffle = False
if get_config('termux_handle_track_switch_by_volume'):
import src.android_termux_api
class player_fw_storage:
storage = {}
menu_ctx = None
menu_ctx_args = None
@logger.catch
def track_url_to_uuid(listen_url=None):
'''Attempt get uuid from track listen url or current playing url'''
hex = '[0-9a-fA-F]+'
find_uuid = f'{hex}-{hex}-{hex}-{hex}-{hex}'
if listen_url:
uuid = re.findall(find_uuid, listen_url)
else:
uuid = re.findall(find_uuid, player.stream_open_filename)
return uuid[0]
def send_listen_activity():
try:
track = player_fw_storage.storage.get(track_url_to_uuid())
except:
return
if src.fw_api.current_instance.token is not None:
track_id = track.get('id')
if track_id:
src.fw_api.record_track_in_history(track_id)
else:
logger.error("Can't write track to history: No track id")
if share_to_fediverse_token != '':
fid = track.get('fid')
artist = track['artist'].get('name')
album = track['album'].get('title')
title = track.get('title')
tags = track.get('tags')
if tags:
tags = [f'#{tag}' for tag in tags]
tags = ' '.join(tags)
if tags == []:
tags = ''
status_obj = {'spoiler_text': 'funkwlmpv music share',
'visibility': 'unlisted',
'status': f'🎧 {artist} - {album} - {title}\n{fid}\n#NowPlaying {tags}'}
requests.post(f'https://{share_to_fediverse_instance}/api/v1/statuses',
json=status_obj,
headers={'Authorization': f'Bearer {share_to_fediverse_token}'})
def osd_observer(value):
'''Sumulate osd playing message in console'''
if value:
osd_message = []
for i in value.items():
if i[0] in ('Artist', 'Album', 'Title'):
osd_message.append(i[1])
osd_string = ' - '.join(osd_message)
term_len = get_terminal_size().columns
print_there(0, 0, '\r'+' '*term_len)
print_there(0, 0, '\r'+osd_string[:term_len])
else:
print_there(0, 0, '\rNo metadata...')
@player.event_callback('start-file')
@logger.catch
def starting_file_handler(value):
'''just show loading state'''
print_there(0, 0, '\rLoading track...')
@player.property_observer('percent-pos')
@logger.catch
def universal_observer(_name, value):
if value:
percent = int(value)
if player.audio_bitrate:
kbps = int(player.audio_bitrate/1024)
else:
kbps = '?'
if player.file_size:
track_size = round(player.file_size/1024/1024, 1)
else:
track_size = '?'
if player.cache_speed:
speed_load = player.cache_speed
if speed_load >= 3*1024*1024:
cache_speed = '| <<<'
elif speed_load >= 1*1024*1024:
cache_speed = '| <<*'
else:
cache_speed = '| <=>'
else:
cache_speed = ''
if player.playlist_count > -1:
player_pos = f'{player.playlist_pos_1}/{player.playlist_count}'
else:
player_pos = '-/-'
osd_observer(player.filtered_metadata)
print_there(2, 2, f'\r'+' '*get_terminal_size().columns)
print_there(2, 2, f'\r{player_pos} | {kbps} kbps | {percent}% | {track_size}MB {cache_speed}')
time.sleep(1)
def soft_volume_reduce():
while player.volume > 10:
player.volume = player.volume - 1
time.sleep(0.050)
@logger.catch
def player_menu(header='', storage={}):
for i in player.playlist_filenames:
count_same_tracks = indices(player.playlist_filenames, i)
while len(count_same_tracks) > 1:
with suppress(SystemError):
player.playlist_remove(count_same_tracks[-1])
count_same_tracks = indices(player.playlist_filenames, i)
player_fw_storage.storage.update(storage)
player.volume = get_config("mpv_volume")
global shuffle
while True:
try:
player_items_menu = ['Next', 'Prev', 'Pause',
'Shuffle', 'Download', 'Info', 'Share', 'Jump to']
if player.pause:
player_items_menu[2] = 'Play'
else:
player_items_menu[2] = 'Pause'
if shuffle:
player_items_menu[3] = 'Unshuffle'
else:
player_items_menu[3] = 'Shuffle'
if show_like_button:
player_items_menu.append('Like')
if player_fw_storage.menu_ctx:
player_items_menu.append('Add more tracks')
player_items_menu.extend(['Hide artist', 'Exit'])
select = fzf.prompt(player_items_menu, quote(f"--header=\'{header}\'"))
if select == []:
break
else:
select = select[0]
if select == 'Next':
try:
player.playlist_next()
except:
print('No more next tracks')
elif select == 'Prev':
player.playlist_prev()
elif select in ('Pause', 'Play'):
player.cycle('pause')
elif select in ('Shuffle', 'Unshuffle'):
if shuffle:
shuffle = False
player.playlist_unshuffle()
else:
shuffle = True
player.playlist_shuffle()
player.playlist_play_index(0)
elif select == 'Download':
name_downloaded = download_track(player.stream_open_filename)
elif select == 'Info':
track = player_fw_storage.storage.get(track_url_to_uuid())
track['direct_url'] = player.stream_open_filename
track_info_output(track)
elif select == 'Share':
send_listen_activity()
elif select == 'Jump to':
jump_to_idx = int(fzf.prompt(range(1, len(player.playlist_filenames)+1))[0])
jump_to_idx -= 1
player.playlist_play_index(jump_to_idx)
elif select == 'Like':
src.fw_api.favorite_track(
player_fw_storage.storage.get(track_url_to_uuid())['id'])
elif select == 'Add more tracks':
player_fw_storage.menu_ctx(*player_fw_storage.menu_ctx_args)
elif select == 'Hide artist':
track = player_fw_storage.storage.get(track_url_to_uuid())
player.playlist_remove('current')
src.fw_api.hide_content(
{'target': {'id': track.get('artist').get('id'), 'type': 'artist'}})
elif select == 'Exit':
shuffle = False
soft_volume_reduce()
player.playlist_clear()
player.stop()
player_fw_storage.storage = {}
break
except KeyboardInterrupt:
break
def play_track(track, multi=False):
listen_url = src.fw_api.get_audio_file(track['listen_url'], True)
player_fw_storage.storage[track_url_to_uuid(listen_url)] = track
if multi:
player.loadfile(listen_url, 'append-play')
else:
player.loadfile(listen_url, 'append-play')
track_name = track.get('title')
player_menu(f"{track_name} playing...", player_fw_storage.storage)

View File

@ -0,0 +1,66 @@
local utils = require 'mp.utils'
local msg = require 'mp.msg'
local options = require 'mp.options'
function sleep(n)
os.execute("sleep " .. tonumber(n))
end
function createDir(dirname)
os.execute("mkdir -p -m 711 " .. dirname)
end
function file_exists(name)
local f = io.open(name, "r")
return f ~= nil and io.close(f)
end
function get_url_host(s)
return (s.."/"):match("://(.-)/")
end
function make_cache_track(url)
mp.command('script-message streamsave-autostart no')
find_uuid = "%x+-%x+-%x+-%x+-%x+"
uuid = string.sub(url, string.find(url, find_uuid))
host = get_url_host(url)
cache_path_file = 'cache/' .. host .. '/' .. uuid .. '.mkv'
cache_path_named_file = 'cache/' .. host .. '/' .. uuid .. '.mkv'
if false == file_exists(cache_path_file) then
createDir('cache/' .. host .. '/')
msg.verbose('Caching ' .. cache_path_file .. '')
mp.command('script-message streamsave-title ' .. uuid .. '')
mp.command('script-message streamsave-force_title ' .. uuid .. '')
mp.command('script-message streamsave-label overwrite')
mp.set_property('script-opts/media-uuid', uuid)
mp.command('script-message streamsave-extension .mkv')
mp.command('script-message streamsave-path cache/' .. host .. '')
mp.command('script-message streamsave-autostart yes')
else
msg.verbose('Already cached ' .. cache_path_file .. '')
os.execute('touch ' .. cache_path_file .. '')
mp.set_property("stream-open-filename", cache_path_file)
end
end
mp.add_hook("on_load", 11, function()
msg.verbose('reusable cache hook activated')
local url = mp.get_property("stream-open-filename", "")
if true == (url:find("https?://") == 1) then
make_cache_track(url)
end
end)
mp.register_event("file-loaded", function()
msg.verbose('reusable cache post-hook activated')
local url = mp.get_property("stream-open-filename", "")
if true == (url:find("https?://") == 1) then
make_cache_track(url)
end
end)

File diff suppressed because it is too large Load Diff

104
src/settings.py Normal file
View File

@ -0,0 +1,104 @@
import json
import os
from os.path import exists
from loguru import logger
from pyfzf.pyfzf import FzfPrompt
fzf = FzfPrompt()
conf_file = 'config.json'
default_conf = {
'instance': 'fw.ponychord.rocks',
'public_list_instances_extended':
{
"open.audio": None,
"audio.liberta.vip": None,
"dance.with.tewi.and.reisen": None,
"tanukitunes.com": None,
"funkwhale.juniorjpdj.pl": None,
"audio.securetown.in.ua": None,
"funkwhale.thurk.org": None,
"buzzworkers.com": None,
"soundship.de": None,
"funkwhale.kameha.click": None,
"music.chosto.me": None,
"zik.goe.land": None,
"music.humanoids.be": None,
"music.hempton.us": None,
"mizik.o-k-i.net": None,
"klh.radiolivre.org": None,
"hudba.feildel.fr": None,
"funk.deko.cloud": None,
"audio.graz.social": None,
"funkwhale.desmu.fr": None,
"listen.knsm.cc": None,
"funkwhale.gegeweb.eu": None,
},
'automatic_fetch_new_instances': False,
'enable_server_transcoding': False,
'external_transcoder_http_proxy_path': "",
'share_to_fediverse_token': "",
'share_to_fediverse_instance': "",
'prefetch_playlist': True,
'enable_persistent_cache': False,
'mpv_volume': 100,
'show_like_button': True,
'show_artist_name_in_albums': False,
'termux_handle_track_switch_by_volume': False
}
def set_defaults(corrected_config=None):
conf_rewrite = default_conf
if corrected_config:
conf_rewrite = corrected_config
descriptor = os.open(
path=conf_file,
flags=(
os.O_WRONLY # access mode: write only
| os.O_CREAT # create if not exists
| os.O_TRUNC # truncate the file to zero
),
mode=0o600)
with open(descriptor, 'wt') as f:
f.write(json.dumps(conf_rewrite, indent=4))
def check_config():
'''Check config and remove or add keys if not found in default config'''
with open(conf_file, 'rt') as f:
loaded_config = json.loads(f.read())
correct_conf = {}
for k, v in loaded_config.items():
if k in default_conf.keys():
correct_conf[k] = v
else:
logger.warning(f'{k} from config will be removed. Value: {v}')
for k, v in default_conf.items():
if k not in correct_conf.keys():
correct_conf[k] = v
logger.warning(f'{k} added in config. Value: {v}')
set_defaults(correct_conf)
return correct_conf
if not exists(conf_file):
set_defaults()
else:
check_config()
def get_config(key):
'''return value from config by key'''
with open(conf_file, 'rt') as f:
loaded_config = json.loads(f.read())
return loaded_config.get(key)
def set_config(key, value):
'''set value new value config by key'''
read_conf = check_config()
with open(conf_file, 'wt') as f:
read_conf[key] = value
f.write(json.dumps(read_conf, indent=4))

85
src/utils.py Normal file
View File

@ -0,0 +1,85 @@
import src.fw_api
import os
import sys
import shutil
from urllib.parse import unquote
def get_remote_file_name(url):
'''This function return filename by content-disposition header'''
r = src.fw_api.current_instance.s.head(url)
content_dispos = r.headers.get('content-disposition')
if content_dispos.startswith('attachment; filename*=UTF-8\'\''):
return unquote(content_dispos.split('attachment; filename*=UTF-8\'\'')[-1])
def download_track(url, name=None):
if not url.startswith('http'):
copy_from_cache(url)
return
url = url.split('?')[0] # Stripe all params from url
r = src.fw_api.current_instance.s.get(url, stream=True)
if not name:
name = get_remote_file_name(url)
if not name:
name = url.split(r'/')[-1]
with open(name.replace('/', '_'), 'wb') as f:
print(f"Downloading {name}")
total_length = r.headers.get('content-length')
if total_length is None: # no content length header
f.write(r.content)
else:
dl = 0
total_length = int(total_length)
for data in r.iter_content(chunk_size=4096):
dl += len(data)
f.write(data)
done = int(50 * dl / total_length)
# base progress bar
sys.stdout.write("\r[%s%s]" % ('=' * done, ' ' * (50-done)))
sys.stdout.flush()
return name
def copy_from_cache(url_file):
uuid = url_file.split(r'/')[-1]
original_name = get_remote_file_name(f'https://{src.fw_api.current_instance.instance}/api/v1/listen/{uuid}')
shutil.copyfile(url_file, original_name)
def print_there(x, y, text):
'''Print at position x, y caption in terminal (Linux only)'''
sys.stdout.write("\x1b7\x1b[%d;%df%s\x1b8" % (x, y, text))
sys.stdout.flush()
def track_info_output(track):
output = []
for k, v in track.items():
if k not in ('cover', 'uploads', 'listen_url', 'mbid', 'id', 'is_playable') and v is not None and v != []:
if isinstance(v, dict):
for i in ('title', 'name', 'fid'):
if v.get(i):
val_override = v.get(i)
output.append(f'{k}: {val_override}')
else:
output.append(f'{k}: {v}')
output.append('Related Libraries:')
try:
assigned_libs = src.fw_api.assigned_libraries_on_track(track['id'])['results']
for i in assigned_libs:
for prop in ('fid', 'name', 'description', 'creation_date'):
output.append(i.get(prop))
except:
output.append('Failed get related')
output = '\n'.join(output)
os.system(f'less <<EOF\n{output}\nEOF')
def indices(lst, item):
'''https://ao.phreedom.club/questions/5419204/index-of-duplicates-items-in-a-python-list'''
return [i for i, x in enumerate(lst) if x == item]