KaraokeVideoDownloader/karaoke_downloader/download_pipeline.py

315 lines
11 KiB
Python

"""
Download pipeline that abstracts the complete download → verify → tag → track process.
"""
import subprocess
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
from karaoke_downloader.config_manager import AppConfig
from karaoke_downloader.error_utils import (
handle_file_validation_error,
handle_yt_dlp_error,
log_error,
)
from karaoke_downloader.id3_utils import add_id3_tags
from karaoke_downloader.songlist_manager import mark_songlist_song_downloaded
from karaoke_downloader.video_downloader import is_valid_mp4, sanitize_filename
from karaoke_downloader.youtube_utils import (
build_yt_dlp_command,
execute_yt_dlp_command,
show_available_formats,
)
from karaoke_downloader.file_utils import (
cleanup_temp_files,
get_unique_filename,
is_valid_mp4_file,
sanitize_filename,
)
class DownloadPipeline:
"""
Handles the complete download pipeline: download → verify → tag → track
"""
def __init__(
self,
yt_dlp_path: str,
config: Union[AppConfig, Dict[str, Any]],
downloads_dir: Path,
songlist_tracking: Optional[Dict] = None,
tracker=None,
):
self.yt_dlp_path = yt_dlp_path
self.config = config
self.downloads_dir = downloads_dir
self.songlist_tracking = songlist_tracking or {}
self.tracker = tracker
def execute_pipeline(
self,
video_id: str,
artist: str,
title: str,
channel_name: str,
video_title: Optional[str] = None,
) -> bool:
"""
Execute the complete download pipeline for a single video.
Args:
video_id: YouTube video ID
artist: Artist name
title: Song title
channel_name: Channel name
video_title: Original video title (optional)
Returns:
True if successful, False otherwise
"""
try:
# Step 1: Prepare file path and check for existing files
output_path, file_exists = get_unique_filename(self.downloads_dir, channel_name, artist, title)
if file_exists:
print(f"⏭️ Skipping download - file already exists: {output_path.name}")
# Still add tags and track the existing file
if self._add_tags(output_path, artist, title, channel_name):
self._track_download(output_path, artist, title, video_id, channel_name)
return True
# Step 2: Download video
if not self._download_video(video_id, output_path, artist, title, channel_name):
return False
# Step 3: Verify download
if not self._verify_download(
output_path, artist, title, video_id, channel_name
):
return False
# Step 4: Add ID3 tags
if not self._add_tags(output_path, artist, title, channel_name):
return False
# Step 5: Track download
if not self._track_download(
output_path, artist, title, video_id, channel_name
):
return False
print(f"✅ Pipeline completed successfully: {artist} - {title}")
return True
except Exception as e:
print(f"❌ Pipeline failed for {artist} - {title}: {e}")
# Mark song as failed in tracking
if self.tracker:
self.tracker.mark_song_failed(
artist, title, video_id, channel_name, f"Pipeline failed: {e}"
)
return False
def _download_video(
self, video_id: str, output_path: Path, artist: str, title: str, channel_name: str
) -> bool:
"""Step 1: Download the video using yt-dlp."""
output_path.parent.mkdir(parents=True, exist_ok=True)
print(f"⬇️ Downloading: {artist} - {title} -> {output_path}")
video_url = f"https://www.youtube.com/watch?v={video_id}"
# Build command using centralized utility
cmd = build_yt_dlp_command(
self.yt_dlp_path, video_url, output_path, self.config
)
print(f"🔧 Running command: {' '.join(cmd)}")
print(
f"📺 Resolution settings: {self.config.download_settings.preferred_resolution}"
)
print(f"🎬 Format string: {self.config.download_settings.format}")
# Debug: Show available formats (optional)
if (
hasattr(self.config, "debug_show_formats")
and self.config.debug_show_formats
):
show_available_formats(video_url, self.yt_dlp_path)
try:
result = execute_yt_dlp_command(cmd)
print(f"✅ yt-dlp completed successfully")
print(f"📄 yt-dlp stdout: {result.stdout}")
return True
except subprocess.CalledProcessError as e:
error = handle_yt_dlp_error(e, artist, title, video_id)
log_error(error)
# Mark song as failed in tracking
if self.tracker:
self.tracker.mark_song_failed(
artist, title, video_id, channel_name, error.message
)
return False
def _verify_download(
self,
output_path: Path,
artist: str,
title: str,
video_id: str,
channel_name: str,
) -> bool:
"""Step 2: Verify that the download was successful."""
if not output_path.exists():
print(f"❌ Download failed: file does not exist: {output_path}")
# Check if yt-dlp saved it somewhere else
possible_files = list(output_path.parent.glob("*.mp4"))
if possible_files:
print(
f"🔍 Found these files in the directory: {[f.name for f in possible_files]}"
)
# Look for a file that matches our pattern (artist - title)
artist_part = artist.lower()
title_part = title.lower()
for file in possible_files:
file_lower = file.stem.lower()
if artist_part in file_lower and any(
word in file_lower for word in title_part.split()
):
print(f"🎯 Found matching file: {file.name}")
output_path = file
break
else:
print(f"❌ No matching file found for: {artist} - {title}")
# Mark song as failed in tracking
if self.tracker:
error_msg = f"Download failed: file does not exist and no matching file found"
self.tracker.mark_song_failed(
artist, title, video_id, channel_name, error_msg
)
return False
else:
# Mark song as failed in tracking
if self.tracker:
error_msg = f"Download failed: file does not exist"
self.tracker.mark_song_failed(
artist, title, video_id, channel_name, error_msg
)
return False
# Validate file
if not is_valid_mp4(output_path):
error = handle_file_validation_error(
"File is not a valid MP4",
output_path,
artist,
title,
video_id,
channel_name,
)
log_error(error)
# Mark song as failed in tracking
if self.tracker:
self.tracker.mark_song_failed(
artist, title, video_id, channel_name, error.message
)
return False
print(f"✅ Download verified: {output_path}")
return True
def _add_tags(
self, output_path: Path, artist: str, title: str, channel_name: str
) -> bool:
"""Step 3: Add ID3 tags to the downloaded file."""
try:
# Use the same artist/title as the filename for consistency
# Don't add "(Karaoke Version)" to the ID3 tag title
add_id3_tags(
output_path, f"{artist} - {title}", channel_name
)
print(f"🏷️ Added ID3 tags: {artist} - {title}")
return True
except Exception as e:
print(f"⚠️ Failed to add ID3 tags: {e}")
# Don't fail the pipeline for tag issues
return True
def _track_download(
self,
output_path: Path,
artist: str,
title: str,
video_id: str,
channel_name: str,
) -> bool:
"""Step 4: Track the download in the tracking system."""
try:
# Track in songlist if available
if self.songlist_tracking is not None:
mark_songlist_song_downloaded(
self.songlist_tracking, artist, title, channel_name, output_path
)
# Track in main tracking system if available
if self.tracker is not None:
file_size = output_path.stat().st_size if output_path.exists() else None
self.tracker.mark_song_downloaded(
artist, title, video_id, channel_name, output_path, file_size
)
print(f"📊 Tracked download: {artist} - {title}")
return True
except Exception as e:
print(f"⚠️ Failed to track download: {e}")
# Don't fail the pipeline for tracking issues
return True
def batch_execute(
self,
videos: List[Dict[str, Any]],
channel_name: str,
limit: Optional[int] = None,
) -> Tuple[int, int]:
"""
Execute the pipeline for multiple videos.
Args:
videos: List of video dictionaries with 'id', 'title', etc.
channel_name: Channel name
limit: Optional limit on number of videos to process
Returns:
Tuple of (successful_downloads, total_attempted)
"""
if limit:
videos = videos[:limit]
successful = 0
total = len(videos)
for i, video in enumerate(videos, 1):
video_id = video["id"]
video_title = video.get("title", "")
# Extract artist and title from video title
from karaoke_downloader.channel_parser import ChannelParser
channel_parser = ChannelParser()
artist, title = channel_parser.extract_artist_title(video_title, channel_name)
print(f" ({i}/{total}) Processing: {artist} - {title}")
if self.execute_pipeline(
video_id, artist, title, channel_name, video_title
):
successful += 1
else:
print(f" ❌ Failed to process: {artist} - {title}")
return successful, total