435 lines
18 KiB
Python
435 lines
18 KiB
Python
"""
|
|
Download plan building utilities.
|
|
Handles pre-scanning channels and building download plans.
|
|
"""
|
|
|
|
import concurrent.futures
|
|
import hashlib
|
|
import json
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
from karaoke_downloader.cache_manager import (
|
|
delete_plan_cache,
|
|
get_download_plan_cache_file,
|
|
load_cached_plan,
|
|
save_plan_cache,
|
|
)
|
|
# Import all fuzzy matching functions
|
|
from karaoke_downloader.fuzzy_matcher import (
|
|
create_song_key,
|
|
create_video_key,
|
|
get_similarity_function,
|
|
is_exact_match,
|
|
is_fuzzy_match,
|
|
normalize_title,
|
|
)
|
|
from karaoke_downloader.channel_parser import ChannelParser
|
|
from karaoke_downloader.youtube_utils import get_channel_info
|
|
|
|
# Constants
|
|
DEFAULT_FILENAME_LENGTH_LIMIT = 100
|
|
DEFAULT_ARTIST_LENGTH_LIMIT = 30
|
|
DEFAULT_TITLE_LENGTH_LIMIT = 60
|
|
DEFAULT_FUZZY_THRESHOLD = 85
|
|
DEFAULT_DISPLAY_LIMIT = 10
|
|
|
|
|
|
def generate_unmatched_report(unmatched: List[Dict[str, Any]], report_path: str = None) -> str:
|
|
"""
|
|
Generate a detailed report of unmatched songs and save it to a file.
|
|
|
|
Args:
|
|
unmatched: List of unmatched songs from build_download_plan
|
|
report_path: Optional path to save the report (default: data/unmatched_songs_report.json)
|
|
|
|
Returns:
|
|
Path to the saved report file
|
|
"""
|
|
if report_path is None:
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
report_path = f"data/unmatched_songs_report_{timestamp}.json"
|
|
|
|
report_data = {
|
|
"generated_at": datetime.now().isoformat(),
|
|
"total_unmatched": len(unmatched),
|
|
"unmatched_songs": []
|
|
}
|
|
|
|
for song in unmatched:
|
|
report_data["unmatched_songs"].append({
|
|
"artist": song["artist"],
|
|
"title": song["title"],
|
|
"position": song.get("position", 0),
|
|
"search_key": create_song_key(song["artist"], song["title"])
|
|
})
|
|
|
|
# Sort by artist, then by title for easier reading
|
|
report_data["unmatched_songs"].sort(key=lambda x: (x["artist"].lower(), x["title"].lower()))
|
|
|
|
# Ensure the data directory exists
|
|
report_file = Path(report_path)
|
|
report_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Save the report
|
|
with open(report_file, 'w', encoding='utf-8') as f:
|
|
json.dump(report_data, f, indent=2, ensure_ascii=False)
|
|
|
|
return str(report_file)
|
|
|
|
|
|
def _scan_channel_for_matches(
|
|
channel_url,
|
|
channel_name,
|
|
channel_id,
|
|
song_keys,
|
|
song_lookup,
|
|
fuzzy_match,
|
|
fuzzy_threshold,
|
|
show_pagination,
|
|
yt_dlp_path,
|
|
tracker,
|
|
):
|
|
"""
|
|
Scan a single channel for matches (used in parallel processing).
|
|
|
|
Args:
|
|
channel_url: URL of the channel to scan
|
|
channel_name: Name of the channel
|
|
channel_id: ID of the channel
|
|
song_keys: Set of song keys to match against
|
|
song_lookup: Dictionary mapping song keys to song data
|
|
fuzzy_match: Whether to use fuzzy matching
|
|
fuzzy_threshold: Threshold for fuzzy matching
|
|
show_pagination: Whether to show pagination progress
|
|
yt_dlp_path: Path to yt-dlp executable
|
|
tracker: Tracking manager instance
|
|
|
|
Returns:
|
|
List of video matches found in this channel
|
|
"""
|
|
print(f"\n🚦 Scanning channel: {channel_name} ({channel_url})")
|
|
|
|
# Get channel info if not provided
|
|
if not channel_name or not channel_id:
|
|
channel_name, channel_id = get_channel_info(channel_url)
|
|
|
|
# Fetch video list from channel
|
|
available_videos = tracker.get_channel_video_list(
|
|
channel_url, yt_dlp_path=str(yt_dlp_path), force_refresh=False, show_pagination=show_pagination
|
|
)
|
|
|
|
print(f" 📊 Channel has {len(available_videos)} videos to scan")
|
|
|
|
video_matches = []
|
|
|
|
# Pre-process video titles for efficient matching
|
|
channel_parser = ChannelParser()
|
|
if fuzzy_match:
|
|
# For fuzzy matching, create normalized video keys
|
|
for video in available_videos:
|
|
v_artist, v_title = channel_parser.extract_artist_title(video["title"], channel_name)
|
|
video_key = create_song_key(v_artist, v_title)
|
|
|
|
# Find best match among remaining songs
|
|
best_match = None
|
|
best_score = 0
|
|
for song_key in song_keys:
|
|
if song_key in song_lookup: # Only check unmatched songs
|
|
score = get_similarity_function()(song_key, video_key)
|
|
if score >= fuzzy_threshold and score > best_score:
|
|
best_score = score
|
|
best_match = song_key
|
|
|
|
if best_match:
|
|
song = song_lookup[best_match]
|
|
video_matches.append(
|
|
{
|
|
"artist": song["artist"],
|
|
"title": song["title"],
|
|
"channel_name": channel_name,
|
|
"channel_url": channel_url,
|
|
"video_id": video["id"],
|
|
"video_title": video["title"],
|
|
"match_score": best_score,
|
|
}
|
|
)
|
|
# Remove matched song from future consideration
|
|
del song_lookup[best_match]
|
|
song_keys.remove(best_match)
|
|
else:
|
|
# For exact matching, use direct key comparison
|
|
for video in available_videos:
|
|
v_artist, v_title = channel_parser.extract_artist_title(video["title"], channel_name)
|
|
video_key = create_song_key(v_artist, v_title)
|
|
|
|
if video_key in song_keys:
|
|
song = song_lookup[video_key]
|
|
video_matches.append(
|
|
{
|
|
"artist": song["artist"],
|
|
"title": song["title"],
|
|
"channel_name": channel_name,
|
|
"channel_url": channel_url,
|
|
"video_id": video["id"],
|
|
"video_title": video["title"],
|
|
"match_score": 100,
|
|
}
|
|
)
|
|
# Remove matched song from future consideration
|
|
del song_lookup[video_key]
|
|
song_keys.remove(video_key)
|
|
|
|
print(f" ✅ Found {len(video_matches)} matches in {channel_name}")
|
|
return video_matches
|
|
|
|
|
|
def build_download_plan(
|
|
channel_urls,
|
|
undownloaded,
|
|
tracker,
|
|
yt_dlp_path,
|
|
fuzzy_match=False,
|
|
fuzzy_threshold=DEFAULT_FUZZY_THRESHOLD,
|
|
show_pagination=False,
|
|
parallel_channels=False,
|
|
max_channel_workers=3,
|
|
):
|
|
"""
|
|
For each song in undownloaded, scan all channels for a match.
|
|
Use fuzzy matching if enabled.
|
|
Return (download_plan, unmatched_songs):
|
|
- download_plan: list of dicts {artist, title, channel_name, channel_url, video_id, video_title, match_score}
|
|
- unmatched_songs: list of songs not found in any channel
|
|
"""
|
|
plan = []
|
|
unmatched = []
|
|
channel_match_counts = {}
|
|
|
|
# Pre-process songlist for O(1) lookups
|
|
song_keys = set()
|
|
song_lookup = {}
|
|
for song in undownloaded:
|
|
key = create_song_key(song["artist"], song["title"])
|
|
song_keys.add(key)
|
|
song_lookup[key] = song
|
|
|
|
if parallel_channels:
|
|
print(f"🚀 Running parallel channel scanning with {max_channel_workers} workers.")
|
|
|
|
# Create a thread-safe copy of song data for parallel processing
|
|
import threading
|
|
song_keys_lock = threading.Lock()
|
|
song_lookup_lock = threading.Lock()
|
|
|
|
def scan_channel_safe(channel_url):
|
|
"""Thread-safe channel scanning function."""
|
|
print(f"\n🚦 Scanning channel: {channel_url}")
|
|
|
|
# Get channel info
|
|
channel_name, channel_id = get_channel_info(channel_url)
|
|
print(f" ✅ Channel info: {channel_name} (ID: {channel_id})")
|
|
|
|
# Fetch video list from channel
|
|
available_videos = tracker.get_channel_video_list(
|
|
channel_url, yt_dlp_path=str(yt_dlp_path), force_refresh=False, show_pagination=show_pagination
|
|
)
|
|
print(f" 📊 Channel has {len(available_videos)} videos to scan")
|
|
|
|
video_matches = []
|
|
|
|
# Pre-process video titles for efficient matching
|
|
channel_parser = ChannelParser()
|
|
if fuzzy_match:
|
|
# For fuzzy matching, create normalized video keys
|
|
for video in available_videos:
|
|
v_artist, v_title = channel_parser.extract_artist_title(video["title"], channel_name)
|
|
video_key = create_song_key(v_artist, v_title)
|
|
|
|
# Find best match among remaining songs (thread-safe)
|
|
best_match = None
|
|
best_score = 0
|
|
with song_keys_lock:
|
|
available_song_keys = list(song_keys) # Copy for iteration
|
|
|
|
for song_key in available_song_keys:
|
|
with song_lookup_lock:
|
|
if song_key in song_lookup: # Only check unmatched songs
|
|
score = get_similarity_function()(song_key, video_key)
|
|
if score >= fuzzy_threshold and score > best_score:
|
|
best_score = score
|
|
best_match = song_key
|
|
|
|
if best_match:
|
|
with song_lookup_lock:
|
|
if best_match in song_lookup: # Double-check it's still available
|
|
song = song_lookup[best_match]
|
|
video_matches.append(
|
|
{
|
|
"artist": song["artist"],
|
|
"title": song["title"],
|
|
"channel_name": channel_name,
|
|
"channel_url": channel_url,
|
|
"video_id": video["id"],
|
|
"video_title": video["title"],
|
|
"match_score": best_score,
|
|
}
|
|
)
|
|
# Remove matched song from future consideration
|
|
del song_lookup[best_match]
|
|
with song_keys_lock:
|
|
song_keys.discard(best_match)
|
|
else:
|
|
# For exact matching, use direct key comparison
|
|
for video in available_videos:
|
|
v_artist, v_title = channel_parser.extract_artist_title(video["title"], channel_name)
|
|
video_key = create_song_key(v_artist, v_title)
|
|
|
|
with song_lookup_lock:
|
|
if video_key in song_keys and video_key in song_lookup:
|
|
song = song_lookup[video_key]
|
|
video_matches.append(
|
|
{
|
|
"artist": song["artist"],
|
|
"title": song["title"],
|
|
"channel_name": channel_name,
|
|
"channel_url": channel_url,
|
|
"video_id": video["id"],
|
|
"video_title": video["title"],
|
|
"match_score": 100,
|
|
}
|
|
)
|
|
# Remove matched song from future consideration
|
|
del song_lookup[video_key]
|
|
with song_keys_lock:
|
|
song_keys.discard(video_key)
|
|
|
|
print(f" ✅ Found {len(video_matches)} matches in {channel_name}")
|
|
return video_matches
|
|
|
|
# Execute parallel channel scanning
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=max_channel_workers) as executor:
|
|
# Submit all channel scanning tasks
|
|
future_to_channel = {
|
|
executor.submit(scan_channel_safe, channel_url): channel_url
|
|
for channel_url in channel_urls
|
|
}
|
|
|
|
# Process results as they complete
|
|
for future in concurrent.futures.as_completed(future_to_channel):
|
|
channel_url = future_to_channel[future]
|
|
try:
|
|
video_matches = future.result()
|
|
plan.extend(video_matches)
|
|
channel_name, _ = get_channel_info(channel_url)
|
|
channel_match_counts[channel_name] = len(video_matches)
|
|
except Exception as e:
|
|
print(f"⚠️ Error processing channel {channel_url}: {e}")
|
|
channel_name, _ = get_channel_info(channel_url)
|
|
channel_match_counts[channel_name] = 0
|
|
else:
|
|
for i, channel_url in enumerate(channel_urls, 1):
|
|
print(f"\n🚦 Starting channel {i}/{len(channel_urls)}: {channel_url}")
|
|
print(f" 🔍 Getting channel info...")
|
|
channel_name, channel_id = get_channel_info(channel_url)
|
|
print(f" ✅ Channel info: {channel_name} (ID: {channel_id})")
|
|
print(f" 🔍 Fetching video list from channel...")
|
|
available_videos = tracker.get_channel_video_list(
|
|
channel_url, yt_dlp_path=str(yt_dlp_path), force_refresh=False, show_pagination=show_pagination
|
|
)
|
|
print(
|
|
f" 📊 Channel has {len(available_videos)} videos to scan against {len(undownloaded)} songlist songs"
|
|
)
|
|
matches_this_channel = 0
|
|
video_matches = [] # Initialize video_matches for this channel
|
|
|
|
# Pre-process video titles for efficient matching
|
|
channel_parser = ChannelParser()
|
|
if fuzzy_match:
|
|
# For fuzzy matching, create normalized video keys
|
|
for video in available_videos:
|
|
v_artist, v_title = channel_parser.extract_artist_title(video["title"], channel_name)
|
|
video_key = create_song_key(v_artist, v_title)
|
|
|
|
# Find best match among remaining songs
|
|
best_match = None
|
|
best_score = 0
|
|
for song_key in song_keys:
|
|
if song_key in song_lookup: # Only check unmatched songs
|
|
score = get_similarity_function()(song_key, video_key)
|
|
if score >= fuzzy_threshold and score > best_score:
|
|
best_score = score
|
|
best_match = song_key
|
|
|
|
if best_match:
|
|
song = song_lookup[best_match]
|
|
video_matches.append(
|
|
{
|
|
"artist": song["artist"],
|
|
"title": song["title"],
|
|
"channel_name": channel_name,
|
|
"channel_url": channel_url,
|
|
"video_id": video["id"],
|
|
"video_title": video["title"],
|
|
"match_score": best_score,
|
|
}
|
|
)
|
|
# Remove matched song from future consideration
|
|
del song_lookup[best_match]
|
|
song_keys.remove(best_match)
|
|
matches_this_channel += 1
|
|
else:
|
|
# For exact matching, use direct key comparison
|
|
for video in available_videos:
|
|
v_artist, v_title = channel_parser.extract_artist_title(video["title"], channel_name)
|
|
video_key = create_song_key(v_artist, v_title)
|
|
|
|
if video_key in song_keys:
|
|
song = song_lookup[video_key]
|
|
video_matches.append(
|
|
{
|
|
"artist": song["artist"],
|
|
"title": song["title"],
|
|
"channel_name": channel_name,
|
|
"channel_url": channel_url,
|
|
"video_id": video["id"],
|
|
"video_title": video["title"],
|
|
"match_score": 100,
|
|
}
|
|
)
|
|
# Remove matched song from future consideration
|
|
del song_lookup[video_key]
|
|
song_keys.remove(video_key)
|
|
matches_this_channel += 1
|
|
|
|
# Add matches to plan
|
|
plan.extend(video_matches)
|
|
|
|
# Print match count once per channel
|
|
channel_match_counts[channel_name] = matches_this_channel
|
|
print(f" → Found {matches_this_channel} songlist matches in this channel.")
|
|
|
|
# Remaining unmatched songs
|
|
unmatched = list(song_lookup.values())
|
|
|
|
# Print summary table
|
|
print("\n📊 Channel match summary:")
|
|
for channel, count in channel_match_counts.items():
|
|
print(f" {channel}: {count} matches")
|
|
print(
|
|
f" TOTAL: {sum(channel_match_counts.values())} matches across {len(channel_match_counts)} channels."
|
|
)
|
|
|
|
# Generate unmatched songs report if there are any
|
|
if unmatched:
|
|
try:
|
|
report_file = generate_unmatched_report(unmatched)
|
|
print(f"\n📋 Unmatched songs report saved to: {report_file}")
|
|
print(f"📋 Total unmatched songs: {len(unmatched)}")
|
|
except Exception as e:
|
|
print(f"⚠️ Could not generate unmatched songs report: {e}")
|
|
|
|
return plan, unmatched
|