220 lines
8.5 KiB
Python
220 lines
8.5 KiB
Python
import logging
|
|
import threading
|
|
import time
|
|
from typing import Optional, Callable, Dict, Any
|
|
from tidal_dl_ng.config import Tidal, Settings
|
|
from tidalapi import Session, Track, Album, Artist, Playlist
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class TidalWrapper:
|
|
_instance = None
|
|
|
|
def __new__(cls):
|
|
if cls._instance is None:
|
|
cls._instance = super(TidalWrapper, cls).__new__(cls)
|
|
cls._instance._initialized = False
|
|
return cls._instance
|
|
|
|
def __init__(self):
|
|
if self._initialized:
|
|
return
|
|
self.settings = Settings()
|
|
self.tidal = Tidal(self.settings)
|
|
self.session = self.tidal.session
|
|
self._initialized = True
|
|
self.auth_future = None
|
|
self.auth_status = {"status": "idle", "message": "", "link": "", "code": ""}
|
|
|
|
def is_authenticated(self) -> bool:
|
|
return self.session.check_login()
|
|
|
|
def start_device_login(self) -> Dict[str, str]:
|
|
"""
|
|
Starts the device login process.
|
|
Returns the verification info (link and code) immediately if possible,
|
|
or starts a thread to handle the blocking login_oauth_simple if needed.
|
|
|
|
Since tidalapi's login_oauth_simple is blocking and takes a print callback,
|
|
we might need to run it in a thread and capture the output.
|
|
However, a better approach is to use the underlying tidalapi methods if available.
|
|
|
|
For now, let's try to use the session's internal methods if we can figure them out,
|
|
or wrap the blocking call.
|
|
"""
|
|
if self.is_authenticated():
|
|
return {"status": "success", "message": "Already logged in"}
|
|
|
|
# Reset status
|
|
self.auth_status = {"status": "pending", "message": "Initializing login...", "link": "", "code": ""}
|
|
|
|
# Run login in a separate thread to avoid blocking
|
|
thread = threading.Thread(target=self._login_thread)
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
return self.auth_status
|
|
|
|
def _login_thread(self):
|
|
def print_callback(msg: str):
|
|
logger.info(f"Tidal Login Callback: {msg}")
|
|
# Parse the message to extract link and code if possible
|
|
# Typical msg: "Visit https://link.tidal.com/AAAAA and enter code AAAAA"
|
|
self.auth_status["message"] = msg
|
|
if "http" in msg:
|
|
parts = msg.split()
|
|
for part in parts:
|
|
if part.startswith("http"):
|
|
self.auth_status["link"] = part
|
|
# This is a bit hacky, but tidalapi 0.7+ might behave differently.
|
|
# If we can't parse it easily, the user just sees the message.
|
|
|
|
try:
|
|
# This will block until user logs in or times out
|
|
self.tidal.login(print_callback)
|
|
if self.is_authenticated():
|
|
self.auth_status["status"] = "success"
|
|
self.auth_status["message"] = "Login successful!"
|
|
else:
|
|
self.auth_status["status"] = "failed"
|
|
self.auth_status["message"] = "Login failed."
|
|
except Exception as e:
|
|
self.auth_status["status"] = "error"
|
|
self.auth_status["message"] = str(e)
|
|
|
|
def get_auth_status(self):
|
|
return self.auth_status
|
|
|
|
def search(self, query: str, type: str = "track", limit: int = 10):
|
|
if not self.is_authenticated():
|
|
raise Exception("Not authenticated")
|
|
|
|
# Map type string to tidalapi models if needed, or just pass string
|
|
# tidalapi.Session.search(query, models=None, limit=10, offset=0)
|
|
# models can be [Track, Album, Artist, Playlist]
|
|
|
|
models = []
|
|
if type == "track":
|
|
models = [Track]
|
|
elif type == "album":
|
|
models = [Album]
|
|
elif type == "artist":
|
|
models = [Artist]
|
|
elif type == "playlist":
|
|
models = [Playlist]
|
|
|
|
results = self.session.search(query, models=models, limit=limit)
|
|
|
|
# Parse results into a JSON-friendly format
|
|
output = []
|
|
if type == "track":
|
|
for track in results["tracks"]:
|
|
output.append({
|
|
"id": track.id,
|
|
"title": track.name,
|
|
"artist": track.artist.name,
|
|
"album": track.album.name,
|
|
"duration": track.duration,
|
|
"cover": self._get_image_url(track.album.cover),
|
|
"type": "track"
|
|
})
|
|
elif type == "album":
|
|
for album in results["albums"]:
|
|
output.append({
|
|
"id": album.id,
|
|
"title": album.name,
|
|
"artist": album.artist.name,
|
|
"tracks": album.num_tracks,
|
|
"release_date": str(album.release_date),
|
|
"cover": self._get_image_url(album.cover),
|
|
"type": "album"
|
|
})
|
|
elif type == "artist":
|
|
for artist in results["artists"]:
|
|
output.append({
|
|
"id": artist.id,
|
|
"title": artist.name,
|
|
"type": "artist",
|
|
"cover": self._get_image_url(artist.picture)
|
|
})
|
|
elif type == "playlist":
|
|
for playlist in results["playlists"]:
|
|
output.append({
|
|
"id": playlist.id,
|
|
"title": playlist.name,
|
|
"type": "playlist",
|
|
"cover": self._get_image_url(playlist.cover)
|
|
})
|
|
|
|
return output
|
|
|
|
def _get_image_url(self, uuid: str | None, width: int = 320, height: int = 320) -> str | None:
|
|
if not uuid:
|
|
return None
|
|
return f"https://resources.tidal.com/images/{uuid.replace('-', '/')}/{width}x{height}.jpg"
|
|
|
|
def get_artist_albums(self, artist_id: str, limit: int = 50):
|
|
"""Fetch all albums for a given artist"""
|
|
if not self.is_authenticated():
|
|
raise Exception("Not authenticated")
|
|
|
|
artist = self.session.artist(artist_id)
|
|
albums = artist.get_albums(limit=limit)
|
|
|
|
# Deduplicate albums by title, keeping highest quality version
|
|
albums_dict = {}
|
|
for album in albums:
|
|
title = album.name
|
|
|
|
# If we haven't seen this album title before, add it
|
|
if title not in albums_dict:
|
|
albums_dict[title] = album
|
|
else:
|
|
# If we've seen it, keep the one with higher quality
|
|
existing = albums_dict[title]
|
|
|
|
# Prioritize by audio quality (if available)
|
|
# TIDAL albums with better quality often have higher audio_quality values
|
|
# Also prefer explicit versions over clean versions (explicit flag)
|
|
# And prefer newer releases (release_date)
|
|
|
|
# Simple heuristic: prefer albums with explicit tag and newer release date
|
|
keep_new = False
|
|
if hasattr(album, 'explicit') and hasattr(existing, 'explicit'):
|
|
if album.explicit and not existing.explicit:
|
|
keep_new = True
|
|
|
|
# If explicit status is same, prefer newer release
|
|
if not keep_new and hasattr(album, 'release_date') and hasattr(existing, 'release_date'):
|
|
if album.release_date and existing.release_date:
|
|
if album.release_date > existing.release_date:
|
|
keep_new = True
|
|
|
|
if keep_new:
|
|
albums_dict[title] = album
|
|
|
|
# Convert deduplicated dict to output format
|
|
output = []
|
|
for album in albums_dict.values():
|
|
output.append({
|
|
"id": album.id,
|
|
"title": album.name,
|
|
"artist": album.artist.name,
|
|
"tracks": album.num_tracks,
|
|
"release_date": str(album.release_date) if album.release_date else "Unknown",
|
|
"cover": self._get_image_url(album.cover),
|
|
"type": "album"
|
|
})
|
|
|
|
# Sort by release date (newest first)
|
|
output.sort(key=lambda x: x['release_date'], reverse=True)
|
|
|
|
return output
|
|
|
|
def get_track(self, track_id: str):
|
|
return self.session.track(track_id)
|
|
|
|
def get_album(self, album_id: str):
|
|
return self.session.album(album_id)
|
|
|