#!/usr/bin/env python3 """ Karaoke Video Downloader CLI Command-line interface for the karaoke video downloader. """ import argparse import os import sys from pathlib import Path from typing import List from karaoke_downloader.channel_parser import ChannelParser from karaoke_downloader.config_manager import AppConfig from karaoke_downloader.data_path_manager import get_data_path_manager from karaoke_downloader.downloader import KaraokeDownloader # Constants DEFAULT_LATEST_PER_CHANNEL_LIMIT = 10 DEFAULT_FUZZY_THRESHOLD = 85 def load_channels_from_json(channels_file: str = None) -> List[str]: """ Load channel URLs from the new JSON format. Args: channels_file: Path to the channels.json file (if None, uses default from config) Returns: List of channel URLs """ if channels_file is None: channels_file = str(get_data_path_manager().get_channels_json_path()) try: parser = ChannelParser(channels_file) channels = parser.channels_config.get("channels", []) return [channel["url"] for channel in channels] except Exception as e: print(f"❌ Error loading channels from {channels_file}: {e}") return [] def load_channels_from_text(channels_file: str = None) -> List[str]: """ Load channel URLs from the old text format (for backward compatibility). Args: channels_file: Path to the channels.txt file (if None, uses default from config) Returns: List of channel URLs """ if channels_file is None: channels_file = str(get_data_path_manager().get_channels_txt_path()) try: with open(channels_file, "r", encoding="utf-8") as f: return [ line.strip() for line in f if line.strip() and not line.strip().startswith("#") ] except Exception as e: print(f"❌ Error loading channels from {channels_file}: {e}") return [] def load_channels(channel_file: str = None) -> List[str]: """Load channel URLs from file.""" if channel_file is None: # Use JSON configuration data_path_manager = get_data_path_manager() if data_path_manager.file_exists("channels.json"): return load_channels_from_json() else: return [] else: if channel_file.endswith(".json"): return load_channels_from_json(channel_file) else: return load_channels_from_text(channel_file) def get_channel_url_by_name(channel_name: str) -> str: """Look up a channel URL by its name from the channels configuration.""" channel_urls = load_channels() # Normalize the channel name for comparison normalized_name = channel_name.lower().replace("@", "").replace("karaoke", "").strip() for url in channel_urls: # Extract channel name from URL if "/@" in url: url_channel_name = url.split("/@")[1].split("/")[0].lower() if url_channel_name == normalized_name or url_channel_name.replace("karaoke", "").strip() == normalized_name: return url return None def main(): parser = argparse.ArgumentParser( description="Karaoke Video Downloader - Download YouTube playlists and channel videos for karaoke (default: downloads latest videos from all channels)", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python download_karaoke.py --limit 10 # Download latest 10 videos from all channels python download_karaoke.py --songlist-only --limit 10 # Download only songlist songs across channels python download_karaoke.py --channel-focus SingKingKaraoke --limit 5 # Download from specific channel python download_karaoke.py --channel-focus SingKingKaraoke --all-videos # Download ALL videos from channel python download_karaoke.py https://www.youtube.com/@SingKingKaraoke/videos # Download from specific channel URL python download_karaoke.py --file data/channels.txt # Download from custom channel list python download_karaoke.py --reset-channel SingKingKaraoke --delete-files """, ) parser.add_argument( "url", nargs="?", help="YouTube playlist or channel URL to download" ) parser.add_argument( "--file", "-f", help="Text file containing playlist or channel URLs (one per line)", ) parser.add_argument( "--status", "-s", action="store_true", help="Show download status and statistics", ) parser.add_argument( "--report", "-r", metavar="PLAYLIST_ID", help="Generate a detailed report for a specific playlist", ) parser.add_argument( "--sync", "--sync-only", action="store_true", help="Only sync playlist without downloading (update tracking)", ) parser.add_argument( "--cleanup", action="store_true", help="Clean up orphaned tracking entries" ) parser.add_argument( "--resolution", "--res", choices=["480p", "720p", "1080p", "1440p", "2160p"], default="720p", help="Preferred video resolution (default: 720p)", ) parser.add_argument( "--limit", "-l", type=int, help="Limit the number of videos to download (e.g., --limit 10)", ) parser.add_argument( "--refresh", "--force-refresh", action="store_true", help="Force refresh channel cache (ignore cached data)", ) parser.add_argument( "--cache-info", action="store_true", help="Show channel cache information" ) parser.add_argument( "--clear-cache", metavar="CHANNEL_ID", nargs="?", const="all", help="Clear cache for specific channel or all channels (use --clear-cache all)", ) parser.add_argument( "--cache-duration", type=int, metavar="HOURS", help="Set cache duration in hours (default: 24)", ) parser.add_argument( "--songlist-priority", action="store_true", help="Prioritize downloads based on songList.json in the data directory (default: enabled)", ) parser.add_argument( "--no-songlist-priority", action="store_true", help="Disable songlist prioritization", ) parser.add_argument( "--generate-unmatched-report", action="store_true", help="Generate a report of songs that couldn't be found in any channel (runs after downloads)", ) parser.add_argument( "--show-pagination", action="store_true", help="Show page-by-page progress when downloading channel video lists (slower but more detailed)", ) parser.add_argument( "--parallel-channels", action="store_true", help="Enable parallel channel scanning for faster channel processing (scans multiple channels simultaneously)", ) parser.add_argument( "--channel-workers", type=int, default=3, help="Number of parallel channel scanning workers (default: 3, max: 10)", ) parser.add_argument( "--songlist-only", action="store_true", help="Only download songs that are in the songlist (skip all others)", ) parser.add_argument( "--songlist-focus", nargs="+", metavar="PLAYLIST_TITLE", help='Focus on specific playlists by title (e.g., --songlist-focus "2025 - Apple Top 50" "2024 - Billboard Hot 100")', ) parser.add_argument( "--songlist-file", metavar="FILE_PATH", help="Custom songlist file path to use with --songlist-focus (default: songList.json in the data directory)", ) parser.add_argument( "--force", action="store_true", help="Force download from channels regardless of whether songs are already downloaded, on server, or marked as duplicates", ) parser.add_argument( "--songlist-status", action="store_true", help="Show songlist download status and statistics", ) parser.add_argument( "--reset-channel", metavar="CHANNEL_NAME", help="Reset all tracking and files for a channel", ) parser.add_argument( "--reset-songlist", action="store_true", help="When used with --reset-channel, also reset songlist songs for this channel", ) parser.add_argument( "--reset-songlist-all", action="store_true", help="Reset all songlist tracking and delete all songlist-downloaded files (global)", ) parser.add_argument( "--clear-server-duplicates", action="store_true", help="Clear server duplicates tracking (allows re-checking songs against server)", ) parser.add_argument( "--version", "-v", action="version", version="Karaoke Playlist Downloader v1.0" ) parser.add_argument( "--force-download-plan", action="store_true", help="Force refresh the download plan cache (re-scan all channels for matches)", ) parser.add_argument( "--latest-per-channel", action="store_true", help="Download the latest N videos from each channel (use with --limit) [DEPRECATED: This is now the default behavior]", ) parser.add_argument( "--fuzzy-match", action="store_true", help="Enable fuzzy matching for songlist-to-video matching (uses rapidfuzz if available)", ) parser.add_argument( "--fuzzy-threshold", type=int, default=DEFAULT_FUZZY_THRESHOLD, help=f"Fuzzy match threshold (0-100, default {DEFAULT_FUZZY_THRESHOLD})", ) parser.add_argument( "--parallel", action="store_true", help="Enable parallel downloads for improved speed (3-5x faster for large batches, defaults to 3 workers)", ) parser.add_argument( "--workers", type=int, default=3, help="Number of parallel download workers (default: 3, max: 10, only used with --parallel)", ) parser.add_argument( "--generate-songlist", nargs="+", metavar="DIRECTORY", help="Generate song list from MP4 files with ID3 tags in specified directories", ) parser.add_argument( "--no-append-songlist", action="store_true", help="Create a new song list instead of appending when using --generate-songlist", ) parser.add_argument( "--manual", action="store_true", help="Download from manual videos collection (manual_videos.json in the data directory)", ) parser.add_argument( "--channel-focus", type=str, help="Download from a specific channel by name (e.g., 'SingKingKaraoke')", ) parser.add_argument( "--all-videos", action="store_true", help="Download all videos from channel (not just songlist matches), skipping existing files", ) parser.add_argument( "--dry-run", action="store_true", help="Build download plan and show what would be downloaded without actually downloading anything", ) args = parser.parse_args() # Validate workers argument if args.workers < 1 or args.workers > 10: print("❌ Error: --workers must be between 1 and 10") sys.exit(1) # Validate channel workers argument if args.channel_workers < 1 or args.channel_workers > 10: print("❌ Error: --channel-workers must be between 1 and 10") sys.exit(1) # Load configuration to get platform-aware yt-dlp path from karaoke_downloader.config_manager import load_config config = load_config() yt_dlp_path = config.yt_dlp_path # Check if it's a command string (like "python3 -m yt_dlp") or a file path if yt_dlp_path.startswith(('python', 'python3')): # It's a command string, test if it works try: import subprocess cmd = yt_dlp_path.split() + ["--version"] result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) if result.returncode != 0: raise Exception(f"Command failed: {result.stderr}") except Exception as e: platform_name = "macOS" if sys.platform == "darwin" else "Windows" print(f"❌ Error: yt-dlp command failed: {yt_dlp_path}") print(f"Please ensure yt-dlp is properly installed for {platform_name}") print(f"Error: {e}") sys.exit(1) else: # It's a file path, check if it exists yt_dlp_file = Path(yt_dlp_path) if not yt_dlp_file.exists(): platform_name = "macOS" if sys.platform == "darwin" else "Windows" binary_name = yt_dlp_file.name print(f"❌ Error: {binary_name} not found in downloader/ directory") print(f"Please ensure {binary_name} is present in the downloader/ folder for {platform_name}") print(f"Expected path: {yt_dlp_file}") sys.exit(1) downloader = KaraokeDownloader() # Set parallel download options if args.parallel: downloader.enable_parallel_downloads = True downloader.parallel_workers = args.workers print(f"⚡ Parallel downloads enabled with {args.workers} workers") else: downloader.enable_parallel_downloads = False print("🐌 Sequential downloads enabled") if args.limit: downloader.download_limit = args.limit print(f"🎯 Download limit set to: {args.limit} videos") if args.no_songlist_priority: downloader.use_songlist_priority = False print("🎯 Songlist prioritization disabled") else: downloader.use_songlist_priority = True print("🎯 Songlist prioritization enabled") if args.songlist_only: downloader.songlist_only = True print("🎯 Songlist-only mode enabled (will only download songlist songs)") if args.songlist_focus: downloader.songlist_focus_titles = args.songlist_focus downloader.songlist_only = True # Enable songlist-only mode when focusing args.songlist_only = True # Also set the args flag to ensure CLI logic works print( f"🎯 Songlist focus mode enabled for playlists: {', '.join(args.songlist_focus)}" ) if args.songlist_file: downloader.songlist_file_path = args.songlist_file print(f"📁 Using custom songlist file: {args.songlist_file}") if args.force: downloader.force_download = True print("💪 Force mode enabled - will download regardless of existing files or server duplicates") if args.dry_run: downloader.dry_run = True print("🔍 Dry run mode enabled - will show download plan without downloading") if args.resolution != "720p": downloader.config_manager.update_resolution(args.resolution) # --- NEW: Reset channel CLI command --- if args.reset_channel: downloader.reset_channel_downloads( args.reset_channel, reset_songlist=args.reset_songlist, delete_files=True, # Always delete files if they exist ) sys.exit(0) # --- END NEW --- # --- NEW: If no URL or file is provided, but --songlist-only is set, use all channels --- if (args.songlist_only or args.songlist_focus) and not args.url and not args.file: channel_urls = load_channels() if channel_urls: print( "📋 No URL or --file provided, defaulting to all configured channels for songlist mode." ) else: print( "❌ No URL, --file, or channel configuration found. Please provide a channel URL or create channels.json in the data directory." ) sys.exit(1) # --- END NEW --- if args.reset_songlist_all: from karaoke_downloader.downloader import reset_songlist_all reset_songlist_all() print("✅ All songlist tracking and files have been reset.") sys.exit(0) if args.clear_server_duplicates: from karaoke_downloader.server_manager import save_server_duplicates_tracking save_server_duplicates_tracking({}) print("✅ Server duplicates tracking has been cleared.") print("ℹ️ Songs will be re-checked against the server on next run.") sys.exit(0) if args.generate_songlist: from karaoke_downloader.songlist_generator import SongListGenerator print("🎵 Generating song list from MP4 files with ID3 tags...") generator = SongListGenerator() try: generator.generate_songlist_from_multiple_directories( args.generate_songlist, append=not args.no_append_songlist ) print("✅ Song list generation completed successfully!") except Exception as e: print(f"❌ Error generating song list: {e}") sys.exit(1) sys.exit(0) if args.status: stats = downloader.tracker.get_statistics() print("🎤 Karaoke Downloader Status") print("=" * 40) print(f"Total Songs: {stats['total_songs']}") print(f"Total Playlists: {stats['total_playlists']}") print(f"Downloaded Songs: {stats['downloaded_songs']}") print(f"Failed Songs: {stats['failed_songs']}") print(f"Partial Downloads: {stats['partial_songs']}") print(f"Total Size: {stats['total_size_mb']} MB") print(f"Last Updated: {stats['last_updated']}") sys.exit(0) elif args.cache_info: cache_info = downloader.tracker.get_cache_info() print("💾 Channel Cache Information") print("=" * 40) print(f"Total Channels: {cache_info['total_channels']}") print(f"Total Cached Videos: {cache_info['total_videos']}") print("\n📋 Channel Details:") for channel in cache_info['channels']: print(f" • {channel['channel']}: {channel['videos']} videos (updated: {channel['last_updated']})") sys.exit(0) elif args.clear_cache: if args.clear_cache == "all": downloader.tracker.clear_channel_cache() print("🧹 Cleared all channel caches") else: downloader.tracker.clear_channel_cache(args.clear_cache) print(f"🧹 Cleared cache for channel: {args.clear_cache}") sys.exit(0) elif args.cache_duration: downloader.tracker.set_cache_duration(args.cache_duration) print(f"⏰ Cache duration set to {args.cache_duration} hours") sys.exit(0) elif args.report: report = downloader.tracker.export_playlist_report(args.report) print(report) sys.exit(0) elif args.cleanup: orphaned = downloader.tracker.cleanup_orphaned_files(downloader.downloads_dir) print(f"🧹 Cleaned up {len(orphaned)} orphaned tracking entries") sys.exit(0) elif args.songlist_status: songlist = downloader._load_songlist() tracking = downloader.songlist_tracking print("📋 Songlist Download Status") print("=" * 40) print(f"Total songs in list: {len(songlist)}") print(f"Songs downloaded: {len(tracking)}") print(f"Songs remaining: {len(songlist) - len(tracking)}") print(f"Progress: {(len(tracking) / len(songlist) * 100):.1f}%") if tracking: print(f"\n📁 Downloaded songs:") for key, info in list(tracking.items())[:10]: print( f" • {info['artist']} - {info['title']} (from {info['channel']})" ) if len(tracking) > 10: print(f" ... and {len(tracking) - 10} more") sys.exit(0) elif args.manual: # Download from manual videos collection print("🎤 Downloading from manual videos collection...") success = downloader.download_channel_videos( "manual://static", force_refresh=args.refresh, fuzzy_match=args.fuzzy_match, fuzzy_threshold=args.fuzzy_threshold, force_download=args.force, ) elif args.channel_focus: # Download from a specific channel by name print(f"🎤 Looking up channel: {args.channel_focus}") channel_url = get_channel_url_by_name(args.channel_focus) if not channel_url: print(f"❌ Channel '{args.channel_focus}' not found in configuration") print("Available channels:") channel_urls = load_channels() for url in channel_urls: if "/@" in url: channel_name = url.split("/@")[1].split("/")[0] print(f" • {channel_name}") sys.exit(1) if args.all_videos: # Download ALL videos from the channel (not just songlist matches) print(f"🎤 Downloading ALL videos from channel: {args.channel_focus} ({channel_url})") success = downloader.download_all_channel_videos( channel_url, force_refresh=args.refresh, force_download=args.force, limit=args.limit, dry_run=args.dry_run, ) else: # Download only songlist matches from the channel print(f"🎤 Downloading from channel: {args.channel_focus} ({channel_url})") success = downloader.download_channel_videos( channel_url, force_refresh=args.refresh, fuzzy_match=args.fuzzy_match, fuzzy_threshold=args.fuzzy_threshold, force_download=args.force, dry_run=args.dry_run, ) elif args.songlist_only or args.songlist_focus: # Use provided file or default to channels configuration channel_urls = load_channels(args.file) if not channel_urls: print(f"❌ No channels found in configuration") sys.exit(1) limit = args.limit if args.limit else None success = downloader.download_songlist_across_channels( channel_urls, limit=args.limit, force_refresh_download_plan=args.force_download_plan if hasattr(args, "force_download_plan") else False, fuzzy_match=args.fuzzy_match, fuzzy_threshold=args.fuzzy_threshold, force_download=args.force, show_pagination=args.show_pagination, parallel_channels=args.parallel_channels, max_channel_workers=args.channel_workers, dry_run=args.dry_run, ) elif args.latest_per_channel: # Use provided file or default to channels configuration channel_urls = load_channels(args.file) if not channel_urls: print(f"❌ No channels found in configuration") sys.exit(1) limit = args.limit if args.limit else DEFAULT_LATEST_PER_CHANNEL_LIMIT force_refresh_download_plan = ( args.force_download_plan if hasattr(args, "force_download_plan") else False ) fuzzy_match = args.fuzzy_match if hasattr(args, "fuzzy_match") else False fuzzy_threshold = ( args.fuzzy_threshold if hasattr(args, "fuzzy_threshold") else DEFAULT_FUZZY_THRESHOLD ) success = downloader.download_latest_per_channel( channel_urls, limit=limit, force_refresh_download_plan=force_refresh_download_plan, fuzzy_match=fuzzy_match, fuzzy_threshold=fuzzy_threshold, force_download=args.force, dry_run=args.dry_run, ) elif args.url: success = downloader.download_channel_videos( args.url, force_refresh=args.refresh, dry_run=args.dry_run ) else: # Default behavior: download from channels (equivalent to --latest-per-channel) print("🎯 No specific mode specified, defaulting to download from channels") channel_urls = load_channels(args.file) if not channel_urls: print(f"❌ No channels found in configuration") print("Please provide a channel URL or create channels.json in the data directory") sys.exit(1) limit = args.limit if args.limit else DEFAULT_LATEST_PER_CHANNEL_LIMIT force_refresh_download_plan = ( args.force_download_plan if hasattr(args, "force_download_plan") else False ) fuzzy_match = args.fuzzy_match if hasattr(args, "fuzzy_match") else False fuzzy_threshold = ( args.fuzzy_threshold if hasattr(args, "fuzzy_threshold") else DEFAULT_FUZZY_THRESHOLD ) success = downloader.download_latest_per_channel( channel_urls, limit=limit, force_refresh_download_plan=force_refresh_download_plan, fuzzy_match=fuzzy_match, fuzzy_threshold=fuzzy_threshold, force_download=args.force, dry_run=args.dry_run, ) # Generate unmatched report if requested (additive feature) if args.generate_unmatched_report: from karaoke_downloader.download_planner import generate_unmatched_report, build_download_plan from karaoke_downloader.songlist_manager import load_songlist print("\n🔍 Generating unmatched songs report...") # Load songlist based on focus mode if args.songlist_focus: # Load focused playlists songlist_file_path = args.songlist_file if args.songlist_file else str(get_data_path_manager().get_songlist_path()) songlist_file = Path(songlist_file_path) if not songlist_file.exists(): print(f"⚠️ Songlist file not found: {songlist_file_path}") else: try: with open(songlist_file, "r", encoding="utf-8") as f: raw_data = json.load(f) # Filter playlists by title focused_playlists = [] for playlist in raw_data: playlist_title = playlist.get("title", "") if playlist_title in args.songlist_focus: focused_playlists.append(playlist) if focused_playlists: # Flatten the focused playlists into songs focused_songs = [] seen = set() for playlist in focused_playlists: if "songs" in playlist: for song in playlist["songs"]: if "artist" in song and "title" in song: artist = song["artist"].strip() title = song["title"].strip() key = f"{artist.lower()}_{title.lower()}" if key in seen: continue seen.add(key) focused_songs.append( { "artist": artist, "title": title, "position": song.get("position", 0), } ) songlist = focused_songs else: print(f"⚠️ No playlists found matching: {', '.join(args.songlist_focus)}") songlist = [] except (json.JSONDecodeError, FileNotFoundError) as e: print(f"⚠️ Could not load songlist for report: {e}") songlist = [] else: # Load all songs from songlist songlist_path = args.songlist_file if args.songlist_file else str(get_data_path_manager().get_songlist_path()) songlist = load_songlist(songlist_path) if songlist: # Load channel URLs channel_file = args.file if args.file else str(get_data_path_manager().get_channels_txt_path()) if os.path.exists(channel_file): with open(channel_file, "r", encoding='utf-8') as f: channel_urls = [ line.strip() for line in f if line.strip() and not line.strip().startswith("#") ] print(f"📋 Analyzing {len(songlist)} songs against {len(channel_urls)} channels...") # Build download plan to get unmatched songs fuzzy_match = args.fuzzy_match if hasattr(args, "fuzzy_match") else False fuzzy_threshold = ( args.fuzzy_threshold if hasattr(args, "fuzzy_threshold") else DEFAULT_FUZZY_THRESHOLD ) try: download_plan, unmatched = build_download_plan( channel_urls, songlist, downloader.tracker, downloader.yt_dlp_path, fuzzy_match=fuzzy_match, fuzzy_threshold=fuzzy_threshold, ) if unmatched: report_file = generate_unmatched_report(unmatched) print(f"\n📋 Unmatched songs report generated successfully!") print(f"📁 Report saved to: {report_file}") print(f"📊 Summary: {len(download_plan)} songs found, {len(unmatched)} songs not found") print(f"\n🔍 First 10 unmatched songs:") for i, song in enumerate(unmatched[:10], 1): print(f" {i:2d}. {song['artist']} - {song['title']}") if len(unmatched) > 10: print(f" ... and {len(unmatched) - 10} more songs") else: print(f"\n✅ All {len(songlist)} songs were found in the channels!") except Exception as e: print(f"❌ Error generating report: {e}") else: print(f"❌ Channel file not found: {channel_file}") else: print("❌ No songlist available for report generation") # Initialize success variable success = False downloader.tracker.force_save() if success: print("\n🎤 All downloads completed successfully!") sys.exit(0) else: print("\n⚠️ Some downloads may have failed. Check the logs for details.") sys.exit(1)