KaraokeVideoDownloader/karaoke_downloader/download_planner.py

431 lines
17 KiB
Python

"""
Download plan building utilities.
Handles pre-scanning channels and building download plans.
"""
import concurrent.futures
import hashlib
import json
import sys
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from karaoke_downloader.cache_manager import (
delete_plan_cache,
get_download_plan_cache_file,
load_cached_plan,
save_plan_cache,
)
from karaoke_downloader.fuzzy_matcher import (
create_song_key,
create_video_key,
extract_artist_title,
get_similarity_function,
is_exact_match,
is_fuzzy_match,
normalize_title,
)
from karaoke_downloader.youtube_utils import get_channel_info
# Constants
DEFAULT_FILENAME_LENGTH_LIMIT = 100
DEFAULT_ARTIST_LENGTH_LIMIT = 30
DEFAULT_TITLE_LENGTH_LIMIT = 60
DEFAULT_FUZZY_THRESHOLD = 85
DEFAULT_DISPLAY_LIMIT = 10
def generate_unmatched_report(unmatched: List[Dict[str, Any]], report_path: str = None) -> str:
"""
Generate a detailed report of unmatched songs and save it to a file.
Args:
unmatched: List of unmatched songs from build_download_plan
report_path: Optional path to save the report (default: data/unmatched_songs_report.json)
Returns:
Path to the saved report file
"""
if report_path is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_path = f"data/unmatched_songs_report_{timestamp}.json"
report_data = {
"generated_at": datetime.now().isoformat(),
"total_unmatched": len(unmatched),
"unmatched_songs": []
}
for song in unmatched:
report_data["unmatched_songs"].append({
"artist": song["artist"],
"title": song["title"],
"position": song.get("position", 0),
"search_key": create_song_key(song["artist"], song["title"])
})
# Sort by artist, then by title for easier reading
report_data["unmatched_songs"].sort(key=lambda x: (x["artist"].lower(), x["title"].lower()))
# Ensure the data directory exists
report_file = Path(report_path)
report_file.parent.mkdir(parents=True, exist_ok=True)
# Save the report
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report_data, f, indent=2, ensure_ascii=False)
return str(report_file)
def _scan_channel_for_matches(
channel_url,
channel_name,
channel_id,
song_keys,
song_lookup,
fuzzy_match,
fuzzy_threshold,
show_pagination,
yt_dlp_path,
tracker,
):
"""
Scan a single channel for matches (used in parallel processing).
Args:
channel_url: URL of the channel to scan
channel_name: Name of the channel
channel_id: ID of the channel
song_keys: Set of song keys to match against
song_lookup: Dictionary mapping song keys to song data
fuzzy_match: Whether to use fuzzy matching
fuzzy_threshold: Threshold for fuzzy matching
show_pagination: Whether to show pagination progress
yt_dlp_path: Path to yt-dlp executable
tracker: Tracking manager instance
Returns:
List of video matches found in this channel
"""
print(f"\n🚦 Scanning channel: {channel_name} ({channel_url})")
# Get channel info if not provided
if not channel_name or not channel_id:
channel_name, channel_id = get_channel_info(channel_url)
# Fetch video list from channel
available_videos = tracker.get_channel_video_list(
channel_url, yt_dlp_path=str(yt_dlp_path), force_refresh=False, show_pagination=show_pagination
)
print(f" 📊 Channel has {len(available_videos)} videos to scan")
video_matches = []
# Pre-process video titles for efficient matching
if fuzzy_match:
# For fuzzy matching, create normalized video keys
for video in available_videos:
v_artist, v_title = extract_artist_title(video["title"])
video_key = create_song_key(v_artist, v_title)
# Find best match among remaining songs
best_match = None
best_score = 0
for song_key in song_keys:
if song_key in song_lookup: # Only check unmatched songs
score = get_similarity_function()(song_key, video_key)
if score >= fuzzy_threshold and score > best_score:
best_score = score
best_match = song_key
if best_match:
song = song_lookup[best_match]
video_matches.append(
{
"artist": song["artist"],
"title": song["title"],
"channel_name": channel_name,
"channel_url": channel_url,
"video_id": video["id"],
"video_title": video["title"],
"match_score": best_score,
}
)
# Remove matched song from future consideration
del song_lookup[best_match]
song_keys.remove(best_match)
else:
# For exact matching, use direct key comparison
for video in available_videos:
v_artist, v_title = extract_artist_title(video["title"])
video_key = create_song_key(v_artist, v_title)
if video_key in song_keys:
song = song_lookup[video_key]
video_matches.append(
{
"artist": song["artist"],
"title": song["title"],
"channel_name": channel_name,
"channel_url": channel_url,
"video_id": video["id"],
"video_title": video["title"],
"match_score": 100,
}
)
# Remove matched song from future consideration
del song_lookup[video_key]
song_keys.remove(video_key)
print(f" ✅ Found {len(video_matches)} matches in {channel_name}")
return video_matches
def build_download_plan(
channel_urls,
undownloaded,
tracker,
yt_dlp_path,
fuzzy_match=False,
fuzzy_threshold=DEFAULT_FUZZY_THRESHOLD,
show_pagination=False,
parallel_channels=False,
max_channel_workers=3,
):
"""
For each song in undownloaded, scan all channels for a match.
Use fuzzy matching if enabled.
Return (download_plan, unmatched_songs):
- download_plan: list of dicts {artist, title, channel_name, channel_url, video_id, video_title, match_score}
- unmatched_songs: list of songs not found in any channel
"""
plan = []
unmatched = []
channel_match_counts = {}
# Pre-process songlist for O(1) lookups
song_keys = set()
song_lookup = {}
for song in undownloaded:
key = create_song_key(song["artist"], song["title"])
song_keys.add(key)
song_lookup[key] = song
if parallel_channels:
print(f"🚀 Running parallel channel scanning with {max_channel_workers} workers.")
# Create a thread-safe copy of song data for parallel processing
import threading
song_keys_lock = threading.Lock()
song_lookup_lock = threading.Lock()
def scan_channel_safe(channel_url):
"""Thread-safe channel scanning function."""
print(f"\n🚦 Scanning channel: {channel_url}")
# Get channel info
channel_name, channel_id = get_channel_info(channel_url)
print(f" ✅ Channel info: {channel_name} (ID: {channel_id})")
# Fetch video list from channel
available_videos = tracker.get_channel_video_list(
channel_url, yt_dlp_path=str(yt_dlp_path), force_refresh=False, show_pagination=show_pagination
)
print(f" 📊 Channel has {len(available_videos)} videos to scan")
video_matches = []
# Pre-process video titles for efficient matching
if fuzzy_match:
# For fuzzy matching, create normalized video keys
for video in available_videos:
v_artist, v_title = extract_artist_title(video["title"])
video_key = create_song_key(v_artist, v_title)
# Find best match among remaining songs (thread-safe)
best_match = None
best_score = 0
with song_keys_lock:
available_song_keys = list(song_keys) # Copy for iteration
for song_key in available_song_keys:
with song_lookup_lock:
if song_key in song_lookup: # Only check unmatched songs
score = get_similarity_function()(song_key, video_key)
if score >= fuzzy_threshold and score > best_score:
best_score = score
best_match = song_key
if best_match:
with song_lookup_lock:
if best_match in song_lookup: # Double-check it's still available
song = song_lookup[best_match]
video_matches.append(
{
"artist": song["artist"],
"title": song["title"],
"channel_name": channel_name,
"channel_url": channel_url,
"video_id": video["id"],
"video_title": video["title"],
"match_score": best_score,
}
)
# Remove matched song from future consideration
del song_lookup[best_match]
with song_keys_lock:
song_keys.discard(best_match)
else:
# For exact matching, use direct key comparison
for video in available_videos:
v_artist, v_title = extract_artist_title(video["title"])
video_key = create_song_key(v_artist, v_title)
with song_lookup_lock:
if video_key in song_keys and video_key in song_lookup:
song = song_lookup[video_key]
video_matches.append(
{
"artist": song["artist"],
"title": song["title"],
"channel_name": channel_name,
"channel_url": channel_url,
"video_id": video["id"],
"video_title": video["title"],
"match_score": 100,
}
)
# Remove matched song from future consideration
del song_lookup[video_key]
with song_keys_lock:
song_keys.discard(video_key)
print(f" ✅ Found {len(video_matches)} matches in {channel_name}")
return video_matches
# Execute parallel channel scanning
with concurrent.futures.ThreadPoolExecutor(max_workers=max_channel_workers) as executor:
# Submit all channel scanning tasks
future_to_channel = {
executor.submit(scan_channel_safe, channel_url): channel_url
for channel_url in channel_urls
}
# Process results as they complete
for future in concurrent.futures.as_completed(future_to_channel):
channel_url = future_to_channel[future]
try:
video_matches = future.result()
plan.extend(video_matches)
channel_name, _ = get_channel_info(channel_url)
channel_match_counts[channel_name] = len(video_matches)
except Exception as e:
print(f"⚠️ Error processing channel {channel_url}: {e}")
channel_name, _ = get_channel_info(channel_url)
channel_match_counts[channel_name] = 0
else:
for i, channel_url in enumerate(channel_urls, 1):
print(f"\n🚦 Starting channel {i}/{len(channel_urls)}: {channel_url}")
print(f" 🔍 Getting channel info...")
channel_name, channel_id = get_channel_info(channel_url)
print(f" ✅ Channel info: {channel_name} (ID: {channel_id})")
print(f" 🔍 Fetching video list from channel...")
available_videos = tracker.get_channel_video_list(
channel_url, yt_dlp_path=str(yt_dlp_path), force_refresh=False, show_pagination=show_pagination
)
print(
f" 📊 Channel has {len(available_videos)} videos to scan against {len(undownloaded)} songlist songs"
)
matches_this_channel = 0
video_matches = [] # Initialize video_matches for this channel
# Pre-process video titles for efficient matching
if fuzzy_match:
# For fuzzy matching, create normalized video keys
for video in available_videos:
v_artist, v_title = extract_artist_title(video["title"])
video_key = create_song_key(v_artist, v_title)
# Find best match among remaining songs
best_match = None
best_score = 0
for song_key in song_keys:
if song_key in song_lookup: # Only check unmatched songs
score = get_similarity_function()(song_key, video_key)
if score >= fuzzy_threshold and score > best_score:
best_score = score
best_match = song_key
if best_match:
song = song_lookup[best_match]
video_matches.append(
{
"artist": song["artist"],
"title": song["title"],
"channel_name": channel_name,
"channel_url": channel_url,
"video_id": video["id"],
"video_title": video["title"],
"match_score": best_score,
}
)
# Remove matched song from future consideration
del song_lookup[best_match]
song_keys.remove(best_match)
matches_this_channel += 1
else:
# For exact matching, use direct key comparison
for video in available_videos:
v_artist, v_title = extract_artist_title(video["title"])
video_key = create_song_key(v_artist, v_title)
if video_key in song_keys:
song = song_lookup[video_key]
video_matches.append(
{
"artist": song["artist"],
"title": song["title"],
"channel_name": channel_name,
"channel_url": channel_url,
"video_id": video["id"],
"video_title": video["title"],
"match_score": 100,
}
)
# Remove matched song from future consideration
del song_lookup[video_key]
song_keys.remove(video_key)
matches_this_channel += 1
# Add matches to plan
plan.extend(video_matches)
# Print match count once per channel
channel_match_counts[channel_name] = matches_this_channel
print(f" → Found {matches_this_channel} songlist matches in this channel.")
# Remaining unmatched songs
unmatched = list(song_lookup.values())
# Print summary table
print("\n📊 Channel match summary:")
for channel, count in channel_match_counts.items():
print(f" {channel}: {count} matches")
print(
f" TOTAL: {sum(channel_match_counts.values())} matches across {len(channel_match_counts)} channels."
)
# Generate unmatched songs report if there are any
if unmatched:
try:
report_file = generate_unmatched_report(unmatched)
print(f"\n📋 Unmatched songs report saved to: {report_file}")
print(f"📋 Total unmatched songs: {len(unmatched)}")
except Exception as e:
print(f"⚠️ Could not generate unmatched songs report: {e}")
return plan, unmatched