Misc updates for Deezer

This commit is contained in:
nathom 2021-07-28 14:26:14 -07:00
parent 64bb0ace79
commit f3c680ace7
3 changed files with 87 additions and 77 deletions

View file

@ -141,7 +141,9 @@ class Config:
return self.qobuz_creds
if source == "tidal":
return self.tidal_creds
if source == "deezer" or source == "soundcloud":
if source == "deezer":
return {"arl": self.file["deezer"]["arl"]}
if source == "soundcloud":
return {}
raise InvalidSourceError(source)

View file

@ -345,33 +345,32 @@ class MusicDL(list):
:param client:
"""
creds = self.config.creds(client.source)
if not client.logged_in:
while True:
try:
client.login(**creds)
break
except AuthenticationError:
click.secho("Invalid credentials, try again.")
self.prompt_creds(client.source)
creds = self.config.creds(client.source)
except MissingCredentials:
logger.debug("Credentials are missing. Prompting..")
self.prompt_creds(client.source)
creds = self.config.creds(client.source)
while True:
try:
client.login(**creds)
break
except AuthenticationError:
click.secho("Invalid credentials, try again.")
self.prompt_creds(client.source)
creds = self.config.creds(client.source)
except MissingCredentials:
logger.debug("Credentials are missing. Prompting..")
self.prompt_creds(client.source)
creds = self.config.creds(client.source)
if (
client.source == "qobuz"
and not creds.get("secrets")
and not creds.get("app_id")
):
(
self.config.file["qobuz"]["app_id"],
self.config.file["qobuz"]["secrets"],
) = client.get_tokens()
self.config.save()
elif client.source == "tidal":
self.config.file["tidal"].update(client.get_tokens())
self.config.save()
if (
client.source == "qobuz"
and not creds.get("secrets")
and not creds.get("app_id")
):
(
self.config.file["qobuz"]["app_id"],
self.config.file["qobuz"]["secrets"],
) = client.get_tokens()
self.config.save()
elif client.source == "tidal":
self.config.file["tidal"].update(client.get_tokens())
self.config.save() # only for the expiry stamp
def parse_urls(self, url: str) -> List[Tuple[str, str, str]]:
"""Return the type of the url and the id.
@ -791,16 +790,24 @@ class MusicDL(list):
:type source: str
"""
if source == "qobuz":
click.secho(f"Enter {source.capitalize()} email:", fg="green")
click.secho("Enter Qobuz email:", fg="green")
self.config.file[source]["email"] = input()
click.secho(
f"Enter {source.capitalize()} password (will not show on screen):",
"Enter Qobuz password (will not show on screen):",
fg="green",
)
self.config.file[source]["password"] = md5(
getpass(prompt="").encode("utf-8")
).hexdigest()
self.config.save()
click.secho(
f'Credentials saved to config file at "{self.config._path}"',
fg="green",
)
elif source == "deezer":
click.secho("Enter Deezer ARL: ", fg="green")
self.config.file["deezer"]["arl"] = input()
self.config.save()
click.secho(
f'Credentials saved to config file at "{self.config._path}"',
@ -821,9 +828,6 @@ class MusicDL(list):
"deezer",
"soundcloud",
), f"Invalid source {source}"
if source == "deezer":
# no login for deezer
return
if source == "soundcloud":
return

View file

@ -13,6 +13,7 @@ import re
import shutil
import subprocess
from tempfile import gettempdir
from tqdm import tqdm
from typing import Any, Optional, Union, Iterable, Generator, Dict, Tuple, List
import click
@ -35,15 +36,16 @@ from .exceptions import (
from .metadata import TrackMetadata
from .utils import (
clean_format,
tqdm_stream,
downsize_image,
get_cover_urls,
decrypt_mqa_file,
get_container,
DownloadStream,
ext,
get_stats_from_quality,
safe_get,
tidal_cover_url,
tqdm_download,
)
logger = logging.getLogger("streamrip")
@ -55,8 +57,6 @@ TYPE_REGEXES = {
class Media(abc.ABC):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def download(self, **kwargs):
pass
@ -86,15 +86,6 @@ class Media(abc.ABC):
def type(self):
pass
# @property
# @abc.abstractmethod
# def id(self):
# pass
# @id.setter
# def id(self, other):
# pass
@property
@abc.abstractmethod
def downloaded_ids(self):
@ -171,23 +162,25 @@ class Track(Media):
"""
assert self.id is not None, "id must be set before loading metadata"
source = self.client.source
self.resp = self.client.get(self.id, media_type="track")
self.meta = TrackMetadata(
track=self.resp, source=self.client.source
track=self.resp, source=source
) # meta dict -> TrackMetadata object
try:
if self.client.source == "qobuz":
if source == "qobuz":
self.cover_url = self.resp["album"]["image"]["large"]
elif self.client.source == "tidal":
elif source == "tidal":
self.cover_url = tidal_cover_url(self.resp["album"]["cover"], 320)
elif self.client.source == "deezer":
elif source == "deezer":
self.cover_url = self.resp["album"]["cover_medium"]
elif self.client.source == "soundcloud":
elif source == "soundcloud":
self.cover_url = (
self.resp["artwork_url"] or self.resp["user"].get("avatar_url")
).replace("large", "t500x500")
else:
raise InvalidSourceError(self.client.source)
raise InvalidSourceError(source)
except KeyError:
logger.debug("No cover found")
self.cover_url = None
@ -219,16 +212,10 @@ class Track(Media):
self.file_format = kwargs.get("track_format", TRACK_FORMAT)
self.folder = sanitize_filepath(self.folder, platform="auto")
self.format_final_path()
self.format_final_path() # raises: ItemExists
os.makedirs(self.folder, exist_ok=True)
if os.path.isfile(self.final_path): # track already exists
self.downloaded = True
self.tagged = True
self.path = self.final_path
raise ItemExists(self.final_path)
if hasattr(self, "cover_url"):
try:
self.download_cover(
@ -256,6 +243,9 @@ class Track(Media):
:param progress_bar: turn on/off progress bar
:type progress_bar: bool
"""
if not self.part_of_tracklist:
click.secho(f"Downloading {self!s}\n", bold=True)
self._prepare_download(
quality=quality,
parent_folder=parent_folder,
@ -414,7 +404,7 @@ class Track(Media):
]
)
elif dl_info["type"] == "original":
tqdm_download(dl_info["url"], self.path, desc=self._progress_desc)
_quick_download(dl_info["url"], self.path, desc=self._progress_desc)
# if a wav is returned, convert to flac
engine = converter.FLAC(self.path)
@ -443,11 +433,7 @@ class Track(Media):
# click.secho(f"\nDownloading cover art for {self!s}", fg="blue")
if not os.path.exists(self.cover_path):
tqdm_download(
self.cover_url,
self.cover_path,
desc=click.style("Cover", fg="cyan"),
)
_cover_download(self.cover_url, self.cover_path)
downsize_image(self.cover_path, width, height)
else:
logger.debug("Cover already exists, skipping download")
@ -469,6 +455,12 @@ class Track(Media):
logger.debug("Formatted path: %s", self.final_path)
if os.path.isfile(self.final_path): # track already exists
self.downloaded = True
self.tagged = True
self.path = self.final_path
raise ItemExists(self.final_path)
return self.final_path
@classmethod
@ -594,6 +586,7 @@ class Track(Media):
# automatically generate key, value pairs based on container
tags = self.meta.tags(self.container)
for k, v in tags:
logger.debug("Setting %s tag to %s", k, v)
audio[k] = v
if embed_cover and cover is None:
@ -992,7 +985,7 @@ class Booklet:
filepath = os.path.join(
parent_folder, f"{sanitize_filename(self.description)}.pdf"
)
tqdm_download(self.url, filepath)
_quick_download(self.url, filepath, "Booklet")
def type(self) -> str:
return "booklet"
@ -1060,7 +1053,7 @@ class Tracklist(list):
if self.client.source != "soundcloud":
# soundcloud only gets metadata after `target` is called
# message will be printed in `target`
click.secho(f'\nDownloading "{item!s}"', fg="blue")
click.secho(f'\nDownloading "{item!s}"', bold=True, fg="green")
try:
target(item, **kwargs)
except ItemExists:
@ -1361,7 +1354,7 @@ class Album(Tracklist, Media):
self.download_message()
# choose optimal cover size and download it
click.secho("Downloading cover art", fg="magenta")
click.secho("Downloading cover art", bold=True)
cover_path = os.path.join(gettempdir(), f"cover_{hash(self)}.jpg")
embed_cover_size = kwargs.get("embed_cover_size", "large")
@ -1371,15 +1364,17 @@ class Album(Tracklist, Media):
embed_cover_url = self.cover_urls[embed_cover_size]
if not os.path.exists(cover_path):
if embed_cover_url is not None:
tqdm_download(embed_cover_url, cover_path)
else: # sometimes happens with Deezer
cover_url = [u for u in self.cover_urls.values() if u][0]
tqdm_download(cover_url, cover_path)
cover_url = (
embed_cover_url
if embed_cover_url is None
else tuple(filter(None, self.cover_urls.values()))[0]
)
_cover_download(cover_url, cover_path)
hires_cov_path = os.path.join(self.folder, "cover.jpg")
if kwargs.get("keep_hires_cover", True) and not os.path.exists(hires_cov_path):
tqdm_download(self.cover_urls["original"], hires_cov_path)
_cover_download(self.cover_urls["original"], hires_cov_path)
cover_size = os.path.getsize(cover_path)
if cover_size > FLAC_MAX_BLOCKSIZE: # 16.77 MB
@ -1388,7 +1383,7 @@ class Album(Tracklist, Media):
fg="bright_yellow",
)
# large is about 600x600px which is guaranteed < 16.7 MB
tqdm_download(self.cover_urls["large"], cover_path)
_cover_download(self.cover_urls["large"], cover_path)
downsize_image(
cover_path,
@ -1396,8 +1391,7 @@ class Album(Tracklist, Media):
kwargs.get("max_artwork_height", 999999),
)
embed_cover = kwargs.get("embed_cover", True) # embed by default
if self.client.source != "deezer" and embed_cover:
if kwargs.get("embed_cover", True): # embed by default
# container generated when formatting folder name
self.cover_obj = self.get_cover_obj(
cover_path, self.container, self.client.source
@ -1411,7 +1405,7 @@ class Album(Tracklist, Media):
and kwargs.get("download_booklets", True)
and not any(f.endswith(".pdf") for f in os.listdir(self.folder))
):
click.secho("\nDownloading booklets", fg="blue")
click.secho("\nDownloading booklets", bold=True)
for item in self.booklets:
Booklet(item).download(parent_folder=self.folder)
@ -1438,7 +1432,7 @@ class Album(Tracklist, Media):
logger.debug("tagging tracks")
# deezer tracks come tagged
if kwargs.get("tag_tracks", True) and self.client.source != "deezer":
if kwargs.get("tag_tracks", True):
item.tag(
cover=self.cover_obj,
embed_cover=kwargs.get("embed_cover", True),
@ -2139,3 +2133,13 @@ def _get_tracklist(resp: dict, source: str) -> list:
return resp["tracks"]
raise NotImplementedError(source)
def _quick_download(url: str, path: str, desc: str = None):
with open(path, "wb") as file:
for chunk in tqdm_stream(DownloadStream(url), desc=desc):
file.write(chunk)
def _cover_download(url: str, path: str):
_quick_download(url, path, click.style("Cover", fg="blue"))