KaraokeVideoDownloader/karaoke_downloader/tracking_manager.py

666 lines
28 KiB
Python

import json
import os
import re
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
class SongStatus(str, Enum):
NOT_DOWNLOADED = "NOT_DOWNLOADED"
DOWNLOADING = "DOWNLOADING"
DOWNLOADED = "DOWNLOADED"
PARTIAL = "PARTIAL"
FAILED = "FAILED"
CONVERTING = "CONVERTING"
CONVERTED = "CONVERTED"
class FormatType(str, Enum):
MP4 = "MP4"
MP3 = "MP3"
OTHER = "OTHER"
class TrackingManager:
def __init__(
self,
tracking_file="data/karaoke_tracking.json",
cache_dir="data/channel_cache",
):
self.tracking_file = Path(tracking_file)
self.cache_dir = Path(cache_dir)
# Ensure cache directory exists
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.data = self._load()
print(f"📊 Tracking manager initialized with {len(self.data.get('songs', {}))} tracked songs")
def _load(self):
"""Load tracking data from JSON file."""
if self.tracking_file.exists():
try:
with open(self.tracking_file, "r", encoding="utf-8") as f:
return json.load(f)
except json.JSONDecodeError:
print(f"⚠️ Corrupted tracking file, creating new one")
return {"songs": {}, "playlists": {}, "last_updated": datetime.now().isoformat()}
def _save(self):
"""Save tracking data to JSON file."""
self.data["last_updated"] = datetime.now().isoformat()
self.tracking_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.tracking_file, "w", encoding="utf-8") as f:
json.dump(self.data, f, indent=2, ensure_ascii=False)
def force_save(self):
"""Force save the tracking data."""
self._save()
def _get_channel_cache_file(self, channel_id: str) -> Path:
"""Get the cache file path for a specific channel."""
# Sanitize channel ID for filename
safe_channel_id = re.sub(r'[<>:"/\\|?*]', '_', channel_id)
return self.cache_dir / f"{safe_channel_id}.json"
def _load_channel_cache(self, channel_id: str) -> List[Dict[str, str]]:
"""Load cache for a specific channel."""
cache_file = self._get_channel_cache_file(channel_id)
if cache_file.exists():
try:
with open(cache_file, 'r', encoding='utf-8') as f:
data = json.load(f)
return data.get('videos', [])
except (json.JSONDecodeError, KeyError):
print(f" ⚠️ Corrupted cache file for {channel_id}, will recreate")
return []
return []
def _save_channel_cache(self, channel_id: str, videos: List[Dict[str, str]]):
"""Save cache for a specific channel."""
cache_file = self._get_channel_cache_file(channel_id)
data = {
'channel_id': channel_id,
'videos': videos,
'last_updated': datetime.now().isoformat(),
'video_count': len(videos)
}
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def _clear_channel_cache(self, channel_id: str):
"""Clear cache for a specific channel."""
cache_file = self._get_channel_cache_file(channel_id)
if cache_file.exists():
cache_file.unlink()
print(f" 🗑️ Cleared cache file: {cache_file.name}")
def get_cache_info(self):
"""Get information about all channel cache files."""
cache_files = list(self.cache_dir.glob("*.json"))
total_videos = 0
cache_info = []
for cache_file in cache_files:
try:
with open(cache_file, 'r', encoding='utf-8') as f:
data = json.load(f)
video_count = len(data.get('videos', []))
total_videos += video_count
last_updated = data.get('last_updated', 'Unknown')
cache_info.append({
'channel': data.get('channel_id', cache_file.stem),
'videos': video_count,
'last_updated': last_updated,
'file': cache_file.name
})
except Exception as e:
print(f"⚠️ Error reading cache file {cache_file.name}: {e}")
return {
'total_channels': len(cache_files),
'total_videos': total_videos,
'channels': cache_info
}
def clear_channel_cache(self, channel_id=None):
"""Clear cache for a specific channel or all channels."""
if channel_id:
self._clear_channel_cache(channel_id)
print(f"🗑️ Cleared cache for channel: {channel_id}")
else:
# Clear all cache files
cache_files = list(self.cache_dir.glob("*.json"))
for cache_file in cache_files:
cache_file.unlink()
print(f"🗑️ Cleared all {len(cache_files)} channel cache files")
def set_cache_duration(self, hours):
"""Placeholder for cache duration logic"""
pass
def export_playlist_report(self, playlist_id):
"""Export a report for a specific playlist."""
pass
def get_statistics(self):
"""Get statistics about tracked songs."""
total_songs = len(self.data["songs"])
downloaded_songs = sum(
1
for s in self.data["songs"].values()
if s["status"] in [SongStatus.DOWNLOADED, SongStatus.CONVERTED]
)
failed_songs = sum(
1 for s in self.data["songs"].values() if s["status"] == SongStatus.FAILED
)
partial_songs = sum(
1 for s in self.data["songs"].values() if s["status"] == SongStatus.PARTIAL
)
total_playlists = len(self.data["playlists"])
total_size_mb = sum(
s.get("file_size", 0)
for s in self.data["songs"].values()
if s.get("file_size")
) / (1024 * 1024)
last_updated = max(
(
s.get("last_updated")
for s in self.data["songs"].values()
if s.get("last_updated")
),
default=None,
)
return {
"total_songs": total_songs,
"downloaded_songs": downloaded_songs,
"failed_songs": failed_songs,
"partial_songs": partial_songs,
"total_playlists": total_playlists,
"total_size_mb": round(total_size_mb, 2),
"last_updated": last_updated,
}
def get_playlist_songs(self, playlist_id):
"""Get songs for a specific playlist."""
return [
s for s in self.data["songs"].values() if s["playlist_id"] == playlist_id
]
def get_failed_songs(self, playlist_id=None):
"""Get failed songs, optionally filtered by playlist."""
if playlist_id:
return [
s
for s in self.data["songs"].values()
if s["playlist_id"] == playlist_id and s["status"] == SongStatus.FAILED
]
return [
s for s in self.data["songs"].values() if s["status"] == SongStatus.FAILED
]
def get_partial_downloads(self, playlist_id=None):
"""Get partial downloads, optionally filtered by playlist."""
if playlist_id:
return [
s
for s in self.data["songs"].values()
if s["playlist_id"] == playlist_id and s["status"] == SongStatus.PARTIAL
]
return [
s for s in self.data["songs"].values() if s["status"] == SongStatus.PARTIAL
]
def cleanup_orphaned_files(self, downloads_dir):
"""Remove tracking entries for files that no longer exist."""
orphaned = []
for song_id, song in list(self.data["songs"].items()):
file_path = song.get("file_path")
if file_path and not Path(file_path).exists():
orphaned.append(song_id)
del self.data["songs"][song_id]
self.force_save()
return orphaned
def is_song_downloaded(self, artist, title, channel_name=None, video_id=None):
"""
Check if a song has already been downloaded.
Returns True if the song exists in tracking with DOWNLOADED status.
"""
# If we have video_id and channel_name, try direct key lookup first (most efficient)
if video_id and channel_name:
song_key = f"{video_id}@{channel_name}"
if song_key in self.data["songs"]:
song_data = self.data["songs"][song_key]
if song_data.get("status") == SongStatus.DOWNLOADED:
return True
# Fallback to content search (for cases where we don't have video_id)
for song_id, song_data in self.data["songs"].items():
# Check if this song matches the artist and title
if song_data.get("artist") == artist and song_data.get("title") == title:
# Check if it's marked as downloaded
if song_data.get("status") == SongStatus.DOWNLOADED:
return True
# Also check the video title field which might contain the song info
video_title = song_data.get("video_title", "")
if video_title and artist in video_title and title in video_title:
if song_data.get("status") == SongStatus.DOWNLOADED:
return True
return False
def is_file_exists(self, file_path):
"""
Check if a file already exists on the filesystem.
"""
return Path(file_path).exists()
def is_song_failed(self, artist, title, channel_name=None, video_id=None):
"""
Check if a song has previously failed to download.
Returns True if the song exists in tracking with FAILED status.
"""
# If we have video_id and channel_name, try direct key lookup first (most efficient)
if video_id and channel_name:
song_key = f"{video_id}@{channel_name}"
if song_key in self.data["songs"]:
song_data = self.data["songs"][song_key]
if song_data.get("status") == SongStatus.FAILED:
return True
# Fallback to content search (for cases where we don't have video_id)
for song_id, song_data in self.data["songs"].items():
# Check if this song matches the artist and title
if song_data.get("artist") == artist and song_data.get("title") == title:
# Check if it's marked as failed
if song_data.get("status") == SongStatus.FAILED:
return True
# Also check the video title field which might contain the song info
video_title = song_data.get("video_title", "")
if video_title and artist in video_title and title in video_title:
if song_data.get("status") == SongStatus.FAILED:
return True
return False
def mark_song_downloaded(
self, artist, title, video_id, channel_name, file_path, file_size=None
):
"""
Mark a song as downloaded in the tracking system.
"""
# Use the existing tracking structure: video_id@channel_name
song_key = f"{video_id}@{channel_name}"
self.data["songs"][song_key] = {
"artist": artist,
"title": title,
"video_id": video_id,
"channel_name": channel_name,
"video_title": f"{artist} - {title}",
"file_path": str(file_path),
"file_size": file_size,
"status": SongStatus.DOWNLOADED,
"last_updated": datetime.now().isoformat(),
}
self._save()
def mark_song_failed(
self, artist, title, video_id, channel_name, error_message=None
):
"""
Mark a song as failed in the tracking system.
"""
# Use the existing tracking structure: video_id@channel_name
song_key = f"{video_id}@{channel_name}"
self.data["songs"][song_key] = {
"artist": artist,
"title": title,
"video_id": video_id,
"channel_name": channel_name,
"video_title": f"{artist} - {title}",
"status": SongStatus.FAILED,
"error_message": error_message,
"last_updated": datetime.now().isoformat(),
}
self._save()
def get_channel_video_list(
self, channel_url, yt_dlp_path="downloader/yt-dlp.exe", force_refresh=False, show_pagination=False
):
"""
Return a list of videos (dicts with 'title' and 'id') for the channel, using cache if available unless force_refresh is True.
Args:
channel_url: YouTube channel URL
yt_dlp_path: Path to yt-dlp executable
force_refresh: Force refresh cache even if available
show_pagination: Show page-by-page progress (slower but more detailed)
"""
channel_name, channel_id = None, None
from karaoke_downloader.youtube_utils import get_channel_info
channel_name, channel_id = get_channel_info(channel_url)
if not channel_id:
print(f" ❌ Could not extract channel ID from URL: {channel_url}")
return []
print(f" 🔍 Channel: {channel_name} (ID: {channel_id})")
# Check if we have cached data for this channel
if not force_refresh:
cached_videos = self._load_channel_cache(channel_id)
if cached_videos:
# Validate that the cached data has proper video IDs
corrupted = False
# Check if any video IDs look like titles instead of proper YouTube IDs
for video in cached_videos[:20]: # Check first 20 videos
video_id = video.get("id", "")
# More comprehensive validation - YouTube IDs should be 11 characters and contain only alphanumeric, hyphens, and underscores
if video_id and (
len(video_id) != 11 or
not video_id.replace('-', '').replace('_', '').isalnum() or
" " in video_id or
"Lyrics" in video_id or
"KARAOKE" in video_id.upper() or
"Vocal" in video_id or
"Guide" in video_id
):
print(f" ⚠️ Detected corrupted video ID in cache: '{video_id}'")
corrupted = True
break
if corrupted:
print(f" 🧹 Clearing corrupted cache for {channel_id}")
self._clear_channel_cache(channel_id)
force_refresh = True
else:
print(f" 📋 Using cached video list ({len(cached_videos)} videos)")
return cached_videos
# Choose fetch method based on show_pagination flag
if show_pagination:
return self._fetch_videos_with_pagination(channel_url, channel_id, yt_dlp_path)
else:
return self._fetch_videos_flat_playlist(channel_url, channel_id, yt_dlp_path)
def _fetch_videos_with_pagination(self, channel_url, channel_id, yt_dlp_path):
"""Fetch videos showing page-by-page progress."""
print(f" 🌐 Fetching video list from YouTube (page-by-page mode)...")
print(f" 📡 Channel URL: {channel_url}")
import subprocess
all_videos = []
page = 1
videos_per_page = 200 # YouTube/yt-dlp supports up to 200 videos per page, reducing API calls and errors
while True:
print(f" 📄 Fetching page {page}...")
# Fetch one page at a time
cmd = [
yt_dlp_path,
"--flat-playlist",
"--print",
"%(title)s|%(id)s|%(url)s",
"--playlist-start",
str((page - 1) * videos_per_page + 1),
"--playlist-end",
str(page * videos_per_page),
channel_url,
]
try:
# Increased timeout to 180 seconds for larger pages (200 videos)
result = subprocess.run(cmd, capture_output=True, text=True, check=True, timeout=180)
lines = result.stdout.strip().splitlines()
# Save raw output for debugging (for each page)
raw_output_file = self._get_channel_cache_file(channel_id).parent / f"{channel_id}_raw_output_page{page}.txt"
try:
with open(raw_output_file, 'w', encoding='utf-8') as f:
f.write(f"# Raw yt-dlp output for {channel_id} - Page {page}\n")
f.write(f"# Channel URL: {channel_url}\n")
f.write(f"# Command: {' '.join(cmd)}\n")
f.write(f"# Timestamp: {datetime.now().isoformat()}\n")
f.write(f"# Total lines: {len(lines)}\n")
f.write("#" * 80 + "\n\n")
for i, line in enumerate(lines, 1):
f.write(f"{i:6d}: {line}\n")
print(f" 💾 Saved raw output to: {raw_output_file.name}")
except Exception as e:
print(f" ⚠️ Could not save raw output: {e}")
if not lines:
print(f" ✅ No more videos found on page {page}")
break
print(f" 📊 Page {page}: Found {len(lines)} videos")
page_videos = []
invalid_count = 0
for line in lines:
if not line.strip():
continue
# More robust parsing that handles titles with | characters
# Extract video ID directly from the URL that yt-dlp provides
# Find the URL and extract video ID from it
url_match = re.search(r'https://www\.youtube\.com/watch\?v=([a-zA-Z0-9_-]{11})', line)
if not url_match:
continue
# Extract video ID directly from the URL
video_id = url_match.group(1)
# Extract title (everything before the video ID in the line)
title = line[:line.find(video_id)].rstrip('|').strip()
# Validate video ID
if video_id and (
len(video_id) == 11 and
video_id.replace('-', '').replace('_', '').isalnum() and
" " not in video_id and
"Lyrics" not in video_id and
"KARAOKE" not in video_id.upper() and
"Vocal" not in video_id and
"Guide" not in video_id
):
page_videos.append({"title": title, "id": video_id})
else:
invalid_count += 1
if invalid_count <= 3: # Show first 3 invalid IDs per page
print(f" ⚠️ Invalid ID: '{video_id}' for '{title[:50]}...'")
if invalid_count > 3:
print(f" ⚠️ ... and {invalid_count - 3} more invalid IDs on this page")
all_videos.extend(page_videos)
print(f" ✅ Page {page}: Added {len(page_videos)} valid videos (total: {len(all_videos)})")
# If we got fewer videos than expected, we're probably at the end
if len(lines) < videos_per_page:
print(f" 🏁 Reached end of channel (last page had {len(lines)} videos)")
break
page += 1
# Safety check to prevent infinite loops
if page > 50: # Max 50 pages (10,000 videos with 200 per page)
print(f" ⚠️ Reached maximum page limit (50 pages), stopping")
break
except subprocess.TimeoutExpired:
print(f" ⚠️ Page {page} timed out, stopping")
break
except subprocess.CalledProcessError as e:
print(f" ❌ Error fetching page {page}: {e}")
break
except KeyboardInterrupt:
print(f" ⏹️ User interrupted, stopping at page {page}")
break
if not all_videos:
print(f" ❌ No valid videos found")
return []
print(f" 🎉 Channel download complete!")
print(f" 📊 Total videos fetched: {len(all_videos)}")
# Save to individual channel cache file
self._save_channel_cache(channel_id, all_videos)
print(f" 💾 Saved cache to: {self._get_channel_cache_file(channel_id).name}")
return all_videos
def _fetch_videos_flat_playlist(self, channel_url, channel_id, yt_dlp_path):
"""Fetch all videos using flat playlist (faster but less detailed progress)."""
# Fetch with yt-dlp
print(f" 🌐 Fetching video list from YouTube (this may take a while)...")
print(f" 📡 Channel URL: {channel_url}")
import subprocess
# First, let's get the total count to show progress
count_cmd = [
yt_dlp_path,
"--flat-playlist",
"--print",
"%(title)s",
"--playlist-end",
"1", # Just get first video to test
channel_url,
]
try:
print(f" 🔍 Testing channel access...")
test_result = subprocess.run(count_cmd, capture_output=True, text=True, timeout=30)
if test_result.returncode == 0:
print(f" ✅ Channel is accessible")
else:
print(f" ⚠️ Channel test failed: {test_result.stderr}")
except subprocess.TimeoutExpired:
print(f" ⚠️ Channel test timed out")
except Exception as e:
print(f" ⚠️ Channel test error: {e}")
# Now fetch all videos with progress indicators
cmd = [
yt_dlp_path,
"--flat-playlist",
"--print",
"%(title)s|%(id)s|%(url)s",
"--verbose", # Add verbose output to see what's happening
channel_url,
]
try:
print(f" 🔧 Running yt-dlp command: {' '.join(cmd)}")
print(f" 📥 Starting video list download...")
# Use a timeout and show progress
result = subprocess.run(cmd, capture_output=True, text=True, check=True, timeout=300)
lines = result.stdout.strip().splitlines()
# Save raw output for debugging
raw_output_file = self._get_channel_cache_file(channel_id).parent / f"{channel_id}_raw_output.txt"
try:
with open(raw_output_file, 'w', encoding='utf-8') as f:
f.write(f"# Raw yt-dlp output for {channel_id}\n")
f.write(f"# Channel URL: {channel_url}\n")
f.write(f"# Command: {' '.join(cmd)}\n")
f.write(f"# Timestamp: {datetime.now().isoformat()}\n")
f.write(f"# Total lines: {len(lines)}\n")
f.write("#" * 80 + "\n\n")
for i, line in enumerate(lines, 1):
f.write(f"{i:6d}: {line}\n")
print(f" 💾 Saved raw output to: {raw_output_file.name}")
except Exception as e:
print(f" ⚠️ Could not save raw output: {e}")
print(f" 📄 Raw output lines: {len(lines)}")
print(f" 📊 Download completed successfully!")
# Show some sample lines to understand the format
if lines:
print(f" 📋 Sample output format:")
for i, line in enumerate(lines[:3]):
print(f" Line {i+1}: {line[:100]}...")
if len(lines) > 3:
print(f" ... and {len(lines) - 3} more lines")
videos = []
invalid_count = 0
print(f" 🔍 Processing {len(lines)} video entries...")
for i, line in enumerate(lines):
if i % 1000 == 0 and i > 0: # Progress indicator every 1000 lines
print(f" 📊 Processing line {i}/{len(lines)}... ({i/len(lines)*100:.1f}%)")
# More robust parsing that handles titles with | characters
# Extract video ID directly from the URL that yt-dlp provides
# Find the URL and extract video ID from it
url_match = re.search(r'https://www\.youtube\.com/watch\?v=([a-zA-Z0-9_-]{11})', line)
if not url_match:
invalid_count += 1
if invalid_count <= 5:
print(f" ⚠️ Skipping line with no URL: '{line[:100]}...'")
elif invalid_count == 6:
print(f" ⚠️ ... and {len(lines) - i - 1} more invalid lines")
continue
# Extract video ID directly from the URL
video_id = url_match.group(1)
# Extract title (everything before the video ID in the line)
title = line[:line.find(video_id)].rstrip('|').strip()
# Validate video ID
if video_id and (
len(video_id) == 11 and
video_id.replace('-', '').replace('_', '').isalnum() and
" " not in video_id and
"Lyrics" not in video_id and
"KARAOKE" not in video_id.upper() and
"Vocal" not in video_id and
"Guide" not in video_id
):
videos.append({"title": title, "id": video_id})
else:
invalid_count += 1
if invalid_count <= 5: # Only show first 5 invalid IDs
print(f" ⚠️ Skipping invalid video ID: '{video_id}' for title: '{title[:50]}...'")
elif invalid_count == 6:
print(f" ⚠️ ... and {len(lines) - i - 1} more invalid IDs")
if not videos:
print(f" ❌ No valid videos found after parsing")
return []
print(f" ✅ Parsed {len(videos)} valid videos from YouTube")
print(f" ⚠️ Skipped {invalid_count} invalid video IDs")
# Save to individual channel cache file
self._save_channel_cache(channel_id, videos)
print(f" 💾 Saved cache to: {self._get_channel_cache_file(channel_id).name}")
return videos
except subprocess.TimeoutExpired:
print(f"❌ yt-dlp timed out after 5 minutes - channel may be too large")
return []
except subprocess.CalledProcessError as e:
print(f"❌ yt-dlp failed to fetch playlist for cache: {e}")
print(f" 📄 stderr: {e.stderr}")
return []