""" Download plan building utilities. Handles pre-scanning channels and building download plans. """ import concurrent.futures import hashlib import json import sys from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from karaoke_downloader.cache_manager import ( delete_plan_cache, get_download_plan_cache_file, load_cached_plan, save_plan_cache, ) # Import all fuzzy matching functions from karaoke_downloader.fuzzy_matcher import ( create_song_key, create_video_key, get_similarity_function, is_exact_match, is_fuzzy_match, normalize_title, ) from karaoke_downloader.channel_parser import ChannelParser from karaoke_downloader.youtube_utils import get_channel_info # Constants DEFAULT_FILENAME_LENGTH_LIMIT = 100 DEFAULT_ARTIST_LENGTH_LIMIT = 30 DEFAULT_TITLE_LENGTH_LIMIT = 60 DEFAULT_FUZZY_THRESHOLD = 85 DEFAULT_DISPLAY_LIMIT = 10 def generate_unmatched_report(unmatched: List[Dict[str, Any]], report_path: str = None) -> str: """ Generate a detailed report of unmatched songs and save it to a file. Args: unmatched: List of unmatched songs from build_download_plan report_path: Optional path to save the report (default: data/unmatched_songs_report.json) Returns: Path to the saved report file """ if report_path is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") report_path = f"data/unmatched_songs_report_{timestamp}.json" report_data = { "generated_at": datetime.now().isoformat(), "total_unmatched": len(unmatched), "unmatched_songs": [] } for song in unmatched: report_data["unmatched_songs"].append({ "artist": song["artist"], "title": song["title"], "position": song.get("position", 0), "search_key": create_song_key(song["artist"], song["title"]) }) # Sort by artist, then by title for easier reading report_data["unmatched_songs"].sort(key=lambda x: (x["artist"].lower(), x["title"].lower())) # Ensure the data directory exists report_file = Path(report_path) report_file.parent.mkdir(parents=True, exist_ok=True) # Save the report with open(report_file, 'w', encoding='utf-8') as f: json.dump(report_data, f, indent=2, ensure_ascii=False) return str(report_file) def _scan_channel_for_matches( channel_url, channel_name, channel_id, song_keys, song_lookup, fuzzy_match, fuzzy_threshold, show_pagination, yt_dlp_path, tracker, ): """ Scan a single channel for matches (used in parallel processing). Args: channel_url: URL of the channel to scan channel_name: Name of the channel channel_id: ID of the channel song_keys: Set of song keys to match against song_lookup: Dictionary mapping song keys to song data fuzzy_match: Whether to use fuzzy matching fuzzy_threshold: Threshold for fuzzy matching show_pagination: Whether to show pagination progress yt_dlp_path: Path to yt-dlp executable tracker: Tracking manager instance Returns: List of video matches found in this channel """ print(f"\n🚦 Scanning channel: {channel_name} ({channel_url})") # Get channel info if not provided if not channel_name or not channel_id: channel_name, channel_id = get_channel_info(channel_url) # Fetch video list from channel available_videos = tracker.get_channel_video_list( channel_url, yt_dlp_path=str(yt_dlp_path), force_refresh=False, show_pagination=show_pagination ) print(f" šŸ“Š Channel has {len(available_videos)} videos to scan") video_matches = [] # Pre-process video titles for efficient matching channel_parser = ChannelParser() if fuzzy_match: # For fuzzy matching, create normalized video keys for video in available_videos: v_artist, v_title = channel_parser.extract_artist_title(video["title"], channel_name) video_key = create_song_key(v_artist, v_title) # Find best match among remaining songs best_match = None best_score = 0 for song_key in song_keys: if song_key in song_lookup: # Only check unmatched songs score = get_similarity_function()(song_key, video_key) if score >= fuzzy_threshold and score > best_score: best_score = score best_match = song_key if best_match: song = song_lookup[best_match] video_matches.append( { "artist": song["artist"], "title": song["title"], "channel_name": channel_name, "channel_url": channel_url, "video_id": video["id"], "video_title": video["title"], "match_score": best_score, } ) # Remove matched song from future consideration del song_lookup[best_match] song_keys.remove(best_match) else: # For exact matching, use direct key comparison for video in available_videos: v_artist, v_title = channel_parser.extract_artist_title(video["title"], channel_name) video_key = create_song_key(v_artist, v_title) if video_key in song_keys: song = song_lookup[video_key] video_matches.append( { "artist": song["artist"], "title": song["title"], "channel_name": channel_name, "channel_url": channel_url, "video_id": video["id"], "video_title": video["title"], "match_score": 100, } ) # Remove matched song from future consideration del song_lookup[video_key] song_keys.remove(video_key) print(f" āœ… Found {len(video_matches)} matches in {channel_name}") return video_matches def build_download_plan( channel_urls, undownloaded, tracker, yt_dlp_path, fuzzy_match=False, fuzzy_threshold=DEFAULT_FUZZY_THRESHOLD, show_pagination=False, parallel_channels=False, max_channel_workers=3, ): """ For each song in undownloaded, scan all channels for a match. Use fuzzy matching if enabled. Return (download_plan, unmatched_songs): - download_plan: list of dicts {artist, title, channel_name, channel_url, video_id, video_title, match_score} - unmatched_songs: list of songs not found in any channel """ plan = [] unmatched = [] channel_match_counts = {} # Pre-process songlist for O(1) lookups song_keys = set() song_lookup = {} for song in undownloaded: key = create_song_key(song["artist"], song["title"]) song_keys.add(key) song_lookup[key] = song if parallel_channels: print(f"šŸš€ Running parallel channel scanning with {max_channel_workers} workers.") # Create a thread-safe copy of song data for parallel processing import threading song_keys_lock = threading.Lock() song_lookup_lock = threading.Lock() def scan_channel_safe(channel_url): """Thread-safe channel scanning function.""" print(f"\n🚦 Scanning channel: {channel_url}") # Get channel info channel_name, channel_id = get_channel_info(channel_url) print(f" āœ… Channel info: {channel_name} (ID: {channel_id})") # Fetch video list from channel available_videos = tracker.get_channel_video_list( channel_url, yt_dlp_path=str(yt_dlp_path), force_refresh=False, show_pagination=show_pagination ) print(f" šŸ“Š Channel has {len(available_videos)} videos to scan") video_matches = [] # Pre-process video titles for efficient matching channel_parser = ChannelParser() if fuzzy_match: # For fuzzy matching, create normalized video keys for video in available_videos: v_artist, v_title = channel_parser.extract_artist_title(video["title"], channel_name) video_key = create_song_key(v_artist, v_title) # Find best match among remaining songs (thread-safe) best_match = None best_score = 0 with song_keys_lock: available_song_keys = list(song_keys) # Copy for iteration for song_key in available_song_keys: with song_lookup_lock: if song_key in song_lookup: # Only check unmatched songs score = get_similarity_function()(song_key, video_key) if score >= fuzzy_threshold and score > best_score: best_score = score best_match = song_key if best_match: with song_lookup_lock: if best_match in song_lookup: # Double-check it's still available song = song_lookup[best_match] video_matches.append( { "artist": song["artist"], "title": song["title"], "channel_name": channel_name, "channel_url": channel_url, "video_id": video["id"], "video_title": video["title"], "match_score": best_score, } ) # Remove matched song from future consideration del song_lookup[best_match] with song_keys_lock: song_keys.discard(best_match) else: # For exact matching, use direct key comparison for video in available_videos: v_artist, v_title = channel_parser.extract_artist_title(video["title"], channel_name) video_key = create_song_key(v_artist, v_title) with song_lookup_lock: if video_key in song_keys and video_key in song_lookup: song = song_lookup[video_key] video_matches.append( { "artist": song["artist"], "title": song["title"], "channel_name": channel_name, "channel_url": channel_url, "video_id": video["id"], "video_title": video["title"], "match_score": 100, } ) # Remove matched song from future consideration del song_lookup[video_key] with song_keys_lock: song_keys.discard(video_key) print(f" āœ… Found {len(video_matches)} matches in {channel_name}") return video_matches # Execute parallel channel scanning with concurrent.futures.ThreadPoolExecutor(max_workers=max_channel_workers) as executor: # Submit all channel scanning tasks future_to_channel = { executor.submit(scan_channel_safe, channel_url): channel_url for channel_url in channel_urls } # Process results as they complete for future in concurrent.futures.as_completed(future_to_channel): channel_url = future_to_channel[future] try: video_matches = future.result() plan.extend(video_matches) channel_name, _ = get_channel_info(channel_url) channel_match_counts[channel_name] = len(video_matches) except Exception as e: print(f"āš ļø Error processing channel {channel_url}: {e}") channel_name, _ = get_channel_info(channel_url) channel_match_counts[channel_name] = 0 else: for i, channel_url in enumerate(channel_urls, 1): print(f"\n🚦 Starting channel {i}/{len(channel_urls)}: {channel_url}") print(f" šŸ” Getting channel info...") channel_name, channel_id = get_channel_info(channel_url) print(f" āœ… Channel info: {channel_name} (ID: {channel_id})") print(f" šŸ” Fetching video list from channel...") available_videos = tracker.get_channel_video_list( channel_url, yt_dlp_path=str(yt_dlp_path), force_refresh=False, show_pagination=show_pagination ) print( f" šŸ“Š Channel has {len(available_videos)} videos to scan against {len(undownloaded)} songlist songs" ) matches_this_channel = 0 video_matches = [] # Initialize video_matches for this channel # Pre-process video titles for efficient matching channel_parser = ChannelParser() if fuzzy_match: # For fuzzy matching, create normalized video keys for video in available_videos: v_artist, v_title = channel_parser.extract_artist_title(video["title"], channel_name) video_key = create_song_key(v_artist, v_title) # Find best match among remaining songs best_match = None best_score = 0 for song_key in song_keys: if song_key in song_lookup: # Only check unmatched songs score = get_similarity_function()(song_key, video_key) if score >= fuzzy_threshold and score > best_score: best_score = score best_match = song_key if best_match: song = song_lookup[best_match] video_matches.append( { "artist": song["artist"], "title": song["title"], "channel_name": channel_name, "channel_url": channel_url, "video_id": video["id"], "video_title": video["title"], "match_score": best_score, } ) # Remove matched song from future consideration del song_lookup[best_match] song_keys.remove(best_match) matches_this_channel += 1 else: # For exact matching, use direct key comparison for video in available_videos: v_artist, v_title = channel_parser.extract_artist_title(video["title"], channel_name) video_key = create_song_key(v_artist, v_title) if video_key in song_keys: song = song_lookup[video_key] video_matches.append( { "artist": song["artist"], "title": song["title"], "channel_name": channel_name, "channel_url": channel_url, "video_id": video["id"], "video_title": video["title"], "match_score": 100, } ) # Remove matched song from future consideration del song_lookup[video_key] song_keys.remove(video_key) matches_this_channel += 1 # Add matches to plan plan.extend(video_matches) # Print match count once per channel channel_match_counts[channel_name] = matches_this_channel print(f" → Found {matches_this_channel} songlist matches in this channel.") # Remaining unmatched songs unmatched = list(song_lookup.values()) # Print summary table print("\nšŸ“Š Channel match summary:") for channel, count in channel_match_counts.items(): print(f" {channel}: {count} matches") print( f" TOTAL: {sum(channel_match_counts.values())} matches across {len(channel_match_counts)} channels." ) # Generate unmatched songs report if there are any if unmatched: try: report_file = generate_unmatched_report(unmatched) print(f"\nšŸ“‹ Unmatched songs report saved to: {report_file}") print(f"šŸ“‹ Total unmatched songs: {len(unmatched)}") except Exception as e: print(f"āš ļø Could not generate unmatched songs report: {e}") return plan, unmatched