KaraokeVideoDownloader/karaoke_downloader/download_planner.py

205 lines
7.4 KiB
Python

"""
Download plan building utilities.
Handles pre-scanning channels and building download plans.
"""
import json
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any
from karaoke_downloader.cache_manager import (
delete_plan_cache,
get_download_plan_cache_file,
load_cached_plan,
save_plan_cache,
)
from karaoke_downloader.fuzzy_matcher import (
create_song_key,
extract_artist_title,
get_similarity_function,
is_exact_match,
is_fuzzy_match,
normalize_title,
)
from karaoke_downloader.youtube_utils import get_channel_info
# Constants
DEFAULT_FILENAME_LENGTH_LIMIT = 100
DEFAULT_ARTIST_LENGTH_LIMIT = 30
DEFAULT_TITLE_LENGTH_LIMIT = 60
DEFAULT_FUZZY_THRESHOLD = 85
def generate_unmatched_report(unmatched: List[Dict[str, Any]], report_path: str = None) -> str:
"""
Generate a detailed report of unmatched songs and save it to a file.
Args:
unmatched: List of unmatched songs from build_download_plan
report_path: Optional path to save the report (default: data/unmatched_songs_report.json)
Returns:
Path to the saved report file
"""
if report_path is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_path = f"data/unmatched_songs_report_{timestamp}.json"
report_data = {
"generated_at": datetime.now().isoformat(),
"total_unmatched": len(unmatched),
"unmatched_songs": []
}
for song in unmatched:
report_data["unmatched_songs"].append({
"artist": song["artist"],
"title": song["title"],
"position": song.get("position", 0),
"search_key": create_song_key(song["artist"], song["title"])
})
# Sort by artist, then by title for easier reading
report_data["unmatched_songs"].sort(key=lambda x: (x["artist"].lower(), x["title"].lower()))
# Ensure the data directory exists
report_file = Path(report_path)
report_file.parent.mkdir(parents=True, exist_ok=True)
# Save the report
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report_data, f, indent=2, ensure_ascii=False)
return str(report_file)
def build_download_plan(
channel_urls,
undownloaded,
tracker,
yt_dlp_path,
fuzzy_match=False,
fuzzy_threshold=DEFAULT_FUZZY_THRESHOLD,
):
"""
For each song in undownloaded, scan all channels for a match.
Use fuzzy matching if enabled.
Return (download_plan, unmatched_songs):
- download_plan: list of dicts {artist, title, channel_name, channel_url, video_id, video_title, match_score}
- unmatched_songs: list of songs not found in any channel
"""
plan = []
unmatched = []
channel_match_counts = {}
# Pre-process songlist for O(1) lookups
song_keys = set()
song_lookup = {}
for song in undownloaded:
key = create_song_key(song["artist"], song["title"])
song_keys.add(key)
song_lookup[key] = song
for i, channel_url in enumerate(channel_urls, 1):
print(f"\n🚦 Starting channel {i}/{len(channel_urls)}: {channel_url}")
print(f" 🔍 Getting channel info...")
channel_name, channel_id = get_channel_info(channel_url)
print(f" ✅ Channel info: {channel_name} (ID: {channel_id})")
print(f" 🔍 Fetching video list from channel...")
available_videos = tracker.get_channel_video_list(
channel_url, yt_dlp_path=str(yt_dlp_path), force_refresh=False
)
print(
f" 📊 Channel has {len(available_videos)} videos to scan against {len(undownloaded)} songlist songs"
)
matches_this_channel = 0
video_matches = [] # Initialize video_matches for this channel
# Pre-process video titles for efficient matching
if fuzzy_match:
# For fuzzy matching, create normalized video keys
for video in available_videos:
v_artist, v_title = extract_artist_title(video["title"])
video_key = create_song_key(v_artist, v_title)
# Find best match among remaining songs
best_match = None
best_score = 0
for song_key in song_keys:
if song_key in song_lookup: # Only check unmatched songs
score = get_similarity_function()(song_key, video_key)
if score >= fuzzy_threshold and score > best_score:
best_score = score
best_match = song_key
if best_match:
song = song_lookup[best_match]
video_matches.append(
{
"artist": song["artist"],
"title": song["title"],
"channel_name": channel_name,
"channel_url": channel_url,
"video_id": video["id"],
"video_title": video["title"],
"match_score": best_score,
}
)
# Remove matched song from future consideration
del song_lookup[best_match]
song_keys.remove(best_match)
matches_this_channel += 1
else:
# For exact matching, use direct key comparison
for video in available_videos:
v_artist, v_title = extract_artist_title(video["title"])
video_key = create_song_key(v_artist, v_title)
if video_key in song_keys:
song = song_lookup[video_key]
video_matches.append(
{
"artist": song["artist"],
"title": song["title"],
"channel_name": channel_name,
"channel_url": channel_url,
"video_id": video["id"],
"video_title": video["title"],
"match_score": 100,
}
)
# Remove matched song from future consideration
del song_lookup[video_key]
song_keys.remove(video_key)
matches_this_channel += 1
# Add matches to plan
plan.extend(video_matches)
# Print match count once per channel
channel_match_counts[channel_name] = matches_this_channel
print(f" → Found {matches_this_channel} songlist matches in this channel.")
# Remaining unmatched songs
unmatched = list(song_lookup.values())
# Print summary table
print("\n📊 Channel match summary:")
for channel, count in channel_match_counts.items():
print(f" {channel}: {count} matches")
print(
f" TOTAL: {sum(channel_match_counts.values())} matches across {len(channel_match_counts)} channels."
)
# Generate unmatched songs report if there are any
if unmatched:
try:
report_file = generate_unmatched_report(unmatched)
print(f"\n📋 Unmatched songs report saved to: {report_file}")
print(f"📋 Total unmatched songs: {len(unmatched)}")
except Exception as e:
print(f"⚠️ Could not generate unmatched songs report: {e}")
return plan, unmatched