666 lines
28 KiB
Python
666 lines
28 KiB
Python
import json
|
|
import os
|
|
import re
|
|
from datetime import datetime, timedelta
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
class SongStatus(str, Enum):
|
|
NOT_DOWNLOADED = "NOT_DOWNLOADED"
|
|
DOWNLOADING = "DOWNLOADING"
|
|
DOWNLOADED = "DOWNLOADED"
|
|
PARTIAL = "PARTIAL"
|
|
FAILED = "FAILED"
|
|
CONVERTING = "CONVERTING"
|
|
CONVERTED = "CONVERTED"
|
|
|
|
|
|
class FormatType(str, Enum):
|
|
MP4 = "MP4"
|
|
MP3 = "MP3"
|
|
OTHER = "OTHER"
|
|
|
|
|
|
class TrackingManager:
|
|
def __init__(
|
|
self,
|
|
tracking_file="data/karaoke_tracking.json",
|
|
cache_dir="data/channel_cache",
|
|
):
|
|
self.tracking_file = Path(tracking_file)
|
|
self.cache_dir = Path(cache_dir)
|
|
|
|
# Ensure cache directory exists
|
|
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
self.data = self._load()
|
|
print(f"📊 Tracking manager initialized with {len(self.data.get('songs', {}))} tracked songs")
|
|
|
|
def _load(self):
|
|
"""Load tracking data from JSON file."""
|
|
if self.tracking_file.exists():
|
|
try:
|
|
with open(self.tracking_file, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except json.JSONDecodeError:
|
|
print(f"⚠️ Corrupted tracking file, creating new one")
|
|
|
|
return {"songs": {}, "playlists": {}, "last_updated": datetime.now().isoformat()}
|
|
|
|
def _save(self):
|
|
"""Save tracking data to JSON file."""
|
|
self.data["last_updated"] = datetime.now().isoformat()
|
|
self.tracking_file.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(self.tracking_file, "w", encoding="utf-8") as f:
|
|
json.dump(self.data, f, indent=2, ensure_ascii=False)
|
|
|
|
def force_save(self):
|
|
"""Force save the tracking data."""
|
|
self._save()
|
|
|
|
def _get_channel_cache_file(self, channel_id: str) -> Path:
|
|
"""Get the cache file path for a specific channel."""
|
|
# Sanitize channel ID for filename
|
|
safe_channel_id = re.sub(r'[<>:"/\\|?*]', '_', channel_id)
|
|
return self.cache_dir / f"{safe_channel_id}.json"
|
|
|
|
def _load_channel_cache(self, channel_id: str) -> List[Dict[str, str]]:
|
|
"""Load cache for a specific channel."""
|
|
cache_file = self._get_channel_cache_file(channel_id)
|
|
if cache_file.exists():
|
|
try:
|
|
with open(cache_file, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
return data.get('videos', [])
|
|
except (json.JSONDecodeError, KeyError):
|
|
print(f" ⚠️ Corrupted cache file for {channel_id}, will recreate")
|
|
return []
|
|
return []
|
|
|
|
def _save_channel_cache(self, channel_id: str, videos: List[Dict[str, str]]):
|
|
"""Save cache for a specific channel."""
|
|
cache_file = self._get_channel_cache_file(channel_id)
|
|
data = {
|
|
'channel_id': channel_id,
|
|
'videos': videos,
|
|
'last_updated': datetime.now().isoformat(),
|
|
'video_count': len(videos)
|
|
}
|
|
with open(cache_file, 'w', encoding='utf-8') as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
|
|
def _clear_channel_cache(self, channel_id: str):
|
|
"""Clear cache for a specific channel."""
|
|
cache_file = self._get_channel_cache_file(channel_id)
|
|
if cache_file.exists():
|
|
cache_file.unlink()
|
|
print(f" 🗑️ Cleared cache file: {cache_file.name}")
|
|
|
|
def get_cache_info(self):
|
|
"""Get information about all channel cache files."""
|
|
cache_files = list(self.cache_dir.glob("*.json"))
|
|
total_videos = 0
|
|
cache_info = []
|
|
|
|
for cache_file in cache_files:
|
|
try:
|
|
with open(cache_file, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
video_count = len(data.get('videos', []))
|
|
total_videos += video_count
|
|
last_updated = data.get('last_updated', 'Unknown')
|
|
cache_info.append({
|
|
'channel': data.get('channel_id', cache_file.stem),
|
|
'videos': video_count,
|
|
'last_updated': last_updated,
|
|
'file': cache_file.name
|
|
})
|
|
except Exception as e:
|
|
print(f"⚠️ Error reading cache file {cache_file.name}: {e}")
|
|
|
|
return {
|
|
'total_channels': len(cache_files),
|
|
'total_videos': total_videos,
|
|
'channels': cache_info
|
|
}
|
|
|
|
def clear_channel_cache(self, channel_id=None):
|
|
"""Clear cache for a specific channel or all channels."""
|
|
if channel_id:
|
|
self._clear_channel_cache(channel_id)
|
|
print(f"🗑️ Cleared cache for channel: {channel_id}")
|
|
else:
|
|
# Clear all cache files
|
|
cache_files = list(self.cache_dir.glob("*.json"))
|
|
for cache_file in cache_files:
|
|
cache_file.unlink()
|
|
print(f"🗑️ Cleared all {len(cache_files)} channel cache files")
|
|
|
|
def set_cache_duration(self, hours):
|
|
"""Placeholder for cache duration logic"""
|
|
pass
|
|
|
|
def export_playlist_report(self, playlist_id):
|
|
"""Export a report for a specific playlist."""
|
|
pass
|
|
|
|
def get_statistics(self):
|
|
"""Get statistics about tracked songs."""
|
|
total_songs = len(self.data["songs"])
|
|
downloaded_songs = sum(
|
|
1
|
|
for s in self.data["songs"].values()
|
|
if s["status"] in [SongStatus.DOWNLOADED, SongStatus.CONVERTED]
|
|
)
|
|
failed_songs = sum(
|
|
1 for s in self.data["songs"].values() if s["status"] == SongStatus.FAILED
|
|
)
|
|
partial_songs = sum(
|
|
1 for s in self.data["songs"].values() if s["status"] == SongStatus.PARTIAL
|
|
)
|
|
total_playlists = len(self.data["playlists"])
|
|
total_size_mb = sum(
|
|
s.get("file_size", 0)
|
|
for s in self.data["songs"].values()
|
|
if s.get("file_size")
|
|
) / (1024 * 1024)
|
|
last_updated = max(
|
|
(
|
|
s.get("last_updated")
|
|
for s in self.data["songs"].values()
|
|
if s.get("last_updated")
|
|
),
|
|
default=None,
|
|
)
|
|
return {
|
|
"total_songs": total_songs,
|
|
"downloaded_songs": downloaded_songs,
|
|
"failed_songs": failed_songs,
|
|
"partial_songs": partial_songs,
|
|
"total_playlists": total_playlists,
|
|
"total_size_mb": round(total_size_mb, 2),
|
|
"last_updated": last_updated,
|
|
}
|
|
|
|
def get_playlist_songs(self, playlist_id):
|
|
"""Get songs for a specific playlist."""
|
|
return [
|
|
s for s in self.data["songs"].values() if s["playlist_id"] == playlist_id
|
|
]
|
|
|
|
def get_failed_songs(self, playlist_id=None):
|
|
"""Get failed songs, optionally filtered by playlist."""
|
|
if playlist_id:
|
|
return [
|
|
s
|
|
for s in self.data["songs"].values()
|
|
if s["playlist_id"] == playlist_id and s["status"] == SongStatus.FAILED
|
|
]
|
|
return [
|
|
s for s in self.data["songs"].values() if s["status"] == SongStatus.FAILED
|
|
]
|
|
|
|
def get_partial_downloads(self, playlist_id=None):
|
|
"""Get partial downloads, optionally filtered by playlist."""
|
|
if playlist_id:
|
|
return [
|
|
s
|
|
for s in self.data["songs"].values()
|
|
if s["playlist_id"] == playlist_id and s["status"] == SongStatus.PARTIAL
|
|
]
|
|
return [
|
|
s for s in self.data["songs"].values() if s["status"] == SongStatus.PARTIAL
|
|
]
|
|
|
|
def cleanup_orphaned_files(self, downloads_dir):
|
|
"""Remove tracking entries for files that no longer exist."""
|
|
orphaned = []
|
|
for song_id, song in list(self.data["songs"].items()):
|
|
file_path = song.get("file_path")
|
|
if file_path and not Path(file_path).exists():
|
|
orphaned.append(song_id)
|
|
del self.data["songs"][song_id]
|
|
self.force_save()
|
|
return orphaned
|
|
|
|
def is_song_downloaded(self, artist, title, channel_name=None, video_id=None):
|
|
"""
|
|
Check if a song has already been downloaded.
|
|
Returns True if the song exists in tracking with DOWNLOADED status.
|
|
"""
|
|
# If we have video_id and channel_name, try direct key lookup first (most efficient)
|
|
if video_id and channel_name:
|
|
song_key = f"{video_id}@{channel_name}"
|
|
if song_key in self.data["songs"]:
|
|
song_data = self.data["songs"][song_key]
|
|
if song_data.get("status") == SongStatus.DOWNLOADED:
|
|
return True
|
|
|
|
# Fallback to content search (for cases where we don't have video_id)
|
|
for song_id, song_data in self.data["songs"].items():
|
|
# Check if this song matches the artist and title
|
|
if song_data.get("artist") == artist and song_data.get("title") == title:
|
|
# Check if it's marked as downloaded
|
|
if song_data.get("status") == SongStatus.DOWNLOADED:
|
|
return True
|
|
# Also check the video title field which might contain the song info
|
|
video_title = song_data.get("video_title", "")
|
|
if video_title and artist in video_title and title in video_title:
|
|
if song_data.get("status") == SongStatus.DOWNLOADED:
|
|
return True
|
|
|
|
return False
|
|
|
|
def is_file_exists(self, file_path):
|
|
"""
|
|
Check if a file already exists on the filesystem.
|
|
"""
|
|
return Path(file_path).exists()
|
|
|
|
def is_song_failed(self, artist, title, channel_name=None, video_id=None):
|
|
"""
|
|
Check if a song has previously failed to download.
|
|
Returns True if the song exists in tracking with FAILED status.
|
|
"""
|
|
# If we have video_id and channel_name, try direct key lookup first (most efficient)
|
|
if video_id and channel_name:
|
|
song_key = f"{video_id}@{channel_name}"
|
|
if song_key in self.data["songs"]:
|
|
song_data = self.data["songs"][song_key]
|
|
if song_data.get("status") == SongStatus.FAILED:
|
|
return True
|
|
|
|
# Fallback to content search (for cases where we don't have video_id)
|
|
for song_id, song_data in self.data["songs"].items():
|
|
# Check if this song matches the artist and title
|
|
if song_data.get("artist") == artist and song_data.get("title") == title:
|
|
# Check if it's marked as failed
|
|
if song_data.get("status") == SongStatus.FAILED:
|
|
return True
|
|
# Also check the video title field which might contain the song info
|
|
video_title = song_data.get("video_title", "")
|
|
if video_title and artist in video_title and title in video_title:
|
|
if song_data.get("status") == SongStatus.FAILED:
|
|
return True
|
|
return False
|
|
|
|
def mark_song_downloaded(
|
|
self, artist, title, video_id, channel_name, file_path, file_size=None
|
|
):
|
|
"""
|
|
Mark a song as downloaded in the tracking system.
|
|
"""
|
|
# Use the existing tracking structure: video_id@channel_name
|
|
song_key = f"{video_id}@{channel_name}"
|
|
|
|
self.data["songs"][song_key] = {
|
|
"artist": artist,
|
|
"title": title,
|
|
"video_id": video_id,
|
|
"channel_name": channel_name,
|
|
"video_title": f"{artist} - {title}",
|
|
"file_path": str(file_path),
|
|
"file_size": file_size,
|
|
"status": SongStatus.DOWNLOADED,
|
|
"last_updated": datetime.now().isoformat(),
|
|
}
|
|
self._save()
|
|
|
|
def mark_song_failed(
|
|
self, artist, title, video_id, channel_name, error_message=None
|
|
):
|
|
"""
|
|
Mark a song as failed in the tracking system.
|
|
"""
|
|
# Use the existing tracking structure: video_id@channel_name
|
|
song_key = f"{video_id}@{channel_name}"
|
|
|
|
self.data["songs"][song_key] = {
|
|
"artist": artist,
|
|
"title": title,
|
|
"video_id": video_id,
|
|
"channel_name": channel_name,
|
|
"video_title": f"{artist} - {title}",
|
|
"status": SongStatus.FAILED,
|
|
"error_message": error_message,
|
|
"last_updated": datetime.now().isoformat(),
|
|
}
|
|
self._save()
|
|
|
|
def get_channel_video_list(
|
|
self, channel_url, yt_dlp_path="downloader/yt-dlp.exe", force_refresh=False, show_pagination=False
|
|
):
|
|
"""
|
|
Return a list of videos (dicts with 'title' and 'id') for the channel, using cache if available unless force_refresh is True.
|
|
|
|
Args:
|
|
channel_url: YouTube channel URL
|
|
yt_dlp_path: Path to yt-dlp executable
|
|
force_refresh: Force refresh cache even if available
|
|
show_pagination: Show page-by-page progress (slower but more detailed)
|
|
"""
|
|
channel_name, channel_id = None, None
|
|
from karaoke_downloader.youtube_utils import get_channel_info
|
|
|
|
channel_name, channel_id = get_channel_info(channel_url)
|
|
|
|
if not channel_id:
|
|
print(f" ❌ Could not extract channel ID from URL: {channel_url}")
|
|
return []
|
|
|
|
print(f" 🔍 Channel: {channel_name} (ID: {channel_id})")
|
|
|
|
# Check if we have cached data for this channel
|
|
if not force_refresh:
|
|
cached_videos = self._load_channel_cache(channel_id)
|
|
if cached_videos:
|
|
# Validate that the cached data has proper video IDs
|
|
corrupted = False
|
|
|
|
# Check if any video IDs look like titles instead of proper YouTube IDs
|
|
for video in cached_videos[:20]: # Check first 20 videos
|
|
video_id = video.get("id", "")
|
|
# More comprehensive validation - YouTube IDs should be 11 characters and contain only alphanumeric, hyphens, and underscores
|
|
if video_id and (
|
|
len(video_id) != 11 or
|
|
not video_id.replace('-', '').replace('_', '').isalnum() or
|
|
" " in video_id or
|
|
"Lyrics" in video_id or
|
|
"KARAOKE" in video_id.upper() or
|
|
"Vocal" in video_id or
|
|
"Guide" in video_id
|
|
):
|
|
print(f" ⚠️ Detected corrupted video ID in cache: '{video_id}'")
|
|
corrupted = True
|
|
break
|
|
|
|
if corrupted:
|
|
print(f" 🧹 Clearing corrupted cache for {channel_id}")
|
|
self._clear_channel_cache(channel_id)
|
|
force_refresh = True
|
|
else:
|
|
print(f" 📋 Using cached video list ({len(cached_videos)} videos)")
|
|
return cached_videos
|
|
|
|
# Choose fetch method based on show_pagination flag
|
|
if show_pagination:
|
|
return self._fetch_videos_with_pagination(channel_url, channel_id, yt_dlp_path)
|
|
else:
|
|
return self._fetch_videos_flat_playlist(channel_url, channel_id, yt_dlp_path)
|
|
|
|
def _fetch_videos_with_pagination(self, channel_url, channel_id, yt_dlp_path):
|
|
"""Fetch videos showing page-by-page progress."""
|
|
print(f" 🌐 Fetching video list from YouTube (page-by-page mode)...")
|
|
print(f" 📡 Channel URL: {channel_url}")
|
|
|
|
import subprocess
|
|
|
|
all_videos = []
|
|
page = 1
|
|
videos_per_page = 200 # YouTube/yt-dlp supports up to 200 videos per page, reducing API calls and errors
|
|
|
|
while True:
|
|
print(f" 📄 Fetching page {page}...")
|
|
|
|
# Fetch one page at a time
|
|
cmd = [
|
|
yt_dlp_path,
|
|
"--flat-playlist",
|
|
"--print",
|
|
"%(title)s|%(id)s|%(url)s",
|
|
"--playlist-start",
|
|
str((page - 1) * videos_per_page + 1),
|
|
"--playlist-end",
|
|
str(page * videos_per_page),
|
|
channel_url,
|
|
]
|
|
|
|
try:
|
|
# Increased timeout to 180 seconds for larger pages (200 videos)
|
|
result = subprocess.run(cmd, capture_output=True, text=True, check=True, timeout=180)
|
|
lines = result.stdout.strip().splitlines()
|
|
|
|
# Save raw output for debugging (for each page)
|
|
raw_output_file = self._get_channel_cache_file(channel_id).parent / f"{channel_id}_raw_output_page{page}.txt"
|
|
try:
|
|
with open(raw_output_file, 'w', encoding='utf-8') as f:
|
|
f.write(f"# Raw yt-dlp output for {channel_id} - Page {page}\n")
|
|
f.write(f"# Channel URL: {channel_url}\n")
|
|
f.write(f"# Command: {' '.join(cmd)}\n")
|
|
f.write(f"# Timestamp: {datetime.now().isoformat()}\n")
|
|
f.write(f"# Total lines: {len(lines)}\n")
|
|
f.write("#" * 80 + "\n\n")
|
|
for i, line in enumerate(lines, 1):
|
|
f.write(f"{i:6d}: {line}\n")
|
|
print(f" 💾 Saved raw output to: {raw_output_file.name}")
|
|
except Exception as e:
|
|
print(f" ⚠️ Could not save raw output: {e}")
|
|
|
|
if not lines:
|
|
print(f" ✅ No more videos found on page {page}")
|
|
break
|
|
|
|
print(f" 📊 Page {page}: Found {len(lines)} videos")
|
|
|
|
page_videos = []
|
|
invalid_count = 0
|
|
|
|
for line in lines:
|
|
if not line.strip():
|
|
continue
|
|
|
|
# More robust parsing that handles titles with | characters
|
|
# Extract video ID directly from the URL that yt-dlp provides
|
|
|
|
# Find the URL and extract video ID from it
|
|
url_match = re.search(r'https://www\.youtube\.com/watch\?v=([a-zA-Z0-9_-]{11})', line)
|
|
if not url_match:
|
|
continue
|
|
|
|
# Extract video ID directly from the URL
|
|
video_id = url_match.group(1)
|
|
|
|
# Extract title (everything before the video ID in the line)
|
|
title = line[:line.find(video_id)].rstrip('|').strip()
|
|
|
|
# Validate video ID
|
|
if video_id and (
|
|
len(video_id) == 11 and
|
|
video_id.replace('-', '').replace('_', '').isalnum() and
|
|
" " not in video_id and
|
|
"Lyrics" not in video_id and
|
|
"KARAOKE" not in video_id.upper() and
|
|
"Vocal" not in video_id and
|
|
"Guide" not in video_id
|
|
):
|
|
page_videos.append({"title": title, "id": video_id})
|
|
else:
|
|
invalid_count += 1
|
|
if invalid_count <= 3: # Show first 3 invalid IDs per page
|
|
print(f" ⚠️ Invalid ID: '{video_id}' for '{title[:50]}...'")
|
|
|
|
if invalid_count > 3:
|
|
print(f" ⚠️ ... and {invalid_count - 3} more invalid IDs on this page")
|
|
|
|
all_videos.extend(page_videos)
|
|
print(f" ✅ Page {page}: Added {len(page_videos)} valid videos (total: {len(all_videos)})")
|
|
|
|
# If we got fewer videos than expected, we're probably at the end
|
|
if len(lines) < videos_per_page:
|
|
print(f" 🏁 Reached end of channel (last page had {len(lines)} videos)")
|
|
break
|
|
|
|
page += 1
|
|
|
|
# Safety check to prevent infinite loops
|
|
if page > 50: # Max 50 pages (10,000 videos with 200 per page)
|
|
print(f" ⚠️ Reached maximum page limit (50 pages), stopping")
|
|
break
|
|
|
|
except subprocess.TimeoutExpired:
|
|
print(f" ⚠️ Page {page} timed out, stopping")
|
|
break
|
|
except subprocess.CalledProcessError as e:
|
|
print(f" ❌ Error fetching page {page}: {e}")
|
|
break
|
|
except KeyboardInterrupt:
|
|
print(f" ⏹️ User interrupted, stopping at page {page}")
|
|
break
|
|
|
|
if not all_videos:
|
|
print(f" ❌ No valid videos found")
|
|
return []
|
|
|
|
print(f" 🎉 Channel download complete!")
|
|
print(f" 📊 Total videos fetched: {len(all_videos)}")
|
|
|
|
# Save to individual channel cache file
|
|
self._save_channel_cache(channel_id, all_videos)
|
|
print(f" 💾 Saved cache to: {self._get_channel_cache_file(channel_id).name}")
|
|
|
|
return all_videos
|
|
|
|
def _fetch_videos_flat_playlist(self, channel_url, channel_id, yt_dlp_path):
|
|
"""Fetch all videos using flat playlist (faster but less detailed progress)."""
|
|
# Fetch with yt-dlp
|
|
print(f" 🌐 Fetching video list from YouTube (this may take a while)...")
|
|
print(f" 📡 Channel URL: {channel_url}")
|
|
|
|
import subprocess
|
|
|
|
# First, let's get the total count to show progress
|
|
count_cmd = [
|
|
yt_dlp_path,
|
|
"--flat-playlist",
|
|
"--print",
|
|
"%(title)s",
|
|
"--playlist-end",
|
|
"1", # Just get first video to test
|
|
channel_url,
|
|
]
|
|
|
|
try:
|
|
print(f" 🔍 Testing channel access...")
|
|
test_result = subprocess.run(count_cmd, capture_output=True, text=True, timeout=30)
|
|
if test_result.returncode == 0:
|
|
print(f" ✅ Channel is accessible")
|
|
else:
|
|
print(f" ⚠️ Channel test failed: {test_result.stderr}")
|
|
except subprocess.TimeoutExpired:
|
|
print(f" ⚠️ Channel test timed out")
|
|
except Exception as e:
|
|
print(f" ⚠️ Channel test error: {e}")
|
|
|
|
# Now fetch all videos with progress indicators
|
|
cmd = [
|
|
yt_dlp_path,
|
|
"--flat-playlist",
|
|
"--print",
|
|
"%(title)s|%(id)s|%(url)s",
|
|
"--verbose", # Add verbose output to see what's happening
|
|
channel_url,
|
|
]
|
|
|
|
try:
|
|
print(f" 🔧 Running yt-dlp command: {' '.join(cmd)}")
|
|
print(f" 📥 Starting video list download...")
|
|
|
|
# Use a timeout and show progress
|
|
result = subprocess.run(cmd, capture_output=True, text=True, check=True, timeout=300)
|
|
lines = result.stdout.strip().splitlines()
|
|
|
|
# Save raw output for debugging
|
|
raw_output_file = self._get_channel_cache_file(channel_id).parent / f"{channel_id}_raw_output.txt"
|
|
try:
|
|
with open(raw_output_file, 'w', encoding='utf-8') as f:
|
|
f.write(f"# Raw yt-dlp output for {channel_id}\n")
|
|
f.write(f"# Channel URL: {channel_url}\n")
|
|
f.write(f"# Command: {' '.join(cmd)}\n")
|
|
f.write(f"# Timestamp: {datetime.now().isoformat()}\n")
|
|
f.write(f"# Total lines: {len(lines)}\n")
|
|
f.write("#" * 80 + "\n\n")
|
|
for i, line in enumerate(lines, 1):
|
|
f.write(f"{i:6d}: {line}\n")
|
|
print(f" 💾 Saved raw output to: {raw_output_file.name}")
|
|
except Exception as e:
|
|
print(f" ⚠️ Could not save raw output: {e}")
|
|
|
|
print(f" 📄 Raw output lines: {len(lines)}")
|
|
print(f" 📊 Download completed successfully!")
|
|
|
|
# Show some sample lines to understand the format
|
|
if lines:
|
|
print(f" 📋 Sample output format:")
|
|
for i, line in enumerate(lines[:3]):
|
|
print(f" Line {i+1}: {line[:100]}...")
|
|
if len(lines) > 3:
|
|
print(f" ... and {len(lines) - 3} more lines")
|
|
|
|
videos = []
|
|
invalid_count = 0
|
|
|
|
print(f" 🔍 Processing {len(lines)} video entries...")
|
|
|
|
for i, line in enumerate(lines):
|
|
if i % 1000 == 0 and i > 0: # Progress indicator every 1000 lines
|
|
print(f" 📊 Processing line {i}/{len(lines)}... ({i/len(lines)*100:.1f}%)")
|
|
|
|
# More robust parsing that handles titles with | characters
|
|
# Extract video ID directly from the URL that yt-dlp provides
|
|
|
|
# Find the URL and extract video ID from it
|
|
url_match = re.search(r'https://www\.youtube\.com/watch\?v=([a-zA-Z0-9_-]{11})', line)
|
|
if not url_match:
|
|
invalid_count += 1
|
|
if invalid_count <= 5:
|
|
print(f" ⚠️ Skipping line with no URL: '{line[:100]}...'")
|
|
elif invalid_count == 6:
|
|
print(f" ⚠️ ... and {len(lines) - i - 1} more invalid lines")
|
|
continue
|
|
|
|
# Extract video ID directly from the URL
|
|
video_id = url_match.group(1)
|
|
|
|
# Extract title (everything before the video ID in the line)
|
|
title = line[:line.find(video_id)].rstrip('|').strip()
|
|
|
|
# Validate video ID
|
|
if video_id and (
|
|
len(video_id) == 11 and
|
|
video_id.replace('-', '').replace('_', '').isalnum() and
|
|
" " not in video_id and
|
|
"Lyrics" not in video_id and
|
|
"KARAOKE" not in video_id.upper() and
|
|
"Vocal" not in video_id and
|
|
"Guide" not in video_id
|
|
):
|
|
videos.append({"title": title, "id": video_id})
|
|
else:
|
|
invalid_count += 1
|
|
if invalid_count <= 5: # Only show first 5 invalid IDs
|
|
print(f" ⚠️ Skipping invalid video ID: '{video_id}' for title: '{title[:50]}...'")
|
|
elif invalid_count == 6:
|
|
print(f" ⚠️ ... and {len(lines) - i - 1} more invalid IDs")
|
|
|
|
if not videos:
|
|
print(f" ❌ No valid videos found after parsing")
|
|
return []
|
|
|
|
print(f" ✅ Parsed {len(videos)} valid videos from YouTube")
|
|
print(f" ⚠️ Skipped {invalid_count} invalid video IDs")
|
|
|
|
# Save to individual channel cache file
|
|
self._save_channel_cache(channel_id, videos)
|
|
print(f" 💾 Saved cache to: {self._get_channel_cache_file(channel_id).name}")
|
|
|
|
return videos
|
|
|
|
except subprocess.TimeoutExpired:
|
|
print(f"❌ yt-dlp timed out after 5 minutes - channel may be too large")
|
|
return []
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"❌ yt-dlp failed to fetch playlist for cache: {e}")
|
|
print(f" 📄 stderr: {e.stderr}")
|
|
return []
|