Add repair command #98

Signed-off-by: nathom <nathanthomas707@gmail.com>
This commit is contained in:
nathom 2021-07-06 14:02:22 -07:00
parent ec5afef1b3
commit 715ac496f1
11 changed files with 417 additions and 267 deletions

View file

@ -1,6 +1,7 @@
"""The streamrip command line interface."""
import click
import logging
from streamrip import __version__
logging.basicConfig(level="WARNING")
logger = logging.getLogger("streamrip")
@ -21,10 +22,10 @@ logger = logging.getLogger("streamrip")
metavar="INT",
help="0: < 320kbps, 1: 320 kbps, 2: 16 bit/44.1 kHz, 3: 24 bit/<=96 kHz, 4: 24 bit/<=192 kHz",
)
@click.option("-t", "--text", metavar="PATH")
@click.option("-nd", "--no-db", is_flag=True)
@click.option("--debug", is_flag=True)
@click.version_option(prog_name="streamrip")
@click.option("-t", "--text", metavar="PATH", help="Download urls from a text file.")
@click.option("-nd", "--no-db", is_flag=True, help="Ignore the database.")
@click.option("--debug", is_flag=True, help="Show debugging logs.")
@click.version_option(prog_name="rip", version=__version__)
@click.pass_context
def cli(ctx, **kwargs):
"""Streamrip: The all-in-one Qobuz, Tidal, SoundCloud, and Deezer music downloader.
@ -42,9 +43,8 @@ def cli(ctx, **kwargs):
import requests
from streamrip import __version__
from .config import Config
from streamrip.constants import CONFIG_DIR
from .constants import CONFIG_DIR
from .core import MusicDL
logging.basicConfig(level="WARNING")
@ -60,7 +60,14 @@ def cli(ctx, **kwargs):
logger.setLevel("DEBUG")
logger.debug("Starting debug log")
if ctx.invoked_subcommand not in {None, "lastfm", "search", "discover", "config"}:
if ctx.invoked_subcommand not in {
None,
"lastfm",
"search",
"discover",
"config",
"repair",
}:
return
config = Config()
@ -284,7 +291,7 @@ def lastfm(ctx, source, url):
def config(ctx, **kwargs):
"""Manage the streamrip configuration file."""
from streamrip.clients import TidalClient
from streamrip.constants import CONFIG_PATH
from .constants import CONFIG_PATH
from hashlib import md5
from getpass import getpass
import shutil
@ -412,6 +419,15 @@ def convert(ctx, **kwargs):
click.secho(f"File {kwargs['path']} does not exist.", fg="red")
@cli.command()
@click.option(
"-n", "--num-items", help="The number of items to atttempt downloads for."
)
@click.pass_context
def repair(ctx, **kwargs):
core.repair()
def none_chosen():
"""Print message if nothing was chosen."""
click.secho("No items chosen, exiting.", fg="bright_red")

View file

@ -10,7 +10,7 @@ from typing import Any, Dict
import click
import tomlkit
from streamrip.constants import CONFIG_DIR, CONFIG_PATH, DOWNLOADS_DIR
from .constants import CONFIG_DIR, CONFIG_PATH, DOWNLOADS_DIR
from streamrip.exceptions import InvalidSourceError
logger = logging.getLogger("streamrip")

View file

@ -56,7 +56,13 @@ download_videos = false
video_downloads_folder = ""
# This stores a list of item IDs so that repeats are not downloaded.
[database]
[database.downloads]
enabled = true
path = ""
# If a download fails, the item ID is stored here. Then, `rip repair` can be
# called to retry the downloads
[database.failed_downloads]
enabled = true
path = ""

View file

@ -20,7 +20,6 @@ URL_REGEX = re.compile(
r"(album|artist|track|playlist|video|label))|(?:\/[-\w]+?))+\/([-\w]+)"
)
SOUNDCLOUD_URL_REGEX = re.compile(r"https://soundcloud.com/[-\w:/]+")
SOUNDCLOUD_CLIENT_ID = re.compile("a3e059563d7fd3372b49b37f00a00bcf")
LASTFM_URL_REGEX = re.compile(r"https://www.last.fm/user/\w+/playlists/\w+")
QOBUZ_INTERPRETER_URL_REGEX = re.compile(
r"https?://www\.qobuz\.com/\w\w-\w\w/interpreter/[-\w]+/[-\w]+"

View file

@ -48,6 +48,7 @@ from .constants import (
from . import db
from streamrip.exceptions import (
AuthenticationError,
PartialFailure,
MissingCredentials,
NonStreamable,
NoResultsFound,
@ -74,6 +75,8 @@ MEDIA_CLASS: Dict[str, Media] = {
"label": Label,
"video": Video,
}
DB_PATH_MAP = {"downloads": DB_PATH, "failed_downloads": FAILED_DB_PATH}
# ---------------------------------------------- #
@ -102,18 +105,28 @@ class MusicDL(list):
"soundcloud": SoundCloudClient(),
}
self.db: db.Database
db_settings = self.config.session["database"]
if db_settings["enabled"]:
path = db_settings["path"]
if path:
self.db = db.Downloads(path)
else:
self.db = db.Downloads(DB_PATH)
self.config.file["database"]["path"] = DB_PATH
self.config.save()
else:
self.db = db.Downloads(None, empty=True)
def get_db(db_type: str) -> db.Database:
db_settings = self.config.session["database"]
db_class = db.CLASS_MAP[db_type]
database = db_class(None, dummy=True)
default_db_path = DB_PATH_MAP[db_type]
if db_settings[db_type]["enabled"]:
path = db_settings[db_type]["path"]
if path:
database = db_class(path)
else:
database = db_class(default_db_path)
assert config is not None
config.file["database"][db_type]["path"] = default_db_path
config.save()
return database
self.db = get_db("downloads")
self.failed_db = get_db("failed_downloads")
def handle_urls(self, urls):
"""Download a url.
@ -217,6 +230,23 @@ class MusicDL(list):
"max_artwork_height": int(artwork["max_height"]),
}
def repair(self, max_items=float("inf")):
print(list(self.failed_db))
if self.failed_db.is_dummy:
click.secho(
"Failed downloads database must be enabled to repair!", fg="red"
)
exit(1)
for counter, (source, media_type, item_id) in enumerate(self.failed_db):
# print(f"handling item {source = } {media_type = } {item_id = }")
if counter >= max_items:
break
self.handle_item(source, media_type, item_id)
self.download()
def download(self):
"""Download all the items in self."""
try:
@ -256,10 +286,24 @@ class MusicDL(list):
try:
item.load_meta(**arguments)
except NonStreamable:
self.failed_db.add((item.client.source, item.type, item.id))
click.secho(f"{item!s} is not available, skipping.", fg="red")
continue
if item.download(**arguments) and hasattr(item, "id"):
try:
item.download(**arguments)
except NonStreamable as e:
print("caught in core")
e.print(item)
self.failed_db.add((item.client.source, item.type, item.id))
continue
except PartialFailure as e:
for failed_item in e.failed_items:
print(f"adding {failed_item} to database")
self.failed_db.add(failed_item)
continue
if hasattr(item, "id"):
self.db.add([item.id])
if isinstance(item, Track):
@ -355,7 +399,7 @@ class MusicDL(list):
)
parsed.extend(URL_REGEX.findall(url)) # Qobuz, Tidal, Dezer
soundcloud_urls = URL_REGEX.findall(url)
soundcloud_urls = SOUNDCLOUD_URL_REGEX.findall(url)
soundcloud_items = [self.clients["soundcloud"].get(u) for u in soundcloud_urls]
parsed.extend(
@ -558,7 +602,7 @@ class MusicDL(list):
ret = fmt.format(**{k: media.get(k, default="Unknown") for k in fields})
return ret
def interactive_search( # noqa
def interactive_search(
self, query: str, source: str = "qobuz", media_type: str = "album"
):
"""Show an interactive menu that contains search results.

108
rip/db.py
View file

@ -3,96 +3,118 @@
import logging
import os
import sqlite3
from typing import Union, List
import abc
from typing import List
logger = logging.getLogger("streamrip")
class Database:
# list of table column names
structure: list
structure: dict
# name of table
name: str
def __init__(self, path, empty=False):
def __init__(self, path, dummy=False):
assert self.structure != []
assert self.name
if empty:
if dummy or path is None:
self.path = None
self.is_dummy = True
return
self.is_dummy = False
self.path = path
if not os.path.exists(self.path):
self.create()
def create(self):
if self.path is None:
if self.is_dummy:
return
with sqlite3.connect(self.path) as conn:
try:
params = ", ".join(
f"{key} TEXT UNIQUE NOT NULL" for key in self.structure
)
command = f"CREATE TABLE {self.name} ({params});"
logger.debug(f"executing {command}")
conn.execute(command)
except sqlite3.OperationalError:
pass
def keys(self):
return self.structure
def contains(self, **items):
allowed_keys = set(self.structure)
assert all(
key in allowed_keys for key in items.keys()
), f"Invalid key. Valid keys: {self.structure}"
items = {k: str(v) for k, v in items.items()}
if self.path is None:
return False
with sqlite3.connect(self.path) as conn:
conditions = " AND ".join(f"{key}=?" for key in items.keys())
command = f"SELECT {self.structure[0]} FROM {self.name} WHERE {conditions}"
params = ", ".join(
f"{key} {' '.join(map(str.upper, props))}"
for key, props in self.structure.items()
)
command = f"CREATE TABLE {self.name} ({params})"
logger.debug(f"executing {command}")
return conn.execute(command, tuple(items.values())).fetchone() is not None
conn.execute(command)
def keys(self):
return self.structure.keys()
def contains(self, **items):
if self.is_dummy:
return False
allowed_keys = set(self.structure.keys())
assert all(
key in allowed_keys for key in items.keys()
), f"Invalid key. Valid keys: {allowed_keys}"
items = {k: str(v) for k, v in items.items()}
with sqlite3.connect(self.path) as conn:
conditions = " AND ".join(f"{key}=?" for key in items.keys())
command = f"SELECT EXISTS(SELECT 1 FROM {self.name} WHERE {conditions})"
logger.debug(f"executing {command}")
result = conn.execute(command, tuple(items.values()))
return result
def __contains__(self, keys: dict) -> bool:
return self.contains(**keys)
def add(self, items: List[str]):
assert len(items) == len(self.structure)
if self.path is None:
if self.is_dummy:
return
params = ", ".join(self.structure)
assert len(items) == len(self.structure)
params = ", ".join(self.structure.keys())
question_marks = ", ".join("?" for _ in items)
command = f"INSERT INTO {self.name} ({params}) VALUES ({question_marks})"
logger.debug(f"executing {command}")
with sqlite3.connect(self.path) as conn:
conn.execute(command, tuple(items))
try:
conn.execute(command, tuple(items))
except sqlite3.IntegrityError as e:
# tried to insert an item that was already there
logger.debug(e)
def __iter__(self):
if self.is_dummy:
return ()
with sqlite3.connect(self.path) as conn:
return conn.execute(f"SELECT * FROM {self.name}")
def reset(self):
try:
os.remove(self.path)
except FileNotFoundError:
pass
class Downloads(Database):
structure = ["id"]
name = "downloads"
structure = {
"id": ["unique", "text"],
}
class FailedDownloads(Database):
structure = ["source", "type", "id"]
name = "failed_downloads"
structure = {
"source": ["text"],
"media_type": ["text"],
"id": ["text", "unique"],
}
CLASS_MAP = {db.name: db for db in (Downloads, FailedDownloads)}

View file

@ -1,10 +1,12 @@
"""Constants that are kept in one place."""
import mutagen.id3 as id3
import re
AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:83.0) Gecko/20100101 Firefox/83.0"
TIDAL_COVER_URL = "https://resources.tidal.com/images/{uuid}/{width}x{height}.jpg"
SOUNDCLOUD_CLIENT_ID = re.compile("a3e059563d7fd3372b49b37f00a00bcf")
QUALITY_DESC = {
@ -132,20 +134,6 @@ FOLDER_FORMAT = (
TRACK_FORMAT = "{tracknumber}. {artist} - {title}"
# ------------------ Regexes ------------------- #
URL_REGEX = (
r"https?://(?:www|open|play|listen)?\.?(qobuz|tidal|deezer)\.com(?:(?:/"
r"(album|artist|track|playlist|video|label))|(?:\/[-\w]+?))+\/([-\w]+)"
)
SOUNDCLOUD_URL_REGEX = r"https://soundcloud.com/[-\w:/]+"
SOUNDCLOUD_CLIENT_ID = "a3e059563d7fd3372b49b37f00a00bcf"
LASTFM_URL_REGEX = r"https://www.last.fm/user/\w+/playlists/\w+"
QOBUZ_INTERPRETER_URL_REGEX = (
r"https?://www\.qobuz\.com/\w\w-\w\w/interpreter/[-\w]+/[-\w]+"
)
DEEZER_DYNAMIC_LINK_REGEX = r"https://deezer\.page\.link/\w+"
YOUTUBE_URL_REGEX = r"https://www\.youtube\.com/watch\?v=[-\w]+"
TIDAL_MAX_Q = 7
TIDAL_Q_MAP = {

View file

@ -1,3 +1,7 @@
from typing import List
import click
class AuthenticationError(Exception):
pass
@ -23,7 +27,16 @@ class InvalidQuality(Exception):
class NonStreamable(Exception):
pass
def __init__(self, message=None):
self.message = message
super().__init__(self.message)
def print(self, item):
if self.message:
click.secho(f"Unable to stream {item!s}. Message: ", nl=False, fg="yellow")
click.secho(self.message, fg="red")
else:
click.secho(f"Unable to stream {item!s}.", fg="yellow")
class InvalidContainerError(Exception):
@ -52,3 +65,13 @@ class ConversionError(Exception):
class NoResultsFound(Exception):
pass
class ItemExists(Exception):
pass
class PartialFailure(Exception):
def __init__(self, failed_items: List):
self.failed_items = failed_items
super().__init__()

View file

@ -8,11 +8,12 @@ as a single track.
import concurrent.futures
import logging
import os
import abc
import re
import shutil
import subprocess
from tempfile import gettempdir
from typing import Any, Optional, Union, Iterable, Generator, Dict
from typing import Any, Optional, Union, Iterable, Generator, Dict, Tuple, List
import click
import tqdm
@ -26,6 +27,8 @@ from .clients import Client
from .constants import FLAC_MAX_BLOCKSIZE, FOLDER_FORMAT, TRACK_FORMAT, ALBUM_KEYS
from .exceptions import (
InvalidQuality,
PartialFailure,
ItemExists,
InvalidSourceError,
NonStreamable,
TooLargeCoverArt,
@ -35,7 +38,6 @@ from .utils import (
clean_format,
downsize_image,
get_cover_urls,
decho,
decrypt_mqa_file,
get_container,
ext,
@ -53,7 +55,38 @@ TYPE_REGEXES = {
}
class Track:
class Media(abc.ABC):
@abc.abstractmethod
def download(self, **kwargs):
pass
@abc.abstractmethod
def load_meta(self, **kwargs):
pass
@abc.abstractmethod
def tag(self, **kwargs):
pass
@property
@abc.abstractmethod
def type(self):
pass
@abc.abstractmethod
def convert(self, **kwargs):
pass
@abc.abstractmethod
def __repr__(self):
pass
@abc.abstractmethod
def __str__(self):
pass
class Track(Media):
"""Represents a downloadable track.
Loading metadata as a single track:
@ -171,15 +204,15 @@ class Track:
self.downloaded = True
self.tagged = True
self.path = self.final_path
decho(f"Track already exists: {self.final_path}", fg="magenta")
return False
raise ItemExists(self.final_path)
if hasattr(self, "cover_url"):
self.download_cover(
width=kwargs.get("max_artwork_width", 999999),
height=kwargs.get("max_artwork_height", 999999),
) # only downloads for playlists and singles
self.download_cover(
width=kwargs.get("max_artwork_width", 999999),
height=kwargs.get("max_artwork_height", 999999),
) # only downloads for playlists and singles
self.path = os.path.join(gettempdir(), f"{hash(self.id)}_{self.quality}.tmp")
return True
def download(
self,
@ -187,7 +220,7 @@ class Track:
parent_folder: str = "StreamripDownloads",
progress_bar: bool = True,
**kwargs,
) -> bool:
):
"""Download the track.
:param quality: (0, 1, 2, 3, 4)
@ -197,13 +230,12 @@ class Track:
:param progress_bar: turn on/off progress bar
:type progress_bar: bool
"""
if not self._prepare_download(
self._prepare_download(
quality=quality,
parent_folder=parent_folder,
progress_bar=progress_bar,
**kwargs,
):
return False
)
if self.client.source == "soundcloud":
# soundcloud client needs whole dict to get file url
@ -214,14 +246,14 @@ class Track:
try:
dl_info = self.client.get_file_url(url_id, self.quality)
except Exception as e:
click.secho(f"Unable to download track. {e}", fg="red")
return False
# click.secho(f"Unable to download track. {e}", fg="red")
raise NonStreamable(e)
if self.client.source == "qobuz":
assert isinstance(dl_info, dict) # for typing
if not self.__validate_qobuz_dl_info(dl_info):
click.secho("Track is not available for download", fg="red")
return False
# click.secho("Track is not available for download", fg="red")
raise NonStreamable("Track is not available for download")
self.sampling_rate = dl_info.get("sampling_rate")
self.bit_depth = dl_info.get("bit_depth")
@ -230,19 +262,12 @@ class Track:
if self.client.source in ("qobuz", "tidal", "deezer"):
assert isinstance(dl_info, dict)
logger.debug("Downloadable URL found: %s", dl_info.get("url"))
try:
tqdm_download(
dl_info["url"], self.path, desc=self._progress_desc
) # downloads file
except NonStreamable:
click.secho(
f"Track {self!s} is not available for download, skipping.",
fg="red",
)
return False
tqdm_download(
dl_info["url"], self.path, desc=self._progress_desc
) # downloads file
elif self.client.source == "soundcloud":
assert isinstance(dl_info, dict)
assert isinstance(dl_info, dict) # for typing
self._soundcloud_download(dl_info)
else:
@ -254,6 +279,7 @@ class Track:
and dl_info.get("enc_key", False)
):
out_path = f"{self.path}_dec"
logger.debug("Decrypting MQA file")
decrypt_mqa_file(self.path, out_path, dl_info["enc_key"])
self.path = out_path
@ -267,8 +293,6 @@ class Track:
if not kwargs.get("keep_cover", True) and hasattr(self, "cover_path"):
os.remove(self.cover_path)
return True
def __validate_qobuz_dl_info(self, info: dict) -> bool:
"""Check if the download info dict returned by Qobuz is downloadable.
@ -335,6 +359,10 @@ class Track:
self.final_path = self.final_path.replace(".mp3", ".flac")
self.quality = 2
@property
def type(self) -> str:
return "track"
@property
def _progress_desc(self) -> str:
"""Get the description that is used on the progress bar.
@ -345,9 +373,6 @@ class Track:
def download_cover(self, width=999999, height=999999):
"""Download the cover art, if cover_url is given."""
if not hasattr(self, "cover_url"):
return False
self.cover_path = os.path.join(gettempdir(), f"cover{hash(self.cover_url)}.jpg")
logger.debug(f"Downloading cover from {self.cover_url}")
# click.secho(f"\nDownloading cover art for {self!s}", fg="blue")
@ -361,6 +386,7 @@ class Track:
downsize_image(self.cover_path, width, height)
else:
logger.debug("Cover already exists, skipping download")
raise ItemExists(self.cover_path)
def format_final_path(self) -> str:
"""Return the final filepath of the downloaded file.
@ -430,11 +456,12 @@ class Track:
cover_url=cover_url,
)
def tag( # noqa
def tag(
self,
album_meta: dict = None,
cover: Union[Picture, APIC, MP4Cover] = None,
embed_cover: bool = True,
**kwargs,
):
"""Tag the track using the stored metadata.
@ -659,7 +686,7 @@ class Track:
return True
class Video:
class Video(Media):
"""Only for Tidal."""
def __init__(self, client: Client, id: str, **kwargs):
@ -709,8 +736,6 @@ class Video:
p = subprocess.Popen(command)
p.wait() # remove this?
return False # so that it is not tagged
def tag(self, *args, **kwargs):
"""Return False.
@ -738,6 +763,9 @@ class Video:
tracknumber=track["trackNumber"],
)
def convert(self, *args, **kwargs):
pass
@property
def path(self) -> str:
"""Get path to download the mp4 file.
@ -753,6 +781,10 @@ class Video:
return os.path.join(self.parent_folder, f"{fname}.mp4")
@property
def type(self) -> str:
return "video"
def __str__(self) -> str:
"""Return the title.
@ -771,6 +803,101 @@ class Video:
return True
class YoutubeVideo(Media):
"""Dummy class implemented for consistency with the Media API."""
class DummyClient:
"""Used because YouTube downloads use youtube-dl, not a client."""
source = "youtube"
def __init__(self, url: str):
"""Create a YoutubeVideo object.
:param url: URL to the youtube video.
:type url: str
"""
self.url = url
self.client = self.DummyClient()
def download(
self,
parent_folder: str = "StreamripDownloads",
download_youtube_videos: bool = False,
youtube_video_downloads_folder: str = "StreamripDownloads",
**kwargs,
):
"""Download the video using 'youtube-dl'.
:param parent_folder:
:type parent_folder: str
:param download_youtube_videos: True if the video should be downloaded.
:type download_youtube_videos: bool
:param youtube_video_downloads_folder: Folder to put videos if
downloaded.
:type youtube_video_downloads_folder: str
:param kwargs:
"""
click.secho(f"Downloading url {self.url}", fg="blue")
filename_formatter = "%(track_number)s.%(track)s.%(container)s"
filename = os.path.join(parent_folder, filename_formatter)
p = subprocess.Popen(
[
"youtube-dl",
"-x", # audio only
"-q", # quiet mode
"--add-metadata",
"--audio-format",
"mp3",
"--embed-thumbnail",
"-o",
filename,
self.url,
]
)
if download_youtube_videos:
click.secho("Downloading video stream", fg="blue")
pv = subprocess.Popen(
[
"youtube-dl",
"-q",
"-o",
os.path.join(
youtube_video_downloads_folder,
"%(title)s.%(container)s",
),
self.url,
]
)
pv.wait()
p.wait()
def load_meta(self, *args, **kwargs):
"""Return None.
Dummy method.
:param args:
:param kwargs:
"""
pass
def tag(self, *args, **kwargs):
"""Return None.
Dummy method.
:param args:
:param kwargs:
"""
pass
def __bool__(self):
return True
class Booklet:
"""Only for Qobuz."""
@ -800,6 +927,9 @@ class Booklet:
filepath = os.path.join(parent_folder, f"{self.description}.pdf")
tqdm_download(self.url, filepath)
def type(self) -> str:
return "booklet"
def __bool__(self):
return True
@ -833,12 +963,26 @@ class Tracklist(list):
else:
target = self._download_item
# TODO: make this function return the items that have not been downloaded
failed_downloads: List[Tuple[str, str, str]] = []
if kwargs.get("concurrent_downloads", True):
# Tidal errors out with unlimited concurrency
with concurrent.futures.ThreadPoolExecutor(15) as executor:
futures = [executor.submit(target, item, **kwargs) for item in self]
future_map = {
executor.submit(target, item, **kwargs): item for item in self
}
# futures = [executor.submit(target, item, **kwargs) for item in self]
try:
concurrent.futures.wait(futures)
concurrent.futures.wait(future_map.keys())
for future in future_map.keys():
try:
future.result()
except NonStreamable:
print("caught in media conc")
item = future_map[future]
failed_downloads.append(
(item.client.source, item.type, item.id)
)
except (KeyboardInterrupt, SystemExit):
executor.shutdown()
tqdm.write("Aborted! May take some time to shutdown.")
@ -850,20 +994,29 @@ class Tracklist(list):
# soundcloud only gets metadata after `target` is called
# message will be printed in `target`
click.secho(f'\nDownloading "{item!s}"', fg="blue")
target(item, **kwargs)
try:
target(item, **kwargs)
except ItemExists:
click.secho(f"{item!s} exists. Skipping.", fg="yellow")
except NonStreamable as e:
e.print(item)
failed_downloads.append((item.client.source, item.type, item.id))
self.downloaded = True
def _download_and_convert_item(self, item, **kwargs):
if failed_downloads:
raise PartialFailure(failed_downloads)
def _download_and_convert_item(self, item: Media, **kwargs):
"""Download and convert an item.
:param item:
:param kwargs: should contain a `conversion` dict.
"""
if self._download_item(item, **kwargs):
item.convert(**kwargs["conversion"])
self._download_item(item, **kwargs)
item.convert(**kwargs["conversion"])
def _download_item(item, *args: Any, **kwargs: Any) -> bool:
def _download_item(self, item: Media, **kwargs: Any):
"""Abstract method.
:param item:
@ -1017,6 +1170,10 @@ class Tracklist(list):
return album
@property
def type(self) -> str:
return self.__class__.__name__.lower()
def __getitem__(self, key):
"""Get an item if key is int, otherwise get an attr.
@ -1044,101 +1201,6 @@ class Tracklist(list):
return True
class YoutubeVideo:
"""Dummy class implemented for consistency with the Media API."""
class DummyClient:
"""Used because YouTube downloads use youtube-dl, not a client."""
source = "youtube"
def __init__(self, url: str):
"""Create a YoutubeVideo object.
:param url: URL to the youtube video.
:type url: str
"""
self.url = url
self.client = self.DummyClient()
def download(
self,
parent_folder: str = "StreamripDownloads",
download_youtube_videos: bool = False,
youtube_video_downloads_folder: str = "StreamripDownloads",
**kwargs,
):
"""Download the video using 'youtube-dl'.
:param parent_folder:
:type parent_folder: str
:param download_youtube_videos: True if the video should be downloaded.
:type download_youtube_videos: bool
:param youtube_video_downloads_folder: Folder to put videos if
downloaded.
:type youtube_video_downloads_folder: str
:param kwargs:
"""
click.secho(f"Downloading url {self.url}", fg="blue")
filename_formatter = "%(track_number)s.%(track)s.%(container)s"
filename = os.path.join(parent_folder, filename_formatter)
p = subprocess.Popen(
[
"youtube-dl",
"-x", # audio only
"-q", # quiet mode
"--add-metadata",
"--audio-format",
"mp3",
"--embed-thumbnail",
"-o",
filename,
self.url,
]
)
if download_youtube_videos:
click.secho("Downloading video stream", fg="blue")
pv = subprocess.Popen(
[
"youtube-dl",
"-q",
"-o",
os.path.join(
youtube_video_downloads_folder,
"%(title)s.%(container)s",
),
self.url,
]
)
pv.wait()
p.wait()
def load_meta(self, *args, **kwargs):
"""Return None.
Dummy method.
:param args:
:param kwargs:
"""
pass
def tag(self, *args, **kwargs):
"""Return None.
Dummy method.
:param args:
:param kwargs:
"""
pass
def __bool__(self):
return True
class Album(Tracklist):
"""Represents a downloadable album.
@ -1278,12 +1340,7 @@ class Album(Tracklist):
for item in self.booklets:
Booklet(item).download(parent_folder=self.folder)
def _download_item( # type: ignore
self,
track: Union[Track, Video],
quality: int = 3,
**kwargs,
) -> bool:
def _download_item(self, item: Media, **kwargs: Any):
"""Download an item.
:param track: The item.
@ -1294,25 +1351,24 @@ class Album(Tracklist):
:rtype: bool
"""
logger.debug("Downloading track to %s", self.folder)
if self.disctotal > 1 and isinstance(track, Track):
disc_folder = os.path.join(self.folder, f"Disc {track.meta.discnumber}")
if self.disctotal > 1 and isinstance(item, Track):
disc_folder = os.path.join(self.folder, f"Disc {item.meta.discnumber}")
kwargs["parent_folder"] = disc_folder
else:
kwargs["parent_folder"] = self.folder
if not track.download(quality=min(self.quality, quality), **kwargs):
return False
quality = kwargs.get("quality", 3)
kwargs.pop("quality")
item.download(quality=min(self.quality, quality), **kwargs)
logger.debug("tagging tracks")
# deezer tracks come tagged
if kwargs.get("tag_tracks", True) and self.client.source != "deezer":
track.tag(
item.tag(
cover=self.cover_obj,
embed_cover=kwargs.get("embed_cover", True),
)
return True
@staticmethod
def _parse_get_resp(resp: dict, client: Client) -> TrackMetadata:
"""Parse information from a client.get(query, 'album') call.
@ -1573,26 +1629,28 @@ class Playlist(Tracklist):
self.__indices = iter(range(1, len(self) + 1))
self.download_message()
def _download_item(self, item: Track, **kwargs) -> bool: # type: ignore
def _download_item(self, item: Media, **kwargs):
assert isinstance(item, Track)
kwargs["parent_folder"] = self.folder
if self.client.source == "soundcloud":
item.load_meta()
click.secho(f"Downloading {item!s}", fg="blue")
if playlist_to_album := kwargs.get("set_playlist_to_album", False):
item["album"] = self.name
item["albumartist"] = self.creator
item.meta.album = self.name
item.meta.albumartist = self.creator
if kwargs.get("new_tracknumbers", True):
item["tracknumber"] = next(self.__indices)
item["discnumber"] = 1
item.meta.tracknumber = next(self.__indices)
item.meta.discnumber = 1
self.downloaded = item.download(**kwargs)
item.download(**kwargs)
if self.downloaded and self.client.source != "deezer":
if self.client.source != "deezer":
item.tag(embed_cover=kwargs.get("embed_cover", True))
if self.downloaded and playlist_to_album and self.client.source == "deezer":
if playlist_to_album and self.client.source == "deezer":
# Because Deezer tracks come pre-tagged, the `set_playlist_to_album`
# option is never set. Here, we manually do this
from mutagen.flac import FLAC
@ -1603,8 +1661,6 @@ class Playlist(Tracklist):
audio["TRACKNUMBER"] = f"{item['tracknumber']:02}"
audio.save()
return self.downloaded
@staticmethod
def _parse_get_resp(item: dict, client: Client) -> dict:
"""Parse information from a search result returned by a client.search call.
@ -1769,13 +1825,7 @@ class Artist(Tracklist):
self.download_message()
return final
def _download_item( # type: ignore
self,
item,
parent_folder: str = "StreamripDownloads",
quality: int = 3,
**kwargs,
) -> bool:
def _download_item(self, item: Media, **kwargs):
"""Download an item.
:param item:
@ -1786,19 +1836,14 @@ class Artist(Tracklist):
:param kwargs:
:rtype: bool
"""
try:
item.load_meta()
except NonStreamable:
logger.info("Skipping album, not available to stream.")
return False
item.load_meta()
kwargs.pop("parent_folder")
# always an Album
status = item.download(
item.download(
parent_folder=self.folder,
quality=quality,
**kwargs,
)
return status
@property
def title(self) -> str:

View file

@ -148,7 +148,7 @@ def tqdm_download(url: str, filepath: str, params: dict = None, desc: str = None
total = int(r.headers.get("content-length", 0))
logger.debug(f"File size = {total}")
if total < 1000 and not url.endswith("jpg") and not url.endswith("png"):
raise NonStreamable(url)
raise NonStreamable("Resource not found.")
try:
with open(filepath, "wb") as file, tqdm(
@ -322,9 +322,6 @@ def decho(message, fg=None):
logger.debug(message)
interpreter_artist_regex = re.compile(r"getSimilarArtist\(\s*'(\w+)'")
def get_container(quality: int, source: str) -> str:
"""Get the file container given the quality.

10
test.toml Normal file
View file

@ -0,0 +1,10 @@
[database]
bruh = "something"
[database.downloads]
enabled = true
path = "asdf"
[database.failed]
enabled = false
path = "asrdfg"