MQA album working

This commit is contained in:
nathom 2021-03-28 16:34:10 -07:00
parent afb76e530c
commit 086262e8b7
5 changed files with 107 additions and 28 deletions

1
.gitignore vendored
View file

@ -6,3 +6,4 @@ build
test.py
/urls.txt
*.flac
/Downloads

View file

@ -1,12 +1,13 @@
import base64
import json
import datetime
import hashlib
import json
import logging
import os
import sys
import time
from abc import ABC, abstractmethod
from pprint import pformat
from pprint import pformat, pprint
from typing import Generator, Sequence, Tuple, Union
import click
@ -511,7 +512,9 @@ class TidalClient(ClientInterface):
return resp.json()
class TidalMQAClient:
class TidalMQAClient(ClientInterface):
source = "tidal"
def __init__(self):
self.device_code = None
self.user_code = None
@ -524,9 +527,32 @@ class TidalMQAClient:
self.refresh_token = None
self.expiry = None
def login(self):
self.get_device_code()
print(f"{self.user_code=}")
def login(
self,
user_id=None,
country_code=None,
access_token=None,
token_expiry=None,
refresh_token=None,
):
if access_token is not None:
self.token_expiry = token_expiry
self.refresh_token = refresh_token
self.login_by_access_token(access_token, user_id)
else:
self.login_new_user()
def get(self, item_id, media_type):
return self._api_get(item_id, media_type)
def search(self, query, media_type="album"):
raise NotImplementedError
def login_new_user(self):
login_link = self.get_device_code()
click.secho(
f"Go to {login_link} to log into Tidal within 5 minutes.", fg="blue"
)
start = time.time()
elapsed = 0
while elapsed < 600: # change later
@ -558,9 +584,9 @@ class TidalMQAClient:
logger.debug(pformat(resp))
self.device_code = resp["deviceCode"]
self.user_code = resp["userCode"]
self.verification_url = resp["verificationUri"]
self.user_code_expiry = resp["expiresIn"]
self.auth_interval = resp["interval"]
return resp["verificationUriComplete"]
def check_auth_status(self):
data = {
@ -587,7 +613,7 @@ class TidalMQAClient:
self.country_code = resp["user"]["countryCode"]
self.access_token = resp["access_token"]
self.refresh_token = resp["refresh_token"]
self.access_token_expiry = resp["expires_in"]
self.token_expiry = resp["expires_in"] + time.time()
return 0
def verify_access_token(self, token):
@ -619,24 +645,48 @@ class TidalMQAClient:
self.user_id = resp["user"]["userId"]
self.country_code = resp["user"]["countryCode"]
self.access_token = resp["access_token"]
self.access_token_expiry = resp["expires_in"]
self.token_expiry = resp["expires_in"] + time.time()
def login_by_access_token(self, token, user_id=None):
headers = {"authorization": f"Bearer {token}"}
resp = requests.get("https://api.tidal.com/v1/sessions", headers=headers).json()
if resp.get("status") != 200:
raise Exception("Login failed")
if resp.get("status", 200) != 200:
raise Exception(f"Login failed {resp=}")
if str(resp.get("userId")) != str(user_id):
raise Exception(f"User id mismatch {locals()}")
raise Exception(f"User id mismatch {resp['userId']} v {user_id}")
self.user_id = resp["userId"]
self.country_code = resp["countryCode"]
self.access_token = token
def _api_request(self, path, params):
def get_tokens(self):
return {
k: getattr(self, k)
for k in (
"user_id",
"country_code",
"access_token",
"refresh_token",
"token_expiry",
)
}
def _api_get(self, item_id: str, media_type: str) -> dict:
item = self._api_request(f"{media_type}s/{item_id}")
if media_type in ("playlist", "album"):
resp = self._api_request(f"{media_type}s/{item_id}/items")
item["tracks"] = [item["item"] for item in resp["items"]]
return item
def _api_request(self, path, params=None) -> dict:
if params is None:
params = {}
headers = {"authorization": f"Bearer {self.access_token}"}
params["countryCode"] = self.country_code
params["limit"] = 100
r = requests.get(f"{TIDAL_BASE}/{path}", headers=headers, params=params).json()
return r
@ -652,7 +702,8 @@ class TidalMQAClient:
}
resp = self._api_request(f"tracks/{track_id}/playbackinfopostpaywall", params)
manifest = json.loads(base64.b64decode(resp["manifest"]).decode("utf-8"))
codec = manifest["codecs"]
file_url = manifest["urls"][0]
enc_key = manifest.get("keyId", "")
return manifest
return {
"url": manifest["urls"][0],
"enc_key": manifest.get("keyId", ""),
"codec": manifest["codecs"],
}

View file

@ -42,7 +42,7 @@ class Config:
"country_code": None,
"access_token": None,
"refresh_token": None,
"expires_after": 0,
"token_expiry": 0,
},
"database": {"enabled": True, "path": None},
"conversion": {
@ -124,10 +124,7 @@ class Config:
@property
def tidal_creds(self):
return {
"email": self.file["tidal"]["email"],
"pwd": self.file["tidal"]["password"],
}
return self.file["tidal"]
@property
def qobuz_creds(self):

View file

@ -2,6 +2,7 @@ import logging
import os
import re
import shutil
import sys
from abc import ABC, abstractmethod
from pprint import pformat, pprint
from tempfile import gettempdir
@ -36,6 +37,7 @@ from .utils import (
safe_get,
tidal_cover_url,
tqdm_download,
decrypt_mqa_file,
)
logger = logging.getLogger(__name__)
@ -137,9 +139,9 @@ class Track:
@staticmethod
def _get_tracklist(resp, source):
if source in ("qobuz", "tidal"):
if source == "qobuz":
return resp["tracks"]["items"]
elif source == "deezer":
elif source in ("tidal", "deezer"):
return resp["tracks"]
raise NotImplementedError(source)
@ -226,7 +228,10 @@ class Track:
else:
raise InvalidSourceError(self.client.source)
shutil.move(temp_file, self.final_path)
if dl_info.get("enc_key"):
decrypt_mqa_file(temp_file, self.final_path, dl_info['enc_key'])
else:
shutil.move(temp_file, self.final_path)
if isinstance(database, MusicDB):
database.add(self.id)
@ -288,7 +293,10 @@ class Track:
:raises IndexError
"""
track = cls._get_tracklist(album, client.source)[pos]
logger.debug(pos)
tracklist = cls._get_tracklist(album, client.source)
logger.debug(len(tracklist))
track = tracklist[pos]
meta = TrackMetadata(album=album, track=track, source=client.source)
return cls(client=client, meta=meta, id=track["id"])
@ -743,7 +751,7 @@ class Album(Tracklist):
This uses a classmethod to convert an item into a Track object, which
stores the metadata inside a TrackMetadata object.
"""
logging.debug("Loading tracks to album")
logging.debug(f"Loading {self.tracktotal} tracks to album")
for i in range(self.tracktotal):
# append method inherited from superclass list
self.append(

View file

@ -1,4 +1,5 @@
import logging
import base64
import logging.handlers as handlers
import os
from string import Formatter
@ -159,7 +160,28 @@ def capitalize(s: str) -> str:
return s[0].upper() + s[1:]
def decrypt_mqa_file(in_path, out_path, key, nonce):
def decrypt_mqa_file(in_path, out_path, encryption_key):
# Do not change this
master_key = "UIlTTEMmmLfGowo/UC60x2H45W6MdGgTRfo/umg4754="
# Decode the base64 strings to ascii strings
master_key = base64.b64decode(master_key)
security_token = base64.b64decode(encryption_key)
# Get the IV from the first 16 bytes of the securityToken
iv = security_token[:16]
encrypted_st = security_token[16:]
# Initialize decryptor
decryptor = AES.new(master_key, AES.MODE_CBC, iv)
# Decrypt the security token
decrypted_st = decryptor.decrypt(encrypted_st)
# Get the audio stream decryption key and nonce from the decrypted security token
key = decrypted_st[:16]
nonce = decrypted_st[16:24]
counter = Counter.new(64, prefix=nonce, initial_value=0)
decryptor = AES.new(key, AES.MODE_CTR, counter=counter)