KaraokeVideoDownloader/karaoke_downloader/video_downloader.py

312 lines
13 KiB
Python

"""
Core video download logic and file validation.
Handles the actual downloading and post-processing of videos.
"""
import subprocess
from pathlib import Path
from karaoke_downloader.id3_utils import add_id3_tags
from karaoke_downloader.songlist_manager import mark_songlist_song_downloaded
from karaoke_downloader.download_planner import save_plan_cache
from karaoke_downloader.youtube_utils import build_yt_dlp_command, execute_yt_dlp_command, show_available_formats
from karaoke_downloader.error_utils import handle_yt_dlp_error, handle_file_validation_error, log_error
# Constants
DEFAULT_FILENAME_LENGTH_LIMIT = 100
DEFAULT_ARTIST_LENGTH_LIMIT = 30
DEFAULT_TITLE_LENGTH_LIMIT = 60
DEFAULT_FORMAT_CHECK_TIMEOUT = 30
def sanitize_filename(artist, title):
"""
Create a safe filename from artist and title.
Removes invalid characters and limits length.
"""
# Create a shorter, safer filename
safe_title = title.replace("(From ", "").replace(")", "").replace(" - ", " ").replace(":", "").replace("'", "").replace('"', "")
safe_artist = artist.replace("'", "").replace('"', "")
# Remove all Windows-invalid characters
invalid_chars = ['?', ':', '*', '"', '<', '>', '|', '/', '\\']
for char in invalid_chars:
safe_title = safe_title.replace(char, "")
safe_artist = safe_artist.replace(char, "")
# Also remove any other potentially problematic characters
safe_title = safe_title.replace("...", "").replace("..", "").replace(".", "").strip()
safe_artist = safe_artist.strip()
filename = f"{safe_artist} - {safe_title}.mp4"
# Limit filename length to avoid Windows path issues
if len(filename) > DEFAULT_FILENAME_LENGTH_LIMIT:
filename = f"{safe_artist[:DEFAULT_ARTIST_LENGTH_LIMIT]} - {safe_title[:DEFAULT_TITLE_LENGTH_LIMIT]}.mp4"
return filename
def is_valid_mp4(file_path):
"""
Check if a file is a valid MP4 file.
Uses ffprobe if available, otherwise checks file extension and size.
"""
if not file_path.exists():
return False
# Check file size
if file_path.stat().st_size == 0:
return False
# Try to use ffprobe for validation
try:
import subprocess
result = subprocess.run(
['ffprobe', '-v', 'quiet', '-print_format', 'json', '-show_format', str(file_path)],
capture_output=True,
text=True,
check=True
)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
# If ffprobe is not available, just check the extension and size
return file_path.suffix.lower() == '.mp4' and file_path.stat().st_size > 0
def download_video_and_track(yt_dlp_path, config, downloads_dir, songlist_tracking,
channel_name, channel_url, video_id, video_title,
artist, title, filename):
"""
Download a single video and track its status.
Returns True if successful, False otherwise.
"""
output_path = downloads_dir / channel_name / filename
return download_single_video(
output_path, video_id, config, yt_dlp_path,
artist, title, channel_name, songlist_tracking
)
def download_single_video(output_path, video_id, config, yt_dlp_path,
artist, title, channel_name, songlist_tracking):
"""Download a single video and handle post-processing."""
output_path.parent.mkdir(parents=True, exist_ok=True)
print(f"⬇️ Downloading: {artist} - {title} -> {output_path}")
video_url = f"https://www.youtube.com/watch?v={video_id}"
# Build command using centralized utility
cmd = build_yt_dlp_command(yt_dlp_path, video_url, output_path, config)
print(f"🔧 Running command: {' '.join(cmd)}")
print(f"📺 Resolution settings: {config.get('download_settings', {}).get('preferred_resolution', 'Unknown')}")
print(f"🎬 Format string: {config.get('download_settings', {}).get('format', 'Unknown')}")
# Debug: Show available formats (optional)
if config.get('debug_show_formats', False):
show_available_formats(video_url, yt_dlp_path)
try:
result = execute_yt_dlp_command(cmd)
print(f"✅ yt-dlp completed successfully")
print(f"📄 yt-dlp stdout: {result.stdout}")
except subprocess.CalledProcessError as e:
error = handle_yt_dlp_error(e, artist, title, video_id, channel_name)
log_error(error)
# Mark song as failed in tracking
_mark_song_failed_standalone(artist, title, video_id, channel_name, error.message)
return False
# Verify download
if not verify_download(output_path, artist, title, video_id, channel_name):
return False
# Post-processing
add_id3_tags(output_path, f"{artist} - {title} (Karaoke Version)", channel_name)
mark_songlist_song_downloaded(songlist_tracking, artist, title, channel_name, output_path)
print(f"✅ Downloaded and tracked: {artist} - {title}")
print(f"🎉 All post-processing complete for: {output_path}")
return True
def _mark_song_failed_standalone(artist, title, video_id, channel_name, error_message):
"""Standalone helper to mark a song as failed in tracking."""
from karaoke_downloader.tracking_manager import TrackingManager
tracker = TrackingManager()
tracker.mark_song_failed(artist, title, video_id, channel_name, error_message)
print(f"🏷️ Marked song as failed: {artist} - {title}")
# Note: show_available_formats is now imported from youtube_utils
def verify_download(output_path, artist, title, video_id=None, channel_name=None):
"""Verify that the download was successful."""
if not output_path.exists():
print(f"❌ Download failed: file does not exist: {output_path}")
# Check if yt-dlp saved it somewhere else
possible_files = list(output_path.parent.glob("*.mp4"))
if possible_files:
print(f"🔍 Found these files in the directory: {[f.name for f in possible_files]}")
# Look for a file that matches our pattern (artist - title)
artist_part = artist.lower()
title_part = title.lower()
for file in possible_files:
file_lower = file.stem.lower()
if artist_part in file_lower and any(word in file_lower for word in title_part.split()):
print(f"🎯 Found matching file: {file.name}")
output_path = file
break
else:
print(f"❌ No matching file found for: {artist} - {title}")
# Mark song as failed if we have the required info
if video_id and channel_name:
error_msg = f"Download failed: file does not exist and no matching file found"
_mark_song_failed_standalone(artist, title, video_id, channel_name, error_msg)
return False
else:
# Mark song as failed if we have the required info
if video_id and channel_name:
error_msg = f"Download failed: file does not exist"
_mark_song_failed_standalone(artist, title, video_id, channel_name, error_msg)
return False
if output_path.stat().st_size == 0:
print(f"❌ Download failed: file is empty (0 bytes): {output_path}")
return False
# Optional MP4 validation
# if not is_valid_mp4(output_path):
# print(f"❌ File is not a valid MP4: {output_path}")
# return False
return True
def execute_download_plan(download_plan, unmatched, cache_file, config, yt_dlp_path,
downloads_dir, songlist_tracking, limit=None):
"""
Execute a download plan with progress tracking and cache management.
Args:
download_plan: List of download items to process
unmatched: List of unmatched songs
cache_file: Path to cache file for progress tracking
config: Configuration dictionary
yt_dlp_path: Path to yt-dlp executable
downloads_dir: Directory for downloads
songlist_tracking: Songlist tracking data
limit: Optional limit on number of downloads
Returns:
tuple: (downloaded_count, success)
"""
downloaded_count = 0
total_to_download = limit if limit is not None else len(download_plan)
# Use reverse iteration to avoid index shifting issues when removing items
for idx in range(len(download_plan) - 1, -1, -1):
if limit is not None and downloaded_count >= limit:
break
item = download_plan[idx]
artist = item['artist']
title = item['title']
channel_name = item['channel_name']
channel_url = item['channel_url']
video_id = item['video_id']
video_title = item['video_title']
print(f"\n⬇️ Downloading {len(download_plan) - idx} of {total_to_download}:")
print(f" 📋 Songlist: {artist} - {title}")
print(f" 🎬 Video: {video_title} ({channel_name})")
if 'match_score' in item:
print(f" 🎯 Match Score: {item['match_score']:.1f}%")
# Create filename
filename = sanitize_filename(artist, title)
output_path = downloads_dir / channel_name / filename
# Download the file
success = download_single_video(
output_path, video_id, config, yt_dlp_path,
artist, title, channel_name, songlist_tracking
)
if success:
downloaded_count += 1
# Remove completed item from plan and update cache
download_plan.pop(idx)
save_plan_cache(cache_file, download_plan, unmatched)
print(f"🗑️ Removed completed item from download plan. {len(download_plan)} items remaining.")
# Delete cache if all items are complete
if len(download_plan) == 0:
cleanup_cache(cache_file)
print(f"🎉 Downloaded {downloaded_count} songlist songs.")
print(f"📊 Summary: Found {downloaded_count} songs, {len(unmatched)} songs not found.")
# Final cleanup
cleanup_cache(cache_file)
return downloaded_count, True
def cleanup_cache(cache_file):
"""Clean up the cache file."""
if cache_file.exists():
try:
cache_file.unlink()
print(f"🗑️ Deleted download plan cache: {cache_file.name}")
except Exception as e:
print(f"⚠️ Could not delete download plan cache: {e}")
def should_skip_song_standalone(artist, title, channel_name, video_id, video_title, downloads_dir, tracker=None, server_songs=None, server_duplicates_tracking=None):
"""
Standalone function to check if a song should be skipped.
Performs four checks in order:
1. Already downloaded (tracking) - if tracker provided
2. File exists on filesystem
3. Already on server - if server data provided
4. Previously failed download (bad file) - if tracker provided
Returns:
tuple: (should_skip, reason, total_filtered)
"""
total_filtered = 0
# Check 1: Already downloaded by this system (if tracker provided)
if tracker and tracker.is_song_downloaded(artist, title, channel_name, video_id):
return True, "already downloaded", total_filtered
# Check 2: File already exists on filesystem
# Generate the expected filename based on the download mode context
safe_title = title
invalid_chars = ['?', ':', '*', '"', '<', '>', '|', '/', '\\']
for char in invalid_chars:
safe_title = safe_title.replace(char, "")
safe_title = safe_title.replace("...", "").replace("..", "").replace(".", "").strip()
# Try different filename patterns that might exist
possible_filenames = [
f"{artist} - {safe_title}.mp4", # Songlist mode
f"{channel_name} - {safe_title}.mp4", # Latest-per-channel mode
f"{artist} - {safe_title} (Karaoke Version).mp4" # Channel videos mode
]
for filename in possible_filenames:
if len(filename) > DEFAULT_FILENAME_LENGTH_LIMIT:
# Apply length limits if needed
safe_artist = artist.replace("'", "").replace('"', "").strip()
filename = f"{safe_artist[:DEFAULT_ARTIST_LENGTH_LIMIT]} - {safe_title[:DEFAULT_TITLE_LENGTH_LIMIT]}.mp4"
output_path = downloads_dir / channel_name / filename
if output_path.exists() and output_path.stat().st_size > 0:
return True, "file exists", total_filtered
# Check 3: Already on server (if server data provided)
if server_songs is not None and server_duplicates_tracking is not None:
from karaoke_downloader.server_manager import check_and_mark_server_duplicate
if check_and_mark_server_duplicate(server_songs, server_duplicates_tracking, artist, title, video_title, channel_name):
total_filtered += 1
return True, "on server", total_filtered
# Check 4: Previously failed download (bad file) - if tracker provided
if tracker and tracker.is_song_failed(artist, title, channel_name, video_id):
return True, "previously failed", total_filtered
return False, None, total_filtered