""" Core video download logic and file validation. Handles the actual downloading and post-processing of videos. """ import subprocess from pathlib import Path from karaoke_downloader.id3_utils import add_id3_tags from karaoke_downloader.songlist_manager import mark_songlist_song_downloaded from karaoke_downloader.download_planner import save_plan_cache # Constants DEFAULT_FILENAME_LENGTH_LIMIT = 100 DEFAULT_ARTIST_LENGTH_LIMIT = 30 DEFAULT_TITLE_LENGTH_LIMIT = 60 DEFAULT_FORMAT_CHECK_TIMEOUT = 30 def sanitize_filename(artist, title): """ Create a safe filename from artist and title. Removes invalid characters and limits length. """ # Create a shorter, safer filename safe_title = title.replace("(From ", "").replace(")", "").replace(" - ", " ").replace(":", "").replace("'", "").replace('"', "") safe_artist = artist.replace("'", "").replace('"', "") # Remove all Windows-invalid characters invalid_chars = ['?', ':', '*', '"', '<', '>', '|', '/', '\\'] for char in invalid_chars: safe_title = safe_title.replace(char, "") safe_artist = safe_artist.replace(char, "") # Also remove any other potentially problematic characters safe_title = safe_title.replace("...", "").replace("..", "").replace(".", "").strip() safe_artist = safe_artist.strip() filename = f"{safe_artist} - {safe_title}.mp4" # Limit filename length to avoid Windows path issues if len(filename) > DEFAULT_FILENAME_LENGTH_LIMIT: filename = f"{safe_artist[:DEFAULT_ARTIST_LENGTH_LIMIT]} - {safe_title[:DEFAULT_TITLE_LENGTH_LIMIT]}.mp4" return filename def is_valid_mp4(file_path): """ Check if a file is a valid MP4 file. Uses ffprobe if available, otherwise checks file extension and size. """ if not file_path.exists(): return False # Check file size if file_path.stat().st_size == 0: return False # Try to use ffprobe for validation try: import subprocess result = subprocess.run( ['ffprobe', '-v', 'quiet', '-print_format', 'json', '-show_format', str(file_path)], capture_output=True, text=True, check=True ) return True except (subprocess.CalledProcessError, FileNotFoundError): # If ffprobe is not available, just check the extension and size return file_path.suffix.lower() == '.mp4' and file_path.stat().st_size > 0 def download_video_and_track(yt_dlp_path, config, downloads_dir, songlist_tracking, channel_name, channel_url, video_id, video_title, artist, title, filename): """ Download a single video and track its status. Returns True if successful, False otherwise. """ output_path = downloads_dir / channel_name / filename return download_single_video( output_path, video_id, config, yt_dlp_path, artist, title, channel_name, songlist_tracking ) def download_single_video(output_path, video_id, config, yt_dlp_path, artist, title, channel_name, songlist_tracking): """Download a single video and handle post-processing.""" output_path.parent.mkdir(parents=True, exist_ok=True) print(f"ā¬‡ļø Downloading: {artist} - {title} -> {output_path}") video_url = f"https://www.youtube.com/watch?v={video_id}" dlp_cmd = [ str(yt_dlp_path), "--no-check-certificates", "--ignore-errors", "--no-warnings", "-o", str(output_path), "-f", config["download_settings"]["format"], video_url ] print(f"šŸ”§ Running command: {' '.join(dlp_cmd)}") print(f"šŸ“ŗ Resolution settings: {config.get('download_settings', {}).get('preferred_resolution', 'Unknown')}") print(f"šŸŽ¬ Format string: {config.get('download_settings', {}).get('format', 'Unknown')}") # Debug: Show available formats (optional) if config.get('debug_show_formats', False): show_available_formats(yt_dlp_path, video_url) try: result = subprocess.run(dlp_cmd, capture_output=True, text=True, check=True) print(f"āœ… yt-dlp completed successfully") print(f"šŸ“„ yt-dlp stdout: {result.stdout}") except subprocess.CalledProcessError as e: print(f"āŒ yt-dlp failed with exit code {e.returncode}") print(f"āŒ yt-dlp stderr: {e.stderr}") # Mark song as failed in tracking error_msg = f"yt-dlp failed with exit code {e.returncode}: {e.stderr}" _mark_song_failed_standalone(artist, title, video_id, channel_name, error_msg) return False # Verify download if not verify_download(output_path, artist, title, video_id, channel_name): return False # Post-processing add_id3_tags(output_path, f"{artist} - {title} (Karaoke Version)", channel_name) mark_songlist_song_downloaded(songlist_tracking, artist, title, channel_name, output_path) print(f"āœ… Downloaded and tracked: {artist} - {title}") print(f"šŸŽ‰ All post-processing complete for: {output_path}") return True def _mark_song_failed_standalone(artist, title, video_id, channel_name, error_message): """Standalone helper to mark a song as failed in tracking.""" from karaoke_downloader.tracking_manager import TrackingManager tracker = TrackingManager() tracker.mark_song_failed(artist, title, video_id, channel_name, error_message) print(f"šŸ·ļø Marked song as failed: {artist} - {title}") def show_available_formats(yt_dlp_path, video_url): """Show available formats for debugging.""" print(f"šŸ” Checking available formats for: {video_url}") format_cmd = [ str(yt_dlp_path), "--list-formats", video_url ] try: format_result = subprocess.run(format_cmd, capture_output=True, text=True, timeout=DEFAULT_FORMAT_CHECK_TIMEOUT) print(f"šŸ“‹ Available formats:\n{format_result.stdout}") except Exception as e: print(f"āš ļø Could not check formats: {e}") def verify_download(output_path, artist, title, video_id=None, channel_name=None): """Verify that the download was successful.""" if not output_path.exists(): print(f"āŒ Download failed: file does not exist: {output_path}") # Check if yt-dlp saved it somewhere else possible_files = list(output_path.parent.glob("*.mp4")) if possible_files: print(f"šŸ” Found these files in the directory: {[f.name for f in possible_files]}") # Look for a file that matches our pattern (artist - title) artist_part = artist.lower() title_part = title.lower() for file in possible_files: file_lower = file.stem.lower() if artist_part in file_lower and any(word in file_lower for word in title_part.split()): print(f"šŸŽÆ Found matching file: {file.name}") output_path = file break else: print(f"āŒ No matching file found for: {artist} - {title}") # Mark song as failed if we have the required info if video_id and channel_name: error_msg = f"Download failed: file does not exist and no matching file found" _mark_song_failed_standalone(artist, title, video_id, channel_name, error_msg) return False else: # Mark song as failed if we have the required info if video_id and channel_name: error_msg = f"Download failed: file does not exist" _mark_song_failed_standalone(artist, title, video_id, channel_name, error_msg) return False if output_path.stat().st_size == 0: print(f"āŒ Download failed: file is empty (0 bytes): {output_path}") return False # Optional MP4 validation # if not is_valid_mp4(output_path): # print(f"āŒ File is not a valid MP4: {output_path}") # return False return True def execute_download_plan(download_plan, unmatched, cache_file, config, yt_dlp_path, downloads_dir, songlist_tracking, limit=None): """ Execute a download plan with progress tracking and cache management. Args: download_plan: List of download items to process unmatched: List of unmatched songs cache_file: Path to cache file for progress tracking config: Configuration dictionary yt_dlp_path: Path to yt-dlp executable downloads_dir: Directory for downloads songlist_tracking: Songlist tracking data limit: Optional limit on number of downloads Returns: tuple: (downloaded_count, success) """ downloaded_count = 0 total_to_download = limit if limit is not None else len(download_plan) for idx, item in enumerate(download_plan[:]): # Use slice to allow modification during iteration if limit is not None and downloaded_count >= limit: break artist = item['artist'] title = item['title'] channel_name = item['channel_name'] channel_url = item['channel_url'] video_id = item['video_id'] video_title = item['video_title'] print(f"\nā¬‡ļø Downloading {idx+1} of {total_to_download}:") print(f" šŸ“‹ Songlist: {artist} - {title}") print(f" šŸŽ¬ Video: {video_title} ({channel_name})") if 'match_score' in item: print(f" šŸŽÆ Match Score: {item['match_score']:.1f}%") # Create filename filename = sanitize_filename(artist, title) output_path = downloads_dir / channel_name / filename # Download the file success = download_single_video( output_path, video_id, config, yt_dlp_path, artist, title, channel_name, songlist_tracking ) if success: downloaded_count += 1 # Remove completed item from plan and update cache download_plan.pop(idx) save_plan_cache(cache_file, download_plan, unmatched) print(f"šŸ—‘ļø Removed completed item from download plan. {len(download_plan)} items remaining.") # Delete cache if all items are complete if len(download_plan) == 0: cleanup_cache(cache_file) print(f"šŸŽ‰ Downloaded {downloaded_count} songlist songs.") print(f"šŸ“Š Summary: Found {downloaded_count} songs, {len(unmatched)} songs not found.") # Final cleanup cleanup_cache(cache_file) return downloaded_count, True def cleanup_cache(cache_file): """Clean up the cache file.""" if cache_file.exists(): try: cache_file.unlink() print(f"šŸ—‘ļø Deleted download plan cache: {cache_file.name}") except Exception as e: print(f"āš ļø Could not delete download plan cache: {e}") def should_skip_song_standalone(artist, title, channel_name, video_id, video_title, downloads_dir, tracker=None, server_songs=None, server_duplicates_tracking=None): """ Standalone function to check if a song should be skipped. Performs four checks in order: 1. Already downloaded (tracking) - if tracker provided 2. File exists on filesystem 3. Already on server - if server data provided 4. Previously failed download (bad file) - if tracker provided Returns: tuple: (should_skip, reason, total_filtered) """ total_filtered = 0 # Check 1: Already downloaded by this system (if tracker provided) if tracker and tracker.is_song_downloaded(artist, title, channel_name, video_id): return True, "already downloaded", total_filtered # Check 2: File already exists on filesystem # Generate the expected filename based on the download mode context safe_title = title invalid_chars = ['?', ':', '*', '"', '<', '>', '|', '/', '\\'] for char in invalid_chars: safe_title = safe_title.replace(char, "") safe_title = safe_title.replace("...", "").replace("..", "").replace(".", "").strip() # Try different filename patterns that might exist possible_filenames = [ f"{artist} - {safe_title}.mp4", # Songlist mode f"{channel_name} - {safe_title}.mp4", # Latest-per-channel mode f"{artist} - {safe_title} (Karaoke Version).mp4" # Channel videos mode ] for filename in possible_filenames: if len(filename) > DEFAULT_FILENAME_LENGTH_LIMIT: # Apply length limits if needed safe_artist = artist.replace("'", "").replace('"', "").strip() filename = f"{safe_artist[:DEFAULT_ARTIST_LENGTH_LIMIT]} - {safe_title[:DEFAULT_TITLE_LENGTH_LIMIT]}.mp4" output_path = downloads_dir / channel_name / filename if output_path.exists() and output_path.stat().st_size > 0: return True, "file exists", total_filtered # Check 3: Already on server (if server data provided) if server_songs is not None and server_duplicates_tracking is not None: from karaoke_downloader.server_manager import check_and_mark_server_duplicate if check_and_mark_server_duplicate(server_songs, server_duplicates_tracking, artist, title, video_title, channel_name): total_filtered += 1 return True, "on server", total_filtered # Check 4: Previously failed download (bad file) - if tracker provided if tracker and tracker.is_song_failed(artist, title, channel_name, video_id): return True, "previously failed", total_filtered return False, None, total_filtered