Initial commit for SoundCloud

This commit is contained in:
nathom 2021-04-03 13:07:45 -07:00
parent 7f413c8290
commit c480462edf
7 changed files with 124 additions and 23 deletions

2
.gitignore vendored
View file

@ -7,3 +7,5 @@ test.py
/urls.txt /urls.txt
*.flac *.flac
/Downloads /Downloads
*.mp3
StreamripDownloads

View file

@ -206,7 +206,7 @@ def config(ctx, **kwargs):
config.reset() config.reset()
if kwargs["open"]: if kwargs["open"]:
click.secho(f"Opening {CONFIG_PATH}", fg='green') click.secho(f"Opening {CONFIG_PATH}", fg="green")
click.launch(CONFIG_PATH) click.launch(CONFIG_PATH)
if kwargs["qobuz"]: if kwargs["qobuz"]:

View file

@ -4,7 +4,7 @@ import json
import logging import logging
import time import time
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from pprint import pformat # , pprint from pprint import pformat, pprint
from typing import Generator, Sequence, Tuple, Union from typing import Generator, Sequence, Tuple, Union
import click import click
@ -50,6 +50,10 @@ QOBUZ_BASE = "https://www.qobuz.com/api.json/0.2"
DEEZER_BASE = "https://api.deezer.com" DEEZER_BASE = "https://api.deezer.com"
DEEZER_DL = "http://dz.loaderapp.info/deezer" DEEZER_DL = "http://dz.loaderapp.info/deezer"
# SoundCloud
SOUNDCLOUD_BASE = "https://api-v2.soundcloud.com"
SOUNDCLOUD_CLIENT_ID = "a3e059563d7fd3372b49b37f00a00bcf"
# ----------- Abstract Classes ----------------- # ----------- Abstract Classes -----------------
@ -639,3 +643,78 @@ class TidalClient(ClientInterface):
def _api_post(self, url, data, auth=None): def _api_post(self, url, data, auth=None):
r = requests.post(url, data=data, auth=auth, verify=False).json() r = requests.post(url, data=data, auth=auth, verify=False).json()
return r return r
class SoundCloudClient(ClientInterface):
source = "soundcloud"
def __init__(self):
self.session = requests.Session()
self.session.headers.update(
{
"User-Agent": AGENT,
}
)
def login(self):
raise NotImplementedError
def get(self, id=None, url=None, media_type="track"):
assert media_type in ("track", "playlist", "album"), f"{media_type} not supported"
if media_type == 'album':
media_type = 'playlist'
if url is not None:
resp, status = self._get(f"resolve?url={url}")
elif id is not None:
resp, _ = self._get(f"tracks/{id}")
else:
raise Exception("Must provide id or url")
return resp
def get_file_url(self, track: dict, **kwargs) -> str:
if not track['streamable'] or track['policy'] == 'BLOCK':
raise Exception
if track['downloadable'] and track['has_downloads_left']:
resp, status = self._get("tracks/{id}/download")
return resp['redirectUri']
else:
url = None
for tc in track['media']['transcodings']:
fmt = tc['format']
if fmt['protocol'] == 'hls' and fmt['mime_type'] == 'audio/mpeg':
url = tc['url']
break
assert url is not None
resp, _ = self._get(url, no_base=True)
return resp['url']
pprint(resp)
if status in (401, 404):
raise Exception
return resp["redirectUri"]
def search(self, query: str, media_type='album'):
params = {'q': query}
resp, _ = self._get(f"search/{media_type}s", params=params)
return resp
def _get(self, path, params=None, no_base=False):
if params is None:
params = {}
params["client_id"] = SOUNDCLOUD_CLIENT_ID
if no_base:
url = path
else:
url = f"{SOUNDCLOUD_BASE}/{path}"
r = self.session.get(url, params=params)
print(r.text)
return r.json(), r.status_code

View file

@ -71,9 +71,9 @@ class MusicDL(list):
f"Enter {capitalize(source)} password (will not show on screen):", f"Enter {capitalize(source)} password (will not show on screen):",
fg="green", fg="green",
) )
self.config.file[source]["password"] = md5(getpass( self.config.file[source]["password"] = md5(
prompt="" getpass(prompt="").encode("utf-8")
).encode('utf-8')).hexdigest() ).hexdigest()
self.config.save() self.config.save()
click.secho(f'Credentials saved to config file at "{self.config._path}"') click.secho(f'Credentials saved to config file at "{self.config._path}"')

View file

@ -61,5 +61,5 @@ class MusicDB:
) )
conn.commit() conn.commit()
except sqlite3.Error as e: except sqlite3.Error as e:
if 'UNIQUE' not in str(e): if "UNIQUE" not in str(e):
raise raise

View file

@ -1,4 +1,5 @@
import logging import logging
import sys
import os import os
import re import re
import shutil import shutil
@ -251,7 +252,7 @@ class Track:
self.cover_path = os.path.join(self.folder, f"cover{hash(self.meta.album)}.jpg") self.cover_path = os.path.join(self.folder, f"cover{hash(self.meta.album)}.jpg")
logger.debug(f"Downloading cover from {self.cover_url}") logger.debug(f"Downloading cover from {self.cover_url}")
click.secho(f"\nDownloading cover art for {self!s}", fg='blue') click.secho(f"\nDownloading cover art for {self!s}", fg="blue")
if not os.path.exists(self.cover_path): if not os.path.exists(self.cover_path):
tqdm_download(self.cover_url, self.cover_path) tqdm_download(self.cover_url, self.cover_path)
@ -573,7 +574,7 @@ class Tracklist(list):
:type quality: int :type quality: int
:rtype: Union[Picture, APIC] :rtype: Union[Picture, APIC]
""" """
cover_type = {1: APIC, 2: Picture, 3: Picture, 4: Picture} cover_type = {0: APIC, 1: APIC, 2: Picture, 3: Picture, 4: Picture}
cover = cover_type.get(quality) cover = cover_type.get(quality)
if cover is Picture: if cover is Picture:
@ -731,7 +732,6 @@ class Album(Tracklist):
"tracktotal": resp.get("numberOfTracks"), "tracktotal": resp.get("numberOfTracks"),
} }
elif client.source == "deezer": elif client.source == "deezer":
logger.debug(pformat(resp))
return { return {
"id": resp.get("id"), "id": resp.get("id"),
"title": resp.get("title"), "title": resp.get("title"),
@ -752,6 +752,25 @@ class Album(Tracklist):
"sampling_rate": 44100, "sampling_rate": 44100,
"tracktotal": resp.get("track_total") or resp.get("nb_tracks"), "tracktotal": resp.get("track_total") or resp.get("nb_tracks"),
} }
elif client.source == 'soundcloud':
print(resp.keys())
return {
"id": resp['id'],
"title": resp['title'],
"_artist": resp['user']['username'],
"albumartist": resp['user']['username'],
"year": resp['created_at'][:4],
"cover_urls": {
"small": resp['artwork_url'],
"large": resp['artwork_url'].replace('large', 't500x500') if resp['artwork_url'] is not None else None
},
"url": resp['uri'],
"streamable": True, # assume to be true for convenience
"quality": 0, # always 128 kbps mp3
# no bit depth
# no sampling rate
"tracktotal": resp['track_count'],
}
raise InvalidSourceError(client.source) raise InvalidSourceError(client.source)
@ -794,7 +813,7 @@ class Album(Tracklist):
def download( def download(
self, self,
quality: int = 7, quality: int = 3,
parent_folder: Union[str, os.PathLike] = "StreamripDownloads", parent_folder: Union[str, os.PathLike] = "StreamripDownloads",
database: MusicDB = None, database: MusicDB = None,
**kwargs, **kwargs,
@ -829,7 +848,7 @@ class Album(Tracklist):
logger.debug("Cover already downloaded: %s. Skipping", cover_path) logger.debug("Cover already downloaded: %s. Skipping", cover_path)
else: else:
click.secho("Downloading cover art", fg="magenta") click.secho("Downloading cover art", fg="magenta")
if kwargs.get("large_cover", False): if kwargs.get("large_cover", True):
cover_url = self.cover_urls.get("large") cover_url = self.cover_urls.get("large")
if self.client.source == "qobuz": if self.client.source == "qobuz":
tqdm_download(cover_url.replace("600", "org"), cover_path) tqdm_download(cover_url.replace("600", "org"), cover_path)
@ -847,7 +866,7 @@ class Album(Tracklist):
else: else:
tqdm_download(self.cover_urls["small"], cover_path) tqdm_download(self.cover_urls["small"], cover_path)
embed_cover = kwargs.get('embed_cover', True) # embed by default embed_cover = kwargs.get("embed_cover", True) # embed by default
if self.client.source != "deezer" and embed_cover: if self.client.source != "deezer" and embed_cover:
cover = self.get_cover_obj(cover_path, quality) cover = self.get_cover_obj(cover_path, quality)
@ -881,6 +900,7 @@ class Album(Tracklist):
else: else:
fmt[key] = None fmt[key] = None
if fmt.get('sampling_rate', False):
fmt["sampling_rate"] /= 1000 fmt["sampling_rate"] /= 1000
# 48.0kHz -> 48kHz, 44.1kHz -> 44.1kHz # 48.0kHz -> 48kHz, 44.1kHz -> 44.1kHz
if fmt["sampling_rate"] % 1 == 0.0: if fmt["sampling_rate"] % 1 == 0.0:
@ -891,7 +911,7 @@ class Album(Tracklist):
def _get_formatted_folder(self, parent_folder: str) -> str: def _get_formatted_folder(self, parent_folder: str) -> str:
if self.bit_depth is not None and self.sampling_rate is not None: if self.bit_depth is not None and self.sampling_rate is not None:
self.container = "FLAC" self.container = "FLAC"
elif self.client.source in ("qobuz", "deezer"): elif self.client.source in ("qobuz", "deezer", "soundcloud"):
self.container = "MP3" self.container = "MP3"
elif self.client.source == "tidal": elif self.client.source == "tidal":
self.container = "AAC" self.container = "AAC"
@ -983,7 +1003,7 @@ class Playlist(Tracklist):
:type new_tracknumbers: bool :type new_tracknumbers: bool
""" """
if self.client.source == "qobuz": if self.client.source == "qobuz":
self.name = self.meta['name'] self.name = self.meta["name"]
tracklist = self.meta["tracks"]["items"] tracklist = self.meta["tracks"]["items"]
def gen_cover(track): # ? def gen_cover(track): # ?
@ -993,7 +1013,7 @@ class Playlist(Tracklist):
return {"track": track, "album": track["album"]} return {"track": track, "album": track["album"]}
elif self.client.source == "tidal": elif self.client.source == "tidal":
self.name = self.meta['title'] self.name = self.meta["title"]
tracklist = self.meta["tracks"] tracklist = self.meta["tracks"]
def gen_cover(track): def gen_cover(track):
@ -1007,7 +1027,7 @@ class Playlist(Tracklist):
} }
elif self.client.source == "deezer": elif self.client.source == "deezer":
self.name = self.meta['title'] self.name = self.meta["title"]
tracklist = self.meta["tracks"] tracklist = self.meta["tracks"]
def gen_cover(track): def gen_cover(track):
@ -1063,7 +1083,7 @@ class Playlist(Tracklist):
for track in self: for track in self:
track.download(parent_folder=folder, quality=quality, database=database) track.download(parent_folder=folder, quality=quality, database=database)
if self.client.source != "deezer": if self.client.source != "deezer":
track.tag(embed_cover=kwargs.get('embed_cover', True)) track.tag(embed_cover=kwargs.get("embed_cover", True))
@staticmethod @staticmethod
def _parse_get_resp(item: dict, client: ClientInterface): def _parse_get_resp(item: dict, client: ClientInterface):
@ -1079,7 +1099,7 @@ class Playlist(Tracklist):
if client.source == "qobuz": if client.source == "qobuz":
return { return {
"name": item["name"], "name": item["name"],
"id": item['id'], "id": item["id"],
} }
elif client.source == "tidal": elif client.source == "tidal":
return { return {

View file

@ -108,7 +108,7 @@ def tqdm_download(url: str, filepath: str):
r = requests.get(url, allow_redirects=True, stream=True) r = requests.get(url, allow_redirects=True, stream=True)
total = int(r.headers.get("content-length", 0)) total = int(r.headers.get("content-length", 0))
logger.debug(f"File size = {total}") logger.debug(f"File size = {total}")
if total < 1000: if total < 1000 and not url.endswith('jpg'):
raise NonStreamable raise NonStreamable
try: try: