import concurrent.futures import logging import os import threading import requests from cleo.application import Application as BaseApplication from cleo.commands.command import Command from import Style from click import launch from streamrip import __version__ from .config import Config from .core import RipCore logging.basicConfig(level="WARNING") logger = logging.getLogger("streamrip") outdated = False class DownloadCommand(Command): """ Download items using urls. url {--f|file=None : Path to a text file containing urls} {--c|codec=None : Convert the downloaded files to ALAC, FLAC, MP3, AAC, or OGG} {--m|max-quality=None : The maximum quality to download. Can be 0, 1, 2, 3 or 4} {--i|ignore-db : Download items even if they have been logged in the database.} {urls?* : One or more Qobuz, Tidal, Deezer, or SoundCloud urls} """ help = ( "\nDownload Dreams by Fleetwood Mac:\n" "$ rip url\n\n" "Batch download urls from a text file named urls.txt:\n" "$ rip url --file urls.txt\n\n" "For more information on Quality IDs, see\n" "\n" ) def handle(self): global outdated # Use a thread so that it doesn't slow down startup update_check = threading.Thread(target=is_outdated, daemon=True) config = Config() path, codec, quality, no_db = clean_options( self.option("file"), self.option("codec"), self.option("max-quality"), self.option("ignore-db"), ) if no_db: config.session["database"]["enabled"] = False if quality is not None: for source in ("qobuz", "tidal", "deezer"): config.session[source]["quality"] = quality core = RipCore(config) urls = self.argument("urls") if path is not None: if os.path.isfile(path): core.handle_txt(path) else: self.line( f"File {path} does not exist." ) return 1 if urls: core.handle_urls(";".join(urls)) if len(core) > 0: elif not urls and path is None: self.line("Must pass arguments. See rip url -h.") try: update_check.join() if outdated: self.line( "A new version of streamrip is available! Run " "pip3 install streamrip --upgrade to update" ) except RuntimeError as e: logger.debug("Update check error: %s", e) pass return 0 class SearchCommand(Command): """ Search for and download items in interactive mode. search {query : The name to search for} {--s|source=qobuz : Qobuz, Tidal, Soundcloud, Deezer, or Deezloader} {--t|type=album : Album, Playlist, Track, or Artist} """ help = ( "\nSearch for Rumours by Fleetwood Mac\n" "$ rip search 'rumours fleetwood mac'\n\n" "Search for 444 by Jay-Z on TIDAL\n" "$ rip search --source tidal '444'\n\n" "Search for Bob Dylan on Deezer\n" "$ rip search --type artist --source deezer 'bob dylan'\n" ) def handle(self): query = self.argument("query") source, type = clean_options(self.option("source"), self.option("type")) config = Config() core = RipCore(config) if core.interactive_search(query, source, type): else: self.line("No items chosen, exiting.") class DiscoverCommand(Command): """ Browse and download items in interactive mode (Qobuz only). discover {--s|scrape : Download all of the items in the list} {--m|max-items=50 : The number of items to fetch} {list=ideal-discography : The list to fetch} """ help = ( "\nBrowse the Qobuz ideal-discography list\n" "$ rip discover\n\n" "Browse the best-sellers list\n" "$ rip discover best-sellers\n\n" "Available options for list:\n\n" " • most-streamed\n" " • recent-releases\n" " • best-sellers\n" " • press-awards\n" " • ideal-discography\n" " • editor-picks\n" " • most-featured\n" " • qobuzissims\n" " • new-releases\n" " • new-releases-full\n" " • harmonia-mundi\n" " • universal-classic\n" " • universal-jazz\n" " • universal-jeunesse\n" " • universal-chanson\n" ) def handle(self): from streamrip.constants import QOBUZ_FEATURED_KEYS chosen_list = self.argument("list") scrape = self.option("scrape") max_items = self.option("max-items") if chosen_list not in QOBUZ_FEATURED_KEYS: self.line(f'Error: list "{chosen_list}" not available') self.line( return 1 config = Config() core = RipCore(config) if scrape: core.scrape(chosen_list, max_items) return 0 if core.interactive_search( chosen_list, "qobuz", "featured", limit=int(max_items) ): else: self.line("No items chosen, exiting.") class LastfmCommand(Command): """ Search for tracks from a playlist and download them. lastfm {--s|source=qobuz : The source to search for items on} {urls* : playlist urls} """ help = ( "You can use this command to download Spotify, Apple Music, and YouTube " "playlists.\nTo get started, create an account at " " Once you have\nreached the home page, " "go to Profile IconView profile ⟶ " "PlaylistsIMPORT\nand paste your url.\n\n" "Download the young & free Apple Music playlist (already imported)\n" "$ rip lastfm\n" ) def handle(self): source = self.option("source") urls = self.argument("urls") config = Config() core = RipCore(config) config.session["lastfm"]["source"] = source core.handle_lastfm_urls(";".join(urls)) class ConfigCommand(Command): """ Manage the configuration file. config {--o|open : Open the config file in the default application} {--O|open-vim : Open the config file in (neo)vim} {--d|directory : Open the directory that the config file is located in} {--p|path : Show the config file's path} {--qobuz : Set the credentials for Qobuz} {--tidal : Log into Tidal} {--deezer : Set the Deezer ARL} {--reset : Reset the config file} {--update : Reset the config file, keeping the credentials} """ def handle(self): import shutil from .constants import CONFIG_DIR, CONFIG_PATH config = Config() if self.option("path"): self.line(f"{CONFIG_PATH}") if self.option("open"): self.line(f"Opening {CONFIG_PATH} in default application") launch(CONFIG_PATH) if self.option("reset"): config.reset() if self.option("update"): config.update() if self.option("open-vim"): if shutil.which("nvim") is not None: os.system(f"nvim '{CONFIG_PATH}'") else: os.system(f"vim '{CONFIG_PATH}'") if self.option("directory"): self.line(f"Opening {CONFIG_DIR}") launch(CONFIG_DIR) if self.option("tidal"): from streamrip.clients import TidalClient client = TidalClient() client.login() config.file["tidal"].update(client.get_tokens()) self.line("Credentials saved to config.") if self.option("deezer"): self.line( "Follow the instructions at" "/nathom/streamrip/wiki/Finding-your-Deezer-ARL-Cookie" ) config.file["deezer"]["arl"] = self.ask("Paste your ARL here: ") class ConvertCommand(Command): """ A standalone tool that converts audio files to other codecs en masse. convert {--s|sampling-rate=192000 : Downsample the tracks to this rate, in Hz.} {--b|bit-depth=24 : Downsample the tracks to this bit depth.} {--k|keep-source : Keep the original file after conversion.} {codec : FLAC, ALAC, OPUS, MP3, or AAC.} {path : The path to the audio file or a directory that contains audio files.} """ help = ( "\nConvert all of the audio files in /my/music to MP3s\n" "$ rip convert MP3 /my/music\n\n" "Downsample the audio to 48kHz after converting them to ALAC\n" "$ rip convert --sampling-rate 48000 ALAC /my/music\n" ) def handle(self): from streamrip import converter CODEC_MAP = { "FLAC": converter.FLAC, "ALAC": converter.ALAC, "OPUS": converter.OPUS, "MP3": converter.LAME, "AAC": converter.AAC, } codec = self.argument("codec") path = self.argument("path") ConverterCls = CODEC_MAP.get(codec.upper()) if ConverterCls is None: self.line( f'Invalid codec "{codec}". See rip convert' " -h." ) return 1 sampling_rate, bit_depth, keep_source = clean_options( self.option("sampling-rate"), self.option("bit-depth"), self.option("keep-source"), ) converter_args = { "sampling_rate": sampling_rate, "bit_depth": bit_depth, "remove_source": not keep_source, } if os.path.isdir(path): import itertools from pathlib import Path from tqdm import tqdm dirname = path audio_extensions = ("flac", "m4a", "aac", "opus", "mp3", "ogg") path_obj = Path(dirname) audio_files = ( path.as_posix() for path in itertools.chain.from_iterable( (path_obj.rglob(f"*.{ext}") for ext in audio_extensions) ) ) with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for file in audio_files: futures.append( executor.submit( ConverterCls( filename=os.path.join(dirname, file), **converter_args ).convert ) ) from streamrip.utils import TQDM_BAR_FORMAT for future in tqdm( concurrent.futures.as_completed(futures), total=len(futures), desc="Converting", bar_format=TQDM_BAR_FORMAT, ): # Only show loading bar future.result() elif os.path.isfile(path): ConverterCls(filename=path, **converter_args).convert() else: self.line( f'Path "{path}" does not exist.', fg="red" ) class RepairCommand(Command): """ Retry failed downloads. repair {--m|max-items=None : The maximum number of tracks to download} """ help = "\nRetry up to 20 failed downloads\n$ rip repair --max-items 20\n" def handle(self): max_items = clean_options(self.option("repair")) config = Config() RipCore(config).repair(max_items=max_items) STRING_TO_PRIMITIVE = { "None": None, "True": True, "False": False, } class Application(BaseApplication): def __init__(self): super().__init__("rip", __version__) def _run(self, io): if io.is_debug(): logger.setLevel(logging.DEBUG) super()._run(io) def create_io(self, input=None, output=None, error_output=None): io = super().create_io(input, output, error_output) # Set our own CLI styles formatter = io.output.formatter formatter.set_style("url", Style("blue", options=["underline"])) formatter.set_style("path", Style("green", options=["bold"])) formatter.set_style("cmd", Style("magenta")) formatter.set_style("title", Style("yellow", options=["bold"])) io.output.set_formatter(formatter) io.error_output.set_formatter(formatter) self._io = io return io @property def _default_definition(self): default_globals = super()._default_definition # as of 1.0.0a3, the descriptions don't wrap properly # so I'm truncating the description for help as a hack default_globals._options["help"]._description = ( default_globals._options["help"]._description.split(".")[0] + "." ) return default_globals def render_error(self, error, io): super().render_error(error, io) io.write_line( "\nIf this was unexpected, please open a Bug Report at " "" ) def clean_options(*opts): for opt in opts: if isinstance(opt, str): if opt.startswith("="): opt = opt[1:] opt = opt.strip() if opt.isdigit(): opt = int(opt) else: opt = STRING_TO_PRIMITIVE.get(opt, opt) yield opt def is_outdated(): global outdated r = requests.get("").json() outdated = r["info"]["version"] != __version__ def main(): application = Application() application.add(DownloadCommand()) application.add(SearchCommand()) application.add(DiscoverCommand()) application.add(LastfmCommand()) application.add(ConfigCommand()) application.add(ConvertCommand()) application.add(RepairCommand()) if __name__ == "__main__": main()