import logging import threading import time from typing import Optional, Callable, Dict, Any from tidal_dl_ng.config import Tidal, Settings from tidalapi import Session, Track, Album, Artist, Playlist logger = logging.getLogger(__name__) class TidalWrapper: _instance = None def __new__(cls): if cls._instance is None: cls._instance = super(TidalWrapper, cls).__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self.settings = Settings() self.tidal = Tidal(self.settings) self.session = self.tidal.session self._initialized = True self.auth_future = None self.auth_status = {"status": "idle", "message": "", "link": "", "code": ""} def is_authenticated(self) -> bool: max_retries = 5 retry_delay = 5 for attempt in range(max_retries): try: return self.session.check_login() except Exception as e: error_str = str(e) # Check for connection-related errors if "Connection" in error_str or "RemoteDisconnected" in error_str or "Network" in error_str: logger.warning(f"Connection error checking authentication (Attempt {attempt + 1}/{max_retries}): {e}") if attempt < max_retries - 1: logger.info(f"Waiting {retry_delay} seconds for network/gluetun to recover...") time.sleep(retry_delay) continue # If it's not a connection error, or we've run out of retries logger.error(f"Error checking authentication status: {e}") return False return False def start_device_login(self) -> Dict[str, str]: """ Starts the device login process. Returns the verification info (link and code) immediately if possible, or starts a thread to handle the blocking login_oauth_simple if needed. Since tidalapi's login_oauth_simple is blocking and takes a print callback, we might need to run it in a thread and capture the output. However, a better approach is to use the underlying tidalapi methods if available. For now, let's try to use the session's internal methods if we can figure them out, or wrap the blocking call. """ if self.is_authenticated(): return {"status": "success", "message": "Already logged in"} # Reset status self.auth_status = {"status": "pending", "message": "Initializing login...", "link": "", "code": ""} # Run login in a separate thread to avoid blocking thread = threading.Thread(target=self._login_thread) thread.daemon = True thread.start() return self.auth_status def _login_thread(self): def print_callback(msg: str): logger.info(f"Tidal Login Callback: {msg}") # Parse the message to extract link and code if possible # Typical msg: "Visit https://link.tidal.com/AAAAA and enter code AAAAA" self.auth_status["message"] = msg if "http" in msg: parts = msg.split() for part in parts: if part.startswith("http"): self.auth_status["link"] = part # This is a bit hacky, but tidalapi 0.7+ might behave differently. # If we can't parse it easily, the user just sees the message. try: # This will block until user logs in or times out self.tidal.login(print_callback) if self.is_authenticated(): self.auth_status["status"] = "success" self.auth_status["message"] = "Login successful!" else: self.auth_status["status"] = "failed" self.auth_status["message"] = "Login failed." except Exception as e: self.auth_status["status"] = "error" self.auth_status["message"] = str(e) def get_auth_status(self): return self.auth_status def search(self, query: str, type: str = "track", limit: int = 10): if not self.is_authenticated(): raise Exception("Not authenticated") # Map type string to tidalapi models if needed, or just pass string # tidalapi.Session.search(query, models=None, limit=10, offset=0) # models can be [Track, Album, Artist, Playlist] models = [] if type == "track": models = [Track] elif type == "album": models = [Album] elif type == "artist": models = [Artist] elif type == "playlist": models = [Playlist] results = self.session.search(query, models=models, limit=limit) # Parse results into a JSON-friendly format output = [] if type == "track": for track in results["tracks"]: output.append({ "id": track.id, "title": track.name, "artist": track.artist.name, "album": track.album.name, "duration": track.duration, "cover": self._get_image_url(track.album.cover), "type": "track" }) elif type == "album": for album in results["albums"]: output.append({ "id": album.id, "title": album.name, "artist": album.artist.name, "tracks": album.num_tracks, "release_date": str(album.release_date), "cover": self._get_image_url(album.cover), "type": "album" }) elif type == "artist": for artist in results["artists"]: output.append({ "id": artist.id, "title": artist.name, "type": "artist", "cover": self._get_image_url(artist.picture) }) elif type == "playlist": for playlist in results["playlists"]: output.append({ "id": playlist.id, "title": playlist.name, "type": "playlist", "cover": self._get_image_url(playlist.cover) }) return output def _get_image_url(self, uuid: str | None, width: int = 320, height: int = 320) -> str | None: if not uuid: return None return f"https://resources.tidal.com/images/{uuid.replace('-', '/')}/{width}x{height}.jpg" def get_artist_albums(self, artist_id: str, limit: int = 50): """Fetch all albums for a given artist""" if not self.is_authenticated(): raise Exception("Not authenticated") artist = self.session.artist(artist_id) albums = artist.get_albums(limit=limit) # Deduplicate albums by title, keeping highest quality version albums_dict = {} for album in albums: title = album.name # If we haven't seen this album title before, add it if title not in albums_dict: albums_dict[title] = album else: # If we've seen it, keep the one with higher quality existing = albums_dict[title] # Prioritize by audio quality (if available) # TIDAL albums with better quality often have higher audio_quality values # Also prefer explicit versions over clean versions (explicit flag) # And prefer newer releases (release_date) # Simple heuristic: prefer albums with explicit tag and newer release date keep_new = False if hasattr(album, 'explicit') and hasattr(existing, 'explicit'): if album.explicit and not existing.explicit: keep_new = True # If explicit status is same, prefer newer release if not keep_new and hasattr(album, 'release_date') and hasattr(existing, 'release_date'): if album.release_date and existing.release_date: if album.release_date > existing.release_date: keep_new = True if keep_new: albums_dict[title] = album # Convert deduplicated dict to output format output = [] for album in albums_dict.values(): output.append({ "id": album.id, "title": album.name, "artist": album.artist.name, "tracks": album.num_tracks, "release_date": str(album.release_date) if album.release_date else "Unknown", "cover": self._get_image_url(album.cover), "type": "album" }) # Sort by release date (newest first) output.sort(key=lambda x: x['release_date'], reverse=True) return output def get_track(self, track_id: str): return self.session.track(track_id) def get_album(self, album_id: str): return self.session.album(album_id)