streamrip/streamrip/core.py

446 lines
15 KiB
Python
Raw Normal View History

2021-03-22 12:21:27 -04:00
import logging
2021-04-09 17:23:55 -04:00
import time
2021-03-22 12:21:27 -04:00
import os
import re
import sys
2021-03-22 12:21:27 -04:00
from getpass import getpass
2021-04-01 19:56:47 -04:00
from hashlib import md5
2021-03-23 01:27:33 -04:00
from string import Formatter
2021-03-22 12:21:27 -04:00
from typing import Generator, Optional, Tuple, Union
import click
2021-04-09 17:23:55 -04:00
import requests
from bs4 import BeautifulSoup
2021-03-22 12:21:27 -04:00
from .clients import DeezerClient, QobuzClient, SoundCloudClient, TidalClient
2021-03-22 12:21:27 -04:00
from .config import Config
2021-04-05 21:24:26 -04:00
from .constants import (
CONFIG_PATH,
DB_PATH,
MEDIA_TYPES,
SOUNDCLOUD_URL_REGEX,
2021-04-09 17:23:55 -04:00
LASTFM_URL_REGEX,
2021-04-05 21:24:26 -04:00
URL_REGEX,
)
2021-03-22 16:06:06 -04:00
from .db import MusicDB
2021-04-09 17:23:55 -04:00
from .downloader import Album, Artist, Label, Playlist, Track, Tracklist
2021-03-22 12:21:27 -04:00
from .exceptions import AuthenticationError, ParsingError
from .utils import capitalize
logger = logging.getLogger(__name__)
2021-03-22 21:00:04 -04:00
MEDIA_CLASS = {
"album": Album,
"playlist": Playlist,
"artist": Artist,
"track": Track,
"label": Label,
}
2021-04-01 19:56:47 -04:00
Media = Union[Album, Playlist, Artist, Track]
2021-03-22 12:21:27 -04:00
2021-03-22 21:00:04 -04:00
class MusicDL(list):
2021-03-22 12:21:27 -04:00
def __init__(
self,
config: Optional[Config] = None,
):
self.url_parse = re.compile(URL_REGEX)
self.soundcloud_url_parse = re.compile(SOUNDCLOUD_URL_REGEX)
2021-04-09 17:23:55 -04:00
self.lastfm_url_parse = re.compile(LASTFM_URL_REGEX)
2021-03-22 12:21:27 -04:00
self.config = config
if self.config is None:
self.config = Config(CONFIG_PATH)
self.clients = {
"qobuz": QobuzClient(),
"tidal": TidalClient(),
"deezer": DeezerClient(),
"soundcloud": SoundCloudClient(),
2021-03-22 12:21:27 -04:00
}
if config.session["database"]["enabled"]:
if config.session["database"]["path"] is not None:
self.db = MusicDB(config.session["database"]["path"])
else:
self.db = MusicDB(DB_PATH)
config.file["database"]["path"] = DB_PATH
config.save()
else:
self.db = []
2021-03-22 12:21:27 -04:00
def prompt_creds(self, source: str):
"""Prompt the user for credentials.
:param source:
:type source: str
"""
2021-03-28 23:02:46 -04:00
if source == "qobuz":
click.secho(f"Enter {capitalize(source)} email:", fg="green")
self.config.file[source]["email"] = input()
click.secho(
f"Enter {capitalize(source)} password (will not show on screen):",
fg="green",
)
self.config.file[source]["password"] = md5(
getpass(prompt="").encode("utf-8")
).hexdigest()
2021-03-28 23:02:46 -04:00
self.config.save()
click.secho(f'Credentials saved to config file at "{self.config._path}"')
else:
raise Exception
2021-03-22 12:21:27 -04:00
def assert_creds(self, source: str):
assert source in (
"qobuz",
"tidal",
"deezer",
"soundcloud",
), f"Invalid source {source}"
2021-03-22 12:21:27 -04:00
if source == "deezer":
# no login for deezer
return
if source == "soundcloud":
return
2021-03-28 23:02:46 -04:00
if source == "qobuz" and (
2021-03-23 01:27:33 -04:00
self.config.file[source]["email"] is None
or self.config.file[source]["password"] is None
2021-03-22 12:21:27 -04:00
):
self.prompt_creds(source)
def handle_urls(self, url: str):
2021-03-22 12:21:27 -04:00
"""Download an url
:param url:
:type url: str
:raises InvalidSourceError
:raises ParsingError
"""
2021-04-09 17:23:55 -04:00
parsed_info = self.parse_urls(url)
if parsed_info is None:
return
for source, url_type, item_id in parsed_info:
2021-03-26 15:26:50 -04:00
if item_id in self.db:
logger.info(
f"ID {item_id} already downloaded, use --no-db to override."
)
click.secho(
f"ID {item_id} already downloaded, use --no-db to override.",
fg="magenta",
)
2021-03-27 14:07:28 -04:00
continue
2021-03-26 15:26:50 -04:00
self.handle_item(source, url_type, item_id)
2021-03-22 12:21:27 -04:00
def handle_item(self, source: str, media_type: str, item_id: str):
self.assert_creds(source)
2021-03-23 01:27:33 -04:00
client = self.get_client(source)
2021-03-22 12:21:27 -04:00
if media_type not in MEDIA_TYPES:
2021-04-05 21:24:26 -04:00
if "playlist" in media_type: # for SoundCloud
media_type = "playlist"
assert media_type in MEDIA_TYPES, media_type
2021-03-22 12:21:27 -04:00
item = MEDIA_CLASS[media_type](client=client, id=item_id)
2021-03-22 21:00:04 -04:00
self.append(item)
2021-03-22 12:21:27 -04:00
def download(self):
2021-04-07 20:15:39 -04:00
arguments = {
"database": self.db,
"parent_folder": self.config.session["downloads"]["folder"],
"folder_format": self.config.session["path_format"]["folder"],
"track_format": self.config.session["path_format"]["track"],
2021-04-07 20:15:39 -04:00
"keep_downloaded_cover": self.config.session["artwork"][
"keep_downloaded_cover"
],
"keep_embedded_cover": self.config.session["artwork"][
"keep_embedded_cover"
],
2021-04-06 19:46:47 -04:00
"embed_cover": self.config.session["artwork"]["embed"],
"embed_cover_size": self.config.session["artwork"]["embed_size"],
"download_cover_size": self.config.session["artwork"]["download_size"],
}
2021-03-22 12:21:27 -04:00
logger.debug("Arguments from config: %s", arguments)
for item in self:
2021-04-07 20:15:39 -04:00
if self.config.session["downloads"]["source_subdirectories"]:
arguments["parent_folder"] = os.path.join(
arguments["parent_folder"], capitalize(item.client.source)
)
2021-03-29 18:46:26 -04:00
arguments["quality"] = self.config.session[item.client.source]["quality"]
if isinstance(item, Artist):
filters_ = tuple(
k for k, v in self.config.session["filters"].items() if v
)
arguments["filters"] = filters_
logger.debug("Added filter argument for artist/label: %s", filters_)
2021-04-09 17:23:55 -04:00
if not (isinstance(item, Tracklist) and item.loaded):
item.load_meta()
if isinstance(item, Track):
# track.download doesn't automatically tag
item.download(**arguments, tag=True)
else:
item.download(**arguments)
2021-04-09 17:23:55 -04:00
if self.db != [] and hasattr(item, 'id'):
2021-03-27 14:07:28 -04:00
self.db.add(item.id)
if self.config.session["conversion"]["enabled"]:
item.convert(**self.config.session["conversion"])
2021-03-22 12:21:27 -04:00
2021-03-23 01:27:33 -04:00
def get_client(self, source: str):
client = self.clients[source]
if not client.logged_in:
self.assert_creds(source)
self.login(client)
return client
2021-03-22 21:00:04 -04:00
def login(self, client):
creds = self.config.creds(client.source)
if not client.logged_in:
while True:
try:
client.login(**creds)
break
except AuthenticationError:
click.secho("Invalid credentials, try again.")
self.prompt_creds(client.source)
if (
client.source == "qobuz"
and not creds.get("secrets")
and not creds.get("app_id")
):
(
self.config.file["qobuz"]["app_id"],
self.config.file["qobuz"]["secrets"],
2021-03-22 21:00:04 -04:00
) = client.get_tokens()
self.config.save()
2021-03-29 18:46:26 -04:00
elif client.source == "tidal":
self.config.file["tidal"].update(client.get_tokens())
2021-03-28 23:02:46 -04:00
self.config.save()
2021-03-22 21:00:04 -04:00
2021-03-24 13:39:37 -04:00
def parse_urls(self, url: str) -> Tuple[str, str]:
2021-03-22 12:21:27 -04:00
"""Returns the type of the url and the id.
Compatible with urls of the form:
https://www.qobuz.com/us-en/{type}/{name}/{id}
https://open.qobuz.com/{type}/{id}
https://play.qobuz.com/{type}/{id}
/us-en/{type}/-/{id}
https://www.deezer.com/us/{type}/{id}
https://tidal.com/browse/{type}/{id}
:raises exceptions.ParsingError
"""
parsed = self.url_parse.findall(url) # Qobuz, Tidal, Dezer
soundcloud_urls = self.soundcloud_url_parse.findall(url)
soundcloud_items = [self.clients["soundcloud"].get(u) for u in soundcloud_urls]
2021-04-09 17:23:55 -04:00
lastfm_urls = self.lastfm_url_parse.findall(url)
if lastfm_urls:
self.handle_lastfm_urls(lastfm_urls)
parsed.extend(
("soundcloud", item["kind"], url)
for item, url in zip(soundcloud_items, soundcloud_urls)
)
2021-03-26 15:26:50 -04:00
logger.debug(f"Parsed urls: {parsed}")
2021-03-22 12:21:27 -04:00
2021-03-24 13:39:37 -04:00
if parsed != []:
return parsed
2021-03-22 12:21:27 -04:00
2021-04-09 17:23:55 -04:00
if not lastfm_urls:
raise ParsingError(f"Error parsing URL: `{url}`")
def handle_lastfm_urls(self, lastfm_urls):
lastfm_source = self.config.session['lastfm']['source']
for purl in lastfm_urls:
title, queries = self.get_lastfm_playlist(purl)
pl = Playlist(client=self.clients[lastfm_source], name=title)
for query in queries:
click.secho(f'Searching for "{query}"', fg='cyan')
track = next(self.search(lastfm_source, query, media_type='track'))
pl.append(track)
pl.loaded = True
time.sleep(0.2) # max 5 requests/s
self.append(pl)
2021-03-22 12:21:27 -04:00
2021-03-26 15:26:50 -04:00
def handle_txt(self, filepath: Union[str, os.PathLike]):
2021-03-22 12:21:27 -04:00
"""
Handle a text file containing URLs. Lines starting with `#` are ignored.
:param filepath:
:type filepath: Union[str, os.PathLike]
:raises OSError
:raises exceptions.ParsingError
"""
with open(filepath) as txt:
2021-03-26 15:26:50 -04:00
self.handle_urls(txt.read())
2021-03-22 12:21:27 -04:00
def search(
2021-03-23 01:27:33 -04:00
self, source: str, query: str, media_type: str = "album", limit: int = 200
2021-03-22 12:21:27 -04:00
) -> Generator:
2021-03-23 01:27:33 -04:00
client = self.get_client(source)
results = client.search(query, media_type)
2021-03-22 12:21:27 -04:00
2021-03-23 01:27:33 -04:00
i = 0
2021-03-22 12:21:27 -04:00
if isinstance(results, Generator): # QobuzClient
for page in results:
tracklist = (
page[f"{media_type}s"]["items"]
if media_type != "featured"
else page["albums"]["items"]
)
for item in tracklist:
yield MEDIA_CLASS[
media_type if media_type != "featured" else "album"
].from_api(item, client)
2021-03-23 01:27:33 -04:00
i += 1
if i > limit:
return
2021-03-22 12:21:27 -04:00
else:
2021-04-05 21:24:26 -04:00
for item in (
results.get("data") or results.get("items") or results.get("collection")
):
2021-03-23 01:27:33 -04:00
yield MEDIA_CLASS[media_type].from_api(item, client)
i += 1
if i > limit:
return
def preview_media(self, media):
if isinstance(media, Album):
fmt = (
"{albumartist} - {title}\n"
"Released on {year}\n{tracktotal} tracks\n"
"{bit_depth} bit / {sampling_rate} Hz\n"
"Version: {version}"
)
elif isinstance(media, Artist):
fmt = "{name}"
elif isinstance(media, Track):
fmt = "{artist} - {title}\nReleased on {year}"
2021-04-05 21:24:26 -04:00
elif isinstance(media, Playlist):
fmt = (
"{title}\n"
"{tracktotal} tracks\n"
"{popularity}\n"
"Description: {description}"
)
2021-03-23 01:27:33 -04:00
else:
raise NotImplementedError
fields = (fname for _, fname, _, _ in Formatter().parse(fmt) if fname)
ret = fmt.format(**{k: media.get(k, default="Unknown") for k in fields})
2021-03-23 01:27:33 -04:00
return ret
def interactive_search(
self, query: str, source: str = "qobuz", media_type: str = "album"
):
results = tuple(self.search(source, query, media_type, limit=50))
2021-03-23 01:27:33 -04:00
def title(res):
return f"{res[0]+1}. {res[1].title}"
def from_title(s):
num = []
for char in s:
2021-04-05 21:40:46 -04:00
if char != ".":
2021-03-23 01:27:33 -04:00
num.append(char)
else:
break
return self.preview_media(results[int("".join(num)) - 1])
if os.name == "nt":
try:
from pick import pick
except (ImportError, ModuleNotFoundError):
2021-04-01 15:54:30 -04:00
click.secho(
"Run `pip3 install windows-curses` to use interactive mode.",
fg="red",
)
sys.exit()
choice = pick(
tuple(enumerate(results)),
2021-04-01 15:54:30 -04:00
title=(
f"{capitalize(source)} {media_type} search.\n"
"Press SPACE to select, RETURN to download, ctrl-C to exit."
),
options_map_func=title,
2021-03-30 14:01:17 -04:00
multiselect=True,
)
2021-03-30 14:01:17 -04:00
if isinstance(choice, list):
for item in choice:
self.append(item[0][1])
elif isinstance(choice, tuple):
self.append(choice[0][1])
return True
else:
try:
from simple_term_menu import TerminalMenu
except (ImportError, ModuleNotFoundError):
2021-04-01 15:54:30 -04:00
click.secho(
"Run `pip3 install simple-term-menu` to use interactive mode.",
fg="red",
)
sys.exit()
menu = TerminalMenu(
map(title, enumerate(results)),
preview_command=from_title,
preview_size=0.5,
2021-04-01 15:54:30 -04:00
title=(
f"{capitalize(source)} {media_type} search.\n"
"SPACE - multiselection, ENTER - download, ESC - exit"
),
cycle_cursor=True,
clear_screen=True,
2021-03-30 14:01:17 -04:00
multi_select=True,
)
choice = menu.show()
if choice is None:
return False
else:
2021-03-30 14:01:17 -04:00
if isinstance(choice, int):
self.append(results[choice])
elif isinstance(choice, tuple):
for i in choice:
self.append(results[i])
return True
2021-04-09 17:23:55 -04:00
def get_lastfm_playlist(self, url: str) -> Tuple[str, list]:
# code from qobuz-dl
try:
r = requests.get(url, timeout=10)
except requests.exceptions.RequestException:
click.secho("Unable to fetch playlist", fg="red")
return
soup = BeautifulSoup(r.content, "html.parser")
artists = (artist.text for artist in soup.select("td.chartlist-artist > a"))
titles = (title.text for title in soup.select("td.chartlist-name > a"))
queries = [f"{artist} {title}" for artist, title in zip(artists, titles)]
if not queries:
click.secho("No tracks found", fg="red")
return
title = soup.select_one("h1").text
return title, queries