funkwlmpv/src/mpv_control.py

181 lines
6.3 KiB
Python

import src.fw_api
from src.utils import download_track, print_there
from src.settings import get_config
from loguru import logger
from pyfzf.pyfzf import FzfPrompt
from shutil import get_terminal_size
import mpv
import time
import sys
fzf = FzfPrompt()
player = mpv.MPV(cache=True, demuxer_max_bytes=25*1024*1024)
player.ytdl = False # Prevent attempts load track with yt-dlp
player.volume = get_config('mpv_volume')
player.prefetch_playlist = get_config('prefetch_playlist')
show_like_button = get_config('show_like_button')
track_activity_history = get_config('track_activity_history')
class player_fw_storage:
storage = {}
@logger.catch
def track_url_to_uuid(listen_url=None):
'''Attempt get uuid from track listen url or current playing url'''
if listen_url:
uuid = listen_url.split(r'/')[-2]
else:
uuid = player.stream_open_filename.split(r'/')[-2]
return uuid
if track_activity_history:
@player.property_observer('time-pos')
@logger.catch
def time_observer(_name, value):
# Here, _value is either None if nothing is playing or a float containing
# fractional seconds since the beginning of the file.
if value and player.http_header_fields != [] and player.pause is False:
if value >= 30.0 and value <= 30.1:
# detect 30 secs for reporting listen activity
track = player_fw_storage.storage.get(track_url_to_uuid())
track_id = track.get('id')
if track_id:
src.fw_api.record_track_in_history(track_id)
else:
logger.error("Can't write track to history: No track id")
time.sleep(1)
@player.property_observer('filtered-metadata')
@logger.catch
def osd_observer(_name, value):
'''Sumulate osd playing message in console'''
if value:
osd_message = []
for i in value.items():
if i[0] in ('Artist', 'Album', 'Title'):
osd_message.append(i[1])
print_there(0, 0, '\r ')
osd_string = ''.join(osd_message)
term_len = get_terminal_size().columns
print_there(0, 0, '\r'+osd_string[:term_len])
@player.property_observer('stream-open-filename')
@logger.catch
def waiting_load_observer(_name, value):
'''just show loading state'''
if value and player.core_idle:
print_there(0, 0, '\rLoading track...')
@player.property_observer('percent-pos')
@logger.catch
def universal_observer(_name, value):
if value:
percent = int(value)
if player.audio_bitrate:
kbps = int(player.audio_bitrate/1024)
else:
kbps = '?'
if player.file_size:
track_size = round(player.file_size/1024/1024, 1)
else:
track_size = '?'
if player.cache_speed:
speed_load = player.cache_speed
if speed_load >= 3*1024*1024:
cache_speed = '| <<<'
elif speed_load >= 1*1024*1024:
cache_speed = '| <<*'
else:
cache_speed = '| <=>'
else:
cache_speed = ''
if player.playlist_count > -1:
player_pos = f'{player.playlist_pos_1}/{player.playlist_count}'
else:
player_pos = '-/-'
print_there(2, 2, f'\r'+' '*get_terminal_size().columns)
print_there(2, 2, f'\r{player_pos} | {kbps} kbps | {percent}% | {track_size}MB {cache_speed}')
time.sleep(1)
def set_http_header(headers=[]):
player.http_header_fields = headers
@logger.catch
def player_menu(header='', storage={}):
player_fw_storage.storage.update(storage)
player.volume = get_config("mpv_volume")
while True:
try:
player_items_menu = ['Next', 'Prev', 'Pause',
'Download', 'Info']
if player.pause:
player_items_menu[2] = 'Play'
else:
player_items_menu[2] = 'Pause'
if show_like_button:
player_items_menu.append('Like')
player_items_menu.extend(['Hide artist', 'Exit'])
select = fzf.prompt(player_items_menu, f"--header=\'{header}\'")[0]
if select == 'Next':
try:
player.playlist_next()
except:
print('No more next tracks')
elif select == 'Prev':
player.playlist_prev()
elif select in ('Pause', 'Play'):
if player.pause:
player.pause = False
else:
player.pause = True
elif select == 'Download':
name_downloaded = download_track(player.stream_open_filename)
elif select == 'Info':
track = player_fw_storage.storage.get(track_url_to_uuid())
for i in track.keys():
if i in ('album', 'artist'):
name_aa = track.get(i).get('name')
if not name_aa:
name_aa = track.get(i).get('title')
print(i + ': ' + name_aa)
key = track.get(i)
if key and isinstance(key, str):
print(i + ': ' + key)
print('Direct link: ' + player.stream_open_filename)
input()
elif select == 'Like':
src.fw_api.favorite_track(
player_fw_storage.storage.get(track_url_to_uuid())['id'])
elif select == 'Hide artist':
track = player_fw_storage.storage.get(track_url_to_uuid())
src.fw_api.hide_content(
{'target': {'id': track.get('artist').get('id'), 'type': 'artist'}})
elif select == 'Exit':
player.playlist_clear()
player.stop()
player_fw_storage.storage = {}
break
except KeyboardInterrupt:
break
def play_track(track, multi=False):
listen_url = src.fw_api.get_audio_file(track['listen_url'], True)
player_fw_storage.storage[track_url_to_uuid(listen_url)] = track
if multi:
player.loadfile(listen_url, 'append-play')
else:
player.loadfile(listen_url, 'append-play')
track_name = track.get('title')
player_menu(f"{track_name} playing...", player_fw_storage.storage)