Add support for last.fm playlists; #41

This commit is contained in:
nathom 2021-04-09 14:23:55 -07:00
parent 107fac4dcd
commit af4aefe7ba
6 changed files with 71 additions and 7 deletions

View file

@ -1,6 +1,7 @@
click
ruamel.yaml
packaging
bs4
pathvalidate
requests
mutagen

View file

@ -82,6 +82,7 @@ class Config:
},
"path_format": {"folder": FOLDER_FORMAT, "track": TRACK_FORMAT},
"check_for_updates": True,
"lastfm": {"source": "qobuz"}
}
def __init__(self, path: str = None):

View file

@ -146,6 +146,7 @@ URL_REGEX = (
)
SOUNDCLOUD_URL_REGEX = r"https://soundcloud.com/[-\w:/]+"
SOUNDCLOUD_CLIENT_ID = "a3e059563d7fd3372b49b37f00a00bcf"
LASTFM_URL_REGEX = r"https://www.last.fm/user/\w+/playlists/\w+"
TIDAL_MAX_Q = 7

View file

@ -1,4 +1,5 @@
import logging
import time
import os
import re
import sys
@ -8,6 +9,8 @@ from string import Formatter
from typing import Generator, Optional, Tuple, Union
import click
import requests
from bs4 import BeautifulSoup
from .clients import DeezerClient, QobuzClient, SoundCloudClient, TidalClient
from .config import Config
@ -16,10 +19,11 @@ from .constants import (
DB_PATH,
MEDIA_TYPES,
SOUNDCLOUD_URL_REGEX,
LASTFM_URL_REGEX,
URL_REGEX,
)
from .db import MusicDB
from .downloader import Album, Artist, Label, Playlist, Track
from .downloader import Album, Artist, Label, Playlist, Track, Tracklist
from .exceptions import AuthenticationError, ParsingError
from .utils import capitalize
@ -44,6 +48,8 @@ class MusicDL(list):
self.url_parse = re.compile(URL_REGEX)
self.soundcloud_url_parse = re.compile(SOUNDCLOUD_URL_REGEX)
self.lastfm_url_parse = re.compile(LASTFM_URL_REGEX)
self.config = config
if self.config is None:
self.config = Config(CONFIG_PATH)
@ -115,7 +121,11 @@ class MusicDL(list):
:raises InvalidSourceError
:raises ParsingError
"""
for source, url_type, item_id in self.parse_urls(url):
parsed_info = self.parse_urls(url)
if parsed_info is None:
return
for source, url_type, item_id in parsed_info:
if item_id in self.db:
logger.info(
f"ID {item_id} already downloaded, use --no-db to override."
@ -173,7 +183,8 @@ class MusicDL(list):
arguments["filters"] = filters_
logger.debug("Added filter argument for artist/label: %s", filters_)
item.load_meta()
if not (isinstance(item, Tracklist) and item.loaded):
item.load_meta()
if isinstance(item, Track):
# track.download doesn't automatically tag
@ -181,7 +192,7 @@ class MusicDL(list):
else:
item.download(**arguments)
if self.db != []:
if self.db != [] and hasattr(item, 'id'):
self.db.add(item.id)
if self.config.session["conversion"]["enabled"]:
@ -235,6 +246,9 @@ class MusicDL(list):
parsed = self.url_parse.findall(url) # Qobuz, Tidal, Dezer
soundcloud_urls = self.soundcloud_url_parse.findall(url)
soundcloud_items = [self.clients["soundcloud"].get(u) for u in soundcloud_urls]
lastfm_urls = self.lastfm_url_parse.findall(url)
if lastfm_urls:
self.handle_lastfm_urls(lastfm_urls)
parsed.extend(
("soundcloud", item["kind"], url)
@ -246,7 +260,23 @@ class MusicDL(list):
if parsed != []:
return parsed
raise ParsingError(f"Error parsing URL: `{url}`")
if not lastfm_urls:
raise ParsingError(f"Error parsing URL: `{url}`")
def handle_lastfm_urls(self, lastfm_urls):
lastfm_source = self.config.session['lastfm']['source']
for purl in lastfm_urls:
title, queries = self.get_lastfm_playlist(purl)
pl = Playlist(client=self.clients[lastfm_source], name=title)
for query in queries:
click.secho(f'Searching for "{query}"', fg='cyan')
track = next(self.search(lastfm_source, query, media_type='track'))
pl.append(track)
pl.loaded = True
time.sleep(0.2) # max 5 requests/s
self.append(pl)
def handle_txt(self, filepath: Union[str, os.PathLike]):
"""
@ -392,3 +422,24 @@ class MusicDL(list):
for i in choice:
self.append(results[i])
return True
def get_lastfm_playlist(self, url: str) -> Tuple[str, list]:
# code from qobuz-dl
try:
r = requests.get(url, timeout=10)
except requests.exceptions.RequestException:
click.secho("Unable to fetch playlist", fg="red")
return
soup = BeautifulSoup(r.content, "html.parser")
artists = (artist.text for artist in soup.select("td.chartlist-artist > a"))
titles = (title.text for title in soup.select("td.chartlist-name > a"))
queries = [f"{artist} {title}" for artist, title in zip(artists, titles)]
if not queries:
click.secho("No tracks found", fg="red")
return
title = soup.select_one("h1").text
return title, queries

View file

@ -709,6 +709,7 @@ class Album(Tracklist):
if kwargs.get("load_on_init"):
self.load_meta()
self.loaded = False
self.downloaded = False
def load_meta(self):
@ -723,6 +724,7 @@ class Album(Tracklist):
raise NonStreamable(f"This album is not streamable ({self.id} ID)")
self._load_tracks()
self.loaded = True
@classmethod
def from_api(cls, resp, client):
@ -1032,6 +1034,8 @@ class Playlist(Tracklist):
if kwargs.get("load_on_init"):
self.load_meta()
self.loaded = False
@classmethod
def from_api(cls, resp: dict, client: ClientInterface):
"""Return a Playlist object initialized with information from
@ -1054,6 +1058,7 @@ class Playlist(Tracklist):
"""
self.meta = self.client.get(id=self.id, media_type="playlist")
self._load_tracks(**kwargs)
self.loaded = True
def _load_tracks(self, new_tracknumbers: bool = True):
"""Parses the tracklist returned by the API.
@ -1246,10 +1251,13 @@ class Artist(Tracklist):
if kwargs.get("load_on_init"):
self.load_meta()
self.loaded = False
def load_meta(self):
"""Send an API call to get album info based on id."""
self.meta = self.client.get(self.id, media_type="artist")
self._load_albums()
self.loaded = True
def _load_albums(self):
"""From the discography returned by client.get(query, 'artist'),
@ -1484,6 +1492,8 @@ class Label(Artist):
for album in resp["albums"]["items"]:
self.append(Album.from_api(album, client=self.client))
self.loaded = True
def __repr__(self):
return f"<Label - {self.name}>"

View file

@ -101,8 +101,8 @@ class TrackMetadata:
self.label = resp.get("label")
self.description = resp.get("description")
self.disctotal = max(
track.get("media_number", 1) for track in resp["tracks"]["items"]
)
track.get("media_number", 1) for track in safe_get(resp, 'tracks', 'items', default=[{}])
) or 1
self.explicit = resp.get("parental_warning", False)
if isinstance(self.label, dict):