streamrip/rip/cli.py

544 lines
17 KiB
Python
Raw Normal View History

2021-07-30 20:33:26 -04:00
import concurrent.futures
import logging
2021-07-30 20:33:26 -04:00
import os
import threading
2021-07-29 01:29:30 -04:00
2021-07-30 20:33:26 -04:00
import requests
from cleo.application import Application as BaseApplication
from cleo.commands.command import Command
from cleo.formatters.style import Style
from click import launch
2021-07-29 01:29:30 -04:00
from streamrip import __version__
2021-05-13 21:55:03 -04:00
2021-07-30 20:33:26 -04:00
from .config import Config
from .core import RipCore
2021-05-13 20:22:21 -04:00
logging.basicConfig(level="WARNING")
2021-05-12 18:19:51 -04:00
logger = logging.getLogger("streamrip")
2021-03-22 12:21:27 -04:00
2021-07-30 20:33:26 -04:00
outdated = False
newest_version = __version__
2021-07-30 20:33:26 -04:00
2021-03-22 12:21:27 -04:00
2021-07-30 20:33:26 -04:00
class DownloadCommand(Command):
"""
Download items using urls.
url
{--f|file=None : Path to a text file containing urls}
{--c|codec=None : Convert the downloaded files to <cmd>ALAC</cmd>, <cmd>FLAC</cmd>, <cmd>MP3</cmd>, <cmd>AAC</cmd>, or <cmd>OGG</cmd>}
{--m|max-quality=None : The maximum quality to download. Can be <cmd>0</cmd>, <cmd>1</cmd>, <cmd>2</cmd>, <cmd>3 </cmd>or <cmd>4</cmd>}
{--i|ignore-db : Download items even if they have been logged in the database.}
{urls?* : One or more Qobuz, Tidal, Deezer, or SoundCloud urls}
2021-03-30 21:36:51 -04:00
"""
2021-07-30 20:33:26 -04:00
help = (
"\nDownload <title>Dreams</title> by <title>Fleetwood Mac</title>:\n"
"$ <cmd>rip url https://www.deezer.com/en/track/63480987</cmd>\n\n"
"Batch download urls from a text file named <path>urls.txt</path>:\n"
"$ <cmd>rip url --file urls.txt</cmd>\n\n"
"For more information on Quality IDs, see\n"
"<url>https://github.com/nathom/streamrip/wiki/Quality-IDs</url>\n"
)
def handle(self):
global outdated
2021-08-13 19:19:04 -04:00
global newest_version
2021-07-30 20:33:26 -04:00
# Use a thread so that it doesn't slow down startup
update_check = threading.Thread(target=is_outdated, daemon=True)
update_check.start()
2021-07-30 20:33:26 -04:00
config = Config()
path, codec, quality, no_db = clean_options(
self.option("file"),
self.option("codec"),
self.option("max-quality"),
self.option("ignore-db"),
)
2021-07-30 20:33:26 -04:00
if no_db:
config.session["database"]["enabled"] = False
2021-07-30 20:33:26 -04:00
if quality is not None:
for source in ("qobuz", "tidal", "deezer"):
config.session[source]["quality"] = quality
2021-07-30 20:33:26 -04:00
core = RipCore(config)
2021-03-22 12:21:27 -04:00
2021-07-30 20:33:26 -04:00
urls = self.argument("urls")
2021-07-30 20:33:26 -04:00
if path is not None:
if os.path.isfile(path):
core.handle_txt(path)
else:
self.line(
f"<error>File <comment>{path}</comment> does not exist.</error>"
)
return 1
if urls:
core.handle_urls(";".join(urls))
if len(core) > 0:
core.download()
elif not urls and path is None:
self.line("<error>Must pass arguments. See </><cmd>rip url -h</cmd>.")
update_check.join()
if outdated:
import subprocess
import re
self.line(
f"<info>Updating streamrip to <title>v{newest_version}</title>...</info>\n"
)
# update in background
update_p = subprocess.Popen(
["pip3", "install", "streamrip", "--upgrade"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
2021-07-29 15:36:13 -04:00
md_header = re.compile(r"#\s+(.+)")
2021-08-13 19:19:04 -04:00
bullet_point = re.compile(r"-\s+(.+)")
code = re.compile(r"`([^`]+)`")
issue_reference = re.compile(r"(#\d+)")
release_notes = requests.get(
"https://api.github.com/repos/nathom/streamrip/releases/latest"
).json()["body"]
release_notes = md_header.sub(r"<header>\1</header>", release_notes)
release_notes = bullet_point.sub(r"<options=bold>•</> \1", release_notes)
release_notes = code.sub(r"<cmd>\1</cmd>", release_notes)
release_notes = issue_reference.sub(r"<options=bold>\1</>", release_notes)
self.line(release_notes)
update_p.wait()
2021-07-30 20:33:26 -04:00
return 0
2021-07-29 15:36:13 -04:00
2021-07-30 20:33:26 -04:00
class SearchCommand(Command):
"""
Search for and download items in interactive mode.
2021-07-30 20:33:26 -04:00
search
{query : The name to search for}
{--s|source=qobuz : Qobuz, Tidal, Soundcloud, Deezer, or Deezloader}
{--t|type=album : Album, Playlist, Track, or Artist}
"""
2021-04-12 20:20:48 -04:00
2021-07-30 20:33:26 -04:00
help = (
"\nSearch for <title>Rumours</title> by <title>Fleetwood Mac</title>\n"
"$ <cmd>rip search 'rumours fleetwood mac'</cmd>\n\n"
"Search for <title>444</title> by <title>Jay-Z</title> on TIDAL\n"
"$ <cmd>rip search --source tidal '444'</cmd>\n\n"
"Search for <title>Bob Dylan</title> on Deezer\n"
"$ <cmd>rip search --type artist --source deezer 'bob dylan'</cmd>\n"
)
2021-04-08 15:41:45 -04:00
2021-07-30 20:33:26 -04:00
def handle(self):
query = self.argument("query")
source, type = clean_options(self.option("source"), self.option("type"))
2021-07-30 20:33:26 -04:00
config = Config()
core = RipCore(config)
2021-07-30 20:33:26 -04:00
if core.interactive_search(query, source, type):
core.download()
else:
self.line("<error>No items chosen, exiting.</error>")
2021-03-30 21:36:51 -04:00
2021-07-30 20:33:26 -04:00
class DiscoverCommand(Command):
"""
Browse and download items in interactive mode (Qobuz only).
2021-03-22 12:21:27 -04:00
2021-07-30 20:33:26 -04:00
discover
{--s|scrape : Download all of the items in the list}
{--m|max-items=50 : The number of items to fetch}
{list=ideal-discography : The list to fetch}
"""
2021-07-30 20:33:26 -04:00
help = (
"\nBrowse the Qobuz ideal-discography list\n"
"$ <cmd>rip discover</cmd>\n\n"
"Browse the best-sellers list\n"
"$ <cmd>rip discover best-sellers</cmd>\n\n"
"Available options for <info>list</info>:\n\n"
" • most-streamed\n"
" • recent-releases\n"
" • best-sellers\n"
" • press-awards\n"
" • ideal-discography\n"
" • editor-picks\n"
" • most-featured\n"
" • qobuzissims\n"
" • new-releases\n"
" • new-releases-full\n"
" • harmonia-mundi\n"
" • universal-classic\n"
" • universal-jazz\n"
" • universal-jeunesse\n"
" • universal-chanson\n"
)
def handle(self):
from streamrip.constants import QOBUZ_FEATURED_KEYS
chosen_list = self.argument("list")
scrape = self.option("scrape")
max_items = self.option("max-items")
if chosen_list not in QOBUZ_FEATURED_KEYS:
self.line(f'<error>Error: list "{chosen_list}" not available</error>')
self.line(self.help)
return 1
config = Config()
core = RipCore(config)
if scrape:
core.scrape(chosen_list, max_items)
core.download()
return 0
if core.interactive_search(
chosen_list, "qobuz", "featured", limit=int(max_items)
):
core.download()
else:
2021-07-30 20:33:26 -04:00
self.line("<error>No items chosen, exiting.</error>")
2021-03-26 15:26:50 -04:00
2021-07-30 20:33:26 -04:00
class LastfmCommand(Command):
"""
Search for tracks from a list.fm playlist and download them.
2021-03-24 13:39:37 -04:00
2021-07-30 20:33:26 -04:00
lastfm
{--s|source=qobuz : The source to search for items on}
{urls* : Last.fm playlist urls}
"""
2021-03-24 13:39:37 -04:00
2021-07-30 20:33:26 -04:00
help = (
"You can use this command to download Spotify, Apple Music, and YouTube "
"playlists.\nTo get started, create an account at "
"<url>https://www.last.fm</url>. Once you have\nreached the home page, "
"go to <path>Profile Icon</path> ⟶ <path>View profile</path> ⟶ "
"<path>Playlists</path> ⟶ <path>IMPORT</path>\nand paste your url.\n\n"
"Download the <info>young & free</info> Apple Music playlist (already imported)\n"
"$ <cmd>rip lastfm https://www.last.fm/user/nathan3895/playlists/12089888</cmd>\n"
)
def handle(self):
source = self.option("source")
urls = self.argument("urls")
config = Config()
core = RipCore(config)
config.session["lastfm"]["source"] = source
core.handle_lastfm_urls(";".join(urls))
core.download()
2021-03-24 13:39:37 -04:00
2021-07-30 20:33:26 -04:00
class ConfigCommand(Command):
"""
Manage the configuration file.
config
{--o|open : Open the config file in the default application}
{--O|open-vim : Open the config file in (neo)vim}
{--d|directory : Open the directory that the config file is located in}
{--p|path : Show the config file's path}
{--qobuz : Set the credentials for Qobuz}
{--tidal : Log into Tidal}
{--deezer : Set the Deezer ARL}
{--reset : Reset the config file}
{--update : Reset the config file, keeping the credentials}
2021-03-22 12:21:27 -04:00
"""
2021-07-30 20:33:26 -04:00
def handle(self):
import shutil
2021-03-22 12:21:27 -04:00
2021-07-30 20:33:26 -04:00
from .constants import CONFIG_DIR, CONFIG_PATH
2021-03-22 12:21:27 -04:00
2021-07-30 20:33:26 -04:00
config = Config()
2021-07-30 20:33:26 -04:00
if self.option("path"):
self.line(f"<info>{CONFIG_PATH}</info>")
2021-07-30 20:33:26 -04:00
if self.option("open"):
self.line(f"Opening <url>{CONFIG_PATH}</url> in default application")
launch(CONFIG_PATH)
2021-07-30 20:33:26 -04:00
if self.option("reset"):
config.reset()
2021-07-30 20:33:26 -04:00
if self.option("update"):
config.update()
2021-07-30 20:33:26 -04:00
if self.option("open-vim"):
if shutil.which("nvim") is not None:
os.system(f"nvim '{CONFIG_PATH}'")
else:
os.system(f"vim '{CONFIG_PATH}'")
2021-07-30 20:33:26 -04:00
if self.option("directory"):
self.line(f"Opening <url>{CONFIG_DIR}</url>")
launch(CONFIG_DIR)
2021-07-30 20:33:26 -04:00
if self.option("tidal"):
from streamrip.clients import TidalClient
2021-07-30 20:33:26 -04:00
client = TidalClient()
client.login()
config.file["tidal"].update(client.get_tokens())
config.save()
self.line("<info>Credentials saved to config.</info>")
2021-07-30 20:33:26 -04:00
if self.option("deezer"):
2021-07-31 13:05:16 -04:00
from streamrip.clients import DeezerClient
from streamrip.exceptions import AuthenticationError
2021-07-30 20:33:26 -04:00
self.line(
"Follow the instructions at <url>https://github.com"
"/nathom/streamrip/wiki/Finding-your-Deezer-ARL-Cookie</url>"
)
2021-07-31 13:05:16 -04:00
given_arl = self.ask("Paste your ARL here: ").strip()
self.line("<comment>Validating arl...</comment>")
2021-08-01 20:31:21 -04:00
2021-07-31 13:05:16 -04:00
try:
DeezerClient().login(arl=given_arl)
config.file["deezer"]["arl"] = given_arl
config.save()
self.line("<b>Sucessfully logged in!</b>")
2021-08-01 20:31:21 -04:00
2021-07-31 13:05:16 -04:00
except AuthenticationError:
self.line("<error>Could not log in. Double check your ARL</error>")
2021-03-24 13:39:37 -04:00
2021-08-01 20:31:21 -04:00
if self.option("qobuz"):
import hashlib
import getpass
config.file["qobuz"]["email"] = self.ask("Qobuz email:")
config.file["qobuz"]["password"] = hashlib.md5(
getpass.getpass("Qobuz password (won't show on screen): ").encode()
).hexdigest()
config.save()
2021-07-30 20:33:26 -04:00
class ConvertCommand(Command):
"""
A standalone tool that converts audio files to other codecs en masse.
convert
{--s|sampling-rate=192000 : Downsample the tracks to this rate, in Hz.}
{--b|bit-depth=24 : Downsample the tracks to this bit depth.}
{--k|keep-source : Keep the original file after conversion.}
{codec : <cmd>FLAC</cmd>, <cmd>ALAC</cmd>, <cmd>OPUS</cmd>, <cmd>MP3</cmd>, or <cmd>AAC</cmd>.}
{path : The path to the audio file or a directory that contains audio files.}
"""
2021-07-30 20:33:26 -04:00
help = (
"\nConvert all of the audio files in <path>/my/music</path> to MP3s\n"
"$ <cmd>rip convert MP3 /my/music</cmd>\n\n"
"Downsample the audio to 48kHz after converting them to ALAC\n"
"$ <cmd>rip convert --sampling-rate 48000 ALAC /my/music\n"
)
def handle(self):
from streamrip import converter
CODEC_MAP = {
"FLAC": converter.FLAC,
"ALAC": converter.ALAC,
"OPUS": converter.OPUS,
"MP3": converter.LAME,
"AAC": converter.AAC,
}
codec = self.argument("codec")
path = self.argument("path")
ConverterCls = CODEC_MAP.get(codec.upper())
if ConverterCls is None:
self.line(
f'<error>Invalid codec "{codec}". See </error><cmd>rip convert'
" -h</cmd>."
)
return 1
2021-07-30 20:33:26 -04:00
sampling_rate, bit_depth, keep_source = clean_options(
self.option("sampling-rate"),
self.option("bit-depth"),
self.option("keep-source"),
)
2021-04-09 19:20:03 -04:00
2021-07-30 20:33:26 -04:00
converter_args = {
"sampling_rate": sampling_rate,
"bit_depth": bit_depth,
"remove_source": not keep_source,
}
if os.path.isdir(path):
import itertools
from pathlib import Path
from tqdm import tqdm
dirname = path
audio_extensions = ("flac", "m4a", "aac", "opus", "mp3", "ogg")
path_obj = Path(dirname)
audio_files = (
path.as_posix()
for path in itertools.chain.from_iterable(
(path_obj.rglob(f"*.{ext}") for ext in audio_extensions)
)
)
2021-04-09 19:20:03 -04:00
2021-07-30 20:33:26 -04:00
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for file in audio_files:
futures.append(
executor.submit(
ConverterCls(
filename=os.path.join(dirname, file), **converter_args
).convert
)
)
from streamrip.utils import TQDM_BAR_FORMAT
for future in tqdm(
concurrent.futures.as_completed(futures),
total=len(futures),
desc="Converting",
bar_format=TQDM_BAR_FORMAT,
):
# Only show loading bar
future.result()
elif os.path.isfile(path):
ConverterCls(filename=path, **converter_args).convert()
else:
self.line(
f'<error>Path <path>"{path}"</path> does not exist.</error>', fg="red"
)
2021-04-09 19:20:03 -04:00
2021-07-30 20:33:26 -04:00
class RepairCommand(Command):
"""
Retry failed downloads.
2021-04-09 19:20:03 -04:00
2021-07-30 20:33:26 -04:00
repair
{--m|max-items=None : The maximum number of tracks to download}
2021-04-09 19:20:03 -04:00
"""
2021-07-30 20:33:26 -04:00
help = "\nRetry up to 20 failed downloads\n$ <cmd>rip repair --max-items 20</cmd>\n"
2021-07-29 00:38:33 -04:00
2021-07-30 20:33:26 -04:00
def handle(self):
max_items = clean_options(self.option("repair"))
config = Config()
RipCore(config).repair(max_items=max_items)
2021-03-24 13:39:37 -04:00
2021-07-30 20:33:26 -04:00
STRING_TO_PRIMITIVE = {
"None": None,
"True": True,
"False": False,
}
2021-07-30 20:33:26 -04:00
class Application(BaseApplication):
def __init__(self):
super().__init__("rip", __version__)
2021-07-30 20:33:26 -04:00
def _run(self, io):
if io.is_debug():
2021-08-01 20:31:21 -04:00
from .constants import CONFIG_DIR
2021-07-30 20:33:26 -04:00
logger.setLevel(logging.DEBUG)
2021-08-01 20:31:21 -04:00
fh = logging.FileHandler(os.path.join(CONFIG_DIR, "streamrip.log"))
fh.setLevel(logging.DEBUG)
logger.addHandler(fh)
2021-07-30 20:33:26 -04:00
super()._run(io)
def create_io(self, input=None, output=None, error_output=None):
io = super().create_io(input, output, error_output)
# Set our own CLI styles
formatter = io.output.formatter
formatter.set_style("url", Style("blue", options=["underline"]))
formatter.set_style("path", Style("green", options=["bold"]))
formatter.set_style("cmd", Style("magenta"))
formatter.set_style("title", Style("yellow", options=["bold"]))
formatter.set_style("header", Style("yellow", options=["bold", "underline"]))
2021-07-30 20:33:26 -04:00
io.output.set_formatter(formatter)
io.error_output.set_formatter(formatter)
self._io = io
return io
@property
def _default_definition(self):
default_globals = super()._default_definition
# as of 1.0.0a3, the descriptions don't wrap properly
# so I'm truncating the description for help as a hack
default_globals._options["help"]._description = (
default_globals._options["help"]._description.split(".")[0] + "."
)
2021-07-30 20:33:26 -04:00
return default_globals
2021-07-30 21:27:12 -04:00
def render_error(self, error, io):
super().render_error(error, io)
io.write_line(
"\n<error>If this was unexpected, please open a <path>Bug Report</path> at </error>"
"<url>https://github.com/nathom/streamrip/issues/new/choose</url>"
)
2021-07-30 20:33:26 -04:00
def clean_options(*opts):
for opt in opts:
if isinstance(opt, str):
if opt.startswith("="):
opt = opt[1:]
opt = opt.strip()
if opt.isdigit():
opt = int(opt)
else:
opt = STRING_TO_PRIMITIVE.get(opt, opt)
yield opt
2021-07-30 20:33:26 -04:00
def is_outdated():
global outdated
global newest_version
2021-07-30 20:33:26 -04:00
r = requests.get("https://pypi.org/pypi/streamrip/json").json()
newest_version = r["info"]["version"]
outdated = newest_version != __version__
2021-03-22 16:40:29 -04:00
2021-03-22 12:21:27 -04:00
def main():
2021-07-30 20:33:26 -04:00
application = Application()
application.add(DownloadCommand())
application.add(SearchCommand())
application.add(DiscoverCommand())
application.add(LastfmCommand())
application.add(ConfigCommand())
application.add(ConvertCommand())
application.add(RepairCommand())
application.run()
if __name__ == "__main__":
main()