Files
tidal-dl-ng-webui/app/services/tidal_wrapper.py
2025-12-03 09:30:32 +00:00

239 lines
9.4 KiB
Python

import logging
import threading
import time
from typing import Optional, Callable, Dict, Any
from tidal_dl_ng.config import Tidal, Settings
from tidalapi import Session, Track, Album, Artist, Playlist
logger = logging.getLogger(__name__)
class TidalWrapper:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(TidalWrapper, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self.settings = Settings()
self.tidal = Tidal(self.settings)
self.session = self.tidal.session
self._initialized = True
self.auth_future = None
self.auth_status = {"status": "idle", "message": "", "link": "", "code": ""}
def is_authenticated(self) -> bool:
max_retries = 5
retry_delay = 5
for attempt in range(max_retries):
try:
return self.session.check_login()
except Exception as e:
error_str = str(e)
# Check for connection-related errors
if "Connection" in error_str or "RemoteDisconnected" in error_str or "Network" in error_str:
logger.warning(f"Connection error checking authentication (Attempt {attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
logger.info(f"Waiting {retry_delay} seconds for network/gluetun to recover...")
time.sleep(retry_delay)
continue
# If it's not a connection error, or we've run out of retries
logger.error(f"Error checking authentication status: {e}")
return False
return False
def start_device_login(self) -> Dict[str, str]:
"""
Starts the device login process.
Returns the verification info (link and code) immediately if possible,
or starts a thread to handle the blocking login_oauth_simple if needed.
Since tidalapi's login_oauth_simple is blocking and takes a print callback,
we might need to run it in a thread and capture the output.
However, a better approach is to use the underlying tidalapi methods if available.
For now, let's try to use the session's internal methods if we can figure them out,
or wrap the blocking call.
"""
if self.is_authenticated():
return {"status": "success", "message": "Already logged in"}
# Reset status
self.auth_status = {"status": "pending", "message": "Initializing login...", "link": "", "code": ""}
# Run login in a separate thread to avoid blocking
thread = threading.Thread(target=self._login_thread)
thread.daemon = True
thread.start()
return self.auth_status
def _login_thread(self):
def print_callback(msg: str):
logger.info(f"Tidal Login Callback: {msg}")
# Parse the message to extract link and code if possible
# Typical msg: "Visit https://link.tidal.com/AAAAA and enter code AAAAA"
self.auth_status["message"] = msg
if "http" in msg:
parts = msg.split()
for part in parts:
if part.startswith("http"):
self.auth_status["link"] = part
# This is a bit hacky, but tidalapi 0.7+ might behave differently.
# If we can't parse it easily, the user just sees the message.
try:
# This will block until user logs in or times out
self.tidal.login(print_callback)
if self.is_authenticated():
self.auth_status["status"] = "success"
self.auth_status["message"] = "Login successful!"
else:
self.auth_status["status"] = "failed"
self.auth_status["message"] = "Login failed."
except Exception as e:
self.auth_status["status"] = "error"
self.auth_status["message"] = str(e)
def get_auth_status(self):
return self.auth_status
def search(self, query: str, type: str = "track", limit: int = 10):
if not self.is_authenticated():
raise Exception("Not authenticated")
# Map type string to tidalapi models if needed, or just pass string
# tidalapi.Session.search(query, models=None, limit=10, offset=0)
# models can be [Track, Album, Artist, Playlist]
models = []
if type == "track":
models = [Track]
elif type == "album":
models = [Album]
elif type == "artist":
models = [Artist]
elif type == "playlist":
models = [Playlist]
results = self.session.search(query, models=models, limit=limit)
# Parse results into a JSON-friendly format
output = []
if type == "track":
for track in results["tracks"]:
output.append({
"id": track.id,
"title": track.name,
"artist": track.artist.name,
"album": track.album.name,
"duration": track.duration,
"cover": self._get_image_url(track.album.cover),
"type": "track"
})
elif type == "album":
for album in results["albums"]:
output.append({
"id": album.id,
"title": album.name,
"artist": album.artist.name,
"tracks": album.num_tracks,
"release_date": str(album.release_date),
"cover": self._get_image_url(album.cover),
"type": "album"
})
elif type == "artist":
for artist in results["artists"]:
output.append({
"id": artist.id,
"title": artist.name,
"type": "artist",
"cover": self._get_image_url(artist.picture)
})
elif type == "playlist":
for playlist in results["playlists"]:
output.append({
"id": playlist.id,
"title": playlist.name,
"type": "playlist",
"cover": self._get_image_url(playlist.cover)
})
return output
def _get_image_url(self, uuid: str | None, width: int = 320, height: int = 320) -> str | None:
if not uuid:
return None
return f"https://resources.tidal.com/images/{uuid.replace('-', '/')}/{width}x{height}.jpg"
def get_artist_albums(self, artist_id: str, limit: int = 50):
"""Fetch all albums for a given artist"""
if not self.is_authenticated():
raise Exception("Not authenticated")
artist = self.session.artist(artist_id)
albums = artist.get_albums(limit=limit)
# Deduplicate albums by title, keeping highest quality version
albums_dict = {}
for album in albums:
title = album.name
# If we haven't seen this album title before, add it
if title not in albums_dict:
albums_dict[title] = album
else:
# If we've seen it, keep the one with higher quality
existing = albums_dict[title]
# Prioritize by audio quality (if available)
# TIDAL albums with better quality often have higher audio_quality values
# Also prefer explicit versions over clean versions (explicit flag)
# And prefer newer releases (release_date)
# Simple heuristic: prefer albums with explicit tag and newer release date
keep_new = False
if hasattr(album, 'explicit') and hasattr(existing, 'explicit'):
if album.explicit and not existing.explicit:
keep_new = True
# If explicit status is same, prefer newer release
if not keep_new and hasattr(album, 'release_date') and hasattr(existing, 'release_date'):
if album.release_date and existing.release_date:
if album.release_date > existing.release_date:
keep_new = True
if keep_new:
albums_dict[title] = album
# Convert deduplicated dict to output format
output = []
for album in albums_dict.values():
output.append({
"id": album.id,
"title": album.name,
"artist": album.artist.name,
"tracks": album.num_tracks,
"release_date": str(album.release_date) if album.release_date else "Unknown",
"cover": self._get_image_url(album.cover),
"type": "album"
})
# Sort by release date (newest first)
output.sort(key=lambda x: x['release_date'], reverse=True)
return output
def get_track(self, track_id: str):
return self.session.track(track_id)
def get_album(self, album_id: str):
return self.session.album(album_id)