Signed-off-by: mbrucedogs <mbrucedogs@gmail.com>

This commit is contained in:
mbrucedogs 2025-07-27 20:33:26 -05:00
parent ea07188739
commit f914d54067
6 changed files with 564 additions and 236696 deletions

122
build_cache_from_raw.py Normal file
View File

@ -0,0 +1,122 @@
#!/usr/bin/env python3
"""
Script to build channel cache from raw yt-dlp output file.
This uses the fixed parsing logic to handle titles with | characters.
"""
import json
import re
from datetime import datetime
from pathlib import Path
def parse_raw_output_file(raw_file_path):
"""Parse the raw output file and extract valid videos."""
videos = []
invalid_count = 0
print(f"🔍 Parsing raw output file: {raw_file_path}")
with open(raw_file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
# Skip header lines (lines starting with #)
data_lines = [line for line in lines if not line.strip().startswith('#') and line.strip()]
print(f"📄 Found {len(data_lines)} data lines to process")
for i, line in enumerate(data_lines):
if i % 1000 == 0 and i > 0: # Progress indicator every 1000 lines
print(f"📊 Processing line {i}/{len(data_lines)}... ({i/len(data_lines)*100:.1f}%)")
# Remove line number prefix (e.g., " 1234: ")
line = re.sub(r'^\s*\d+:\s*', '', line.strip())
# More robust parsing that handles titles with | characters
# Extract video ID directly from the URL that yt-dlp provides
# Find the URL and extract video ID from it
url_match = re.search(r'https://www\.youtube\.com/watch\?v=([a-zA-Z0-9_-]{11})', line)
if not url_match:
invalid_count += 1
if invalid_count <= 5:
print(f"⚠️ Skipping line with no URL: '{line[:100]}...'")
elif invalid_count == 6:
print(f"⚠️ ... and {len(data_lines) - i - 1} more invalid lines")
continue
# Extract video ID directly from the URL
video_id = url_match.group(1)
# Extract title (everything before the video ID in the line)
title = line[:line.find(video_id)].rstrip('|').strip()
# Validate video ID
if video_id and (
len(video_id) == 11 and
video_id.replace('-', '').replace('_', '').isalnum() and
" " not in video_id and
"Lyrics" not in video_id and
"KARAOKE" not in video_id.upper() and
"Vocal" not in video_id and
"Guide" not in video_id
):
videos.append({"title": title, "id": video_id})
else:
invalid_count += 1
if invalid_count <= 5: # Only show first 5 invalid IDs
print(f"⚠️ Skipping invalid video ID: '{video_id}' for title: '{title[:50]}...'")
elif invalid_count == 6:
print(f"⚠️ ... and {len(data_lines) - i - 1} more invalid IDs")
print(f"✅ Parsed {len(videos)} valid videos from raw output")
print(f"⚠️ Skipped {invalid_count} invalid video IDs")
return videos
def save_cache_file(channel_id, videos, cache_dir="data/channel_cache"):
"""Save the parsed videos to a cache file."""
cache_dir = Path(cache_dir)
cache_dir.mkdir(parents=True, exist_ok=True)
# Sanitize channel ID for filename
safe_channel_id = re.sub(r'[<>:"/\\|?*]', '_', channel_id)
cache_file = cache_dir / f"{safe_channel_id}.json"
data = {
'channel_id': channel_id,
'videos': videos,
'last_updated': datetime.now().isoformat(),
'video_count': len(videos)
}
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"💾 Saved cache to: {cache_file.name}")
return cache_file
def main():
"""Main function to build cache from raw output."""
raw_file_path = Path("data/channel_cache/@VocalStarKaraoke_raw_output.txt")
if not raw_file_path.exists():
print(f"❌ Raw output file not found: {raw_file_path}")
return
# Parse the raw output file
videos = parse_raw_output_file(raw_file_path)
if not videos:
print("❌ No valid videos found")
return
# Save to cache file
channel_id = "@VocalStarKaraoke"
cache_file = save_cache_file(channel_id, videos)
print(f"🎉 Cache build complete!")
print(f"📊 Total videos in cache: {len(videos)}")
print(f"📁 Cache file: {cache_file}")
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load Diff

View File

@ -110,6 +110,17 @@ Examples:
action="store_true",
help="Show page-by-page progress when downloading channel video lists (slower but more detailed)",
)
parser.add_argument(
"--parallel-channels",
action="store_true",
help="Enable parallel channel scanning for faster channel processing (scans multiple channels simultaneously)",
)
parser.add_argument(
"--channel-workers",
type=int,
default=3,
help="Number of parallel channel scanning workers (default: 3, max: 10)",
)
parser.add_argument(
"--songlist-only",
action="store_true",
@ -183,7 +194,7 @@ Examples:
parser.add_argument(
"--parallel",
action="store_true",
help="Enable parallel downloads for improved speed",
help="Enable parallel downloads for improved speed (3-5x faster for large batches)",
)
parser.add_argument(
"--workers",
@ -209,6 +220,11 @@ Examples:
print("❌ Error: --workers must be between 1 and 10")
sys.exit(1)
# Validate channel workers argument
if args.channel_workers < 1 or args.channel_workers > 10:
print("❌ Error: --channel-workers must be between 1 and 10")
sys.exit(1)
yt_dlp_path = Path("downloader/yt-dlp.exe")
if not yt_dlp_path.exists():
print("❌ Error: yt-dlp.exe not found in downloader/ directory")
@ -391,6 +407,8 @@ Examples:
fuzzy_threshold=args.fuzzy_threshold,
force_download=args.force,
show_pagination=args.show_pagination,
parallel_channels=args.parallel_channels,
max_channel_workers=args.channel_workers,
)
elif args.latest_per_channel:
# Use provided file or default to data/channels.txt

View File

@ -3,10 +3,13 @@ Download plan building utilities.
Handles pre-scanning channels and building download plans.
"""
import concurrent.futures
import hashlib
import json
import sys
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any
from typing import Any, Dict, List, Optional, Tuple
from karaoke_downloader.cache_manager import (
delete_plan_cache,
@ -16,12 +19,13 @@ from karaoke_downloader.cache_manager import (
)
from karaoke_downloader.fuzzy_matcher import (
create_song_key,
extract_artist_title,
create_video_key,
get_similarity_function,
is_exact_match,
is_fuzzy_match,
normalize_title,
)
from karaoke_downloader.id3_utils import extract_artist_title
from karaoke_downloader.youtube_utils import get_channel_info
# Constants
@ -29,6 +33,7 @@ DEFAULT_FILENAME_LENGTH_LIMIT = 100
DEFAULT_ARTIST_LENGTH_LIMIT = 30
DEFAULT_TITLE_LENGTH_LIMIT = 60
DEFAULT_FUZZY_THRESHOLD = 85
DEFAULT_DISPLAY_LIMIT = 10
def generate_unmatched_report(unmatched: List[Dict[str, Any]], report_path: str = None) -> str:
@ -74,6 +79,111 @@ def generate_unmatched_report(unmatched: List[Dict[str, Any]], report_path: str
return str(report_file)
def _scan_channel_for_matches(
channel_url,
channel_name,
channel_id,
song_keys,
song_lookup,
fuzzy_match,
fuzzy_threshold,
show_pagination,
yt_dlp_path,
tracker,
):
"""
Scan a single channel for matches (used in parallel processing).
Args:
channel_url: URL of the channel to scan
channel_name: Name of the channel
channel_id: ID of the channel
song_keys: Set of song keys to match against
song_lookup: Dictionary mapping song keys to song data
fuzzy_match: Whether to use fuzzy matching
fuzzy_threshold: Threshold for fuzzy matching
show_pagination: Whether to show pagination progress
yt_dlp_path: Path to yt-dlp executable
tracker: Tracking manager instance
Returns:
List of video matches found in this channel
"""
print(f"\n🚦 Scanning channel: {channel_name} ({channel_url})")
# Get channel info if not provided
if not channel_name or not channel_id:
channel_name, channel_id = get_channel_info(channel_url)
# Fetch video list from channel
available_videos = tracker.get_channel_video_list(
channel_url, yt_dlp_path=str(yt_dlp_path), force_refresh=False, show_pagination=show_pagination
)
print(f" 📊 Channel has {len(available_videos)} videos to scan")
video_matches = []
# Pre-process video titles for efficient matching
if fuzzy_match:
# For fuzzy matching, create normalized video keys
for video in available_videos:
v_artist, v_title = extract_artist_title(video["title"])
video_key = create_song_key(v_artist, v_title)
# Find best match among remaining songs
best_match = None
best_score = 0
for song_key in song_keys:
if song_key in song_lookup: # Only check unmatched songs
score = get_similarity_function()(song_key, video_key)
if score >= fuzzy_threshold and score > best_score:
best_score = score
best_match = song_key
if best_match:
song = song_lookup[best_match]
video_matches.append(
{
"artist": song["artist"],
"title": song["title"],
"channel_name": channel_name,
"channel_url": channel_url,
"video_id": video["id"],
"video_title": video["title"],
"match_score": best_score,
}
)
# Remove matched song from future consideration
del song_lookup[best_match]
song_keys.remove(best_match)
else:
# For exact matching, use direct key comparison
for video in available_videos:
v_artist, v_title = extract_artist_title(video["title"])
video_key = create_song_key(v_artist, v_title)
if video_key in song_keys:
song = song_lookup[video_key]
video_matches.append(
{
"artist": song["artist"],
"title": song["title"],
"channel_name": channel_name,
"channel_url": channel_url,
"video_id": video["id"],
"video_title": video["title"],
"match_score": 100,
}
)
# Remove matched song from future consideration
del song_lookup[video_key]
song_keys.remove(video_key)
print(f" ✅ Found {len(video_matches)} matches in {channel_name}")
return video_matches
def build_download_plan(
channel_urls,
undownloaded,
@ -82,6 +192,8 @@ def build_download_plan(
fuzzy_match=False,
fuzzy_threshold=DEFAULT_FUZZY_THRESHOLD,
show_pagination=False,
parallel_channels=False,
max_channel_workers=3,
):
"""
For each song in undownloaded, scan all channels for a match.
@ -102,6 +214,119 @@ def build_download_plan(
song_keys.add(key)
song_lookup[key] = song
if parallel_channels:
print(f"🚀 Running parallel channel scanning with {max_channel_workers} workers.")
# Create a thread-safe copy of song data for parallel processing
import threading
song_keys_lock = threading.Lock()
song_lookup_lock = threading.Lock()
def scan_channel_safe(channel_url):
"""Thread-safe channel scanning function."""
print(f"\n🚦 Scanning channel: {channel_url}")
# Get channel info
channel_name, channel_id = get_channel_info(channel_url)
print(f" ✅ Channel info: {channel_name} (ID: {channel_id})")
# Fetch video list from channel
available_videos = tracker.get_channel_video_list(
channel_url, yt_dlp_path=str(yt_dlp_path), force_refresh=False, show_pagination=show_pagination
)
print(f" 📊 Channel has {len(available_videos)} videos to scan")
video_matches = []
# Pre-process video titles for efficient matching
if fuzzy_match:
# For fuzzy matching, create normalized video keys
for video in available_videos:
v_artist, v_title = extract_artist_title(video["title"])
video_key = create_song_key(v_artist, v_title)
# Find best match among remaining songs (thread-safe)
best_match = None
best_score = 0
with song_keys_lock:
available_song_keys = list(song_keys) # Copy for iteration
for song_key in available_song_keys:
with song_lookup_lock:
if song_key in song_lookup: # Only check unmatched songs
score = get_similarity_function()(song_key, video_key)
if score >= fuzzy_threshold and score > best_score:
best_score = score
best_match = song_key
if best_match:
with song_lookup_lock:
if best_match in song_lookup: # Double-check it's still available
song = song_lookup[best_match]
video_matches.append(
{
"artist": song["artist"],
"title": song["title"],
"channel_name": channel_name,
"channel_url": channel_url,
"video_id": video["id"],
"video_title": video["title"],
"match_score": best_score,
}
)
# Remove matched song from future consideration
del song_lookup[best_match]
with song_keys_lock:
song_keys.discard(best_match)
else:
# For exact matching, use direct key comparison
for video in available_videos:
v_artist, v_title = extract_artist_title(video["title"])
video_key = create_song_key(v_artist, v_title)
with song_lookup_lock:
if video_key in song_keys and video_key in song_lookup:
song = song_lookup[video_key]
video_matches.append(
{
"artist": song["artist"],
"title": song["title"],
"channel_name": channel_name,
"channel_url": channel_url,
"video_id": video["id"],
"video_title": video["title"],
"match_score": 100,
}
)
# Remove matched song from future consideration
del song_lookup[video_key]
with song_keys_lock:
song_keys.discard(video_key)
print(f" ✅ Found {len(video_matches)} matches in {channel_name}")
return video_matches
# Execute parallel channel scanning
with concurrent.futures.ThreadPoolExecutor(max_workers=max_channel_workers) as executor:
# Submit all channel scanning tasks
future_to_channel = {
executor.submit(scan_channel_safe, channel_url): channel_url
for channel_url in channel_urls
}
# Process results as they complete
for future in concurrent.futures.as_completed(future_to_channel):
channel_url = future_to_channel[future]
try:
video_matches = future.result()
plan.extend(video_matches)
channel_name, _ = get_channel_info(channel_url)
channel_match_counts[channel_name] = len(video_matches)
except Exception as e:
print(f"⚠️ Error processing channel {channel_url}: {e}")
channel_name, _ = get_channel_info(channel_url)
channel_match_counts[channel_name] = 0
else:
for i, channel_url in enumerate(channel_urls, 1):
print(f"\n🚦 Starting channel {i}/{len(channel_urls)}: {channel_url}")
print(f" 🔍 Getting channel info...")

View File

@ -317,6 +317,8 @@ class KaraokeDownloader:
fuzzy_threshold=DEFAULT_FUZZY_THRESHOLD,
force_download=False,
show_pagination=False,
parallel_channels=False,
max_channel_workers=3,
):
"""
Download songs from the songlist across multiple channels.
@ -510,6 +512,8 @@ class KaraokeDownloader:
fuzzy_match=fuzzy_match,
fuzzy_threshold=fuzzy_threshold,
show_pagination=show_pagination,
parallel_channels=parallel_channels,
max_channel_workers=max_channel_workers,
)
save_plan_cache(cache_file, download_plan, unmatched)
print(f"💾 Download plan cached to: {cache_file}")

View File

@ -359,9 +359,18 @@ class TrackingManager:
corrupted = False
# Check if any video IDs look like titles instead of proper YouTube IDs
for video in cached_videos[:10]: # Check first 10 videos
for video in cached_videos[:20]: # Check first 20 videos
video_id = video.get("id", "")
if video_id and (len(video_id) > 20 or " " in video_id or "Lyrics" in video_id):
# More comprehensive validation - YouTube IDs should be 11 characters and contain only alphanumeric, hyphens, and underscores
if video_id and (
len(video_id) != 11 or
not video_id.replace('-', '').replace('_', '').isalnum() or
" " in video_id or
"Lyrics" in video_id or
"KARAOKE" in video_id.upper() or
"Vocal" in video_id or
"Guide" in video_id
):
print(f" ⚠️ Detected corrupted video ID in cache: '{video_id}'")
corrupted = True
break
@ -389,7 +398,7 @@ class TrackingManager:
all_videos = []
page = 1
videos_per_page = 50 # YouTube typically shows 50 videos per page
videos_per_page = 200 # YouTube/yt-dlp supports up to 200 videos per page, reducing API calls and errors
while True:
print(f" 📄 Fetching page {page}...")
@ -408,9 +417,26 @@ class TrackingManager:
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True, timeout=60)
# Increased timeout to 180 seconds for larger pages (200 videos)
result = subprocess.run(cmd, capture_output=True, text=True, check=True, timeout=180)
lines = result.stdout.strip().splitlines()
# Save raw output for debugging (for each page)
raw_output_file = self._get_channel_cache_file(channel_id).parent / f"{channel_id}_raw_output_page{page}.txt"
try:
with open(raw_output_file, 'w', encoding='utf-8') as f:
f.write(f"# Raw yt-dlp output for {channel_id} - Page {page}\n")
f.write(f"# Channel URL: {channel_url}\n")
f.write(f"# Command: {' '.join(cmd)}\n")
f.write(f"# Timestamp: {datetime.now().isoformat()}\n")
f.write(f"# Total lines: {len(lines)}\n")
f.write("#" * 80 + "\n\n")
for i, line in enumerate(lines, 1):
f.write(f"{i:6d}: {line}\n")
print(f" 💾 Saved raw output to: {raw_output_file.name}")
except Exception as e:
print(f" ⚠️ Could not save raw output: {e}")
if not lines:
print(f" ✅ No more videos found on page {page}")
break
@ -421,17 +447,38 @@ class TrackingManager:
invalid_count = 0
for line in lines:
parts = line.split("|")
if len(parts) >= 2:
title, video_id = parts[0].strip(), parts[1].strip()
if not line.strip():
continue
# Validate that video_id looks like a proper YouTube ID
if video_id and (len(video_id) <= 20 and " " not in video_id and "Lyrics" not in video_id):
# More robust parsing that handles titles with | characters
# Extract video ID directly from the URL that yt-dlp provides
# Find the URL and extract video ID from it
url_match = re.search(r'https://www\.youtube\.com/watch\?v=([a-zA-Z0-9_-]{11})', line)
if not url_match:
continue
# Extract video ID directly from the URL
video_id = url_match.group(1)
# Extract title (everything before the video ID in the line)
title = line[:line.find(video_id)].rstrip('|').strip()
# Validate video ID
if video_id and (
len(video_id) == 11 and
video_id.replace('-', '').replace('_', '').isalnum() and
" " not in video_id and
"Lyrics" not in video_id and
"KARAOKE" not in video_id.upper() and
"Vocal" not in video_id and
"Guide" not in video_id
):
page_videos.append({"title": title, "id": video_id})
else:
invalid_count += 1
if invalid_count <= 3: # Show first 3 invalid IDs per page
print(f" ⚠️ Invalid ID: '{video_id}' for '{title}'")
print(f" ⚠️ Invalid ID: '{video_id}' for '{title[:50]}...'")
if invalid_count > 3:
print(f" ⚠️ ... and {invalid_count - 3} more invalid IDs on this page")
@ -447,8 +494,8 @@ class TrackingManager:
page += 1
# Safety check to prevent infinite loops
if page > 200: # Max 200 pages (10,000 videos)
print(f" ⚠️ Reached maximum page limit (200 pages), stopping")
if page > 50: # Max 50 pages (10,000 videos with 200 per page)
print(f" ⚠️ Reached maximum page limit (50 pages), stopping")
break
except subprocess.TimeoutExpired:
@ -457,6 +504,9 @@ class TrackingManager:
except subprocess.CalledProcessError as e:
print(f" ❌ Error fetching page {page}: {e}")
break
except KeyboardInterrupt:
print(f" ⏹️ User interrupted, stopping at page {page}")
break
if not all_videos:
print(f" ❌ No valid videos found")
@ -520,6 +570,22 @@ class TrackingManager:
result = subprocess.run(cmd, capture_output=True, text=True, check=True, timeout=300)
lines = result.stdout.strip().splitlines()
# Save raw output for debugging
raw_output_file = self._get_channel_cache_file(channel_id).parent / f"{channel_id}_raw_output.txt"
try:
with open(raw_output_file, 'w', encoding='utf-8') as f:
f.write(f"# Raw yt-dlp output for {channel_id}\n")
f.write(f"# Channel URL: {channel_url}\n")
f.write(f"# Command: {' '.join(cmd)}\n")
f.write(f"# Timestamp: {datetime.now().isoformat()}\n")
f.write(f"# Total lines: {len(lines)}\n")
f.write("#" * 80 + "\n\n")
for i, line in enumerate(lines, 1):
f.write(f"{i:6d}: {line}\n")
print(f" 💾 Saved raw output to: {raw_output_file.name}")
except Exception as e:
print(f" ⚠️ Could not save raw output: {e}")
print(f" 📄 Raw output lines: {len(lines)}")
print(f" 📊 Download completed successfully!")
@ -540,17 +606,40 @@ class TrackingManager:
if i % 1000 == 0 and i > 0: # Progress indicator every 1000 lines
print(f" 📊 Processing line {i}/{len(lines)}... ({i/len(lines)*100:.1f}%)")
parts = line.split("|")
if len(parts) >= 2:
title, video_id = parts[0].strip(), parts[1].strip()
# More robust parsing that handles titles with | characters
# Extract video ID directly from the URL that yt-dlp provides
# Validate that video_id looks like a proper YouTube ID
if video_id and (len(video_id) <= 20 and " " not in video_id and "Lyrics" not in video_id):
# Find the URL and extract video ID from it
url_match = re.search(r'https://www\.youtube\.com/watch\?v=([a-zA-Z0-9_-]{11})', line)
if not url_match:
invalid_count += 1
if invalid_count <= 5:
print(f" ⚠️ Skipping line with no URL: '{line[:100]}...'")
elif invalid_count == 6:
print(f" ⚠️ ... and {len(lines) - i - 1} more invalid lines")
continue
# Extract video ID directly from the URL
video_id = url_match.group(1)
# Extract title (everything before the video ID in the line)
title = line[:line.find(video_id)].rstrip('|').strip()
# Validate video ID
if video_id and (
len(video_id) == 11 and
video_id.replace('-', '').replace('_', '').isalnum() and
" " not in video_id and
"Lyrics" not in video_id and
"KARAOKE" not in video_id.upper() and
"Vocal" not in video_id and
"Guide" not in video_id
):
videos.append({"title": title, "id": video_id})
else:
invalid_count += 1
if invalid_count <= 5: # Only show first 5 invalid IDs
print(f" ⚠️ Skipping invalid video ID: '{video_id}' for title: '{title}'")
print(f" ⚠️ Skipping invalid video ID: '{video_id}' for title: '{title[:50]}...'")
elif invalid_count == 6:
print(f" ⚠️ ... and {len(lines) - i - 1} more invalid IDs")