Merge branch 'tidal_aac'

This commit is contained in:
nathom 2021-04-09 17:37:36 -07:00
commit 9301757c10
3 changed files with 86 additions and 41 deletions

View file

@ -11,13 +11,13 @@ from typing import Any, Callable, Optional, Tuple, Union
import click import click
from mutagen.flac import FLAC, Picture from mutagen.flac import FLAC, Picture
from mutagen.id3 import APIC, ID3, ID3NoHeaderError from mutagen.id3 import APIC, ID3, ID3NoHeaderError
from mutagen.mp4 import MP4, MP4Cover
from pathvalidate import sanitize_filename, sanitize_filepath from pathvalidate import sanitize_filename, sanitize_filepath
from . import converter from . import converter
from .clients import ClientInterface from .clients import ClientInterface
from .constants import ( from .constants import (
ALBUM_KEYS, ALBUM_KEYS,
EXT,
FLAC_MAX_BLOCKSIZE, FLAC_MAX_BLOCKSIZE,
FOLDER_FORMAT, FOLDER_FORMAT,
TRACK_FORMAT, TRACK_FORMAT,
@ -37,6 +37,7 @@ from .utils import (
safe_get, safe_get,
tidal_cover_url, tidal_cover_url,
tqdm_download, tqdm_download,
ext,
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -315,7 +316,7 @@ class Track:
filename = clean_format(self.file_format, formatter) filename = clean_format(self.file_format, formatter)
self.final_path = ( self.final_path = (
os.path.join(self.folder, filename)[:250].strip() os.path.join(self.folder, filename)[:250].strip()
+ EXT[self.quality] # file extension dict + ext(self.quality, self.client.source)
) )
logger.debug("Formatted path: %s", self.final_path) logger.debug("Formatted path: %s", self.final_path)
@ -366,7 +367,7 @@ class Track:
def tag( def tag(
self, self,
album_meta: dict = None, album_meta: dict = None,
cover: Union[Picture, APIC] = None, cover: Union[Picture, APIC, MP4Cover] = None,
embed_cover: bool = True, embed_cover: bool = True,
): ):
"""Tag the track using the stored metadata. """Tag the track using the stored metadata.
@ -403,22 +404,28 @@ class Track:
logger.debug("Tagging file with %s container", self.container) logger.debug("Tagging file with %s container", self.container)
audio = FLAC(self.final_path) audio = FLAC(self.final_path)
elif self.quality <= 1: elif self.quality <= 1:
self.container = "MP3" if self.client.source == 'tidal':
logger.debug("Tagging file with %s container", self.container) self.container = 'AAC'
audio = MP4(self.final_path)
else:
self.container = 'MP3'
try: try:
audio = ID3(self.final_path) audio = ID3(self.final_path)
except ID3NoHeaderError: except ID3NoHeaderError:
audio = ID3() audio = ID3()
logger.debug("Tagging file with %s container", self.container)
else: else:
raise InvalidQuality(f'Invalid quality: "{self.quality}"') raise InvalidQuality(f'Invalid quality: "{self.quality}"')
# automatically generate key, value pairs based on container # automatically generate key, value pairs based on container
for k, v in self.meta.tags(self.container): tags = self.meta.tags(self.container)
for k, v in tags:
audio[k] = v audio[k] = v
if embed_cover and cover is None: if embed_cover and cover is None:
assert hasattr(self, "cover_path") assert hasattr(self, "cover_path")
cover = Tracklist.get_cover_obj(self.cover_path, self.quality) cover = Tracklist.get_cover_obj(self.cover_path, self.quality, self.client.source)
if isinstance(audio, FLAC): if isinstance(audio, FLAC):
if embed_cover: if embed_cover:
@ -428,6 +435,9 @@ class Track:
if embed_cover: if embed_cover:
audio.add(cover) audio.add(cover)
audio.save(self.final_path, "v2_version=3") audio.save(self.final_path, "v2_version=3")
elif isinstance(audio, MP4):
audio['covr'] = [cover]
audio.save()
else: else:
raise ValueError(f"Unknown container type: {audio}") raise ValueError(f"Unknown container type: {audio}")
@ -606,7 +616,7 @@ class Tracklist(list):
return cls(client=client, **info) return cls(client=client, **info)
@staticmethod @staticmethod
def get_cover_obj(cover_path: str, quality: int) -> Union[Picture, APIC]: def get_cover_obj(cover_path: str, quality: int, source: str) -> Union[Picture, APIC]:
"""Given the path to an image and a quality id, return an initialized """Given the path to an image and a quality id, return an initialized
cover object that can be used for every track in the album. cover object that can be used for every track in the album.
@ -616,18 +626,7 @@ class Tracklist(list):
:type quality: int :type quality: int
:rtype: Union[Picture, APIC] :rtype: Union[Picture, APIC]
""" """
cover_type = {0: APIC, 1: APIC, 2: Picture, 3: Picture, 4: Picture} def flac_mp3_cover_obj(cover):
cover = cover_type.get(quality)
if cover is Picture:
size_ = os.path.getsize(cover_path)
if size_ > FLAC_MAX_BLOCKSIZE:
raise TooLargeCoverArt(
f"Not suitable for Picture embed: {size_ / 10 ** 6} MB"
)
elif cover is None:
raise InvalidQuality(f"Quality {quality} not allowed")
cover_obj = cover() cover_obj = cover()
cover_obj.type = 3 cover_obj.type = 3
cover_obj.mime = "image/jpeg" cover_obj.mime = "image/jpeg"
@ -636,6 +635,30 @@ class Tracklist(list):
return cover_obj return cover_obj
if quality > 1:
cover = Picture
elif source == 'tidal':
cover = MP4Cover
else:
cover = APIC
if cover is Picture:
size_ = os.path.getsize(cover_path)
if size_ > FLAC_MAX_BLOCKSIZE:
raise TooLargeCoverArt(
f"Not suitable for Picture embed: {size_ / 10 ** 6} MB"
)
return flac_mp3_cover_obj(cover)
elif cover is APIC:
return flac_mp3_cover_obj(cover)
elif cover is MP4Cover:
with open(cover_path, 'rb') as img:
return cover(img.read(), imageformat=MP4Cover.FORMAT_JPEG)
raise InvalidQuality(f"Quality {quality} not allowed")
def download_message(self): def download_message(self):
click.secho( click.secho(
f"\nDownloading {self.title} ({self.__class__.__name__})\n", f"\nDownloading {self.title} ({self.__class__.__name__})\n",
@ -913,7 +936,7 @@ class Album(Tracklist):
if ( if (
self.cover_urls.get(download_cover_size, embed_cover_size) self.cover_urls.get(download_cover_size, embed_cover_size)
!= embed_cover_size != embed_cover_size
or os.path.size(cover_path) > FLAC_MAX_BLOCKSIZE or os.path.getsize(cover_path) > FLAC_MAX_BLOCKSIZE
): ):
# download cover at another resolution but don't use for embed # download cover at another resolution but don't use for embed
embed_cover_path = cover_path.replace(".jpg", "_embed.jpg") embed_cover_path = cover_path.replace(".jpg", "_embed.jpg")
@ -926,7 +949,7 @@ class Album(Tracklist):
embed_cover = kwargs.get("embed_cover", True) # embed by default embed_cover = kwargs.get("embed_cover", True) # embed by default
if self.client.source != "deezer" and embed_cover: if self.client.source != "deezer" and embed_cover:
cover = self.get_cover_obj(embed_cover_path, quality) cover = self.get_cover_obj(embed_cover_path, quality, self.client.source)
download_args = { download_args = {
"quality": quality, "quality": quality,

View file

@ -93,9 +93,12 @@ class TrackMetadata:
""" """
if self.__source == "qobuz": if self.__source == "qobuz":
self.album = resp.get("title") self.album = resp.get("title")
self.tracktotal = str(resp.get("tracks_count", 1)) self.tracktotal = resp.get("tracks_count", 1)
self.genre = resp.get("genres_list", []) self.genre = resp.get("genres_list", [])
self.date = resp.get("release_date_original") or resp.get("release_date") self.date = resp.get("release_date_original") or resp.get("release_date")
if self.date:
self.year = self.date[:4]
self.copyright = resp.get("copyright") self.copyright = resp.get("copyright")
self.albumartist = safe_get(resp, "artist", "name") self.albumartist = safe_get(resp, "artist", "name")
self.label = resp.get("label") self.label = resp.get("label")
@ -117,8 +120,11 @@ class TrackMetadata:
self.tracktotal = resp.get("numberOfTracks") self.tracktotal = resp.get("numberOfTracks")
# genre not returned by API # genre not returned by API
self.date = resp.get("releaseDate") self.date = resp.get("releaseDate")
if self.date:
self.year = self.date[:4]
self.copyright = resp.get("copyright") self.copyright = resp.get("copyright")
self.albumartist = resp.get("artist", {}).get("name") self.albumartist = safe_get(resp, 'artist', 'name')
self.disctotal = resp.get("numberOfVolumes") self.disctotal = resp.get("numberOfVolumes")
self.isrc = resp.get("isrc") self.isrc = resp.get("isrc")
self.explicit = resp.get("explicit", False) self.explicit = resp.get("explicit", False)
@ -127,9 +133,9 @@ class TrackMetadata:
elif self.__source == "deezer": elif self.__source == "deezer":
self.album = resp.get("title") self.album = resp.get("title")
self.tracktotal = resp.get("track_total") self.tracktotal = resp.get("track_total")
self.genre = resp.get("genres", {}).get("data") self.genre = safe_get(resp, 'genres', 'data')
self.date = resp.get("release_date") self.date = resp.get("release_date")
self.albumartist = resp.get("artist", {}).get("name") self.albumartist = safe_get(resp, 'artist', 'name')
self.label = resp.get("label") self.label = resp.get("label")
# either 0 or 1 # either 0 or 1
self.explicit = bool(resp.get("parental_warning")) self.explicit = bool(resp.get("parental_warning"))
@ -140,8 +146,8 @@ class TrackMetadata:
raise ValueError(self.__source) raise ValueError(self.__source)
def add_track_meta(self, track: dict): def add_track_meta(self, track: dict):
"""Parse the metadata from a track dict returned by the """Parse the metadata from a track dict returned by an
Qobuz API. API.
:param track: :param track:
""" """
@ -150,25 +156,23 @@ class TrackMetadata:
self._mod_title(track.get("version"), track.get("work")) self._mod_title(track.get("version"), track.get("work"))
self.composer = track.get("composer", {}).get("name") self.composer = track.get("composer", {}).get("name")
self.tracknumber = f"{int(track.get('track_number', 1)):02}" self.tracknumber = track.get('track_number', 1)
self.discnumber = str(track.get("media_number", 1)) self.discnumber = track.get("media_number", 1)
try: self.artist = safe_get(track, 'performer', 'name')
self.artist = track["performer"]["name"] if self.artist is None:
except KeyError: self.artist = self.get('albumartist')
if hasattr(self, "albumartist"):
self.artist = self.albumartist
elif self.__source == "tidal": elif self.__source == "tidal":
self.title = track.get("title").strip() self.title = track.get("title").strip()
self._mod_title(track.get("version"), None) self._mod_title(track.get("version"), None)
self.tracknumber = f"{int(track.get('trackNumber', 1)):02}" self.tracknumber = track.get('trackNumber', 1)
self.discnumber = str(track.get("volumeNumber")) self.discnumber = track.get("volumeNumber")
self.artist = track.get("artist", {}).get("name") self.artist = track.get("artist", {}).get("name")
elif self.__source == "deezer": elif self.__source == "deezer":
self.title = track.get("title").strip() self.title = track.get("title").strip()
self._mod_title(track.get("version"), None) self._mod_title(track.get("version"), None)
self.tracknumber = f"{int(track.get('track_position', 1)):02}" self.tracknumber = track.get('track_position', 1)
self.discnumber = track.get("disk_number") self.discnumber = track.get("disk_number")
self.artist = track.get("artist", {}).get("name") self.artist = track.get("artist", {}).get("name")
@ -364,14 +368,22 @@ class TrackMetadata:
if text is not None and v is not None: if text is not None and v is not None:
yield (v.__name__, v(encoding=3, text=text)) yield (v.__name__, v(encoding=3, text=text))
def __mp4_tags(self) -> Tuple[str, str]: def __gen_mp4_tags(self) -> Tuple[str, Union[str, int, tuple]]:
"""Generate key, value pairs to tag ALAC or AAC files in """Generate key, value pairs to tag ALAC or AAC files in
an MP4 container. an MP4 container.
:rtype: Tuple[str, str] :rtype: Tuple[str, str]
""" """
for k, v in MP4_KEY.items(): for k, v in MP4_KEY.items():
return (v, getattr(self, k)) if k == "tracknumber":
text = [(self.tracknumber, self.tracktotal)]
elif k == 'discnumber':
text = [(self.discnumber, self.get('disctotal', 1))]
else:
text = getattr(self, k)
if v is not None and text is not None:
yield (v, text)
def __setitem__(self, key, val): def __setitem__(self, key, val):
"""Dict-like access for tags. """Dict-like access for tags.

View file

@ -206,3 +206,13 @@ def decrypt_mqa_file(in_path, out_path, encryption_key):
dec_bytes = decryptor.decrypt(enc_file.read()) dec_bytes = decryptor.decrypt(enc_file.read())
with open(out_path, "wb") as dec_file: with open(out_path, "wb") as dec_file:
dec_file.write(dec_bytes) dec_file.write(dec_bytes)
def ext(quality: int, source: str):
if quality <= 1:
if source == 'tidal':
return '.m4a'
else:
return '.mp3'
else:
return '.flac'