# -*- coding: utf-8 -*-
# Copyright (C) 2023- The Tidalapi Developers
# Copyright (C) 2019-2022 morguldir
# Copyright (C) 2014 Thomas Amland
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see .
"""A module containing things related to TIDAL playlists."""
from __future__ import annotations
import copy
from datetime import datetime
from typing import TYPE_CHECKING, List, Optional, Sequence, Union, cast
from tidalapi.exceptions import ObjectNotFound, TooManyRequests
from tidalapi.types import JsonObj
from tidalapi.user import LoggedInUser
if TYPE_CHECKING:
from tidalapi.artist import Artist
from tidalapi.media import Track, Video
from tidalapi.session import Session
from tidalapi.user import User
import dateutil.parser
def list_validate(lst):
if isinstance(lst, str):
lst = [lst]
if len(lst) == 0:
raise ValueError("An empty list was provided.")
return lst
class Playlist:
"""An object containing various data about a playlist and methods to work with
them."""
id: Optional[str] = None
trn: Optional[str] = None
name: Optional[str] = None
num_tracks: int = -1
num_videos: int = -1
creator: Optional[Union["Artist", "User"]] = None
description: Optional[str] = None
duration: int = -1
last_updated: Optional[datetime] = None
created: Optional[datetime] = None
type = None
public: Optional[bool] = False
popularity: Optional[int] = None
promoted_artists: Optional[List["Artist"]] = None
last_item_added_at: Optional[datetime] = None
picture: Optional[str] = None
square_picture: Optional[str] = None
user_date_added: Optional[datetime] = None
_etag: Optional[str] = None
# Direct URL to https://listen.tidal.com/playlist/
listen_url: str = ""
# Direct URL to https://tidal.com/browse/playlist/
share_url: str = ""
def __init__(self, session: "Session", playlist_id: Optional[str]):
self.id = playlist_id
self.session = session
self.request = session.request
self._base_url = "playlists/%s"
if playlist_id:
try:
request = self.request.request("GET", self._base_url % self.id)
except ObjectNotFound:
raise ObjectNotFound("Playlist not found")
except TooManyRequests:
raise TooManyRequests("Playlist unavailable")
else:
self._etag = request.headers["etag"]
self.parse(request.json())
def parse(self, json_obj: JsonObj) -> "Playlist":
"""Parses a playlist from tidal, replaces the current playlist object.
:param json_obj: Json data returned from api.tidal.com containing a playlist
:return: Returns a copy of the original :exc: 'Playlist': object
"""
self.id = json_obj["uuid"]
self.trn = f"trn:playlist:{self.id}"
self.name = json_obj["title"]
self.num_tracks = int(json_obj["numberOfTracks"])
self.num_videos = int(json_obj["numberOfVideos"])
self.description = json_obj["description"]
self.duration = int(json_obj["duration"])
# These can be missing on from the /pages endpoints
last_updated = json_obj.get("lastUpdated")
self.last_updated = (
dateutil.parser.isoparse(last_updated) if last_updated else None
)
created = json_obj.get("created")
self.created = dateutil.parser.isoparse(created) if created else None
public = json_obj.get("publicPlaylist")
self.public = None if public is None else bool(public)
popularity = json_obj.get("popularity")
self.popularity = int(popularity) if popularity else None
self.type = json_obj["type"]
self.picture = json_obj["image"]
self.square_picture = json_obj["squareImage"]
promoted_artists = json_obj["promotedArtists"]
self.promoted_artists = (
self.session.parse_artists(promoted_artists) if promoted_artists else None
)
last_item_added_at = json_obj.get("lastItemAddedAt")
self.last_item_added_at = (
dateutil.parser.isoparse(last_item_added_at) if last_item_added_at else None
)
user_date_added = json_obj.get("dateAdded")
self.user_date_added = (
dateutil.parser.isoparse(user_date_added) if user_date_added else None
)
creator = json_obj.get("creator")
if self.type == "ARTIST" and creator and creator.get("id"):
self.creator = self.session.parse_artist(creator)
else:
self.creator = self.session.parse_user(creator) if creator else None
self.listen_url = f"{self.session.config.listen_base_url}/playlist/{self.id}"
self.share_url = f"{self.session.config.share_base_url}/playlist/{self.id}"
return copy.copy(self)
def factory(self) -> Union["Playlist", "UserPlaylist"]:
if (
self.id
and self.creator
and isinstance(self.session.user, LoggedInUser)
and self.creator.id == self.session.user.id
):
return UserPlaylist(self.session, self.id)
return self
def parse_factory(self, json_obj: JsonObj) -> "Playlist":
self.parse(json_obj)
return copy.copy(self.factory())
def tracks(self, limit: Optional[int] = None, offset: int = 0) -> List["Track"]:
"""Gets the playlists' tracks from TIDAL.
:param limit: The amount of items you want returned.
:param offset: The index of the first item you want included.
:return: A list of :class:`Tracks <.Track>`
"""
params = {"limit": limit, "offset": offset}
request = self.request.request(
"GET", self._base_url % self.id + "/tracks", params=params
)
self._etag = request.headers["etag"]
return list(
self.request.map_json(
json_obj=request.json(), parse=self.session.parse_track
)
)
def items(self, limit: int = 100, offset: int = 0) -> List[Union["Track", "Video"]]:
"""Fetches up to the first 100 items, including tracks and videos.
:param limit: The amount of items you want, up to 100.
:param offset: The index of the first item you want returned
:return: A list of :class:`Tracks<.Track>` and :class:`Videos<.Video>`
"""
params = {"limit": limit, "offset": offset}
request = self.request.request(
"GET", self._base_url % self.id + "/items", params=params
)
self._etag = request.headers["etag"]
return list(
self.request.map_json(request.json(), parse=self.session.parse_media)
)
def image(self, dimensions: int = 480, wide_fallback: bool = True) -> str:
"""A URL to a playlist picture.
:param dimensions: The width and height that want from the image
:type dimensions: int
:param wide_fallback: Use wide image as fallback if no square cover picture exists
:type wide_fallback: bool
:return: A url to the image
Original sizes: 160x160, 320x320, 480x480, 640x640, 750x750, 1080x1080
"""
if dimensions not in [160, 320, 480, 640, 750, 1080]:
raise ValueError("Invalid resolution {0} x {0}".format(dimensions))
if self.square_picture:
return self.session.config.image_url % (
self.square_picture.replace("-", "/"),
dimensions,
dimensions,
)
elif self.picture and wide_fallback:
return self.wide_image()
else:
raise AttributeError("No picture available")
def wide_image(self, width: int = 1080, height: int = 720) -> str:
"""Create a url to a wider playlist image.
:param width: The width of the image
:param height: The height of the image
:return: Returns a url to the image with the specified resolution
Valid sizes: 160x107, 480x320, 750x500, 1080x720
"""
if (width, height) not in [(160, 107), (480, 320), (750, 500), (1080, 720)]:
raise ValueError("Invalid resolution {} x {}".format(width, height))
if self.picture is None:
raise AttributeError("No picture available")
return self.session.config.image_url % (
self.picture.replace("-", "/"),
width,
height,
)
class Folder:
"""An object containing various data about a folder and methods to work with
them."""
trn: Optional[str] = None
id: Optional[str] = None
parent_folder_id: Optional[str] = None
name: Optional[str] = None
parent: Optional[str] = None # TODO Determine the correct type of the parent
added: Optional[datetime] = None
created: Optional[datetime] = None
last_modified: Optional[datetime] = None
total_number_of_items: int = 0
# Direct URL to https://listen.tidal.com/folder/
listen_url: str = ""
def __init__(
self,
session: "Session",
folder_id: Optional[str],
parent_folder_id: str = "root",
):
self.id = folder_id
self.parent_folder_id = parent_folder_id
self.session = session
self.request = session.request
self.playlist = session.playlist()
self._endpoint = "my-collection/playlists/folders"
if folder_id:
# Go through all available folders and see if the requested folder exists
try:
params = {
"folderId": parent_folder_id,
"offset": 0,
"limit": 50,
"order": "NAME",
"includeOnly": "FOLDER",
}
request = self.request.request(
"GET",
self._endpoint,
base_url=self.session.config.api_v2_location,
params=params,
)
for item in request.json().get("items"):
if item["data"].get("id") == folder_id:
self.parse(item)
return
raise ObjectNotFound
except ObjectNotFound:
raise ObjectNotFound(f"Folder not found")
except TooManyRequests:
raise TooManyRequests("Folder unavailable")
def _reparse(self) -> None:
params = {
"folderId": self.parent_folder_id,
"offset": 0,
"limit": 50,
"order": "NAME",
"includeOnly": "FOLDER",
}
request = self.request.request(
"GET",
self._endpoint,
base_url=self.session.config.api_v2_location,
params=params,
)
for item in request.json().get("items"):
if item["data"].get("id") == self.id:
self.parse(item)
return
def parse(self, json_obj: JsonObj) -> "Folder":
"""Parses a folder from tidal, replaces the current folder object.
:param json_obj: Json data returned from api.tidal.com containing a folder
:return: Returns a copy of the original :class:`Folder` object
"""
self.trn = json_obj.get("trn")
self.id = json_obj["data"].get("id")
self.name = json_obj.get("name")
self.parent = json_obj.get("parent")
added = json_obj.get("addedAt")
created = json_obj["data"].get("createdAt")
last_modified = json_obj["data"].get("lastModifiedAt")
self.added = dateutil.parser.isoparse(added) if added else None
self.created = dateutil.parser.isoparse(created) if added else None
self.last_modified = dateutil.parser.isoparse(last_modified) if added else None
self.total_number_of_items = json_obj["data"].get("totalNumberOfItems")
self.listen_url = f"{self.session.config.listen_base_url}/folder/{self.id}"
return copy.copy(self)
def rename(self, name: str) -> bool:
"""Rename the selected folder.
:param name: The name to be used for the folder
:return: True, if operation was successful.
"""
params = {"trn": self.trn, "name": name}
endpoint = "my-collection/playlists/folders/rename"
res = self.request.request(
"PUT",
endpoint,
base_url=self.session.config.api_v2_location,
params=params,
)
self._reparse()
return res.ok
def remove(self) -> bool:
"""Remove the selected folder.
:return: True, if operation was successful.
"""
params = {"trns": self.trn}
endpoint = "my-collection/playlists/folders/remove"
return self.request.request(
"PUT",
endpoint,
base_url=self.session.config.api_v2_location,
params=params,
).ok
def items(
self, offset: int = 0, limit: int = 50
) -> List[Union["Playlist", "UserPlaylist"]]:
"""Return the items in the folder.
:param offset: Optional; The index of the first item to be returned. Default: 0
:param limit: Optional; The amount of items you want returned. Default: 50
:return: Returns a list of :class:`Playlist` or :class:`UserPlaylist` objects
"""
params = {
"folderId": self.id,
"offset": offset,
"limit": limit,
"order": "NAME",
"includeOnly": "PLAYLIST",
}
endpoint = "my-collection/playlists/folders"
json_obj = self.request.request(
"GET",
endpoint,
base_url=self.session.config.api_v2_location,
params=params,
).json()
# Generate a dict of Playlist items from the response data
if json_obj.get("items"):
playlists = {"items": [item["data"] for item in json_obj.get("items")]}
return cast(
List[Union["Playlist", "UserPlaylist"]],
self.request.map_json(playlists, parse=self.playlist.parse_factory),
)
else:
return []
def add_items(self, trns: [str]):
"""Convenience method to add items to the current folder.
:param trns: List of playlist trns to be added to the current folder
:return: True, if operation was successful.
"""
self.move_items_to_folder(trns, self.id)
def move_items_to_root(self, trns: [str]):
"""Convenience method to move items from the current folder to the root folder.
:param trns: List of playlist trns to be moved from the current folder
:return: True, if operation was successful.
"""
self.move_items_to_folder(trns, folder="root")
def move_items_to_folder(self, trns: [str], folder: str = None):
"""Move item(s) in one folder to another folder.
:param trns: List of playlist trns to be moved.
:param folder: Destination folder. Default: Use the current folder
:return: True, if operation was successful.
"""
trns = list_validate(trns)
# Make sure all trns has the correct type prepended to it
trns_full = []
for trn in trns:
if "trn:" in trn:
trns_full.append(trn)
else:
trns_full.append(f"trn:playlist:{trn}")
if not folder:
folder = self.id
params = {"folderId": folder, "trns": ",".join(trns_full)}
endpoint = "my-collection/playlists/folders/move"
res = self.request.request(
"PUT",
endpoint,
base_url=self.session.config.api_v2_location,
params=params,
)
self._reparse()
return res.ok
class UserPlaylist(Playlist):
def _reparse(self) -> None:
"""Re-Read Playlist to get ETag."""
request = self.request.request("GET", self._base_url % self.id)
self._etag = request.headers["etag"]
self.request.map_json(request.json(), parse=self.parse)
def edit(
self, title: Optional[str] = None, description: Optional[str] = None
) -> bool:
"""Edit UserPlaylist title & description.
:param title: Playlist title
:param description: Playlist title.
:return: True, if successful.
"""
if not title:
title = self.name
if not description:
description = self.description
data = {"title": title, "description": description}
return self.request.request("POST", self._base_url % self.id, data=data).ok
def delete_by_id(self, media_ids: List[str]) -> bool:
"""Delete one or more items from the UserPlaylist.
:param media_ids: Lists of Media IDs to remove.
:return: True, if successful.
"""
media_ids = list_validate(media_ids)
# Generate list of track indices of tracks found in the list of media_ids.
track_ids = [str(track.id) for track in self.tracks()]
matching_indices = [i for i, item in enumerate(track_ids) if item in media_ids]
return self.remove_by_indices(matching_indices)
def add(
self,
media_ids: List[str],
allow_duplicates: bool = False,
position: int = -1,
limit: int = 100,
) -> List[int]:
"""Add one or more items to the UserPlaylist.
:param media_ids: List of Media IDs to add.
:param allow_duplicates: Allow adding duplicate items
:param position: Insert items at a specific position.
Default: insert at the end of the playlist
:param limit: Maximum number of items to add
:return: List of media IDs that has been added
"""
media_ids = list_validate(media_ids)
# Insert items at a specific index
if position < 0 or position > self.num_tracks:
position = self.num_tracks
data = {
"onArtifactNotFound": "SKIP",
"trackIds": ",".join(map(str, media_ids)),
"toIndex": position,
"onDupes": "ADD" if allow_duplicates else "SKIP",
}
params = {"limit": limit}
headers = {"If-None-Match": self._etag} if self._etag else None
res = self.request.request(
"POST",
self._base_url % self.id + "/items",
params=params,
data=data,
headers=headers,
)
self._reparse()
# Respond with the added item IDs:
added_items = res.json().get("addedItemIds")
if added_items:
return added_items
else:
return []
def merge(
self, playlist: str, allow_duplicates: bool = False, allow_missing: bool = True
) -> List[int]:
"""Add (merge) items from a playlist with the current playlist.
:param playlist: Playlist UUID to be merged in the current playlist
:param allow_duplicates: If true, duplicate tracks are allowed. Otherwise,
tracks will be skipped.
:param allow_missing: If true, missing tracks are allowed. Otherwise, exception
will be thrown
:return: List of items that has been added from the playlist
"""
data = {
"fromPlaylistUuid": str(playlist),
"onArtifactNotFound": "SKIP" if allow_missing else "FAIL",
"onDupes": "ADD" if allow_duplicates else "SKIP",
}
headers = {"If-None-Match": self._etag} if self._etag else None
res = self.request.request(
"POST",
self._base_url % self.id + "/items",
data=data,
headers=headers,
)
self._reparse()
# Respond with the added item IDs:
added_items = res.json().get("addedItemIds")
if added_items:
return added_items
else:
return []
def add_by_isrc(
self,
isrc: str,
allow_duplicates: bool = False,
position: int = -1,
) -> bool:
"""Add an item to a playlist, using the track ISRC.
:param isrc: The ISRC of the track to be added
:param allow_duplicates: Allow adding duplicate items
:param position: Insert items at a specific position.
Default: insert at the end of the playlist
:return: True, if successful.
"""
if not isinstance(isrc, str):
isrc = str(isrc)
try:
track = self.session.get_tracks_by_isrc(isrc)
if track:
# Add the first track in the list
track_id = track[0].id
added = self.add(
[str(track_id)],
allow_duplicates=allow_duplicates,
position=position,
)
if track_id in added:
return True
else:
return False
else:
return False
except ObjectNotFound:
return False
def move_by_id(self, media_id: str, position: int) -> bool:
"""Move an item to a new position, by media ID.
:param media_id: The index of the item to be moved
:param position: The new position of the item
:return: True, if successful.
"""
if not isinstance(media_id, str):
media_id = str(media_id)
track_ids = [str(track.id) for track in self.tracks()]
try:
index = track_ids.index(media_id)
if index is not None and index < self.num_tracks:
return self.move_by_indices([index], position)
except ValueError:
return False
def move_by_index(self, index: int, position: int) -> bool:
"""Move a single item to a new position.
:param index: The index of the item to be moved
:param position: The new position/offset of the item
:return: True, if successful.
"""
if not isinstance(index, int):
raise ValueError
return self.move_by_indices([index], position)
def move_by_indices(self, indices: Sequence[int], position: int) -> bool:
"""Move one or more items to a new position.
:param indices: List containing indices to move.
:param position: The new position/offset of the item(s)
:return: True, if successful.
"""
# Move item to a new position
if position < 0 or position >= self.num_tracks:
position = self.num_tracks
data = {
"toIndex": position,
}
headers = {"If-None-Match": self._etag} if self._etag else None
track_index_string = ",".join([str(x) for x in indices])
res = self.request.request(
"POST",
(self._base_url + "/items/%s") % (self.id, track_index_string),
data=data,
headers=headers,
)
self._reparse()
return res.ok
def remove_by_id(self, media_id: str) -> bool:
"""Remove a single item from the playlist, using the media ID.
:param media_id: Media ID to remove.
:return: True, if successful.
"""
if not isinstance(media_id, str):
media_id = str(media_id)
track_ids = [str(track.id) for track in self.tracks()]
try:
index = track_ids.index(media_id)
if index is not None and index < self.num_tracks:
return self.remove_by_index(index)
except ValueError:
return False
def remove_by_index(self, index: int) -> bool:
"""Remove a single item from the UserPlaylist, using item index.
:param index: Media index to remove
:return: True, if successful.
"""
return self.remove_by_indices([index])
def remove_by_indices(self, indices: Sequence[int]) -> bool:
"""Remove one or more items from the UserPlaylist, using list of indices.
:param indices: List containing indices to remove.
:return: True, if successful.
"""
headers = {"If-None-Match": self._etag} if self._etag else None
track_index_string = ",".join([str(x) for x in indices])
res = self.request.request(
"DELETE",
(self._base_url + "/items/%s") % (self.id, track_index_string),
headers=headers,
)
self._reparse()
return res.ok
def clear(self, chunk_size: int = 50) -> bool:
"""Clear UserPlaylist.
:param chunk_size: Number of items to remove per request
:return: True, if successful.
"""
while self.num_tracks:
indices = range(min(self.num_tracks, chunk_size))
if not self.remove_by_indices(indices):
return False
return True
def set_playlist_public(self):
"""Set UserPlaylist as Public.
:return: True, if successful.
"""
res = self.request.request(
"PUT",
base_url=self.session.config.api_v2_location,
path=(self._base_url + "/set-public") % self.id,
)
self.public = True
self._reparse()
return res.ok
def set_playlist_private(self):
"""Set UserPlaylist as Private.
:return: True, if successful.
"""
res = self.request.request(
"PUT",
base_url=self.session.config.api_v2_location,
path=(self._base_url + "/set-private") % self.id,
)
self.public = False
self._reparse()
return res.ok
def delete(self) -> bool:
"""Delete UserPlaylist.
:return: True, if successful.
"""
return self.request.request("DELETE", path="playlists/%s" % self.id).ok