Files
2024-12-27 22:00:28 +09:00

806 lines
33 KiB
Python

import os
import pathlib
import random
import shutil
import tempfile
import time
from collections.abc import Callable
from concurrent import futures
from uuid import uuid4
import ffmpeg
import m3u8
import requests
from requests.adapters import HTTPAdapter, Retry
from requests.exceptions import HTTPError
from rich.progress import Progress, TaskID
from tidalapi import Album, Mix, Playlist, Session, Track, UserPlaylist, Video
from tidalapi.exceptions import TooManyRequests
from tidalapi.media import AudioExtensions, Codec, Quality, Stream, StreamManifest, VideoExtensions
from tidal_dl_ng.config import Settings
from tidal_dl_ng.constants import (
CHUNK_SIZE,
COVER_NAME,
EXTENSION_LYRICS,
PLAYLIST_EXTENSION,
PLAYLIST_PREFIX,
REQUESTS_TIMEOUT_SEC,
MediaType,
QualityVideo,
)
from tidal_dl_ng.helper.decryption import decrypt_file, decrypt_security_token
from tidal_dl_ng.helper.exceptions import MediaMissing
from tidal_dl_ng.helper.path import check_file_exists, format_path_media, path_file_sanitize, url_to_filename
from tidal_dl_ng.helper.tidal import (
instantiate_media,
items_results_all,
name_builder_album_artist,
name_builder_artist,
name_builder_item,
name_builder_title,
)
from tidal_dl_ng.metadata import Metadata
from tidal_dl_ng.model.downloader import DownloadSegmentResult
from tidal_dl_ng.model.gui_data import ProgressBars
# TODO: Set appropriate client string and use it for video download.
# https://github.com/globocom/m3u8#using-different-http-clients
class RequestsClient:
def download(
self, uri: str, timeout: int = REQUESTS_TIMEOUT_SEC, headers: dict | None = None, verify_ssl: bool = True
):
if not headers:
headers = {}
o = requests.get(uri, timeout=timeout, headers=headers)
return o.text, o.url
# TODO: Use pathlib.Path everywhere
class Download:
settings: Settings
session: Session
skip_existing: bool = False
fn_logger: Callable
progress_gui: ProgressBars
progress: Progress
def __init__(
self,
session: Session,
path_base: str,
fn_logger: Callable,
skip_existing: bool = False,
progress_gui: ProgressBars = None,
progress: Progress = None,
):
self.settings = Settings()
self.session = session
self.skip_existing = skip_existing
self.fn_logger = fn_logger
self.progress_gui = progress_gui
self.progress = progress
self.path_base = path_base
if not self.settings.data.path_binary_ffmpeg and (
self.settings.data.video_convert_mp4 or self.settings.data.extract_flac
):
self.settings.data.video_convert_mp4 = False
self.settings.data.extract_flac = False
self.fn_logger.error(
"FFmpeg path is not set. Videos can be downloaded but will not be processed. FLAC cannot be "
"extracted from MP4 containers. Make sure FFmpeg is installed. The path to the FFmpeg binary must "
"be set in (`path_binary_ffmpeg`)."
)
def _download(
self,
media: Track | Video,
path_file: pathlib.Path,
stream_manifest: StreamManifest | None = None,
) -> (bool, pathlib.Path):
media_name: str = name_builder_item(media)
urls: [str]
path_base: pathlib.Path = path_file.parent
result_segments: bool = True
dl_segment_results: [DownloadSegmentResult] = []
result_merge: bool = False
# Get urls for media.
try:
if isinstance(media, Track):
urls = stream_manifest.get_urls()
elif isinstance(media, Video):
m3u8_variant: m3u8.M3U8 = m3u8.load(media.get_url())
# Find the desired video resolution or the next best one.
m3u8_playlist, codecs = self._extract_video_stream(m3u8_variant, int(self.settings.data.quality_video))
# Populate urls.
urls = m3u8_playlist.files
except Exception:
return False, path_file
# Set the correct progress output channel.
if self.progress_gui is None:
progress_to_stdout: bool = True
else:
progress_to_stdout: bool = False
# Send signal to GUI with media name
self.progress_gui.item_name.emit(media_name[:30])
# Compute total iterations for progress
urls_count: int = len(urls)
if urls_count > 1:
progress_total: int = urls_count
block_size: int | None = None
elif urls_count == 1:
# Get file size and compute progress steps
r = requests.head(urls[0], timeout=REQUESTS_TIMEOUT_SEC)
total_size_in_bytes: int = int(r.headers.get("content-length", 0))
block_size: int | None = 1048576
progress_total: float = total_size_in_bytes / block_size
else:
raise ValueError
# Create progress Task
p_task: TaskID = self.progress.add_task(
f"[blue]Item '{media_name[:30]}'",
total=progress_total,
visible=progress_to_stdout,
)
# Download segments until progress is finished.
# TODO: Compute download speed (https://github.com/Textualize/rich/blob/master/examples/downloader.py)
while not self.progress.tasks[p_task].finished:
with futures.ThreadPoolExecutor(
max_workers=self.settings.data.downloads_simultaneous_per_track_max
) as executor:
# Dispatch all download tasks to worker threads
l_futures: [any] = [
executor.submit(self._download_segment, url, path_base, block_size, p_task, progress_to_stdout)
for url in urls
]
# Report results as they become available
for future in futures.as_completed(l_futures):
# Retrieve result
result_dl_segment: DownloadSegmentResult = future.result()
dl_segment_results.append(result_dl_segment)
# check for a link that was skipped
if not result_dl_segment.result and (result_dl_segment.url is not urls[-1]):
# Sometimes it happens, if a track is very short (< 8 seconds or so), that the last URL in `urls` is
# invalid (HTTP Error 500) and not necessary. File won't be corrupt.
# If this is NOT the case, but any other URL has resulted in an error,
# mark the whole thing as corrupt.
result_segments = False
self.fn_logger.error(f"Something went wrong while downloading {media_name}. File is corrupt!")
tmp_path_file_decrypted: pathlib.Path = path_file
# Only if no error happened while downloading.
if result_segments:
# Bring list into right order, so segments can be easily merged.
dl_segment_results.sort(key=lambda x: x.id_segment)
result_merge: bool = self._segments_merge(path_file, dl_segment_results)
if not result_merge:
self.fn_logger.error(f"Something went wrong while writing to {media_name}. File is corrupt!")
elif result_merge and isinstance(media, Track) and stream_manifest.is_encrypted:
key, nonce = decrypt_security_token(stream_manifest.encryption_key)
tmp_path_file_decrypted = path_file.with_suffix(".decrypted")
decrypt_file(path_file, tmp_path_file_decrypted, key, nonce)
return result_merge, tmp_path_file_decrypted
def _segments_merge(self, path_file, dl_segment_results) -> bool:
result: bool
# Copy the content of all segments into one file.
try:
with path_file.open("wb") as f_target:
for dl_segment_result in dl_segment_results:
with dl_segment_result.path_segment.open("rb") as f_segment:
# Read and write junks, which gives better HDD write performance
while segment := f_segment.read(CHUNK_SIZE):
f_target.write(segment)
# Delete segment from HDD
dl_segment_result.path_segment.unlink()
result = True
except Exception:
result = False
return result
def _download_segment(
self, url: str, path_base: pathlib.Path, block_size: int | None, p_task: TaskID, progress_to_stdout: bool
) -> DownloadSegmentResult:
result: bool = False
path_segment: pathlib.Path = path_base / url_to_filename(url)
# Calculate the segment ID based on the file name within the URL.
filename_stem: str = str(path_segment.stem).split("_")[-1]
# CAUTION: This is a workaround, so BTS (LOW quality) track will work. They usually have only ONE link.
id_segment: int = int(filename_stem) if filename_stem.isdecimal() else 0
error: HTTPError | None = None
# Retry download on failed segments, with an exponential delay between retries
s = requests.Session()
retries = Retry(total=5, backoff_factor=1) # , status_forcelist=[ 502, 503, 504 ])
s.mount("https://", HTTPAdapter(max_retries=retries))
try:
# Create the request object with stream=True, so the content won't be loaded into memory at once.
r = s.get(url, stream=True, timeout=REQUESTS_TIMEOUT_SEC)
r.raise_for_status()
# Write the content to disk. If `chunk_size` is set to `None` the whole file will be written at once.
with path_segment.open("wb") as f:
for data in r.iter_content(chunk_size=block_size):
f.write(data)
# Advance progress bar.
self.progress.advance(p_task)
result = True
except Exception:
self.progress.advance(p_task)
# To send the progress to the GUI, we need to emit the percentage.
if not progress_to_stdout:
self.progress_gui.item.emit(self.progress.tasks[p_task].percentage)
return DownloadSegmentResult(
result=result, url=url, path_segment=path_segment, id_segment=id_segment, error=error
)
def extension_guess(self, quality_audio: Quality, is_video: bool) -> AudioExtensions | VideoExtensions:
result: AudioExtensions | VideoExtensions
if is_video:
result = AudioExtensions.MP4 if self.settings.data.video_convert_mp4 else VideoExtensions.TS
else:
result = (
AudioExtensions.FLAC
if self.settings.data.extract_flac and quality_audio in (Quality.hi_res_lossless, Quality.high_lossless)
else AudioExtensions.M4A
)
return result
def item(
self,
file_template: str,
media: Track | Video = None,
media_id: str = None,
media_type: MediaType = None,
video_download: bool = True,
download_delay: bool = False,
quality_audio: Quality | None = None,
quality_video: QualityVideo | None = None,
is_parent_album: bool = False,
) -> (bool, pathlib.Path):
try:
if media_id and media_type:
# If no media instance is provided, we need to create the media instance.
media = instantiate_media(self.session, media_type, media_id)
elif isinstance(media, Track): # Check if media is available not deactivated / removed from TIDAL.
if not media.available:
self.fn_logger.info(
f"This track is not available for listening anymore on TIDAL. Skipping: {name_builder_item(media)}"
)
return False, ""
else:
# Re-create media instance with full album information
media = self.session.track(media.id, with_album=True)
elif not media:
raise MediaMissing
except:
return False, ""
# If video download is not allowed end here
if not video_download and isinstance(media, Video):
self.fn_logger.info(
f"Video downloads are deactivated (see settings). Skipping video: {name_builder_item(media)}"
)
return False, ""
# Create file name and path
file_extension_dummy: str = self.extension_guess(quality_audio, isinstance(media, Video))
file_name_relative: str = format_path_media(file_template, media, self.settings.data.album_track_num_pad_min)
path_media_dst: pathlib.Path = (
pathlib.Path(self.path_base).expanduser() / (file_name_relative + file_extension_dummy)
).absolute()
# Sanitize final path_file to fit into OS boundaries.
path_media_dst = pathlib.Path(path_file_sanitize(str(path_media_dst), adapt=True))
# Compute if and how downloads need to be skipped.
skip_download: bool = False
if self.skip_existing:
skip_file: bool = check_file_exists(path_media_dst, extension_ignore=False)
if self.settings.data.symlink_to_track and not isinstance(media, Video):
# Compute symlink tracks path, sanitize and check if file exists
file_name_track_dir_relative: str = format_path_media(self.settings.data.format_track, media)
path_media_track_dir: pathlib.Path = (
pathlib.Path(self.path_base).expanduser() / (file_name_track_dir_relative + file_extension_dummy)
).absolute()
path_media_track_dir = pathlib.Path(path_file_sanitize(str(path_media_track_dir), adapt=True))
file_exists_track_dir: bool = check_file_exists(path_media_track_dir, extension_ignore=False)
file_exists_playlist_dir: bool = (
not file_exists_track_dir and skip_file and not path_media_dst.is_symlink()
)
skip_download = file_exists_playlist_dir or file_exists_track_dir
# If
if skip_file and file_exists_playlist_dir:
skip_file = False
else:
skip_file: bool = False
if not skip_file:
# If a quality is explicitly set, change it and remember the previously set quality.
quality_audio_old: Quality = self.adjust_quality_audio(quality_audio) if quality_audio else quality_audio
quality_video_old: QualityVideo = (
self.adjust_quality_video(quality_video) if quality_video else quality_video
)
do_flac_extract = False
# Get extension.
file_extension: str
stream_manifest: StreamManifest | None = None
if isinstance(media, Track):
try:
media_stream: Stream = media.get_stream()
stream_manifest = media_stream.get_stream_manifest()
except TooManyRequests:
self.fn_logger.exception(
f"Too many requests against TIDAL backend. Skipping '{name_builder_item(media)}'. "
f"Consider to activate delay between downloads."
)
return False, ""
except Exception:
self.fn_logger.exception(f"Something went wrong. Skipping '{name_builder_item(media)}'.")
return False, ""
file_extension = stream_manifest.file_extension
if self.settings.data.extract_flac and (
stream_manifest.codecs.upper() == Codec.FLAC and file_extension != AudioExtensions.FLAC
):
file_extension = AudioExtensions.FLAC
do_flac_extract = True
elif isinstance(media, Video):
file_extension = AudioExtensions.MP4 if self.settings.data.video_convert_mp4 else VideoExtensions.TS
# Compute file name, sanitize once again and create destination directory
path_media_dst = path_media_dst.with_suffix(file_extension)
path_media_dst = pathlib.Path(path_file_sanitize(str(path_media_dst), adapt=True))
os.makedirs(path_media_dst.parent, exist_ok=True)
if not skip_download:
# Create a temp directory and file.
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmp_path_dir:
tmp_path_file: pathlib.Path = pathlib.Path(tmp_path_dir) / str(uuid4())
# Create empty file
tmp_path_file.touch()
# Download media.
result_download, tmp_path_file = self._download(
media=media, stream_manifest=stream_manifest, path_file=tmp_path_file
)
if result_download:
# Convert video from TS to MP4
if isinstance(media, Video) and self.settings.data.video_convert_mp4:
# Convert `*.ts` file to `*.mp4` using ffmpeg
tmp_path_file = self._video_convert(tmp_path_file)
# Extract FLAC from MP4 container using ffmpeg
if isinstance(media, Track) and self.settings.data.extract_flac and do_flac_extract:
tmp_path_file = self._extract_flac(tmp_path_file)
tmp_path_lyrics: pathlib.Path | None = None
tmp_path_cover: pathlib.Path | None = None
# Write metadata to file.
if not isinstance(media, Video):
result_metadata, tmp_path_lyrics, tmp_path_cover = self.metadata_write(
media, tmp_path_file, is_parent_album, media_stream
)
# Move lyrics file
if self.settings.data.lyrics_file and not isinstance(media, Video) and tmp_path_lyrics:
self._move_lyrics(tmp_path_lyrics, path_media_dst)
# Move cover file
# TODO: Cover is downloaded with every track of the album. Needs refactoring, so cover is only
# downloaded for an album once.
if self.settings.data.cover_album_file and tmp_path_cover:
self._move_cover(tmp_path_cover, path_media_dst)
self.fn_logger.info(f"Downloaded item '{name_builder_item(media)}'.")
# Move final file to the configured destination directory.
shutil.move(tmp_path_file, path_media_dst)
# If files needs to be symlinked, do postprocessing here.
if self.settings.data.symlink_to_track and not isinstance(media, Video):
path_media_track_dir: pathlib.Path = self.media_move_and_symlink(media, path_media_dst, file_extension)
if quality_audio:
# Set quality back to the global user value
self.adjust_quality_audio(quality_audio_old)
if quality_video:
# Set quality back to the global user value
self.adjust_quality_video(quality_video_old)
else:
self.fn_logger.debug(f"Download skipped, since file exists: '{path_media_dst}'")
status_download: bool = not skip_file
# Whether a file was downloaded or skipped and the download delay is enabled, wait until the next download.
# Only use this, if you have a list of several Track items.
if download_delay and not skip_file:
time_sleep: float = round(
random.SystemRandom().uniform(
self.settings.data.download_delay_sec_min, self.settings.data.download_delay_sec_max
),
1,
)
self.fn_logger.debug(f"Next download will start in {time_sleep} seconds.")
time.sleep(time_sleep)
return status_download, path_media_dst
def media_move_and_symlink(
self, media: Track | Video, path_media_src: pathlib.Path, file_extension: str
) -> pathlib.Path:
# Compute tracks path, sanitize and ensure path exists
file_name_relative: str = format_path_media(self.settings.data.format_track, media)
path_media_dst: pathlib.Path = (
pathlib.Path(self.path_base).expanduser() / (file_name_relative + file_extension)
).absolute()
path_media_dst = pathlib.Path(path_file_sanitize(str(path_media_dst), adapt=True))
os.makedirs(path_media_dst.parent, exist_ok=True)
# Move item and symlink it
if path_media_dst != path_media_src:
if self.skip_existing:
skip_file: bool = check_file_exists(path_media_dst, extension_ignore=False)
skip_symlink: bool = path_media_src.is_symlink()
else:
skip_file: bool = False
skip_symlink: bool = False
if not skip_file:
self.fn_logger.debug(f"Move: {path_media_src} -> {path_media_dst}")
shutil.move(path_media_src, path_media_dst)
if not skip_symlink:
self.fn_logger.debug(f"Symlink: {path_media_src} -> {path_media_dst}")
path_media_src.unlink(missing_ok=True)
path_media_src.symlink_to(path_media_dst)
return path_media_dst
def adjust_quality_audio(self, quality) -> Quality:
# Save original quality settings
quality_old: Quality = self.session.audio_quality
self.session.audio_quality = quality
return quality_old
def adjust_quality_video(self, quality) -> QualityVideo:
quality_old: QualityVideo = self.settings.data.quality_video
self.settings.data.quality_video = quality
return quality_old
def _move_file(self, path_file_source: pathlib.Path, path_file_destination: str | pathlib.Path) -> bool:
result: bool
# Check if the file was downloaded
if path_file_source and path_file_source.is_file():
# Move it.
shutil.move(path_file_source, path_file_destination)
result = True
else:
result = False
return result
def _move_lyrics(self, path_lyrics: pathlib.Path, file_media_dst: pathlib.Path) -> bool:
# Build tmp lyrics filename
path_file_lyrics: pathlib.Path = file_media_dst.with_suffix(EXTENSION_LYRICS)
result: bool = self._move_file(path_lyrics, path_file_lyrics)
return result
def _move_cover(self, path_cover: pathlib.Path, file_media_dst: pathlib.Path) -> bool:
# Build tmp lyrics filename
path_file_cover: pathlib.Path = file_media_dst.parent / COVER_NAME
result: bool = self._move_file(path_cover, path_file_cover)
return result
def lyrics_to_file(self, dir_destination: pathlib.Path, lyrics: str) -> str:
return self.write_to_tmp_file(dir_destination, mode="x", content=lyrics)
def cover_to_file(self, dir_destination: pathlib.Path, image: bytes) -> str:
return self.write_to_tmp_file(dir_destination, mode="xb", content=image)
def write_to_tmp_file(self, dir_destination: pathlib.Path, mode: str, content: str | bytes) -> str:
result: str = dir_destination / str(uuid4())
encoding: str | None = "utf-8" if isinstance(content, str) else None
try:
with open(result, mode=mode, encoding=encoding) as f:
f.write(content)
except:
result = ""
return result
@staticmethod
def cover_data(url: str = None, path_file: str = None) -> str | bytes:
result: str | bytes = ""
if url:
try:
result = requests.get(url, timeout=REQUESTS_TIMEOUT_SEC).content
except Exception as e:
# TODO: Implement propper logging.
print(e)
elif path_file:
try:
with open(path_file, "rb") as f:
result = f.read()
except OSError as e:
# TODO: Implement propper logging.
print(e)
return result
def metadata_write(
self, track: Track, path_media: pathlib.Path, is_parent_album: bool, media_stream: Stream
) -> (bool, pathlib.Path | None, pathlib.Path | None):
result: bool = False
path_lyrics: pathlib.Path | None = None
path_cover: pathlib.Path | None = None
release_date: str = (
track.album.available_release_date.strftime("%Y-%m-%d")
if track.album.available_release_date
else track.album.release_date.strftime("%Y-%m-%d") if track.album.release_date else ""
)
copy_right: str = track.copyright if hasattr(track, "copyright") and track.copyright else ""
isrc: str = track.isrc if hasattr(track, "isrc") and track.isrc else ""
lyrics: str = ""
cover_data: bytes = None
if self.settings.data.lyrics_embed or self.settings.data.lyrics_file:
# Try to retrieve lyrics.
try:
lyrics_obj = track.lyrics()
if lyrics_obj.subtitles:
lyrics = lyrics_obj.subtitles
elif lyrics_obj.text:
lyrics = lyrics_obj.text
except:
lyrics = ""
# TODO: Implement proper logging.
print(f"Could not retrieve lyrics for `{name_builder_item(track)}`.")
if lyrics and self.settings.data.lyrics_file:
path_lyrics = self.lyrics_to_file(path_media.parent, lyrics)
if self.settings.data.metadata_cover_embed or (self.settings.data.cover_album_file and is_parent_album):
url_cover = track.album.image(int(self.settings.data.metadata_cover_dimension))
cover_data = self.cover_data(url=url_cover)
if cover_data and self.settings.data.cover_album_file and is_parent_album:
path_cover = self.cover_to_file(path_media.parent, cover_data)
# `None` values are not allowed.
m: Metadata = Metadata(
path_file=path_media,
lyrics=lyrics,
copy_right=copy_right,
title=name_builder_title(track),
artists=name_builder_artist(track),
album=track.album.name if track.album else "",
tracknumber=track.track_num,
date=release_date,
isrc=isrc,
albumartist=name_builder_album_artist(track),
totaltrack=track.album.num_tracks if track.album and track.album.num_tracks else 1,
totaldisc=track.album.num_volumes if track.album and track.album.num_volumes else 1,
discnumber=track.volume_num if track.volume_num else 1,
cover_data=cover_data if self.settings.data.metadata_cover_embed else None,
album_replay_gain=media_stream.album_replay_gain,
album_peak_amplitude=media_stream.album_peak_amplitude,
track_replay_gain=media_stream.track_replay_gain,
track_peak_amplitude=media_stream.track_peak_amplitude,
)
m.save()
result = True
return result, path_lyrics, path_cover
def items(
self,
file_template: str,
media: Album | Playlist | UserPlaylist | Mix = None,
media_id: str = None,
media_type: MediaType = None,
video_download: bool = False,
download_delay: bool = True,
quality_audio: Quality | None = None,
quality_video: QualityVideo | None = None,
):
# If no media instance is provided, we need to create the media instance.
if media_id and media_type:
media = instantiate_media(self.session, media_type, media_id)
elif not media:
raise MediaMissing
# Create file name and path
file_name_relative: str = format_path_media(file_template, media, self.settings.data.album_track_num_pad_min)
# Get the name of the list and check, if videos should be included.
list_media_name: str = name_builder_title(media)
list_media_name_short: str = list_media_name[:30]
# Get all items of the list.
items = items_results_all(media, videos_include=video_download)
# Determine where to redirect the progress information.
if self.progress_gui is None:
progress_stdout: bool = True
else:
progress_stdout: bool = False
self.progress_gui.list_name.emit(list_media_name_short[:30])
# Create the list progress task.
p_task1: TaskID = self.progress.add_task(
f"[green]List '{list_media_name_short}'", total=len(items), visible=progress_stdout
)
is_album: bool = isinstance(media, Album)
result_dirs: [pathlib.Path] = []
# Iterate through list items
while not self.progress.finished:
with futures.ThreadPoolExecutor(max_workers=self.settings.data.downloads_concurrent_max) as executor:
# Dispatch all download tasks to worker threads
l_futures: [any] = [
executor.submit(
self.item,
media=item_media,
file_template=file_name_relative,
quality_audio=quality_audio,
quality_video=quality_video,
download_delay=download_delay,
is_parent_album=is_album,
)
for item_media in items
]
# Report results as they become available
for future in futures.as_completed(l_futures):
# Retrieve result
status, result_path_file = future.result()
if status:
result_dirs.append(result_path_file.parent)
# Advance progress bar.
self.progress.advance(p_task1)
if not progress_stdout:
self.progress_gui.list_item.emit(self.progress.tasks[p_task1].percentage)
# Create playlist file
if self.settings.data.playlist_create:
self.playlist_populate(set(result_dirs), list_media_name, is_album)
self.fn_logger.info(f"Finished list '{list_media_name}'.")
def playlist_populate(self, dirs_scoped: [pathlib.Path], name_list: str, is_album: bool) -> [pathlib.Path]:
result: [pathlib.Path] = []
# For each dir, which contains tracks
for dir_scoped in dirs_scoped:
# Sanitize final playlist name to fit into OS boundaries.
path_playlist = dir_scoped / (PLAYLIST_PREFIX + name_list + PLAYLIST_EXTENSION)
path_playlist = pathlib.Path(path_file_sanitize(path_playlist, adapt=True))
self.fn_logger.debug(f"Playlist: Creating {path_playlist}")
# Get all tracks in the directory
path_tracks: [pathlib.Path] = []
for extension_audio in AudioExtensions:
path_tracks = path_tracks + list(dir_scoped.glob(f"*{extension_audio!s}"))
# If it is not an album sort by modification time
if not is_album:
path_tracks.sort(key=lambda x: os.path.getmtime(x))
# Write data to m3u file
with path_playlist.open(mode="w", encoding="utf-8") as f:
for path_track in path_tracks:
# If it's a symlink write the relative file path to the actual track into the playlist file
if path_track.is_symlink():
media_file_target = path_track.resolve().relative_to(path_track.parent, walk_up=True)
f.write(str(media_file_target) + os.linesep)
result.append(path_playlist)
return result
def _video_convert(self, path_file: pathlib.Path) -> pathlib.Path:
path_file_out: pathlib.Path = path_file.with_suffix(AudioExtensions.MP4)
result, _ = (
ffmpeg.input(path_file)
.output(str(path_file_out), map=0, c="copy", loglevel="quiet")
.run(cmd=self.settings.data.path_binary_ffmpeg)
)
return path_file_out
def _extract_flac(self, path_media_src: pathlib.Path) -> pathlib.Path:
path_media_out = path_media_src.with_suffix(AudioExtensions.FLAC)
result, _ = (
ffmpeg.input(path_media_src)
.output(
str(path_media_out),
map=0,
movflags="use_metadata_tags",
acodec="copy",
map_metadata="0:g",
loglevel="quiet",
)
.run(cmd=self.settings.data.path_binary_ffmpeg)
)
return path_media_out
def _extract_video_stream(self, m3u8_variant: m3u8.M3U8, quality: int) -> (m3u8.M3U8 | bool, str):
m3u8_playlist: m3u8.M3U8 | bool = False
resolution_best: int = 0
mime_type: str = ""
if m3u8_variant.is_variant:
for playlist in m3u8_variant.playlists:
if resolution_best < playlist.stream_info.resolution[1]:
resolution_best = playlist.stream_info.resolution[1]
m3u8_playlist = m3u8.load(playlist.uri)
mime_type = playlist.stream_info.codecs
if quality == playlist.stream_info.resolution[1]:
break
return m3u8_playlist, mime_type