Signed-off-by: mbrucedogs <mbrucedogs@gmail.com>

This commit is contained in:
mbrucedogs 2025-07-27 19:47:05 -05:00
parent 2c63bf809b
commit ea07188739
5 changed files with 356 additions and 122 deletions

View File

@ -105,6 +105,11 @@ Examples:
action="store_true", action="store_true",
help="Generate a report of songs that couldn't be found in any channel (runs after downloads)", help="Generate a report of songs that couldn't be found in any channel (runs after downloads)",
) )
parser.add_argument(
"--show-pagination",
action="store_true",
help="Show page-by-page progress when downloading channel video lists (slower but more detailed)",
)
parser.add_argument( parser.add_argument(
"--songlist-only", "--songlist-only",
action="store_true", action="store_true",
@ -237,6 +242,7 @@ Examples:
if args.songlist_focus: if args.songlist_focus:
downloader.songlist_focus_titles = args.songlist_focus downloader.songlist_focus_titles = args.songlist_focus
downloader.songlist_only = True # Enable songlist-only mode when focusing downloader.songlist_only = True # Enable songlist-only mode when focusing
args.songlist_only = True # Also set the args flag to ensure CLI logic works
print( print(
f"🎯 Songlist focus mode enabled for playlists: {', '.join(args.songlist_focus)}" f"🎯 Songlist focus mode enabled for playlists: {', '.join(args.songlist_focus)}"
) )
@ -260,12 +266,12 @@ Examples:
# --- END NEW --- # --- END NEW ---
# --- NEW: If no URL or file is provided, but --songlist-only is set, use all channels in data/channels.txt --- # --- NEW: If no URL or file is provided, but --songlist-only is set, use all channels in data/channels.txt ---
if args.songlist_only and not args.url and not args.file: if (args.songlist_only or args.songlist_focus) and not args.url and not args.file:
channels_file = Path("data/channels.txt") channels_file = Path("data/channels.txt")
if channels_file.exists(): if channels_file.exists():
args.file = str(channels_file) args.file = str(channels_file)
print( print(
"📋 No URL or --file provided, defaulting to all channels in data/channels.txt for songlist-only mode." "📋 No URL or --file provided, defaulting to all channels in data/channels.txt for songlist mode."
) )
else: else:
print( print(
@ -377,22 +383,14 @@ Examples:
if line.strip() and not line.strip().startswith("#") if line.strip() and not line.strip().startswith("#")
] ]
limit = args.limit if args.limit else None limit = args.limit if args.limit else None
force_refresh_download_plan = (
args.force_download_plan if hasattr(args, "force_download_plan") else False
)
fuzzy_match = args.fuzzy_match if hasattr(args, "fuzzy_match") else False
fuzzy_threshold = (
args.fuzzy_threshold
if hasattr(args, "fuzzy_threshold")
else DEFAULT_FUZZY_THRESHOLD
)
success = downloader.download_songlist_across_channels( success = downloader.download_songlist_across_channels(
channel_urls, channel_urls,
limit=limit, limit=args.limit,
force_refresh_download_plan=force_refresh_download_plan, force_refresh_download_plan=args.force_download_plan if hasattr(args, "force_download_plan") else False,
fuzzy_match=fuzzy_match, fuzzy_match=args.fuzzy_match,
fuzzy_threshold=fuzzy_threshold, fuzzy_threshold=args.fuzzy_threshold,
force_download=args.force, force_download=args.force,
show_pagination=args.show_pagination,
) )
elif args.latest_per_channel: elif args.latest_per_channel:
# Use provided file or default to data/channels.txt # Use provided file or default to data/channels.txt

View File

@ -81,6 +81,7 @@ def build_download_plan(
yt_dlp_path, yt_dlp_path,
fuzzy_match=False, fuzzy_match=False,
fuzzy_threshold=DEFAULT_FUZZY_THRESHOLD, fuzzy_threshold=DEFAULT_FUZZY_THRESHOLD,
show_pagination=False,
): ):
""" """
For each song in undownloaded, scan all channels for a match. For each song in undownloaded, scan all channels for a match.
@ -108,7 +109,7 @@ def build_download_plan(
print(f" ✅ Channel info: {channel_name} (ID: {channel_id})") print(f" ✅ Channel info: {channel_name} (ID: {channel_id})")
print(f" 🔍 Fetching video list from channel...") print(f" 🔍 Fetching video list from channel...")
available_videos = tracker.get_channel_video_list( available_videos = tracker.get_channel_video_list(
channel_url, yt_dlp_path=str(yt_dlp_path), force_refresh=False channel_url, yt_dlp_path=str(yt_dlp_path), force_refresh=False, show_pagination=show_pagination
) )
print( print(
f" 📊 Channel has {len(available_videos)} videos to scan against {len(undownloaded)} songlist songs" f" 📊 Channel has {len(available_videos)} videos to scan against {len(undownloaded)} songlist songs"

View File

@ -88,9 +88,9 @@ class KaraokeDownloader:
# Initialize tracking # Initialize tracking
tracking_file = DATA_DIR / "karaoke_tracking.json" tracking_file = DATA_DIR / "karaoke_tracking.json"
cache_file = DATA_DIR / "channel_cache.json" cache_dir = DATA_DIR / "channel_cache"
self.tracker = TrackingManager( self.tracker = TrackingManager(
tracking_file=tracking_file, cache_file=cache_file tracking_file=tracking_file, cache_dir=cache_dir
) )
# Initialize song validator # Initialize song validator
@ -316,6 +316,7 @@ class KaraokeDownloader:
fuzzy_match=False, fuzzy_match=False,
fuzzy_threshold=DEFAULT_FUZZY_THRESHOLD, fuzzy_threshold=DEFAULT_FUZZY_THRESHOLD,
force_download=False, force_download=False,
show_pagination=False,
): ):
""" """
Download songs from the songlist across multiple channels. Download songs from the songlist across multiple channels.
@ -508,6 +509,7 @@ class KaraokeDownloader:
self.yt_dlp_path, self.yt_dlp_path,
fuzzy_match=fuzzy_match, fuzzy_match=fuzzy_match,
fuzzy_threshold=fuzzy_threshold, fuzzy_threshold=fuzzy_threshold,
show_pagination=show_pagination,
) )
save_plan_cache(cache_file, download_plan, unmatched) save_plan_cache(cache_file, download_plan, unmatched)
print(f"💾 Download plan cached to: {cache_file}") print(f"💾 Download plan cached to: {cache_file}")

View File

@ -1,10 +1,10 @@
import threading
from enum import Enum
import json import json
from datetime import datetime import os
import re
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
class SongStatus(str, Enum): class SongStatus(str, Enum):
NOT_DOWNLOADED = "NOT_DOWNLOADED" NOT_DOWNLOADED = "NOT_DOWNLOADED"
@ -26,45 +26,127 @@ class TrackingManager:
def __init__( def __init__(
self, self,
tracking_file="data/karaoke_tracking.json", tracking_file="data/karaoke_tracking.json",
cache_file="data/channel_cache.json", cache_dir="data/channel_cache",
): ):
self.tracking_file = Path(tracking_file) self.tracking_file = Path(tracking_file)
self.cache_file = Path(cache_file) self.cache_dir = Path(cache_dir)
self.data = {"playlists": {}, "songs": {}}
self.cache = {} # Ensure cache directory exists
self._lock = threading.Lock() self.cache_dir.mkdir(parents=True, exist_ok=True)
self._load()
self._load_cache() self.data = self._load()
print(f"📊 Tracking manager initialized with {len(self.data.get('songs', {}))} tracked songs")
def _load(self): def _load(self):
"""Load tracking data from JSON file."""
if self.tracking_file.exists(): if self.tracking_file.exists():
try: try:
with open(self.tracking_file, "r", encoding="utf-8") as f: with open(self.tracking_file, "r", encoding="utf-8") as f:
self.data = json.load(f) return json.load(f)
except Exception: except json.JSONDecodeError:
self.data = {"playlists": {}, "songs": {}} print(f"⚠️ Corrupted tracking file, creating new one")
return {"songs": {}, "playlists": {}, "last_updated": datetime.now().isoformat()}
def _save(self): def _save(self):
with self._lock: """Save tracking data to JSON file."""
self.data["last_updated"] = datetime.now().isoformat()
self.tracking_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.tracking_file, "w", encoding="utf-8") as f: with open(self.tracking_file, "w", encoding="utf-8") as f:
json.dump(self.data, f, indent=2, ensure_ascii=False) json.dump(self.data, f, indent=2, ensure_ascii=False)
def force_save(self): def force_save(self):
"""Force save the tracking data."""
self._save() self._save()
def _load_cache(self): def _get_channel_cache_file(self, channel_id: str) -> Path:
if self.cache_file.exists(): """Get the cache file path for a specific channel."""
try: # Sanitize channel ID for filename
with open(self.cache_file, "r", encoding="utf-8") as f: safe_channel_id = re.sub(r'[<>:"/\\|?*]', '_', channel_id)
self.cache = json.load(f) return self.cache_dir / f"{safe_channel_id}.json"
except Exception:
self.cache = {}
def save_cache(self): def _load_channel_cache(self, channel_id: str) -> List[Dict[str, str]]:
with open(self.cache_file, "w", encoding="utf-8") as f: """Load cache for a specific channel."""
json.dump(self.cache, f, indent=2, ensure_ascii=False) cache_file = self._get_channel_cache_file(channel_id)
if cache_file.exists():
try:
with open(cache_file, 'r', encoding='utf-8') as f:
data = json.load(f)
return data.get('videos', [])
except (json.JSONDecodeError, KeyError):
print(f" ⚠️ Corrupted cache file for {channel_id}, will recreate")
return []
return []
def _save_channel_cache(self, channel_id: str, videos: List[Dict[str, str]]):
"""Save cache for a specific channel."""
cache_file = self._get_channel_cache_file(channel_id)
data = {
'channel_id': channel_id,
'videos': videos,
'last_updated': datetime.now().isoformat(),
'video_count': len(videos)
}
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def _clear_channel_cache(self, channel_id: str):
"""Clear cache for a specific channel."""
cache_file = self._get_channel_cache_file(channel_id)
if cache_file.exists():
cache_file.unlink()
print(f" 🗑️ Cleared cache file: {cache_file.name}")
def get_cache_info(self):
"""Get information about all channel cache files."""
cache_files = list(self.cache_dir.glob("*.json"))
total_videos = 0
cache_info = []
for cache_file in cache_files:
try:
with open(cache_file, 'r', encoding='utf-8') as f:
data = json.load(f)
video_count = len(data.get('videos', []))
total_videos += video_count
last_updated = data.get('last_updated', 'Unknown')
cache_info.append({
'channel': data.get('channel_id', cache_file.stem),
'videos': video_count,
'last_updated': last_updated,
'file': cache_file.name
})
except Exception as e:
print(f"⚠️ Error reading cache file {cache_file.name}: {e}")
return {
'total_channels': len(cache_files),
'total_videos': total_videos,
'channels': cache_info
}
def clear_channel_cache(self, channel_id=None):
"""Clear cache for a specific channel or all channels."""
if channel_id:
self._clear_channel_cache(channel_id)
print(f"🗑️ Cleared cache for channel: {channel_id}")
else:
# Clear all cache files
cache_files = list(self.cache_dir.glob("*.json"))
for cache_file in cache_files:
cache_file.unlink()
print(f"🗑️ Cleared all {len(cache_files)} channel cache files")
def set_cache_duration(self, hours):
"""Placeholder for cache duration logic"""
pass
def export_playlist_report(self, playlist_id):
"""Export a report for a specific playlist."""
pass
def get_statistics(self): def get_statistics(self):
"""Get statistics about tracked songs."""
total_songs = len(self.data["songs"]) total_songs = len(self.data["songs"])
downloaded_songs = sum( downloaded_songs = sum(
1 1
@ -102,11 +184,13 @@ class TrackingManager:
} }
def get_playlist_songs(self, playlist_id): def get_playlist_songs(self, playlist_id):
"""Get songs for a specific playlist."""
return [ return [
s for s in self.data["songs"].values() if s["playlist_id"] == playlist_id s for s in self.data["songs"].values() if s["playlist_id"] == playlist_id
] ]
def get_failed_songs(self, playlist_id=None): def get_failed_songs(self, playlist_id=None):
"""Get failed songs, optionally filtered by playlist."""
if playlist_id: if playlist_id:
return [ return [
s s
@ -118,6 +202,7 @@ class TrackingManager:
] ]
def get_partial_downloads(self, playlist_id=None): def get_partial_downloads(self, playlist_id=None):
"""Get partial downloads, optionally filtered by playlist."""
if playlist_id: if playlist_id:
return [ return [
s s
@ -129,7 +214,7 @@ class TrackingManager:
] ]
def cleanup_orphaned_files(self, downloads_dir): def cleanup_orphaned_files(self, downloads_dir):
# Remove tracking entries for files that no longer exist """Remove tracking entries for files that no longer exist."""
orphaned = [] orphaned = []
for song_id, song in list(self.data["songs"].items()): for song_id, song in list(self.data["songs"].items()):
file_path = song.get("file_path") file_path = song.get("file_path")
@ -139,51 +224,17 @@ class TrackingManager:
self.force_save() self.force_save()
return orphaned return orphaned
def get_cache_info(self):
total_channels = len(self.cache)
total_cached_videos = sum(len(v) for v in self.cache.values())
cache_duration_hours = 24 # default
last_updated = None
return {
"total_channels": total_channels,
"total_cached_videos": total_cached_videos,
"cache_duration_hours": cache_duration_hours,
"last_updated": last_updated,
}
def clear_channel_cache(self, channel_id=None):
if channel_id is None or channel_id == "all":
self.cache = {}
else:
self.cache.pop(channel_id, None)
self.save_cache()
def set_cache_duration(self, hours):
# Placeholder for cache duration logic
pass
def export_playlist_report(self, playlist_id):
playlist = self.data["playlists"].get(playlist_id)
if not playlist:
return f"Playlist '{playlist_id}' not found."
songs = self.get_playlist_songs(playlist_id)
report = {"playlist": playlist, "songs": songs}
return json.dumps(report, indent=2, ensure_ascii=False)
def is_song_downloaded(self, artist, title, channel_name=None, video_id=None): def is_song_downloaded(self, artist, title, channel_name=None, video_id=None):
""" """
Check if a song has already been downloaded by this system. Check if a song has already been downloaded.
Returns True if the song exists in tracking with DOWNLOADED or CONVERTED status. Returns True if the song exists in tracking with DOWNLOADED status.
""" """
# If we have video_id and channel_name, try direct key lookup first (most efficient) # If we have video_id and channel_name, try direct key lookup first (most efficient)
if video_id and channel_name: if video_id and channel_name:
song_key = f"{video_id}@{channel_name}" song_key = f"{video_id}@{channel_name}"
if song_key in self.data["songs"]: if song_key in self.data["songs"]:
song_data = self.data["songs"][song_key] song_data = self.data["songs"][song_key]
if song_data.get("status") in [ if song_data.get("status") == SongStatus.DOWNLOADED:
SongStatus.DOWNLOADED,
SongStatus.CONVERTED,
]:
return True return True
# Fallback to content search (for cases where we don't have video_id) # Fallback to content search (for cases where we don't have video_id)
@ -191,19 +242,14 @@ class TrackingManager:
# Check if this song matches the artist and title # Check if this song matches the artist and title
if song_data.get("artist") == artist and song_data.get("title") == title: if song_data.get("artist") == artist and song_data.get("title") == title:
# Check if it's marked as downloaded # Check if it's marked as downloaded
if song_data.get("status") in [ if song_data.get("status") == SongStatus.DOWNLOADED:
SongStatus.DOWNLOADED,
SongStatus.CONVERTED,
]:
return True return True
# Also check the video title field which might contain the song info # Also check the video title field which might contain the song info
video_title = song_data.get("video_title", "") video_title = song_data.get("video_title", "")
if video_title and artist in video_title and title in video_title: if video_title and artist in video_title and title in video_title:
if song_data.get("status") in [ if song_data.get("status") == SongStatus.DOWNLOADED:
SongStatus.DOWNLOADED,
SongStatus.CONVERTED,
]:
return True return True
return False return False
def is_file_exists(self, file_path): def is_file_exists(self, file_path):
@ -283,65 +329,248 @@ class TrackingManager:
self._save() self._save()
def get_channel_video_list( def get_channel_video_list(
self, channel_url, yt_dlp_path="downloader/yt-dlp.exe", force_refresh=False self, channel_url, yt_dlp_path="downloader/yt-dlp.exe", force_refresh=False, show_pagination=False
): ):
""" """
Return a list of videos (dicts with 'title' and 'id') for the channel, using cache if available unless force_refresh is True. Return a list of videos (dicts with 'title' and 'id') for the channel, using cache if available unless force_refresh is True.
Args:
channel_url: YouTube channel URL
yt_dlp_path: Path to yt-dlp executable
force_refresh: Force refresh cache even if available
show_pagination: Show page-by-page progress (slower but more detailed)
""" """
channel_name, channel_id = None, None channel_name, channel_id = None, None
from karaoke_downloader.youtube_utils import get_channel_info from karaoke_downloader.youtube_utils import get_channel_info
channel_name, channel_id = get_channel_info(channel_url) channel_name, channel_id = get_channel_info(channel_url)
# Try multiple possible cache keys if not channel_id:
possible_keys = [ print(f" ❌ Could not extract channel ID from URL: {channel_url}")
channel_id, # The extracted channel ID return []
channel_url, # The full URL
channel_name, # The extracted channel name
]
cache_key = None print(f" 🔍 Channel: {channel_name} (ID: {channel_id})")
for key in possible_keys:
if key and key in self.cache: # Check if we have cached data for this channel
cache_key = key if not force_refresh:
cached_videos = self._load_channel_cache(channel_id)
if cached_videos:
# Validate that the cached data has proper video IDs
corrupted = False
# Check if any video IDs look like titles instead of proper YouTube IDs
for video in cached_videos[:10]: # Check first 10 videos
video_id = video.get("id", "")
if video_id and (len(video_id) > 20 or " " in video_id or "Lyrics" in video_id):
print(f" ⚠️ Detected corrupted video ID in cache: '{video_id}'")
corrupted = True
break break
if not cache_key: if corrupted:
cache_key = channel_id or channel_url # Use as fallback for new entries print(f" 🧹 Clearing corrupted cache for {channel_id}")
self._clear_channel_cache(channel_id)
print(f" 🔍 Trying cache keys: {possible_keys}") force_refresh = True
print(f" 🔍 Selected cache key: '{cache_key}'")
if not force_refresh and cache_key in self.cache:
print(
f" 📋 Using cached video list ({len(self.cache[cache_key])} videos)"
)
return self.cache[cache_key]
else: else:
print(f" ❌ Cache miss for all keys") print(f" 📋 Using cached video list ({len(cached_videos)} videos)")
# Fetch with yt-dlp return cached_videos
print(f" 🌐 Fetching video list from YouTube (this may take a while)...")
# Choose fetch method based on show_pagination flag
if show_pagination:
return self._fetch_videos_with_pagination(channel_url, channel_id, yt_dlp_path)
else:
return self._fetch_videos_flat_playlist(channel_url, channel_id, yt_dlp_path)
def _fetch_videos_with_pagination(self, channel_url, channel_id, yt_dlp_path):
"""Fetch videos showing page-by-page progress."""
print(f" 🌐 Fetching video list from YouTube (page-by-page mode)...")
print(f" 📡 Channel URL: {channel_url}")
import subprocess import subprocess
all_videos = []
page = 1
videos_per_page = 50 # YouTube typically shows 50 videos per page
while True:
print(f" 📄 Fetching page {page}...")
# Fetch one page at a time
cmd = [ cmd = [
yt_dlp_path, yt_dlp_path,
"--flat-playlist", "--flat-playlist",
"--print", "--print",
"%(title)s|%(id)s|%(url)s", "%(title)s|%(id)s|%(url)s",
"--playlist-start",
str((page - 1) * videos_per_page + 1),
"--playlist-end",
str(page * videos_per_page),
channel_url, channel_url,
] ]
try: try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True) result = subprocess.run(cmd, capture_output=True, text=True, check=True, timeout=60)
lines = result.stdout.strip().splitlines() lines = result.stdout.strip().splitlines()
videos = []
if not lines:
print(f" ✅ No more videos found on page {page}")
break
print(f" 📊 Page {page}: Found {len(lines)} videos")
page_videos = []
invalid_count = 0
for line in lines: for line in lines:
parts = line.split("|") parts = line.split("|")
if len(parts) >= 2: if len(parts) >= 2:
title, video_id = parts[0].strip(), parts[1].strip() title, video_id = parts[0].strip(), parts[1].strip()
# Validate that video_id looks like a proper YouTube ID
if video_id and (len(video_id) <= 20 and " " not in video_id and "Lyrics" not in video_id):
page_videos.append({"title": title, "id": video_id})
else:
invalid_count += 1
if invalid_count <= 3: # Show first 3 invalid IDs per page
print(f" ⚠️ Invalid ID: '{video_id}' for '{title}'")
if invalid_count > 3:
print(f" ⚠️ ... and {invalid_count - 3} more invalid IDs on this page")
all_videos.extend(page_videos)
print(f" ✅ Page {page}: Added {len(page_videos)} valid videos (total: {len(all_videos)})")
# If we got fewer videos than expected, we're probably at the end
if len(lines) < videos_per_page:
print(f" 🏁 Reached end of channel (last page had {len(lines)} videos)")
break
page += 1
# Safety check to prevent infinite loops
if page > 200: # Max 200 pages (10,000 videos)
print(f" ⚠️ Reached maximum page limit (200 pages), stopping")
break
except subprocess.TimeoutExpired:
print(f" ⚠️ Page {page} timed out, stopping")
break
except subprocess.CalledProcessError as e:
print(f" ❌ Error fetching page {page}: {e}")
break
if not all_videos:
print(f" ❌ No valid videos found")
return []
print(f" 🎉 Channel download complete!")
print(f" 📊 Total videos fetched: {len(all_videos)}")
# Save to individual channel cache file
self._save_channel_cache(channel_id, all_videos)
print(f" 💾 Saved cache to: {self._get_channel_cache_file(channel_id).name}")
return all_videos
def _fetch_videos_flat_playlist(self, channel_url, channel_id, yt_dlp_path):
"""Fetch all videos using flat playlist (faster but less detailed progress)."""
# Fetch with yt-dlp
print(f" 🌐 Fetching video list from YouTube (this may take a while)...")
print(f" 📡 Channel URL: {channel_url}")
import subprocess
# First, let's get the total count to show progress
count_cmd = [
yt_dlp_path,
"--flat-playlist",
"--print",
"%(title)s",
"--playlist-end",
"1", # Just get first video to test
channel_url,
]
try:
print(f" 🔍 Testing channel access...")
test_result = subprocess.run(count_cmd, capture_output=True, text=True, timeout=30)
if test_result.returncode == 0:
print(f" ✅ Channel is accessible")
else:
print(f" ⚠️ Channel test failed: {test_result.stderr}")
except subprocess.TimeoutExpired:
print(f" ⚠️ Channel test timed out")
except Exception as e:
print(f" ⚠️ Channel test error: {e}")
# Now fetch all videos with progress indicators
cmd = [
yt_dlp_path,
"--flat-playlist",
"--print",
"%(title)s|%(id)s|%(url)s",
"--verbose", # Add verbose output to see what's happening
channel_url,
]
try:
print(f" 🔧 Running yt-dlp command: {' '.join(cmd)}")
print(f" 📥 Starting video list download...")
# Use a timeout and show progress
result = subprocess.run(cmd, capture_output=True, text=True, check=True, timeout=300)
lines = result.stdout.strip().splitlines()
print(f" 📄 Raw output lines: {len(lines)}")
print(f" 📊 Download completed successfully!")
# Show some sample lines to understand the format
if lines:
print(f" 📋 Sample output format:")
for i, line in enumerate(lines[:3]):
print(f" Line {i+1}: {line[:100]}...")
if len(lines) > 3:
print(f" ... and {len(lines) - 3} more lines")
videos = []
invalid_count = 0
print(f" 🔍 Processing {len(lines)} video entries...")
for i, line in enumerate(lines):
if i % 1000 == 0 and i > 0: # Progress indicator every 1000 lines
print(f" 📊 Processing line {i}/{len(lines)}... ({i/len(lines)*100:.1f}%)")
parts = line.split("|")
if len(parts) >= 2:
title, video_id = parts[0].strip(), parts[1].strip()
# Validate that video_id looks like a proper YouTube ID
if video_id and (len(video_id) <= 20 and " " not in video_id and "Lyrics" not in video_id):
videos.append({"title": title, "id": video_id}) videos.append({"title": title, "id": video_id})
self.cache[cache_key] = videos else:
self.save_cache() invalid_count += 1
if invalid_count <= 5: # Only show first 5 invalid IDs
print(f" ⚠️ Skipping invalid video ID: '{video_id}' for title: '{title}'")
elif invalid_count == 6:
print(f" ⚠️ ... and {len(lines) - i - 1} more invalid IDs")
if not videos:
print(f" ❌ No valid videos found after parsing")
return []
print(f" ✅ Parsed {len(videos)} valid videos from YouTube")
print(f" ⚠️ Skipped {invalid_count} invalid video IDs")
# Save to individual channel cache file
self._save_channel_cache(channel_id, videos)
print(f" 💾 Saved cache to: {self._get_channel_cache_file(channel_id).name}")
return videos return videos
except subprocess.TimeoutExpired:
print(f"❌ yt-dlp timed out after 5 minutes - channel may be too large")
return []
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
print(f"❌ yt-dlp failed to fetch playlist for cache: {e}") print(f"❌ yt-dlp failed to fetch playlist for cache: {e}")
print(f" 📄 stderr: {e.stderr}")
return [] return []

View File

@ -107,6 +107,10 @@ def download_single_video(
video_url = f"https://www.youtube.com/watch?v={video_id}" video_url = f"https://www.youtube.com/watch?v={video_id}"
# Debug: Show the video_id and URL being used
print(f"🔍 DEBUG: video_id = '{video_id}'")
print(f"🔍 DEBUG: video_url = '{video_url}'")
# Build command using centralized utility # Build command using centralized utility
cmd = build_yt_dlp_command(yt_dlp_path, video_url, output_path, config) cmd = build_yt_dlp_command(yt_dlp_path, video_url, output_path, config)
@ -255,7 +259,7 @@ def execute_download_plan(
video_id = item["video_id"] video_id = item["video_id"]
video_title = item["video_title"] video_title = item["video_title"]
print(f"\n⬇️ Downloading {len(download_plan) - idx} of {total_to_download}:") print(f"\n⬇️ Downloading {downloaded_count + 1} of {total_to_download}:")
print(f" 📋 Songlist: {artist} - {title}") print(f" 📋 Songlist: {artist} - {title}")
print(f" 🎬 Video: {video_title} ({channel_name})") print(f" 🎬 Video: {video_title} ({channel_name})")
if "match_score" in item: if "match_score" in item: