This commit is contained in:
nathom 2021-04-05 14:40:14 -07:00
parent 74aca34e6a
commit fa72e82769
8 changed files with 181 additions and 88 deletions

1
.gitignore vendored
View file

@ -9,3 +9,4 @@ test.py
/Downloads
*.mp3
StreamripDownloads
*.wav

View file

@ -16,6 +16,7 @@ from .constants import (
AVAILABLE_QUALITY_IDS,
DEEZER_MAX_Q,
QOBUZ_FEATURED_KEYS,
SOUNDCLOUD_CLIENT_ID,
TIDAL_MAX_Q,
)
from .exceptions import (
@ -52,7 +53,6 @@ DEEZER_DL = "http://dz.loaderapp.info/deezer"
# SoundCloud
SOUNDCLOUD_BASE = "https://api-v2.soundcloud.com"
SOUNDCLOUD_CLIENT_ID = "a3e059563d7fd3372b49b37f00a00bcf"
# ----------- Abstract Classes -----------------
@ -105,12 +105,18 @@ class ClientInterface(ABC):
def source(self):
pass
@property
@abstractmethod
def max_quality(self):
pass
# ------------- Clients -----------------
class QobuzClient(ClientInterface):
source = "qobuz"
max_quality = 4
# ------- Public Methods -------------
def __init__(self):
@ -365,6 +371,7 @@ class QobuzClient(ClientInterface):
class DeezerClient(ClientInterface):
source = "deezer"
max_quality = 2
def __init__(self):
self.session = requests.Session()
@ -425,6 +432,7 @@ class DeezerClient(ClientInterface):
class TidalClient(ClientInterface):
source = "tidal"
max_quality = 3
def __init__(self):
self.logged_in = False
@ -647,6 +655,7 @@ class TidalClient(ClientInterface):
class SoundCloudClient(ClientInterface):
source = "soundcloud"
max_quality = 0
def __init__(self):
self.session = requests.Session()
@ -659,52 +668,45 @@ class SoundCloudClient(ClientInterface):
def login(self):
raise NotImplementedError
def get(self, id=None, url=None, media_type="track"):
def get(self, id, media_type="track"):
assert media_type in ("track", "playlist"), f"{media_type} not supported"
if url is not None:
resp, status = self._get(f"resolve?url={url}")
elif id is not None:
if media_type == "track":
resp, _ = self._get(f"{media_type}s/{id}")
elif "http" in id:
resp, _ = self._get(f"resolve?url={id}")
else:
raise Exception("Must provide id or url")
raise Exception(id)
return resp
def get_file_url(self, track: dict, **kwargs) -> str:
if not track['streamable'] or track['policy'] == 'BLOCK':
def get_file_url(self, track: dict, quality) -> dict:
if not track["streamable"] or track["policy"] == "BLOCK":
raise Exception
if track['downloadable'] and track['has_downloads_left']:
resp, status = self._get("tracks/{id}/download")
return resp['redirectUri']
if track["downloadable"] and track["has_downloads_left"]:
r = self._get(f"tracks/{track['id']}/download", resp_obj=True)
return {"url": r.json()["redirectUri"], "type": "original"}
else:
url = None
for tc in track['media']['transcodings']:
fmt = tc['format']
if fmt['protocol'] == 'hls' and fmt['mime_type'] == 'audio/mpeg':
url = tc['url']
for tc in track["media"]["transcodings"]:
fmt = tc["format"]
if fmt["protocol"] == "hls" and fmt["mime_type"] == "audio/mpeg":
url = tc["url"]
break
assert url is not None
resp, _ = self._get(url, no_base=True)
return resp['url']
return {"url": resp["url"], "type": "mp3"}
pprint(resp)
if status in (401, 404):
raise Exception
return resp["redirectUri"]
def search(self, query: str, media_type='album'):
params = {'q': query}
def search(self, query: str, media_type="album"):
params = {"q": query}
resp, _ = self._get(f"search/{media_type}s", params=params)
return resp
def _get(self, path, params=None, no_base=False):
def _get(self, path, params=None, no_base=False, resp_obj=False):
if params is None:
params = {}
params["client_id"] = SOUNDCLOUD_CLIENT_ID
@ -713,6 +715,9 @@ class SoundCloudClient(ClientInterface):
else:
url = f"{SOUNDCLOUD_BASE}/{path}"
logger.debug(f"Fetching url {url}")
r = self.session.get(url, params=params)
print(r.text)
if resp_obj:
return r
return r.json(), r.status_code

View file

@ -19,6 +19,7 @@ AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:83.0) Gecko/20100101 Firef
TIDAL_COVER_URL = "https://resources.tidal.com/images/{uuid}/{width}x{height}.jpg"
EXT = {
0: ".mp3",
1: ".mp3",
2: ".flac",
3: ".flac",
@ -137,6 +138,8 @@ URL_REGEX = (
r"https:\/\/(?:www|open|play|listen)?\.?(\w+)\.com(?:(?:\/(track|playlist|album|"
r"artist|label))|(?:\/[-\w]+?))+\/([-\w]+)"
)
SOUNDCLOUD_URL_REGEX = r"https://soundcloud.com/[-\w:]+"
SOUNDCLOUD_CLIENT_ID = "a3e059563d7fd3372b49b37f00a00bcf"
TIDAL_MAX_Q = 7

View file

@ -97,7 +97,7 @@ class Converter:
"-i",
self.filename,
"-loglevel",
"warning",
"panic",
"-c:a",
self.codec_lib,
]

View file

@ -9,9 +9,9 @@ from typing import Generator, Optional, Tuple, Union
import click
from .clients import DeezerClient, QobuzClient, TidalClient
from .clients import DeezerClient, QobuzClient, SoundCloudClient, TidalClient
from .config import Config
from .constants import CONFIG_PATH, DB_PATH, URL_REGEX
from .constants import CONFIG_PATH, DB_PATH, SOUNDCLOUD_URL_REGEX, URL_REGEX
from .db import MusicDB
from .downloader import Album, Artist, Label, Playlist, Track
from .exceptions import AuthenticationError, ParsingError
@ -27,7 +27,6 @@ MEDIA_CLASS = {
"track": Track,
"label": Label,
}
CLIENTS = {"qobuz": QobuzClient, "tidal": TidalClient, "deezer": DeezerClient}
Media = Union[Album, Playlist, Artist, Track]
@ -38,6 +37,7 @@ class MusicDL(list):
):
self.url_parse = re.compile(URL_REGEX)
self.soundcloud_url_parse = re.compile(SOUNDCLOUD_URL_REGEX)
self.config = config
if self.config is None:
self.config = Config(CONFIG_PATH)
@ -46,6 +46,7 @@ class MusicDL(list):
"qobuz": QobuzClient(),
"tidal": TidalClient(),
"deezer": DeezerClient(),
"soundcloud": SoundCloudClient(),
}
if config.session["database"]["enabled"]:
@ -81,11 +82,19 @@ class MusicDL(list):
raise Exception
def assert_creds(self, source: str):
assert source in ("qobuz", "tidal", "deezer"), f"Invalid source {source}"
assert source in (
"qobuz",
"tidal",
"deezer",
"soundcloud",
), f"Invalid source {source}"
if source == "deezer":
# no login for deezer
return
if source == "soundcloud":
return
if source == "qobuz" and (
self.config.file[source]["email"] is None
or self.config.file[source]["password"] is None
@ -201,6 +210,12 @@ class MusicDL(list):
:raises exceptions.ParsingError
"""
parsed = self.url_parse.findall(url)
soundcloud_urls = self.soundcloud_url_parse.findall(url)
if len(soundcloud_urls) > 0:
parsed.extend(
self.clients["soundcloud"].resolve(u) for u in soundcloud_urls
)
logger.debug(f"Parsed urls: {parsed}")
if parsed != []:

View file

@ -1,14 +1,15 @@
import logging
from pprint import pprint
import sys
import os
import re
import shutil
from pprint import pformat
import subprocess
import sys
from pprint import pformat, pprint
from tempfile import gettempdir
from typing import Any, Callable, Optional, Tuple, Union
import click
import requests
from mutagen.flac import FLAC, Picture
from mutagen.id3 import APIC, ID3, ID3NoHeaderError
from pathvalidate import sanitize_filename, sanitize_filepath
@ -20,6 +21,7 @@ from .constants import (
EXT,
FLAC_MAX_BLOCKSIZE,
FOLDER_FORMAT,
SOUNDCLOUD_CLIENT_ID,
TRACK_FORMAT,
)
from .db import MusicDB
@ -118,17 +120,20 @@ class Track:
assert hasattr(self, "id"), "id must be set before loading metadata"
track_meta = self.client.get(self.id, media_type="track")
self.resp = self.client.get(self.id, media_type="track")
pprint(self.resp)
self.meta = TrackMetadata(
track=track_meta, source=self.client.source
track=self.resp, source=self.client.source
) # meta dict -> TrackMetadata object
try:
if self.client.source == "qobuz":
self.cover_url = track_meta["album"]["image"]["small"]
self.cover_url = self.resp["album"]["image"]["small"]
elif self.client.source == "tidal":
self.cover_url = tidal_cover_url(track_meta["album"]["cover"], 320)
self.cover_url = tidal_cover_url(self.resp["album"]["cover"], 320)
elif self.client.source == "deezer":
self.cover_url = track_meta["album"]["cover_medium"]
self.cover_url = self.resp["album"]["cover_medium"]
elif self.client.source == "soundcloud":
self.cover_url = self.resp["artwork_url"].replace("large", "t500x500")
else:
raise InvalidSourceError(self.client.source)
except KeyError:
@ -146,7 +151,7 @@ class Track:
def download(
self,
quality: int = 7,
quality: int = 3,
parent_folder: str = "StreamripDownloads",
progress_bar: bool = True,
database: MusicDB = None,
@ -164,10 +169,8 @@ class Track:
:type progress_bar: bool
"""
# args override attributes
self.quality, self.folder = (
quality or self.quality,
parent_folder or self.folder,
)
self.quality = min((quality or self.quality), self.client.max_quality)
self.folder = parent_folder or self.folder
self.file_format = kwargs.get("track_format", TRACK_FORMAT)
self.folder = sanitize_filepath(self.folder, platform="auto")
@ -193,7 +196,12 @@ class Track:
if hasattr(self, "cover_url"): # only for playlists and singles
self.download_cover()
dl_info = self.client.get_file_url(self.id, quality)
if self.client.source == "soundcloud":
url_id = self.resp
else:
url_id = self.id
dl_info = self.client.get_file_url(url_id, self.quality)
temp_file = os.path.join(gettempdir(), f"~{self.id}_{quality}.tmp")
logger.debug("Temporary file path: %s", temp_file)
@ -214,7 +222,8 @@ class Track:
if self.client.source in ("qobuz", "tidal"):
logger.debug("Downloadable URL found: %s", dl_info.get("url"))
tqdm_download(dl_info["url"], temp_file) # downloads file
elif isinstance(dl_info, str): # Deezer
elif self.client.source == "deezer": # Deezer
logger.debug("Downloadable URL found: %s", dl_info)
try:
tqdm_download(dl_info, temp_file) # downloads file
@ -222,6 +231,34 @@ class Track:
logger.debug(f"Track is not downloadable {dl_info}")
click.secho("Track is not available for download", fg="red")
return False
elif self.client.source == "soundcloud":
if dl_info["type"] == "mp3":
temp_file += ".mp3"
# convert hls stream to mp3
subprocess.call(
[
"ffmpeg",
"-i",
dl_info,
"-c",
"copy",
"-y",
temp_file,
"-loglevel",
"fatal",
]
)
elif dl_info["type"] == "original":
tqdm_download(dl_info["url"], temp_file)
# if a wav is returned, convert to flac
engine = converter.FLAC(temp_file)
temp_file = f"{temp_file}.flac"
engine.convert(custom_fn=temp_file)
self.final_path = self.final_path.replace(".mp3", ".flac")
self.quality = 2
else:
raise InvalidSourceError(self.client.source)
@ -260,9 +297,6 @@ class Track:
else:
logger.debug("Cover already exists, skipping download")
self.cover = Tracklist.get_cover_obj(self.cover_path, self.quality)
logger.debug(f"Cover obj: {self.cover}")
def format_final_path(self) -> str:
"""Return the final filepath of the downloaded file.
@ -361,16 +395,13 @@ class Track:
self.container = "FLAC"
logger.debug("Tagging file with %s container", self.container)
audio = FLAC(self.final_path)
elif self.quality == 1:
elif self.quality <= 1:
self.container = "MP3"
logger.debug("Tagging file with %s container", self.container)
try:
audio = ID3(self.final_path)
except ID3NoHeaderError:
audio = ID3()
elif self.quality == 0: # tidal and deezer
# TODO: add compatibility with MP4 container
raise NotImplementedError("Qualities < 320kbps not implemented")
else:
raise InvalidQuality(f'Invalid quality: "{self.quality}"')
@ -379,9 +410,9 @@ class Track:
audio[k] = v
if embed_cover and cover is None:
assert hasattr(self, "cover")
cover = self.cover
assert hasattr(self, "cover_path")
cover = Tracklist.get_cover_obj(self.cover_path, self.quality)
if isinstance(audio, FLAC):
if embed_cover:
audio.add_picture(cover)
@ -882,7 +913,7 @@ class Album(Tracklist):
else:
fmt[key] = None
if fmt.get('sampling_rate', False):
if fmt.get("sampling_rate", False):
fmt["sampling_rate"] /= 1000
# 48.0kHz -> 48kHz, 44.1kHz -> 44.1kHz
if fmt["sampling_rate"] % 1 == 0.0:
@ -1015,39 +1046,44 @@ class Playlist(Tracklist):
def gen_cover(track):
return track["album"]["cover_medium"]
def meta_args(track):
return {"track": track, "source": self.client.source}
elif self.client.source == "soundcloud":
pprint(self.meta)
self.name = self.meta["title"]
tracklist = self.meta["tracks"]
elif self.client.source == 'soundcloud':
self.name = self.meta['title']
tracklist = self.meta['tracks']
def gen_cover(track):
return track["artwork_url"].replace("large", "t500x500")
else:
raise NotImplementedError
for i, track in enumerate(tracklist):
# TODO: This should be managed with .m3u files and alike. Arbitrary
# tracknumber tags might cause conflicts if the playlist files are
# inside of a library folder
meta = TrackMetadata(**meta_args(track))
if new_tracknumbers:
meta["tracknumber"] = str(i + 1)
if self.client.source == "soundcloud":
# No meta is included in soundcloud playlist
# response, so it is loaded at download time
for track in tracklist:
self.append(Track(self.client, id=track["id"]))
else:
for track in tracklist:
# TODO: This should be managed with .m3u files and alike. Arbitrary
# tracknumber tags might cause conflicts if the playlist files are
# inside of a library folder
meta = TrackMetadata(track=track, source=self.client.source)
self.append(
Track(
self.client,
id=track.get("id"),
meta=meta,
cover_url=gen_cover(track),
self.append(
Track(
self.client,
id=track.get("id"),
meta=meta,
cover_url=gen_cover(track),
)
)
)
logger.debug(f"Loaded {len(self)} tracks from playlist {self.name}")
def download(
self,
parent_folder: str = "Downloads",
quality: int = 6,
parent_folder: str = "StreamripDownloads",
quality: int = 3,
filters: Callable = None,
database: MusicDB = None,
**kwargs,
@ -1066,9 +1102,18 @@ class Playlist(Tracklist):
logger.debug(f"Parent folder {folder}")
self.download_message()
for track in self:
track.download(parent_folder=folder, quality=quality, database=database)
if self.client.source != "deezer":
for i, track in enumerate(self):
if self.client.source == "soundcloud":
track.load_meta()
if kwargs.get("new_tracknumbers", True):
track.meta["tracknumber"] = str(i + 1)
if (
track.download(parent_folder=folder, quality=quality, database=database)
and self.client.source != "deezer"
):
track.tag(embed_cover=kwargs.get("embed_cover", True))
@staticmethod

View file

@ -2,6 +2,7 @@ import json
import logging
import re
import sys
from pprint import pprint
from typing import Generator, Optional, Tuple, Union
from .constants import (
@ -113,9 +114,10 @@ class TrackMetadata:
self.date = resp.get("release_date")
self.albumartist = resp.get("artist", {}).get("name")
self.label = resp.get("label")
elif self.__source == "soundcloud":
raise Exception
else:
raise ValueError
raise ValueError(self.__source)
def add_track_meta(self, track: dict):
"""Parse the metadata from a track dict returned by the
@ -150,8 +152,27 @@ class TrackMetadata:
self.discnumber = track.get("disk_number")
self.artist = track.get("artist", {}).get("name")
elif self.__source == "soundcloud":
self.title = track["title"].strip()
print(f"{self.title=}")
self.genre = track["genre"]
print(f"{self.genre=}")
self.artist = track["user"]["username"]
self.albumartist = self.artist
print(f"{self.artist=}")
self.year = track["created_at"][:4]
print(f"{self.year=}")
self.label = track["label_name"]
print(f"{self.label=}")
self.comment = track["description"]
print(f"{self.comment=}")
self.tracknumber = 0
print(f"{self.tracknumber=}")
self.tracktotal = 0
print(f"{self.tracktotal=}")
else:
raise ValueError
raise ValueError(self.__source)
if track.get("album"):
self.add_album_meta(track["album"])

View file

@ -96,7 +96,7 @@ def get_quality_id(bit_depth: Optional[int], sampling_rate: Optional[int]):
return 4
def tqdm_download(url: str, filepath: str):
def tqdm_download(url: str, filepath: str, params: dict = None):
"""Downloads a file with a progress bar.
:param url: url to direct download
@ -104,11 +104,14 @@ def tqdm_download(url: str, filepath: str):
:type url: str
:type filepath: str
"""
logger.debug(f"Downloading {url} to {filepath}")
r = requests.get(url, allow_redirects=True, stream=True)
logger.debug(f"Downloading {url} to {filepath} with params {params}")
if params is None:
params = {}
r = requests.get(url, allow_redirects=True, stream=True, params=params)
total = int(r.headers.get("content-length", 0))
logger.debug(f"File size = {total}")
if total < 1000 and not url.endswith('jpg'):
if total < 1000 and not url.endswith("jpg"):
raise NonStreamable
try: