Merge pull request #504 from nathom/503

Add option to output search results to file
This commit is contained in:
Nathan Thomas 2023-12-24 11:48:14 -08:00 committed by GitHub
commit 36f2769d5e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 178 additions and 80 deletions

View file

@ -48,7 +48,7 @@ class Downloadable(ABC):
await self._download(path, callback)
async def size(self) -> int:
if self._size is not None:
if hasattr(self, "_size") and self._size is not None:
return self._size
async with self.session.head(self.url) as response:
@ -293,6 +293,7 @@ class SoundcloudDownloadable(Downloadable):
async def _download_original(self, path: str, callback):
downloader = BasicDownloadable(self.session, self.url, "flac")
await downloader.download(path, callback)
self.size = downloader.size
engine = converter.FLAC(path)
await engine.convert(path)

View file

@ -74,6 +74,85 @@ class SoundcloudClient(Client):
else:
raise Exception(f"{media_type} not supported")
async def search(
self,
media_type: str,
query: str,
limit: int = 50,
offset: int = 0,
) -> list[dict]:
# TODO: implement pagination
assert media_type in ("track", "playlist"), f"Cannot search for {media_type}"
params = {
"q": query,
"facet": "genre",
"user_id": USER_ID,
"limit": limit,
"offset": offset,
"linked_partitioning": "1",
}
resp, status = await self._api_request(f"search/{media_type}s", params=params)
assert status == 200
if media_type == "track":
for item in resp["collection"]:
item["id"] = self._get_custom_id(item)
return [resp]
async def get_downloadable(self, item_info: str, _) -> SoundcloudDownloadable:
# We have `get_metadata` overwrite the "id" field so that it contains
# some extra information we need to download soundcloud tracks
# item_id is the soundcloud ID of the track
# download_url is either the url that points to an mp3 download or ""
# if download_url == '_non_streamable' then we raise an exception
infos: list[str] = item_info.split("|")
logger.debug(f"{infos=}")
assert len(infos) == 2, infos
item_id, download_info = infos
assert re.match(r"\d+", item_id) is not None
if download_info == self.NON_STREAMABLE:
raise NonStreamableError(item_info)
if download_info == self.ORIGINAL_DOWNLOAD:
resp_json, status = await self._api_request(f"tracks/{item_id}/download")
assert status == 200
return SoundcloudDownloadable(
self.session,
{"url": resp_json["redirectUri"], "type": "original"},
)
if download_info == self.NOT_RESOLVED:
raise NotImplementedError(item_info)
# download_info contains mp3 stream url
resp_json, status = await self._request(download_info)
return SoundcloudDownloadable(
self.session,
{"url": resp_json["url"], "type": "mp3"},
)
async def resolve_url(self, url: str) -> dict:
"""Get metadata of the item pointed to by a soundcloud url.
This is necessary only for soundcloud because they don't store
the item IDs in their url. See SoundcloudURL.into_pending for example
usage.
Args:
url (str): Url to resolve.
Returns:
API response for item.
"""
resp, status = await self._api_request("resolve", params={"url": url})
assert status == 200
if resp["kind"] == "track":
resp["id"] = self._get_custom_id(resp)
return resp
async def _get_track(self, item_id: str):
resp, status = await self._api_request(f"tracks/{item_id}")
assert status == 200
@ -143,62 +222,6 @@ class SoundcloudClient(Client):
assert url is not None
return f"{item_id}|{url}"
async def get_downloadable(self, item_info: str, _) -> SoundcloudDownloadable:
# We have `get_metadata` overwrite the "id" field so that it contains
# some extra information we need to download soundcloud tracks
# item_id is the soundcloud ID of the track
# download_url is either the url that points to an mp3 download or ""
# if download_url == '_non_streamable' then we raise an exception
infos: list[str] = item_info.split("|")
logger.debug(f"{infos=}")
assert len(infos) == 2, infos
item_id, download_info = infos
assert re.match(r"\d+", item_id) is not None
if download_info == self.NON_STREAMABLE:
raise NonStreamableError(item_info)
if download_info == self.ORIGINAL_DOWNLOAD:
resp_json, status = await self._api_request(f"tracks/{item_id}/download")
assert status == 200
return SoundcloudDownloadable(
self.session,
{"url": resp_json["redirectUri"], "type": "original"},
)
if download_info == self.NOT_RESOLVED:
raise NotImplementedError(item_info)
# download_info contains mp3 stream url
resp_json, status = await self._request(download_info)
return SoundcloudDownloadable(
self.session,
{"url": resp_json["url"], "type": "mp3"},
)
async def search(
self,
media_type: str,
query: str,
limit: int = 50,
offset: int = 0,
) -> list[dict]:
# TODO: implement pagination
assert media_type in ("track", "playlist"), f"Cannot search for {media_type}"
params = {
"q": query,
"facet": "genre",
"user_id": USER_ID,
"limit": limit,
"offset": offset,
"linked_partitioning": "1",
}
resp, status = await self._api_request(f"search/{media_type}s", params=params)
assert status == 200
return [resp]
async def _api_request(self, path, params=None, headers=None):
url = f"{BASE}/{path}"
return await self._request(url, params=params, headers=headers)
@ -230,14 +253,6 @@ class SoundcloudClient(Client):
async with self.session.get(url, params=_params, headers=headers) as resp:
return await resp.content.read(), resp.status
async def resolve_url(self, url: str) -> dict:
resp, status = await self._api_request("resolve", params={"url": url})
assert status == 200
if resp["kind"] == "track":
resp["id"] = self._get_custom_id(resp)
return resp
async def _announce_success(self):
url = f"{BASE}/announcements"
_, status = await self._request_body(url)

View file

@ -18,7 +18,10 @@ logger = logging.getLogger("streamrip")
def remove_artwork_tempdirs():
logger.debug("Removing dirs %s", _artwork_tempdirs)
for path in _artwork_tempdirs:
shutil.rmtree(path)
try:
shutil.rmtree(path)
except FileNotFoundError:
pass
async def download_artwork(

View file

@ -224,7 +224,8 @@ class AlbumMetadata:
safe_get(track, "publisher_metadata", "explicit", default=False),
bool,
)
genre = typed(track["genre"], str)
genre = typed(track["genre"], str | None)
genres = [genre] if genre is not None else []
artist = typed(safe_get(track, "publisher_metadata", "artist"), str | None)
artist = artist or typed(track["user"]["username"], str)
albumartist = artist
@ -259,7 +260,7 @@ class AlbumMetadata:
album_title,
albumartist,
year,
genre=[genre],
genre=genres,
covers=covers,
albumcomposer=None,
comment=None,

View file

@ -46,7 +46,7 @@ class ArtistSummary(Summary):
@classmethod
def from_item(cls, item: dict):
id = item["id"]
id = str(item["id"])
name = (
item.get("name")
or item.get("performer", {}).get("name")
@ -81,7 +81,7 @@ class TrackSummary(Summary):
@classmethod
def from_item(cls, item: dict):
id = item["id"]
id = str(item["id"])
name = item.get("title") or item.get("name") or "Unknown"
artist = (
item.get("performer", {}).get("name")
@ -127,7 +127,7 @@ class AlbumSummary(Summary):
@classmethod
def from_item(cls, item: dict):
id = item["id"]
id = str(item["id"])
name = item.get("title") or "Unknown Title"
artist = (
item.get("performer", {}).get("name")
@ -175,7 +175,7 @@ class LabelSummary(Summary):
@classmethod
def from_item(cls, item: dict):
id = item["id"]
id = str(item["id"])
name = item["name"]
return cls(id, name)
@ -279,6 +279,17 @@ class SearchResults:
i = int(ind.group(0))
return self.results[i - 1].preview()
def as_list(self, source: str) -> list[dict[str, str]]:
return [
{
"source": source,
"media_type": i.media_type(),
"id": i.id,
"desc": i.summarize(),
}
for i in self.results
]
def clean(s: str, trunc=True) -> str:
s = s.replace("|", "").replace("\n", "")

View file

@ -1,9 +1,11 @@
import asyncio
import json
import logging
import os
import shutil
import subprocess
from functools import wraps
from typing import Any
import aiofiles
import click
@ -158,7 +160,9 @@ async def url(ctx, urls):
@rip.command()
@click.argument(
"path", required=True, type=click.Path(file_okay=True, dir_okay=False, exists=True)
"path",
required=True,
type=click.Path(exists=True, readable=True, file_okay=True, dir_okay=False),
)
@click.pass_context
@coro
@ -171,8 +175,26 @@ async def file(ctx, path):
"""
with ctx.obj["config"] as cfg:
async with Main(cfg) as main:
async with aiofiles.open(path) as f:
await main.add_all([line async for line in f])
async with aiofiles.open(path, "r") as f:
try:
items: Any = json.loads(await f.read())
loaded = True
except json.JSONDecodeError:
items: Any = [line async for line in f]
loaded = False
if loaded:
console.print(
f"Detected json file. Loading [yellow]{len(items)}[/yellow] items"
)
await main.add_all_by_id(
[(i["source"], i["media_type"], i["id"]) for i in items]
)
else:
console.print(
f"Detected list of urls. Loading [yellow]{len(items)}[/yellow] items"
)
await main.add_all(items)
await main.resolve()
await main.rip()
@ -278,22 +300,42 @@ def database_browse(ctx, table):
help="Automatically download the first search result without showing the menu.",
is_flag=True,
)
@click.option(
"-o",
"--output-file",
help="Write search results to a file instead of showing interactive menu.",
type=click.Path(writable=True),
)
@click.option(
"-n",
"--num-results",
help="Maximum number of search results to show",
default=100,
type=click.IntRange(min=1),
)
@click.argument("source", required=True)
@click.argument("media-type", required=True)
@click.argument("query", required=True)
@click.pass_context
@coro
async def search(ctx, first, source, media_type, query):
async def search(ctx, first, output_file, num_results, source, media_type, query):
"""Search for content using a specific source.
Example:
rip search qobuz album 'rumours'
"""
if first and output_file:
console.print("Cannot choose --first and --output-file!")
return
with ctx.obj["config"] as cfg:
async with Main(cfg) as main:
if first:
await main.search_take_first(source, media_type, query)
elif output_file:
await main.search_output_file(
source, media_type, query, output_file, num_results
)
else:
await main.search_interactive(source, media_type, query)
await main.resolve()

View file

@ -1,6 +1,9 @@
import asyncio
import json
import logging
import os
import platform
import aiofiles
from .. import db
from ..client import Client, DeezerClient, QobuzClient, SoundcloudClient, TidalClient
@ -171,7 +174,7 @@ class Main:
return
search_results = SearchResults.from_pages(source, media_type, pages)
if os.name == "nt":
if platform.system() == "Windows": # simple term menu not supported for windows
from pick import pick
choices = pick(
@ -215,7 +218,9 @@ class Main:
async def search_take_first(self, source: str, media_type: str, query: str):
client = await self.get_logged_in_client(source)
pages = await client.search(media_type, query, limit=1)
with console.status(f"[bold]Searching {source}", spinner="dots"):
pages = await client.search(media_type, query, limit=1)
if len(pages) == 0:
console.print(f"[red]No search results found for query {query}")
return
@ -223,7 +228,27 @@ class Main:
search_results = SearchResults.from_pages(source, media_type, pages)
assert len(search_results.results) > 0
first = search_results.results[0]
await self.add(f"http://{source}.com/{first.media_type()}/{first.id}")
await self.add_by_id(source, first.media_type(), first.id)
async def search_output_file(
self, source: str, media_type: str, query: str, filepath: str, limit: int
):
client = await self.get_logged_in_client(source)
with console.status(f"[bold]Searching {source}", spinner="dots"):
pages = await client.search(media_type, query, limit=limit)
if len(pages) == 0:
console.print(f"[red]No search results found for query {query}")
return
search_results = SearchResults.from_pages(source, media_type, pages)
file_contents = json.dumps(search_results.as_list(source), indent=4)
async with aiofiles.open(filepath, "w") as f:
await f.write(file_contents)
console.print(
f"Wrote [purple]{len(search_results.results)}[/purple] results to [cyan]{filepath} as JSON!"
)
async def resolve_lastfm(self, playlist_url: str):
"""Resolve a last.fm playlist."""