Standardize quality ids

Update README
This commit is contained in:
nathom 2021-03-29 12:12:50 -07:00
parent 1f5b0e7217
commit 24b858fad7
7 changed files with 138 additions and 167 deletions

View file

@ -28,6 +28,20 @@ Download the album and convert it to `mp3`
rip --convert mp3 -u https://open.qobuz.com/album/0060253780968
```
To set the quality, use the `--quality` option to `0, 1, 2, 3, 4`:
| Quality ID | Audio Quality | Available Sources |
| ---------- | ------------------- | -------------------- |
| 0 | 128 kbps MP3 or AAC | Deezer, Tidal |
| 1 | 320 kbps MP3 or AAC | Deezer, Tidal, Qobuz |
| 2 | 16 bit / 44.1 kHz | Deezer, Tidal, Qobuz |
| 3 | 24 bit / ≤ 96 kHz | Tidal (MQA), Qobuz |
| 4 | 24 bit / ≤ 192 kHz | Qobuz |
```bash
rip --quality 3 https://tidal.com/browse/album/147569387
```
Search for *Fleetwood Mac - Rumours* on Qobuz
```bash

View file

@ -4,7 +4,7 @@ import hashlib
import json
import logging
import os
# import sys
import sys
import time
from abc import ABC, abstractmethod
from pprint import pformat # , pprint
@ -13,17 +13,15 @@ from typing import Generator, Sequence, Tuple, Union
import click
import requests
from requests.packages import urllib3
import tidalapi
from dogpile.cache import make_region
from .constants import (
AGENT,
CACHE_DIR,
DEEZER_MAX_Q,
DEEZER_Q_IDS,
QOBUZ_FEATURED_KEYS,
TIDAL_MAX_Q,
TIDAL_Q_IDS,
AVAILABLE_QUALITY_IDS,
)
from .exceptions import (
AuthenticationError,
@ -32,6 +30,7 @@ from .exceptions import (
InvalidAppSecretError,
InvalidQuality,
)
from .utils import get_quality
from .spoofbuz import Spoofer
urllib3.disable_warnings()
@ -102,7 +101,7 @@ class ClientInterface(ABC):
pass
@abstractmethod
def get_file_url(self, track_id, quality=6) -> Union[dict]:
def get_file_url(self, track_id, quality=3) -> Union[dict]:
"""Get the direct download url dict for a file.
:param track_id: id of the track
@ -144,6 +143,7 @@ class QobuzClient(ClientInterface):
return
if (kwargs.get("app_id") or kwargs.get("secrets")) in (None, [], ""):
click.secho("Fetching tokens, this may take a few seconds.")
logger.info("Fetching tokens from Qobuz")
spoofer = Spoofer()
kwargs["app_id"] = spoofer.get_app_id()
@ -209,7 +209,7 @@ class QobuzClient(ClientInterface):
def get(self, item_id: Union[str, int], media_type: str = "album") -> dict:
return self._api_get(media_type, item_id=item_id)
def get_file_url(self, item_id, quality=6) -> dict:
def get_file_url(self, item_id, quality=3) -> dict:
return self._api_get_file_url(item_id, quality=quality)
# ---------- Private Methods ---------------
@ -319,12 +319,12 @@ class QobuzClient(ClientInterface):
self.label = resp["user"]["credential"]["parameters"]["short_label"]
def _api_get_file_url(
self, track_id: Union[str, int], quality: int = 6, sec: str = None
self, track_id: Union[str, int], quality: int = 3, sec: str = None
) -> dict:
unix_ts = time.time()
if int(quality) not in (5, 6, 7, 27): # Needed?
raise InvalidQuality(f"Invalid quality id {quality}. Choose 5, 6, 7 or 27")
if int(quality) not in AVAILABLE_QUALITY_IDS: # Needed?
raise InvalidQuality(f"Invalid quality id {quality}. Choose from {AVAILABLE_QUALITY_IDS}")
if sec is not None:
secret = sec
@ -333,6 +333,7 @@ class QobuzClient(ClientInterface):
else:
raise InvalidAppSecretError("Cannot find app secret")
quality = get_quality(quality, self.source)
r_sig = f"trackgetFileUrlformat_id{quality}intentstreamtrack_id{track_id}{unix_ts}{secret}"
logger.debug("Raw request signature: %s", r_sig)
r_sig_hashed = hashlib.md5(r_sig.encode("utf-8")).hexdigest()
@ -362,7 +363,7 @@ class QobuzClient(ClientInterface):
def _test_secret(self, secret: str) -> bool:
try:
self._api_get_file_url("19512574", sec=secret)
r = self._api_get_file_url("19512574", sec=secret)
return True
except InvalidAppSecretError as error:
logger.debug("Test for %s secret didn't work: %s", secret, error)
@ -426,98 +427,11 @@ class DeezerClient(ClientInterface):
@staticmethod
def get_file_url(meta_id: Union[str, int], quality: int = 6):
quality = min(DEEZER_MAX_Q, quality)
url = f"{DEEZER_DL}/{DEEZER_Q_IDS[quality]}/{DEEZER_BASE}/track/{meta_id}"
url = f"{DEEZER_DL}/{get_quality(quality, 'deezer')}/{DEEZER_BASE}/track/{meta_id}"
logger.debug(f"Download url {url}")
return url
'''
class TidalClient(ClientInterface):
source = "tidal"
def __init__(self):
self.logged_in = False
def login(self, email: str, pwd: str):
click.secho(f"Logging into {self.source}", fg="green")
if self.logged_in:
return
config = tidalapi.Config()
self.session = tidalapi.Session(config=config)
self.session.login(email, pwd)
logger.info("Logged into Tidal")
self.logged_in = True
@region.cache_on_arguments(expiration_time=RELEASE_CACHE_TIME)
def search(self, query: str, media_type: str = "album", limit: int = 50):
"""
:param query:
:type query: str
:param media_type: artist, album, playlist, or track
:type media_type: str
:param limit:
:type limit: int
:raises ValueError: if field value is invalid
"""
return self._search(query, media_type, limit=limit)
@region.cache_on_arguments(expiration_time=RELEASE_CACHE_TIME)
def get(self, meta_id: Union[str, int], media_type: str = "album"):
"""Get metadata.
:param meta_id:
:type meta_id: Union[str, int]
:param media_type:
:type media_type: str
"""
return self._get(meta_id, media_type)
def get_file_url(self, meta_id: Union[str, int], quality: int = 6):
"""
:param meta_id:
:type meta_id: Union[str, int]
:param quality:
:type quality: int
"""
logger.debug(f"Fetching file url with quality {quality}")
return self._get_file_url(meta_id, quality=min(TIDAL_MAX_Q, quality))
def _search(self, query, media_type="album", **kwargs):
params = {
"query": query,
"limit": kwargs.get("limit", 50),
}
return self.session.request("GET", f"search/{media_type}s", params).json()
def _get(self, media_id, media_type="album"):
if media_type == "album":
info = self.session.request("GET", f"albums/{media_id}")
tracklist = self.session.request("GET", f"albums/{media_id}/tracks")
album = info.json()
album["tracks"] = tracklist.json()
return album
elif media_type == "track":
return self.session.request("GET", f"tracks/{media_id}").json()
elif media_type == "playlist":
return self.session.request("GET", f"playlists/{media_id}/tracks").json()
elif media_type == "artist":
return self.session.request("GET", f"artists/{media_id}/albums").json()
else:
raise ValueError
def _get_file_url(self, track_id, quality=6):
params = {"soundQuality": TIDAL_Q_IDS[quality]}
resp = self.session.request("GET", f"tracks/{track_id}/streamUrl", params)
resp.raise_for_status()
return resp.json()
'''
class TidalClient(ClientInterface):
source = "tidal"
@ -546,11 +460,15 @@ class TidalClient(ClientInterface):
if access_token is not None:
self.token_expiry = token_expiry
self.refresh_token = refresh_token
if self.token_expiry - time.time() < 86400: # 1 day
logger.debug("Refreshing access token")
self._refresh_access_token()
else:
logger.debug("Logging in with access token")
self._login_by_access_token(access_token, user_id)
else:
logger.debug("Logging in as a new user")
self._login_new_user()
self.logged_in = True
@ -564,22 +482,35 @@ class TidalClient(ClientInterface):
"query": query,
"limit": limit,
}
return self._api_get(f"search/{media_type}s", params=params)
return self._api_request(f"search/{media_type}s", params=params)
def get_file_url(self, track_id, quality: int = 7):
def get_file_url(self, track_id, quality: int = 3):
params = {
"audioquality": TIDAL_Q_IDS[quality],
"audioquality": get_quality(min(quality, TIDAL_MAX_Q), self.source),
"playbackmode": "STREAM",
"assetpresentation": "FULL",
}
resp = self._api_request(f"tracks/{track_id}/playbackinfopostpaywall", params)
manifest = json.loads(base64.b64decode(resp["manifest"]).decode("utf-8"))
logger.debug(f"{pformat(manifest)=}")
return {
"url": manifest["urls"][0],
"enc_key": manifest.get("keyId"),
"codec": manifest["codecs"],
}
def get_tokens(self):
return {
k: getattr(self, k)
for k in (
"user_id",
"country_code",
"access_token",
"refresh_token",
"token_expiry",
)
}
def _login_new_user(self, launch=True):
login_link = f"https://{self._get_device_code()}"
@ -696,18 +627,6 @@ class TidalClient(ClientInterface):
self.country_code = resp["countryCode"]
self.access_token = token
def get_tokens(self):
return {
k: getattr(self, k)
for k in (
"user_id",
"country_code",
"access_token",
"refresh_token",
"token_expiry",
)
}
def _api_get(self, item_id: str, media_type: str) -> dict:
item = self._api_request(f"{media_type}s/{item_id}")
if media_type in ("playlist", "album"):

View file

@ -32,18 +32,23 @@ class Config:
defaults = {
"qobuz": {
"quality": 2,
"email": None,
"password": None,
"app_id": "", # Avoid NoneType error
"secrets": [],
},
"tidal": {
"quality": 3,
"user_id": None,
"country_code": None,
"access_token": None,
"refresh_token": None,
"token_expiry": 0,
},
"deezer": {
"quality": 2,
},
"database": {"enabled": True, "path": None},
"conversion": {
"enabled": False,
@ -59,7 +64,7 @@ class Config:
"non_studio_albums": False,
"non_remaster": False,
},
"downloads": {"folder": DOWNLOADS_DIR, "quality": 7},
"downloads": {"folder": DOWNLOADS_DIR},
"metadata": {
"embed_cover": True,
"large_cover": False,
@ -124,7 +129,10 @@ class Config:
@property
def tidal_creds(self):
return self.file["tidal"]
creds = dict(self.file['tidal'])
logger.debug(creds)
del creds['quality'] # should not be included in creds
return creds
@property
def qobuz_creds(self):

View file

@ -12,25 +12,25 @@ CONFIG_PATH = os.path.join(CONFIG_DIR, "config.yaml")
LOG_DIR = click.get_app_dir(APPNAME)
DB_PATH = os.path.join(LOG_DIR, "downloads.db")
DOWNLOADS_DIR = os.path.join(Path.home(), "Music Downloads")
DOWNLOADS_DIR = os.path.join(Path.home(), "StreamripDownloads")
AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:83.0) Gecko/20100101 Firefox/83.0"
TIDAL_COVER_URL = "https://resources.tidal.com/images/{uuid}/{width}x{height}.jpg"
EXT = {
5: ".mp3",
6: ".flac",
7: ".flac",
27: ".flac",
1: ".mp3",
2: ".flac",
3: ".flac",
4: ".flac",
}
QUALITY_DESC = {
4: "128kbps",
5: "320kbps",
6: "16bit/44.1kHz",
7: "24bit/96kHz",
27: "24bit/192kHz",
0: "128kbps",
1: "320kbps",
2: "16bit/44.1kHz",
3: "24bit/96kHz",
4: "24bit/192kHz",
}
@ -133,17 +133,10 @@ TRACK_FORMAT = "{tracknumber}. {artist} - {title}"
URL_REGEX = (
r"https:\/\/(?:www|open|play|listen)?\.?(\w+)\.com(?:(?:\/(track|playlist|album|"
r"artist|label))|(?:\/[-\w]+?))+\/(\w+)"
r"artist|label))|(?:\/[-\w]+?))+\/([-\w]+)"
)
TIDAL_Q_IDS = {
4: "LOW", # AAC
5: "HIGH", # AAC
6: "LOSSLESS", # Lossless, but it also could be MQA
7: "HI_RES", # not available for download
}
TIDAL_MAX_Q = 7
DEEZER_Q_IDS = {4: 128, 5: 320, 6: 1411}
DEEZER_MAX_Q = 6
AVAILABLE_QUALITY_IDS = (0, 1, 2, 3, 4)

View file

@ -124,12 +124,12 @@ class MusicDL(list):
arguments = {
"database": self.db,
"parent_folder": self.config.session["downloads"]["folder"],
"quality": self.config.session["downloads"]["quality"],
# TODO: fully implement this
# "embed_cover": self.config.session["metadata"]["embed_cover"],
}
logger.debug("Arguments from config: %s", arguments)
for item in self:
arguments['quality'] = self.config.session[item.client.source]['quality']
if isinstance(item, Artist):
filters_ = tuple(
k for k, v in self.config.session["filters"].items() if v
@ -189,7 +189,7 @@ class MusicDL(list):
) = client.get_tokens()
self.config.save()
elif client.source == 'tidal':
self.config.file['tidal'] = client.get_tokens()
self.config.file['tidal'].update(client.get_tokens())
self.config.save()
def parse_urls(self, url: str) -> Tuple[str, str]:

View file

@ -2,9 +2,10 @@ import logging
import os
import re
import shutil
import sys
# import sys
from abc import ABC, abstractmethod
from pprint import pformat, pprint
from pprint import pformat
# from pprint import pprint
from tempfile import gettempdir
from typing import Any, Callable, Optional, Tuple, Union
@ -34,7 +35,7 @@ from .metadata import TrackMetadata
from .utils import (
clean_format,
decrypt_mqa_file,
quality_id,
get_quality_id,
safe_get,
tidal_cover_url,
tqdm_download,
@ -43,10 +44,10 @@ from .utils import (
logger = logging.getLogger(__name__)
TIDAL_Q_MAP = {
"LOW": 4,
"HIGH": 5,
"LOSSLESS": 6,
"HI_RES": 7,
"LOW": 0,
"HIGH": 1,
"LOSSLESS": 2,
"HI_RES": 3,
}
# used to homogenize cover size keys
@ -228,7 +229,7 @@ class Track:
else:
raise InvalidSourceError(self.client.source)
if dl_info.get("enc_key"):
if isinstance(dl_info, dict) and dl_info.get("enc_key"):
decrypt_mqa_file(temp_file, self.final_path, dl_info["enc_key"])
else:
shutil.move(temp_file, self.final_path)
@ -293,9 +294,7 @@ class Track:
:raises IndexError
"""
logger.debug(pos)
tracklist = cls._get_tracklist(album, client.source)
logger.debug(len(tracklist))
track = tracklist[pos]
meta = TrackMetadata(album=album, track=track, source=client.source)
return cls(client=client, meta=meta, id=track["id"])
@ -356,18 +355,18 @@ class Track:
if album_meta is not None:
self.meta.add_album_meta(album_meta) # extend meta with album info
if self.quality in (6, 7, 27):
if self.quality in (2, 3, 4):
self.container = "FLAC"
logger.debug("Tagging file with %s container", self.container)
audio = FLAC(self.final_path)
elif self.quality == 5:
elif self.quality == 1:
self.container = "MP3"
logger.debug("Tagging file with %s container", self.container)
try:
audio = ID3(self.final_path)
except ID3NoHeaderError:
audio = ID3()
elif self.quality == 4: # tidal and deezer
elif self.quality == 0: # tidal and deezer
# TODO: add compatibility with MP4 container
raise NotImplementedError("Qualities < 320kbps not implemented")
else:
@ -579,7 +578,7 @@ class Tracklist(list, ABC):
:type quality: int
:rtype: Union[Picture, APIC]
"""
cover_type = {5: APIC, 6: Picture, 7: Picture, 27: Picture}
cover_type = {1: APIC, 2: Picture, 3: Picture, 4: Picture}
cover = cover_type.get(quality)
if cover is Picture:
@ -623,7 +622,7 @@ class Tracklist(list, ABC):
class Album(Tracklist):
"""Represents a downloadable Qobuz album.
"""Represents a downloadable album.
Usage:
@ -694,7 +693,7 @@ class Album(Tracklist):
"release_type": resp.get("release_type", "album"),
"cover_urls": resp.get("image"),
"streamable": resp.get("streamable"),
"quality": quality_id(
"quality": get_quality_id(
resp.get("maximum_bit_depth"), resp.get("maximum_sampling_rate")
),
"bit_depth": resp.get("maximum_bit_depth"),
@ -715,8 +714,8 @@ class Album(Tracklist):
},
"streamable": resp.get("allowStreaming"),
"quality": TIDAL_Q_MAP[resp.get("audioQuality")],
"bit_depth": 16,
"sampling_rate": 44100,
"bit_depth": 24 if resp.get("audioQuality") == 'HI_RES' else 16,
"sampling_rate": 44100, # always 44.1 kHz
"tracktotal": resp.get("numberOfTracks"),
}
elif client.source == "deezer":
@ -726,7 +725,7 @@ class Album(Tracklist):
"title": resp.get("title"),
"_artist": safe_get(resp, "artist", "name"),
"albumartist": safe_get(resp, "artist", "name"),
"year": str(resp.get("year"))[:4] or "Unknown",
"year": str(resp.get("year"))[:4],
# version not given by API
"cover_urls": {
sk: resp.get(rk) # size key, resp key
@ -736,7 +735,7 @@ class Album(Tracklist):
},
"url": resp.get("link"),
"streamable": True, # api only returns streamables
"quality": 6, # all tracks are 16/44.1 streamable
"quality": 2, # all tracks are 16/44.1 streamable
"bit_depth": 16,
"sampling_rate": 44100,
"tracktotal": resp.get("track_total") or resp.get("nb_tracks"),
@ -891,7 +890,7 @@ class Album(Tracklist):
class Playlist(Tracklist):
"""Represents a downloadable Qobuz playlist.
"""Represents a downloadable playlist.
Usage:
>>> resp = client.get('hip hop', 'playlist')
@ -938,7 +937,7 @@ class Playlist(Tracklist):
:param kwargs:
"""
self.meta = self.client.get(self.id, "playlist")
self.name = self.meta.get("name")
self.name = self.meta.get("title")
self._load_tracks(**kwargs)
def _load_tracks(self, new_tracknumbers: bool = True):
@ -957,7 +956,7 @@ class Playlist(Tracklist):
return {"track": track, "album": track["album"]}
elif self.client.source == "tidal":
tracklist = self.meta["items"]
tracklist = self.meta["tracks"]
def gen_cover(track):
cover_url = tidal_cover_url(track["album"]["cover"], 320)
@ -1018,6 +1017,7 @@ class Playlist(Tracklist):
"""
folder = sanitize_filename(self.name)
folder = os.path.join(parent_folder, folder)
logger.debug(f"Parent folder {folder}")
for track in self:
track.download(parent_folder=folder, quality=quality, database=database)
@ -1352,7 +1352,6 @@ class Label(Artist):
resp = self.client.get(self.id, "label")
self.name = resp["name"]
for album in resp["albums"]["items"]:
pprint(album)
self.append(Album.from_api(album, client=self.client))
def __repr__(self):

View file

@ -3,7 +3,7 @@ import logging
import logging.handlers as handlers
import os
from string import Formatter
from typing import Optional
from typing import Optional, Union
import requests
from Crypto.Cipher import AES
@ -12,7 +12,7 @@ from pathvalidate import sanitize_filename
from tqdm import tqdm
from .constants import LOG_DIR, TIDAL_COVER_URL
from .exceptions import NonStreamable
from .exceptions import NonStreamable, InvalidSourceError
logger = logging.getLogger(__name__)
@ -36,7 +36,45 @@ def safe_get(d: dict, *keys, default=None):
return res
def quality_id(bit_depth: Optional[int], sampling_rate: Optional[int]):
def get_quality(quality_id: int, source: str) -> Union[str, int]:
"""Given the quality id in (0, 1, 2, 3, 4), return the streaming quality
value to send to the api for a given source.
:param quality_id: the quality id
:type quality_id: int
:param source: qobuz, tidal, or deezer
:type source: str
:rtype: Union[str, int]
"""
if source == 'qobuz':
q_map = {
1: 5,
2: 6,
3: 7,
4: 27,
}
elif source == 'tidal':
q_map = {
0: "LOW", # AAC
1: "HIGH", # AAC
2: "LOSSLESS", # CD Quality
3: "HI_RES", # MQA
}
elif source == 'deezer':
q_map = {
0: 128,
1: 320,
2: 1411,
}
else:
raise InvalidSourceError(source)
possible_keys = set(q_map.keys())
assert quality_id in possible_keys, f"{quality_id} must be in {possible_keys}"
return q_map[quality_id]
def get_quality_id(bit_depth: Optional[int], sampling_rate: Optional[int]):
"""Return a quality id in (5, 6, 7, 27) from bit depth and
sampling rate. If None is provided, mp3/lossy is assumed.
@ -46,16 +84,16 @@ def quality_id(bit_depth: Optional[int], sampling_rate: Optional[int]):
:type sampling_rate: Optional[int]
"""
if not (bit_depth or sampling_rate): # is lossy
return 5
return 1
if bit_depth == 16:
return 6
return 2
if bit_depth == 24:
if sampling_rate <= 96:
return 7
return 3
return 27
return 4
def tqdm_download(url: str, filepath: str):