diff --git a/PRD.md b/PRD.md index aa26ada..e8cce7e 100644 --- a/PRD.md +++ b/PRD.md @@ -101,6 +101,7 @@ python download_karaoke.py --clear-cache SingKingKaraoke - ✅ Songlist integration: prioritize and track custom songlists - ✅ Songlist-only mode: download only songs from the songlist - ✅ Songlist focus mode: download only songs from specific playlists by title +- ✅ Force download mode: bypass all existing file checks and re-download songs regardless of server duplicates or existing files - ✅ Global songlist tracking to avoid duplicates across channels - ✅ ID3 tagging for artist/title in MP4 files (mutagen) - ✅ Real-time progress and detailed logging @@ -176,6 +177,7 @@ KaroakeVideoDownloader/ - `--songlist-priority`: Prioritize songlist songs in download queue - `--songlist-only`: Download only songs from the songlist - `--songlist-focus ...`: Focus on specific playlists by title (e.g., `--songlist-focus "2025 - Apple Top 50" "2024 - Billboard Hot 100"`) +- `--force`: **Force download from channels, bypassing all existing file checks and re-downloading if necessary** - `--songlist-status`: Show songlist download progress - `--limit `: Limit number of downloads (enables fast mode with early exit) - `--resolution <720p|1080p|...>`: Override resolution diff --git a/README.md b/README.md index a299ed4..ee35aea 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ A Python-based Windows CLI tool to download karaoke videos from YouTube channels - ⚡ **Optimized Scanning**: High-performance channel scanning with O(n×m) complexity, pre-processed lookups, and early termination for faster matching - 🏷️ **Server Duplicates Tracking**: Automatically checks against local songs.json file and marks duplicates for future skipping, preventing re-downloads of songs already on the server - ⚡ **Parallel Downloads**: Enable concurrent downloads with `--parallel --workers N` for significantly faster batch downloads (3-5x speedup) +- 📊 **Unmatched Songs Reports**: Generate detailed reports of songs that couldn't be found in any channel with `--generate-unmatched-report` ## 🏗️ Architecture The codebase has been comprehensively refactored into a modular architecture with centralized utilities for improved maintainability, error handling, and code reuse: @@ -111,6 +112,11 @@ python download_karaoke.py --parallel --workers 5 --songlist-only --limit 10 python download_karaoke.py --songlist-focus "2025 - Apple Top 50" "2024 - Billboard Hot 100" ``` +### Force Download from Channels (Bypass All Existing File Checks) +```bash +python download_karaoke.py --songlist-focus "2025 - Apple Top 50" --force +``` + ### Download with Fuzzy Matching ```bash python download_karaoke.py --songlist-only --limit 10 --fuzzy-match --fuzzy-threshold 85 @@ -262,6 +268,9 @@ KaroakeVideoDownloader/ - `--fuzzy-threshold `: Fuzzy match threshold (0-100, default 85) - `--parallel`: Enable parallel downloads for improved speed - `--workers `: Number of parallel download workers (1-10, default: 3) +- `--generate-songlist ...`: **Generate song list from MP4 files with ID3 tags in specified directories** +- `--no-append-songlist`: **Create a new song list instead of appending when using --generate-songlist** +- `--force`: **Force download from channels, bypassing all existing file checks and re-downloading if necessary** ## 📝 Example Usage @@ -280,16 +289,38 @@ python download_karaoke.py --parallel --workers 3 --latest-per-channel --limit 5 # Traditional full scan (no limit) python download_karaoke.py --songlist-only +# Focused fuzzy matching (target specific playlists with flexible matching) +python download_karaoke.py --songlist-focus "2025 - Apple Top 50" --fuzzy-match --fuzzy-threshold 80 --limit 10 + +# Force download with fuzzy matching (bypass all existing file checks) +python download_karaoke.py --songlist-focus "2025 - Apple Top 50" --force --fuzzy-match --fuzzy-threshold 80 --limit 10 + # Channel-specific operations python download_karaoke.py --reset-channel SingKingKaraoke python download_karaoke.py --reset-channel SingKingKaraoke --reset-songlist python download_karaoke.py --clear-cache all python download_karaoke.py --clear-server-duplicates + +# Song list generation from MP4 files +python download_karaoke.py --generate-songlist /path/to/mp4/directory +python download_karaoke.py --generate-songlist /path/to/dir1 /path/to/dir2 --no-append-songlist + +# Generate report of songs that couldn't be found +python download_karaoke.py --generate-unmatched-report +python download_karaoke.py --generate-unmatched-report --fuzzy-match --fuzzy-threshold 85 ``` ## 🏷️ ID3 Tagging - Adds artist/title/album/genre to MP4 files using mutagen (if installed) +## 📋 Song List Generation +- **Generate song lists from existing MP4 files**: Use `--generate-songlist` to create song lists from directories containing MP4 files with ID3 tags +- **Automatic ID3 extraction**: Extracts artist and title from MP4 files' ID3 tags +- **Directory-based organization**: Each directory becomes a playlist with the directory name as the title +- **Position tracking**: Songs are numbered starting from 1 based on file order +- **Append or replace**: Choose to append to existing song list or create a new one with `--no-append-songlist` +- **Multiple directories**: Process multiple directories in a single command + ## 🧹 Cleanup - Removes `.info.json` and `.meta` files after download diff --git a/commands.txt b/commands.txt index 7adfa73..c41cf8e 100644 --- a/commands.txt +++ b/commands.txt @@ -19,6 +19,20 @@ python download_karaoke.py --limit 10 https://www.youtube.com/@SingKingKaraoke/v # Enable parallel downloads for faster processing (3-5x speedup) python download_karaoke.py --parallel --workers 5 --limit 10 https://www.youtube.com/@SingKingKaraoke/videos +## 📋 SONG LIST GENERATION + +# Generate song list from MP4 files in a directory (append to existing song list) +python download_karaoke.py --generate-songlist /path/to/mp4/directory + +# Generate song list from multiple directories +python download_karaoke.py --generate-songlist /path/to/dir1 /path/to/dir2 /path/to/dir3 + +# Generate song list and create a new song list file (don't append) +python download_karaoke.py --generate-songlist /path/to/mp4/directory --no-append-songlist + +# Generate song list from multiple directories and create new file +python download_karaoke.py --generate-songlist /path/to/dir1 /path/to/dir2 --no-append-songlist + ## 🎵 SONGLIST OPERATIONS # Download only songs from your songlist (uses data/channels.txt by default) @@ -51,6 +65,12 @@ python download_karaoke.py --songlist-focus "2025 - Apple Top 50" --limit 5 # Focus on specific playlists with parallel processing python download_karaoke.py --parallel --workers 3 --songlist-focus "2025 - Apple Top 50" --limit 5 +# Force download from channels regardless of existing files or server duplicates +python download_karaoke.py --songlist-focus "2025 - Apple Top 50" --force + +# Force download with parallel processing +python download_karaoke.py --parallel --workers 5 --songlist-focus "2025 - Apple Top 50" --force --limit 10 + # Prioritize songlist songs in download queue (default behavior) python download_karaoke.py --songlist-priority https://www.youtube.com/@SingKingKaraoke/videos @@ -60,6 +80,20 @@ python download_karaoke.py --no-songlist-priority https://www.youtube.com/@SingK # Show songlist download status and statistics python download_karaoke.py --songlist-status +## 📊 UNMATCHED SONGS REPORTS + +# Generate report of songs that couldn't be found in any channel +python download_karaoke.py --generate-unmatched-report + +# Generate report with fuzzy matching enabled +python download_karaoke.py --generate-unmatched-report --fuzzy-match --fuzzy-threshold 85 + +# Generate report using a specific channel file +python download_karaoke.py --generate-unmatched-report --file data/my_channels.txt + +# Generate report with custom fuzzy threshold +python download_karaoke.py --generate-unmatched-report --fuzzy-match --fuzzy-threshold 80 + ## ⚡ PARALLEL DOWNLOADS (v3.4) # Basic parallel downloads (3-5x faster than sequential) @@ -172,6 +206,9 @@ python download_karaoke.py --parallel --workers 5 --songlist-only --limit 10 # 1b. Focus on specific playlists (fast targeted download) python download_karaoke.py --songlist-focus "2025 - Apple Top 50" --limit 5 +# 1c. Force download from specific playlists (bypass all existing file checks) +python download_karaoke.py --songlist-focus "2025 - Apple Top 50" --force --limit 5 + # 2. Latest videos from all channels python download_karaoke.py --latest-per-channel --limit 5 @@ -190,6 +227,9 @@ python download_karaoke.py --parallel --workers 5 --songlist-only --fuzzy-match # 4b. Focused fuzzy matching (target specific playlists with flexible matching) python download_karaoke.py --songlist-focus "2025 - Apple Top 50" --fuzzy-match --fuzzy-threshold 80 --limit 10 +# 4c. Force download with fuzzy matching (bypass all existing file checks) +python download_karaoke.py --songlist-focus "2025 - Apple Top 50" --force --fuzzy-match --fuzzy-threshold 80 --limit 10 + # 5. Reset and start fresh python download_karaoke.py --reset-channel SingKingKaraoke --reset-songlist diff --git a/karaoke_downloader/cli.py b/karaoke_downloader/cli.py index fbbeab1..415b2d2 100644 --- a/karaoke_downloader/cli.py +++ b/karaoke_downloader/cli.py @@ -99,6 +99,11 @@ Examples: action="store_true", help="Disable songlist prioritization", ) + parser.add_argument( + "--generate-unmatched-report", + action="store_true", + help="Generate a report of songs that couldn't be found in any channel", + ) parser.add_argument( "--songlist-only", action="store_true", @@ -110,6 +115,11 @@ Examples: metavar="PLAYLIST_TITLE", help='Focus on specific playlists by title (e.g., --songlist-focus "2025 - Apple Top 50" "2024 - Billboard Hot 100")', ) + parser.add_argument( + "--force", + action="store_true", + help="Force download from channels regardless of whether songs are already downloaded, on server, or marked as duplicates", + ) parser.add_argument( "--songlist-status", action="store_true", @@ -170,6 +180,17 @@ Examples: default=3, help="Number of parallel download workers (default: 3, max: 10)", ) + parser.add_argument( + "--generate-songlist", + nargs="+", + metavar="DIRECTORY", + help="Generate song list from MP4 files with ID3 tags in specified directories", + ) + parser.add_argument( + "--no-append-songlist", + action="store_true", + help="Create a new song list instead of appending when using --generate-songlist", + ) args = parser.parse_args() # Validate workers argument @@ -213,6 +234,9 @@ Examples: print( f"🎯 Songlist focus mode enabled for playlists: {', '.join(args.songlist_focus)}" ) + if args.force: + downloader.force_download = True + print("💪 Force mode enabled - will download regardless of existing files or server duplicates") if args.resolution != "720p": downloader.config_manager.update_resolution(args.resolution) @@ -256,6 +280,22 @@ Examples: print("ℹ️ Songs will be re-checked against the server on next run.") sys.exit(0) + if args.generate_songlist: + from karaoke_downloader.songlist_generator import SongListGenerator + + print("🎵 Generating song list from MP4 files with ID3 tags...") + generator = SongListGenerator() + try: + generator.generate_songlist_from_multiple_directories( + args.generate_songlist, + append=not args.no_append_songlist + ) + print("✅ Song list generation completed successfully!") + except Exception as e: + print(f"❌ Error generating song list: {e}") + sys.exit(1) + sys.exit(0) + if args.status: stats = downloader.tracker.get_statistics() print("🎤 Karaoke Downloader Status") @@ -296,6 +336,70 @@ Examples: elif args.cleanup: orphaned = downloader.tracker.cleanup_orphaned_files(downloader.downloads_dir) print(f"🧹 Cleaned up {len(orphaned)} orphaned tracking entries") + sys.exit(0) + elif args.generate_unmatched_report: + from karaoke_downloader.download_planner import generate_unmatched_report + from karaoke_downloader.songlist_manager import load_songlist + + print("🔍 Generating unmatched songs report...") + + # Load songlist + songlist = load_songlist() + if not songlist: + print("❌ No songlist found. Please ensure data/songList.json exists.") + sys.exit(1) + + # Load channel URLs + channel_file = args.file if args.file else "data/channels.txt" + if not os.path.exists(channel_file): + print(f"❌ Channel file not found: {channel_file}") + sys.exit(1) + + with open(channel_file, "r", encoding='utf-8') as f: + channel_urls = [ + line.strip() + for line in f + if line.strip() and not line.strip().startswith("#") + ] + + print(f"📋 Analyzing {len(songlist)} songs against {len(channel_urls)} channels...") + + # Build download plan to get unmatched songs + from karaoke_downloader.download_planner import build_download_plan + fuzzy_match = args.fuzzy_match if hasattr(args, "fuzzy_match") else False + fuzzy_threshold = ( + args.fuzzy_threshold + if hasattr(args, "fuzzy_threshold") + else DEFAULT_FUZZY_THRESHOLD + ) + + try: + download_plan, unmatched = build_download_plan( + channel_urls, + songlist, + downloader.tracker, + downloader.yt_dlp_path, + fuzzy_match=fuzzy_match, + fuzzy_threshold=fuzzy_threshold, + ) + + if unmatched: + report_file = generate_unmatched_report(unmatched) + print(f"\n📋 Unmatched songs report generated successfully!") + print(f"📁 Report saved to: {report_file}") + print(f"📊 Summary: {len(download_plan)} songs found, {len(unmatched)} songs not found") + print(f"\n🔍 First 10 unmatched songs:") + for i, song in enumerate(unmatched[:10], 1): + print(f" {i:2d}. {song['artist']} - {song['title']}") + if len(unmatched) > 10: + print(f" ... and {len(unmatched) - 10} more songs") + else: + print(f"\n✅ All {len(songlist)} songs were found in the channels!") + + except Exception as e: + print(f"❌ Error generating report: {e}") + sys.exit(1) + sys.exit(0) elif args.songlist_status: songlist = downloader._load_songlist() @@ -343,6 +447,7 @@ Examples: force_refresh_download_plan=force_refresh_download_plan, fuzzy_match=fuzzy_match, fuzzy_threshold=fuzzy_threshold, + force_download=args.force, ) elif args.latest_per_channel: # Use provided file or default to data/channels.txt @@ -372,6 +477,7 @@ Examples: force_refresh_download_plan=force_refresh_download_plan, fuzzy_match=fuzzy_match, fuzzy_threshold=fuzzy_threshold, + force_download=args.force, ) elif args.url: success = downloader.download_channel_videos( diff --git a/karaoke_downloader/download_planner.py b/karaoke_downloader/download_planner.py index af51d39..431b2fa 100644 --- a/karaoke_downloader/download_planner.py +++ b/karaoke_downloader/download_planner.py @@ -3,6 +3,11 @@ Download plan building utilities. Handles pre-scanning channels and building download plans. """ +import json +from datetime import datetime +from pathlib import Path +from typing import List, Dict, Any + from karaoke_downloader.cache_manager import ( delete_plan_cache, get_download_plan_cache_file, @@ -15,6 +20,7 @@ from karaoke_downloader.fuzzy_matcher import ( get_similarity_function, is_exact_match, is_fuzzy_match, + normalize_title, ) from karaoke_downloader.youtube_utils import get_channel_info @@ -25,6 +31,49 @@ DEFAULT_TITLE_LENGTH_LIMIT = 60 DEFAULT_FUZZY_THRESHOLD = 85 +def generate_unmatched_report(unmatched: List[Dict[str, Any]], report_path: str = None) -> str: + """ + Generate a detailed report of unmatched songs and save it to a file. + + Args: + unmatched: List of unmatched songs from build_download_plan + report_path: Optional path to save the report (default: data/unmatched_songs_report.json) + + Returns: + Path to the saved report file + """ + if report_path is None: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + report_path = f"data/unmatched_songs_report_{timestamp}.json" + + report_data = { + "generated_at": datetime.now().isoformat(), + "total_unmatched": len(unmatched), + "unmatched_songs": [] + } + + for song in unmatched: + report_data["unmatched_songs"].append({ + "artist": song["artist"], + "title": song["title"], + "position": song.get("position", 0), + "search_key": create_song_key(song["artist"], song["title"]) + }) + + # Sort by artist, then by title for easier reading + report_data["unmatched_songs"].sort(key=lambda x: (x["artist"].lower(), x["title"].lower())) + + # Ensure the data directory exists + report_file = Path(report_path) + report_file.parent.mkdir(parents=True, exist_ok=True) + + # Save the report + with open(report_file, 'w', encoding='utf-8') as f: + json.dump(report_data, f, indent=2, ensure_ascii=False) + + return str(report_file) + + def build_download_plan( channel_urls, undownloaded, @@ -143,4 +192,13 @@ def build_download_plan( f" TOTAL: {sum(channel_match_counts.values())} matches across {len(channel_match_counts)} channels." ) + # Generate unmatched songs report if there are any + if unmatched: + try: + report_file = generate_unmatched_report(unmatched) + print(f"\n📋 Unmatched songs report saved to: {report_file}") + print(f"📋 Total unmatched songs: {len(unmatched)}") + except Exception as e: + print(f"⚠️ Could not generate unmatched songs report: {e}") + return plan, unmatched diff --git a/karaoke_downloader/downloader.py b/karaoke_downloader/downloader.py index 80c3755..1ab9db4 100644 --- a/karaoke_downloader/downloader.py +++ b/karaoke_downloader/downloader.py @@ -114,6 +114,7 @@ class KaraokeDownloader: self.songlist_only = False self.use_songlist_priority = True self.download_limit = None + self.force_download = False def _load_config(self): """Load configuration using the config manager.""" @@ -143,6 +144,7 @@ class KaraokeDownloader: video_title, server_songs, server_duplicates_tracking, + self.force_download, ) def _mark_song_failed(self, artist, title, video_id, channel_name, error_message): @@ -229,18 +231,18 @@ class KaraokeDownloader: best_score = score best_song = song if best_score >= fuzzy_threshold and best_song: - # Check if already downloaded or on server - if not is_songlist_song_downloaded( + # Check if already downloaded or on server (unless force mode is enabled) + if self.force_download or not is_songlist_song_downloaded( self.songlist_tracking, best_song["artist"], best_song["title"] ): - # Check if already marked as server duplicate - if not is_song_marked_as_server_duplicate( + # Check if already marked as server duplicate (unless force mode is enabled) + if self.force_download or not is_song_marked_as_server_duplicate( server_duplicates_tracking, best_song["artist"], best_song["title"], ): - # Check if already on server and mark for future skipping - if not check_and_mark_server_duplicate( + # Check if already on server and mark for future skipping (unless force mode is enabled) + if self.force_download or not check_and_mark_server_duplicate( server_songs, server_duplicates_tracking, best_song["artist"], @@ -257,16 +259,16 @@ class KaraokeDownloader: else: if key in normalized_songlist: song = normalized_songlist[key] - # Check if already downloaded or on server - if not is_songlist_song_downloaded( + # Check if already downloaded or on server (unless force mode is enabled) + if self.force_download or not is_songlist_song_downloaded( self.songlist_tracking, song["artist"], song["title"] ): - # Check if already marked as server duplicate - if not is_song_marked_as_server_duplicate( + # Check if already marked as server duplicate (unless force mode is enabled) + if self.force_download or not is_song_marked_as_server_duplicate( server_duplicates_tracking, song["artist"], song["title"] ): - # Check if already on server and mark for future skipping - if not check_and_mark_server_duplicate( + # Check if already on server and mark for future skipping (unless force mode is enabled) + if self.force_download or not check_and_mark_server_duplicate( server_songs, server_duplicates_tracking, song["artist"], @@ -312,11 +314,14 @@ class KaraokeDownloader: force_refresh_download_plan=False, fuzzy_match=False, fuzzy_threshold=DEFAULT_FUZZY_THRESHOLD, + force_download=False, ): """ - For each song in the songlist, try each channel in order and download from the first channel where it is found. - Download up to 'limit' songs, skipping any that cannot be found, until the limit is reached or all possible matches are exhausted. + Download songs from the songlist across multiple channels. """ + # Set force download flag + self.force_download = force_download + # Apply songlist focus filtering if specified if self.songlist_focus_titles: # Load the raw songlist data to filter by playlist titles @@ -388,70 +393,79 @@ class KaraokeDownloader: return False # Filter for songs not yet downloaded - undownloaded = [ - s - for s in songlist - if not is_songlist_song_downloaded( - self.songlist_tracking, s["artist"], s["title"] - ) - ] - print(f"\n🎯 {len(songlist)} total unique songs in songlist.") - print(f"\n🎯 {len(undownloaded)} unique songlist songs to download.") + if self.force_download: + undownloaded = songlist + print(f"\n🎯 {len(songlist)} total unique songs in songlist.") + print(f"💪 Force mode enabled - will download all songs regardless of existing status") + else: + undownloaded = [ + s + for s in songlist + if not is_songlist_song_downloaded( + self.songlist_tracking, s["artist"], s["title"] + ) + ] + print(f"\n🎯 {len(songlist)} total unique songs in songlist.") + print(f"\n🎯 {len(undownloaded)} unique songlist songs to download.") # Load server songs and duplicates tracking for availability checking server_songs = load_server_songs() server_duplicates_tracking = load_server_duplicates_tracking() # Further filter out songs already on server or marked as duplicates - not_on_server = [] - server_available_mp4 = 0 - server_available_other = 0 - marked_duplicates = 0 + if self.force_download: + not_on_server = undownloaded + print(f"💪 Force mode enabled - will download all songs regardless of server status") + else: + not_on_server = [] + server_available_mp4 = 0 + server_available_other = 0 + marked_duplicates = 0 - for song in undownloaded: - artist, title = song["artist"], song["title"] + for song in undownloaded: + artist, title = song["artist"], song["title"] - # Check if already marked as server duplicate - if is_song_marked_as_server_duplicate( - server_duplicates_tracking, artist, title - ): - marked_duplicates += 1 - continue + # Check if already marked as server duplicate + if is_song_marked_as_server_duplicate( + server_duplicates_tracking, artist, title + ): + marked_duplicates += 1 + continue - # Check if already on server and mark for future skipping - if check_and_mark_server_duplicate( - server_songs, - server_duplicates_tracking, - artist, - title, - f"{artist} - {title}", - "songlist", - ): - server_available_mp4 += 1 - continue + # Check if already on server and mark for future skipping + if check_and_mark_server_duplicate( + server_songs, + server_duplicates_tracking, + artist, + title, + f"{artist} - {title}", + "songlist", + ): + server_available_mp4 += 1 + continue - # Check if song is on server but in different format (MP3/CDG) - key = f"{artist.lower()}_{normalize_title(title)}" - if key in server_songs: - song_info = server_songs[key] - if song_info.get("is_mp3", False) or song_info.get("is_cdg", False): - server_available_other += 1 - print(f"🎵 Found {artist} - {title} on server as {song_info.get('is_mp3', False) and 'MP3' or 'CDG'} format, will download video version") + # Check if song is on server but in different format (MP3/CDG) + key = f"{artist.lower()}_{normalize_title(title)}" + if key in server_songs: + song_info = server_songs[key] + if song_info.get("is_mp3", False) or song_info.get("is_cdg", False): + server_available_other += 1 + print(f"🎵 Found {artist} - {title} on server as {song_info.get('is_mp3', False) and 'MP3' or 'CDG'} format, will download video version") - not_on_server.append(song) + not_on_server.append(song) - if server_available_mp4 > 0: - print( - f"\n🎵 {server_available_mp4} songs already available as MP4 on server, skipping." - ) - if server_available_other > 0: - print( - f"\n🎵 {server_available_other} songs found on server as MP3/CDG, will download video versions." - ) - if marked_duplicates > 0: - print( - f"\n🏷️ {marked_duplicates} songs previously marked as server duplicates, skipping." - ) + if server_available_mp4 > 0: + print( + f"\n🎵 {server_available_mp4} songs already available as MP4 on server, skipping." + ) + if server_available_other > 0: + print( + f"\n🎵 {server_available_other} songs found on server as MP3/CDG, will download video versions." + ) + if marked_duplicates > 0: + print( + f"\n🏷️ {marked_duplicates} songs previously marked as server duplicates, skipping." + ) undownloaded = not_on_server print(f"\n🎯 {len(undownloaded)} songs need to be downloaded.") @@ -832,6 +846,7 @@ class KaraokeDownloader: force_refresh_download_plan=False, fuzzy_match=False, fuzzy_threshold=DEFAULT_FUZZY_THRESHOLD, + force_download=False, ): """ Download the latest N videos from each channel in channel_urls. @@ -841,6 +856,10 @@ class KaraokeDownloader: - Resume robustly if interrupted (removes each channel from the plan as it completes). - Deletes the plan cache when all channels are done. """ + # Set force download flag + self.force_download = force_download + + print(f"\n🎬 Downloading latest {limit} videos from each channel...") # Load server songs for availability checking server_songs = load_server_songs() server_duplicates_tracking = load_server_duplicates_tracking() diff --git a/karaoke_downloader/song_validator.py b/karaoke_downloader/song_validator.py index 19bf61a..8266d24 100644 --- a/karaoke_downloader/song_validator.py +++ b/karaoke_downloader/song_validator.py @@ -35,6 +35,7 @@ class SongValidator: video_title: Optional[str] = None, server_songs: Optional[Dict[str, Any]] = None, server_duplicates_tracking: Optional[Dict[str, Any]] = None, + force_download: bool = False, ) -> Tuple[bool, Optional[str], int]: """ Check if a song should be skipped based on multiple criteria. @@ -53,10 +54,15 @@ class SongValidator: video_title: YouTube video title (optional) server_songs: Server songs data (optional) server_duplicates_tracking: Server duplicates tracking (optional) + force_download: If True, bypass all validation checks and force download Returns: Tuple of (should_skip, reason, total_filtered) """ + # If force download is enabled, skip all validation checks + if force_download: + return False, None, 0 + total_filtered = 0 # Check 1: Already downloaded by this system diff --git a/karaoke_downloader/songlist_generator.py b/karaoke_downloader/songlist_generator.py new file mode 100644 index 0000000..21191ce --- /dev/null +++ b/karaoke_downloader/songlist_generator.py @@ -0,0 +1,261 @@ +import json +import os +from pathlib import Path +from typing import List, Dict, Any, Optional +from mutagen.mp4 import MP4 + + +class SongListGenerator: + """Utility class for generating song lists from MP4 files with ID3 tags.""" + + def __init__(self, songlist_path: str = "data/songList.json"): + self.songlist_path = Path(songlist_path) + self.songlist_path.parent.mkdir(parents=True, exist_ok=True) + + def read_existing_songlist(self) -> List[Dict[str, Any]]: + """Read existing song list from JSON file.""" + if self.songlist_path.exists(): + try: + with open(self.songlist_path, 'r', encoding='utf-8') as f: + return json.load(f) + except (json.JSONDecodeError, IOError) as e: + print(f"⚠️ Warning: Could not read existing songlist: {e}") + return [] + return [] + + def save_songlist(self, songlist: List[Dict[str, Any]]) -> None: + """Save song list to JSON file.""" + try: + with open(self.songlist_path, 'w', encoding='utf-8') as f: + json.dump(songlist, f, indent=2, ensure_ascii=False) + print(f"✅ Song list saved to {self.songlist_path}") + except IOError as e: + print(f"❌ Error saving song list: {e}") + raise + + def extract_id3_tags(self, mp4_path: Path) -> Optional[Dict[str, str]]: + """Extract ID3 tags from MP4 file.""" + try: + mp4 = MP4(str(mp4_path)) + + # Extract artist and title from ID3 tags + artist = mp4.get("\xa9ART", ["Unknown Artist"])[0] if "\xa9ART" in mp4 else "Unknown Artist" + title = mp4.get("\xa9nam", ["Unknown Title"])[0] if "\xa9nam" in mp4 else "Unknown Title" + + return { + "artist": artist, + "title": title + } + except Exception as e: + print(f"⚠️ Warning: Could not extract ID3 tags from {mp4_path.name}: {e}") + return None + + def scan_directory_for_mp4_files(self, directory_path: str) -> List[Path]: + """Scan directory for MP4 files.""" + directory = Path(directory_path) + if not directory.exists(): + raise FileNotFoundError(f"Directory not found: {directory_path}") + + if not directory.is_dir(): + raise ValueError(f"Path is not a directory: {directory_path}") + + mp4_files = list(directory.glob("*.mp4")) + if not mp4_files: + print(f"⚠️ No MP4 files found in {directory_path}") + return [] + + print(f"📁 Found {len(mp4_files)} MP4 files in {directory.name}") + return sorted(mp4_files) + + def generate_songlist_from_directory(self, directory_path: str, append: bool = True) -> Dict[str, Any]: + """Generate a song list from MP4 files in a directory.""" + directory = Path(directory_path) + directory_name = directory.name + + # Scan for MP4 files + mp4_files = self.scan_directory_for_mp4_files(directory_path) + if not mp4_files: + return {} + + # Extract ID3 tags and create songs list + songs = [] + for index, mp4_file in enumerate(mp4_files, start=1): + id3_tags = self.extract_id3_tags(mp4_file) + if id3_tags: + song = { + "position": index, + "title": id3_tags["title"], + "artist": id3_tags["artist"] + } + songs.append(song) + print(f" {index:3d}. {id3_tags['artist']} - {id3_tags['title']}") + + if not songs: + print("❌ No valid ID3 tags found in any MP4 files") + return {} + + # Create the song list entry + songlist_entry = { + "title": directory_name, + "songs": songs + } + + # Handle appending to existing song list + if append: + existing_songlist = self.read_existing_songlist() + + # Check if a playlist with this title already exists + existing_index = None + for i, entry in enumerate(existing_songlist): + if entry.get("title") == directory_name: + existing_index = i + break + + if existing_index is not None: + # Replace existing entry + print(f"🔄 Replacing existing playlist: {directory_name}") + existing_songlist[existing_index] = songlist_entry + else: + # Add new entry to the beginning of the list + print(f"➕ Adding new playlist: {directory_name}") + existing_songlist.insert(0, songlist_entry) + + self.save_songlist(existing_songlist) + else: + # Create new song list with just this entry + print(f"📝 Creating new song list with playlist: {directory_name}") + self.save_songlist([songlist_entry]) + + return songlist_entry + + def generate_songlist_from_multiple_directories(self, directory_paths: List[str], append: bool = True) -> List[Dict[str, Any]]: + """Generate song lists from multiple directories.""" + results = [] + errors = [] + + # Read existing song list once at the beginning + existing_songlist = self.read_existing_songlist() if append else [] + + for directory_path in directory_paths: + try: + print(f"\n📂 Processing directory: {directory_path}") + directory = Path(directory_path) + directory_name = directory.name + + # Scan for MP4 files + mp4_files = self.scan_directory_for_mp4_files(directory_path) + if not mp4_files: + continue + + # Extract ID3 tags and create songs list + songs = [] + for index, mp4_file in enumerate(mp4_files, start=1): + id3_tags = self.extract_id3_tags(mp4_file) + if id3_tags: + song = { + "position": index, + "title": id3_tags["title"], + "artist": id3_tags["artist"] + } + songs.append(song) + print(f" {index:3d}. {id3_tags['artist']} - {id3_tags['title']}") + + if not songs: + print("❌ No valid ID3 tags found in any MP4 files") + continue + + # Create the song list entry + songlist_entry = { + "title": directory_name, + "songs": songs + } + + # Check if a playlist with this title already exists + existing_index = None + for i, entry in enumerate(existing_songlist): + if entry.get("title") == directory_name: + existing_index = i + break + + if existing_index is not None: + # Replace existing entry + print(f"🔄 Replacing existing playlist: {directory_name}") + existing_songlist[existing_index] = songlist_entry + else: + # Add new entry to the beginning of the list + print(f"➕ Adding new playlist: {directory_name}") + existing_songlist.insert(0, songlist_entry) + + results.append(songlist_entry) + + except Exception as e: + error_msg = f"Error processing {directory_path}: {e}" + print(f"❌ {error_msg}") + errors.append(error_msg) + + # Save the final song list + if results: + if append: + # Save the updated existing song list + self.save_songlist(existing_songlist) + else: + # Create new song list with just the results + self.save_songlist(results) + + # If there were any errors, raise an exception + if errors: + raise Exception(f"Failed to process {len(errors)} directories: {'; '.join(errors)}") + + return results + + +def main(): + """CLI entry point for song list generation.""" + import argparse + import sys + + parser = argparse.ArgumentParser( + description="Generate song lists from MP4 files with ID3 tags", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + python -m karaoke_downloader.songlist_generator /path/to/mp4/directory + python -m karaoke_downloader.songlist_generator /path/to/dir1 /path/to/dir2 --no-append + python -m karaoke_downloader.songlist_generator /path/to/dir --songlist-path custom_songlist.json + """ + ) + + parser.add_argument( + "directories", + nargs="+", + help="Directory paths containing MP4 files with ID3 tags" + ) + + parser.add_argument( + "--no-append", + action="store_true", + help="Create a new song list instead of appending to existing one" + ) + + parser.add_argument( + "--songlist-path", + default="data/songList.json", + help="Path to the song list JSON file (default: data/songList.json)" + ) + + args = parser.parse_args() + + try: + generator = SongListGenerator(args.songlist_path) + generator.generate_songlist_from_multiple_directories( + args.directories, + append=not args.no_append + ) + print("\n✅ Song list generation completed successfully!") + except Exception as e: + print(f"\n❌ Error: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file