From f8dc9d206f015a7eae5c0e669d0ce4d81cb867c9 Mon Sep 17 00:00:00 2001 From: nathom Date: Fri, 9 Apr 2021 17:37:14 -0700 Subject: [PATCH] Support for Tidal qualities <= 1 --- streamrip/downloader.py | 71 ++++++++++++++++++++++++++++------------- streamrip/metadata.py | 48 +++++++++++++++++----------- streamrip/utils.py | 10 ++++++ 3 files changed, 88 insertions(+), 41 deletions(-) diff --git a/streamrip/downloader.py b/streamrip/downloader.py index e69eda4..f7c2964 100644 --- a/streamrip/downloader.py +++ b/streamrip/downloader.py @@ -11,13 +11,13 @@ from typing import Any, Callable, Optional, Tuple, Union import click from mutagen.flac import FLAC, Picture from mutagen.id3 import APIC, ID3, ID3NoHeaderError +from mutagen.mp4 import MP4, MP4Cover from pathvalidate import sanitize_filename, sanitize_filepath from . import converter from .clients import ClientInterface from .constants import ( ALBUM_KEYS, - EXT, FLAC_MAX_BLOCKSIZE, FOLDER_FORMAT, TRACK_FORMAT, @@ -37,6 +37,7 @@ from .utils import ( safe_get, tidal_cover_url, tqdm_download, + ext, ) logger = logging.getLogger(__name__) @@ -315,7 +316,7 @@ class Track: filename = clean_format(self.file_format, formatter) self.final_path = ( os.path.join(self.folder, filename)[:250].strip() - + EXT[self.quality] # file extension dict + + ext(self.quality, self.client.source) ) logger.debug("Formatted path: %s", self.final_path) @@ -366,7 +367,7 @@ class Track: def tag( self, album_meta: dict = None, - cover: Union[Picture, APIC] = None, + cover: Union[Picture, APIC, MP4Cover] = None, embed_cover: bool = True, ): """Tag the track using the stored metadata. @@ -403,22 +404,28 @@ class Track: logger.debug("Tagging file with %s container", self.container) audio = FLAC(self.final_path) elif self.quality <= 1: - self.container = "MP3" + if self.client.source == 'tidal': + self.container = 'AAC' + audio = MP4(self.final_path) + else: + self.container = 'MP3' + try: + audio = ID3(self.final_path) + except ID3NoHeaderError: + audio = ID3() + logger.debug("Tagging file with %s container", self.container) - try: - audio = ID3(self.final_path) - except ID3NoHeaderError: - audio = ID3() else: raise InvalidQuality(f'Invalid quality: "{self.quality}"') # automatically generate key, value pairs based on container - for k, v in self.meta.tags(self.container): + tags = self.meta.tags(self.container) + for k, v in tags: audio[k] = v if embed_cover and cover is None: assert hasattr(self, "cover_path") - cover = Tracklist.get_cover_obj(self.cover_path, self.quality) + cover = Tracklist.get_cover_obj(self.cover_path, self.quality, self.client.source) if isinstance(audio, FLAC): if embed_cover: @@ -428,6 +435,9 @@ class Track: if embed_cover: audio.add(cover) audio.save(self.final_path, "v2_version=3") + elif isinstance(audio, MP4): + audio['covr'] = [cover] + audio.save() else: raise ValueError(f"Unknown container type: {audio}") @@ -606,7 +616,7 @@ class Tracklist(list): return cls(client=client, **info) @staticmethod - def get_cover_obj(cover_path: str, quality: int) -> Union[Picture, APIC]: + def get_cover_obj(cover_path: str, quality: int, source: str) -> Union[Picture, APIC]: """Given the path to an image and a quality id, return an initialized cover object that can be used for every track in the album. @@ -616,25 +626,38 @@ class Tracklist(list): :type quality: int :rtype: Union[Picture, APIC] """ - cover_type = {0: APIC, 1: APIC, 2: Picture, 3: Picture, 4: Picture} + def flac_mp3_cover_obj(cover): + cover_obj = cover() + cover_obj.type = 3 + cover_obj.mime = "image/jpeg" + with open(cover_path, "rb") as img: + cover_obj.data = img.read() + + return cover_obj + + if quality > 1: + cover = Picture + elif source == 'tidal': + cover = MP4Cover + else: + cover = APIC - cover = cover_type.get(quality) if cover is Picture: size_ = os.path.getsize(cover_path) if size_ > FLAC_MAX_BLOCKSIZE: raise TooLargeCoverArt( f"Not suitable for Picture embed: {size_ / 10 ** 6} MB" ) - elif cover is None: - raise InvalidQuality(f"Quality {quality} not allowed") + return flac_mp3_cover_obj(cover) - cover_obj = cover() - cover_obj.type = 3 - cover_obj.mime = "image/jpeg" - with open(cover_path, "rb") as img: - cover_obj.data = img.read() + elif cover is APIC: + return flac_mp3_cover_obj(cover) - return cover_obj + elif cover is MP4Cover: + with open(cover_path, 'rb') as img: + return cover(img.read(), imageformat=MP4Cover.FORMAT_JPEG) + + raise InvalidQuality(f"Quality {quality} not allowed") def download_message(self): click.secho( @@ -913,7 +936,7 @@ class Album(Tracklist): if ( self.cover_urls.get(download_cover_size, embed_cover_size) != embed_cover_size - or os.path.size(cover_path) > FLAC_MAX_BLOCKSIZE + or os.path.getsize(cover_path) > FLAC_MAX_BLOCKSIZE ): # download cover at another resolution but don't use for embed embed_cover_path = cover_path.replace(".jpg", "_embed.jpg") @@ -921,10 +944,12 @@ class Album(Tracklist): tqdm_download(self.cover_urls[download_cover_size], cover_path) else: embed_cover_path = cover_path + else: + embed_cover_path = cover_path embed_cover = kwargs.get("embed_cover", True) # embed by default if self.client.source != "deezer" and embed_cover: - cover = self.get_cover_obj(embed_cover_path, quality) + cover = self.get_cover_obj(embed_cover_path, quality, self.client.source) download_args = { "quality": quality, diff --git a/streamrip/metadata.py b/streamrip/metadata.py index c4d2fe8..8502785 100644 --- a/streamrip/metadata.py +++ b/streamrip/metadata.py @@ -93,9 +93,12 @@ class TrackMetadata: """ if self.__source == "qobuz": self.album = resp.get("title") - self.tracktotal = str(resp.get("tracks_count", 1)) + self.tracktotal = resp.get("tracks_count", 1) self.genre = resp.get("genres_list", []) self.date = resp.get("release_date_original") or resp.get("release_date") + if self.date: + self.year = self.date[:4] + self.copyright = resp.get("copyright") self.albumartist = safe_get(resp, "artist", "name") self.label = resp.get("label") @@ -117,8 +120,11 @@ class TrackMetadata: self.tracktotal = resp.get("numberOfTracks") # genre not returned by API self.date = resp.get("releaseDate") + if self.date: + self.year = self.date[:4] + self.copyright = resp.get("copyright") - self.albumartist = resp.get("artist", {}).get("name") + self.albumartist = safe_get(resp, 'artist', 'name') self.disctotal = resp.get("numberOfVolumes") self.isrc = resp.get("isrc") self.explicit = resp.get("explicit", False) @@ -127,9 +133,9 @@ class TrackMetadata: elif self.__source == "deezer": self.album = resp.get("title") self.tracktotal = resp.get("track_total") - self.genre = resp.get("genres", {}).get("data") + self.genre = safe_get(resp, 'genres', 'data') self.date = resp.get("release_date") - self.albumartist = resp.get("artist", {}).get("name") + self.albumartist = safe_get(resp, 'artist', 'name') self.label = resp.get("label") # either 0 or 1 self.explicit = bool(resp.get("parental_warning")) @@ -140,8 +146,8 @@ class TrackMetadata: raise ValueError(self.__source) def add_track_meta(self, track: dict): - """Parse the metadata from a track dict returned by the - Qobuz API. + """Parse the metadata from a track dict returned by an + API. :param track: """ @@ -150,25 +156,23 @@ class TrackMetadata: self._mod_title(track.get("version"), track.get("work")) self.composer = track.get("composer", {}).get("name") - self.tracknumber = f"{int(track.get('track_number', 1)):02}" - self.discnumber = str(track.get("media_number", 1)) - try: - self.artist = track["performer"]["name"] - except KeyError: - if hasattr(self, "albumartist"): - self.artist = self.albumartist + self.tracknumber = track.get('track_number', 1) + self.discnumber = track.get("media_number", 1) + self.artist = safe_get(track, 'performer', 'name') + if self.artist is None: + self.artist = self.get('albumartist') elif self.__source == "tidal": self.title = track.get("title").strip() self._mod_title(track.get("version"), None) - self.tracknumber = f"{int(track.get('trackNumber', 1)):02}" - self.discnumber = str(track.get("volumeNumber")) + self.tracknumber = track.get('trackNumber', 1) + self.discnumber = track.get("volumeNumber") self.artist = track.get("artist", {}).get("name") elif self.__source == "deezer": self.title = track.get("title").strip() self._mod_title(track.get("version"), None) - self.tracknumber = f"{int(track.get('track_position', 1)):02}" + self.tracknumber = track.get('track_position', 1) self.discnumber = track.get("disk_number") self.artist = track.get("artist", {}).get("name") @@ -364,14 +368,22 @@ class TrackMetadata: if text is not None and v is not None: yield (v.__name__, v(encoding=3, text=text)) - def __mp4_tags(self) -> Tuple[str, str]: + def __gen_mp4_tags(self) -> Tuple[str, Union[str, int, tuple]]: """Generate key, value pairs to tag ALAC or AAC files in an MP4 container. :rtype: Tuple[str, str] """ for k, v in MP4_KEY.items(): - return (v, getattr(self, k)) + if k == "tracknumber": + text = [(self.tracknumber, self.tracktotal)] + elif k == 'discnumber': + text = [(self.discnumber, self.get('disctotal', 1))] + else: + text = getattr(self, k) + + if v is not None and text is not None: + yield (v, text) def __setitem__(self, key, val): """Dict-like access for tags. diff --git a/streamrip/utils.py b/streamrip/utils.py index 1c84816..ddb89df 100644 --- a/streamrip/utils.py +++ b/streamrip/utils.py @@ -206,3 +206,13 @@ def decrypt_mqa_file(in_path, out_path, encryption_key): dec_bytes = decryptor.decrypt(enc_file.read()) with open(out_path, "wb") as dec_file: dec_file.write(dec_bytes) + + +def ext(quality: int, source: str): + if quality <= 1: + if source == 'tidal': + return '.m4a' + else: + return '.mp3' + else: + return '.flac'