import json import os import re from datetime import datetime, timedelta from enum import Enum from pathlib import Path from typing import Any, Dict, List, Optional, Tuple class SongStatus(str, Enum): NOT_DOWNLOADED = "NOT_DOWNLOADED" DOWNLOADING = "DOWNLOADING" DOWNLOADED = "DOWNLOADED" PARTIAL = "PARTIAL" FAILED = "FAILED" CONVERTING = "CONVERTING" CONVERTED = "CONVERTED" class FormatType(str, Enum): MP4 = "MP4" MP3 = "MP3" OTHER = "OTHER" class TrackingManager: def __init__( self, tracking_file="data/karaoke_tracking.json", cache_dir="data/channel_cache", ): self.tracking_file = Path(tracking_file) self.cache_dir = Path(cache_dir) # Ensure cache directory exists self.cache_dir.mkdir(parents=True, exist_ok=True) self.data = self._load() print(f"๐Ÿ“Š Tracking manager initialized with {len(self.data.get('songs', {}))} tracked songs") def _load(self): """Load tracking data from JSON file.""" if self.tracking_file.exists(): try: with open(self.tracking_file, "r", encoding="utf-8") as f: return json.load(f) except json.JSONDecodeError: print(f"โš ๏ธ Corrupted tracking file, creating new one") return {"songs": {}, "playlists": {}, "last_updated": datetime.now().isoformat()} def _save(self): """Save tracking data to JSON file.""" self.data["last_updated"] = datetime.now().isoformat() self.tracking_file.parent.mkdir(parents=True, exist_ok=True) with open(self.tracking_file, "w", encoding="utf-8") as f: json.dump(self.data, f, indent=2, ensure_ascii=False) def force_save(self): """Force save the tracking data.""" self._save() def _get_channel_cache_file(self, channel_id: str) -> Path: """Get the cache file path for a specific channel.""" # Sanitize channel ID for filename safe_channel_id = re.sub(r'[<>:"/\\|?*]', '_', channel_id) return self.cache_dir / f"{safe_channel_id}.json" def _load_channel_cache(self, channel_id: str) -> List[Dict[str, str]]: """Load cache for a specific channel.""" cache_file = self._get_channel_cache_file(channel_id) if cache_file.exists(): try: with open(cache_file, 'r', encoding='utf-8') as f: data = json.load(f) return data.get('videos', []) except (json.JSONDecodeError, KeyError): print(f" โš ๏ธ Corrupted cache file for {channel_id}, will recreate") return [] return [] def _save_channel_cache(self, channel_id: str, videos: List[Dict[str, str]]): """Save cache for a specific channel.""" cache_file = self._get_channel_cache_file(channel_id) data = { 'channel_id': channel_id, 'videos': videos, 'last_updated': datetime.now().isoformat(), 'video_count': len(videos) } with open(cache_file, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) def _clear_channel_cache(self, channel_id: str): """Clear cache for a specific channel.""" cache_file = self._get_channel_cache_file(channel_id) if cache_file.exists(): cache_file.unlink() print(f" ๐Ÿ—‘๏ธ Cleared cache file: {cache_file.name}") def get_cache_info(self): """Get information about all channel cache files.""" cache_files = list(self.cache_dir.glob("*.json")) total_videos = 0 cache_info = [] for cache_file in cache_files: try: with open(cache_file, 'r', encoding='utf-8') as f: data = json.load(f) video_count = len(data.get('videos', [])) total_videos += video_count last_updated = data.get('last_updated', 'Unknown') cache_info.append({ 'channel': data.get('channel_id', cache_file.stem), 'videos': video_count, 'last_updated': last_updated, 'file': cache_file.name }) except Exception as e: print(f"โš ๏ธ Error reading cache file {cache_file.name}: {e}") return { 'total_channels': len(cache_files), 'total_videos': total_videos, 'channels': cache_info } def clear_channel_cache(self, channel_id=None): """Clear cache for a specific channel or all channels.""" if channel_id: self._clear_channel_cache(channel_id) print(f"๐Ÿ—‘๏ธ Cleared cache for channel: {channel_id}") else: # Clear all cache files cache_files = list(self.cache_dir.glob("*.json")) for cache_file in cache_files: cache_file.unlink() print(f"๐Ÿ—‘๏ธ Cleared all {len(cache_files)} channel cache files") def set_cache_duration(self, hours): """Placeholder for cache duration logic""" pass def export_playlist_report(self, playlist_id): """Export a report for a specific playlist.""" pass def get_statistics(self): """Get statistics about tracked songs.""" total_songs = len(self.data["songs"]) downloaded_songs = sum( 1 for s in self.data["songs"].values() if s["status"] in [SongStatus.DOWNLOADED, SongStatus.CONVERTED] ) failed_songs = sum( 1 for s in self.data["songs"].values() if s["status"] == SongStatus.FAILED ) partial_songs = sum( 1 for s in self.data["songs"].values() if s["status"] == SongStatus.PARTIAL ) total_playlists = len(self.data["playlists"]) total_size_mb = sum( s.get("file_size", 0) for s in self.data["songs"].values() if s.get("file_size") ) / (1024 * 1024) last_updated = max( ( s.get("last_updated") for s in self.data["songs"].values() if s.get("last_updated") ), default=None, ) return { "total_songs": total_songs, "downloaded_songs": downloaded_songs, "failed_songs": failed_songs, "partial_songs": partial_songs, "total_playlists": total_playlists, "total_size_mb": round(total_size_mb, 2), "last_updated": last_updated, } def get_playlist_songs(self, playlist_id): """Get songs for a specific playlist.""" return [ s for s in self.data["songs"].values() if s["playlist_id"] == playlist_id ] def get_failed_songs(self, playlist_id=None): """Get failed songs, optionally filtered by playlist.""" if playlist_id: return [ s for s in self.data["songs"].values() if s["playlist_id"] == playlist_id and s["status"] == SongStatus.FAILED ] return [ s for s in self.data["songs"].values() if s["status"] == SongStatus.FAILED ] def get_partial_downloads(self, playlist_id=None): """Get partial downloads, optionally filtered by playlist.""" if playlist_id: return [ s for s in self.data["songs"].values() if s["playlist_id"] == playlist_id and s["status"] == SongStatus.PARTIAL ] return [ s for s in self.data["songs"].values() if s["status"] == SongStatus.PARTIAL ] def cleanup_orphaned_files(self, downloads_dir): """Remove tracking entries for files that no longer exist.""" orphaned = [] for song_id, song in list(self.data["songs"].items()): file_path = song.get("file_path") if file_path and not Path(file_path).exists(): orphaned.append(song_id) del self.data["songs"][song_id] self.force_save() return orphaned def is_song_downloaded(self, artist, title, channel_name=None, video_id=None): """ Check if a song has already been downloaded. Returns True if the song exists in tracking with DOWNLOADED status. """ # If we have video_id and channel_name, try direct key lookup first (most efficient) if video_id and channel_name: song_key = f"{video_id}@{channel_name}" if song_key in self.data["songs"]: song_data = self.data["songs"][song_key] if song_data.get("status") == SongStatus.DOWNLOADED: return True # Fallback to content search (for cases where we don't have video_id) for song_id, song_data in self.data["songs"].items(): # Check if this song matches the artist and title if song_data.get("artist") == artist and song_data.get("title") == title: # Check if it's marked as downloaded if song_data.get("status") == SongStatus.DOWNLOADED: return True # Also check the video title field which might contain the song info video_title = song_data.get("video_title", "") if video_title and artist in video_title and title in video_title: if song_data.get("status") == SongStatus.DOWNLOADED: return True return False def is_file_exists(self, file_path): """ Check if a file already exists on the filesystem. """ return Path(file_path).exists() def is_song_failed(self, artist, title, channel_name=None, video_id=None): """ Check if a song has previously failed to download. Returns True if the song exists in tracking with FAILED status. """ # If we have video_id and channel_name, try direct key lookup first (most efficient) if video_id and channel_name: song_key = f"{video_id}@{channel_name}" if song_key in self.data["songs"]: song_data = self.data["songs"][song_key] if song_data.get("status") == SongStatus.FAILED: return True # Fallback to content search (for cases where we don't have video_id) for song_id, song_data in self.data["songs"].items(): # Check if this song matches the artist and title if song_data.get("artist") == artist and song_data.get("title") == title: # Check if it's marked as failed if song_data.get("status") == SongStatus.FAILED: return True # Also check the video title field which might contain the song info video_title = song_data.get("video_title", "") if video_title and artist in video_title and title in video_title: if song_data.get("status") == SongStatus.FAILED: return True return False def mark_song_downloaded( self, artist, title, video_id, channel_name, file_path, file_size=None ): """ Mark a song as downloaded in the tracking system. """ # Use the existing tracking structure: video_id@channel_name song_key = f"{video_id}@{channel_name}" self.data["songs"][song_key] = { "artist": artist, "title": title, "video_id": video_id, "channel_name": channel_name, "video_title": f"{artist} - {title}", "file_path": str(file_path), "file_size": file_size, "status": SongStatus.DOWNLOADED, "last_updated": datetime.now().isoformat(), } self._save() def mark_song_failed( self, artist, title, video_id, channel_name, error_message=None ): """ Mark a song as failed in the tracking system. """ # Use the existing tracking structure: video_id@channel_name song_key = f"{video_id}@{channel_name}" self.data["songs"][song_key] = { "artist": artist, "title": title, "video_id": video_id, "channel_name": channel_name, "video_title": f"{artist} - {title}", "status": SongStatus.FAILED, "error_message": error_message, "last_updated": datetime.now().isoformat(), } self._save() def get_channel_video_list( self, channel_url, yt_dlp_path="downloader/yt-dlp.exe", force_refresh=False, show_pagination=False ): """ Return a list of videos (dicts with 'title' and 'id') for the channel, using cache if available unless force_refresh is True. Args: channel_url: YouTube channel URL yt_dlp_path: Path to yt-dlp executable force_refresh: Force refresh cache even if available show_pagination: Show page-by-page progress (slower but more detailed) """ channel_name, channel_id = None, None # Check if this is a manual channel from karaoke_downloader.manual_video_manager import is_manual_channel, get_manual_channel_info, get_manual_videos_for_channel if is_manual_channel(channel_url): channel_name, channel_id = get_manual_channel_info(channel_url) if channel_name and channel_id: print(f" ๐Ÿ“‹ Loading manual videos for {channel_name}") manual_videos = get_manual_videos_for_channel(channel_name) # Convert to the expected format videos = [] for video in manual_videos: videos.append({ "title": video.get("title", ""), "id": video.get("id", ""), "url": video.get("url", "") }) print(f" โœ… Loaded {len(videos)} manual videos") return videos else: print(f" โŒ Could not get manual channel info for: {channel_url}") return [] # Regular YouTube channel processing from karaoke_downloader.youtube_utils import get_channel_info channel_name, channel_id = get_channel_info(channel_url) if not channel_id: print(f" โŒ Could not extract channel ID from URL: {channel_url}") return [] print(f" ๐Ÿ” Channel: {channel_name} (ID: {channel_id})") # Check if we have cached data for this channel if not force_refresh: cached_videos = self._load_channel_cache(channel_id) if cached_videos: # Validate that the cached data has proper video IDs corrupted = False # Check if any video IDs look like titles instead of proper YouTube IDs for video in cached_videos[:20]: # Check first 20 videos video_id = video.get("id", "") # More comprehensive validation - YouTube IDs should be 11 characters and contain only alphanumeric, hyphens, and underscores if video_id and ( len(video_id) != 11 or not video_id.replace('-', '').replace('_', '').isalnum() or " " in video_id or "Lyrics" in video_id or "KARAOKE" in video_id.upper() or "Vocal" in video_id or "Guide" in video_id ): print(f" โš ๏ธ Detected corrupted video ID in cache: '{video_id}'") corrupted = True break if corrupted: print(f" ๐Ÿงน Clearing corrupted cache for {channel_id}") self._clear_channel_cache(channel_id) force_refresh = True else: print(f" ๐Ÿ“‹ Using cached video list ({len(cached_videos)} videos)") return cached_videos # Choose fetch method based on show_pagination flag if show_pagination: return self._fetch_videos_with_pagination(channel_url, channel_id, yt_dlp_path) else: return self._fetch_videos_flat_playlist(channel_url, channel_id, yt_dlp_path) def _fetch_videos_with_pagination(self, channel_url, channel_id, yt_dlp_path): """Fetch videos showing page-by-page progress.""" print(f" ๐ŸŒ Fetching video list from YouTube (page-by-page mode)...") print(f" ๐Ÿ“ก Channel URL: {channel_url}") import subprocess all_videos = [] page = 1 videos_per_page = 200 # YouTube/yt-dlp supports up to 200 videos per page, reducing API calls and errors while True: print(f" ๐Ÿ“„ Fetching page {page}...") # Fetch one page at a time cmd = [ yt_dlp_path, "--flat-playlist", "--print", "%(title)s|%(id)s|%(url)s", "--playlist-start", str((page - 1) * videos_per_page + 1), "--playlist-end", str(page * videos_per_page), channel_url, ] try: # Increased timeout to 180 seconds for larger pages (200 videos) result = subprocess.run(cmd, capture_output=True, text=True, check=True, timeout=180) lines = result.stdout.strip().splitlines() # Save raw output for debugging (for each page) raw_output_file = self._get_channel_cache_file(channel_id).parent / f"{channel_id}_raw_output_page{page}.txt" try: with open(raw_output_file, 'w', encoding='utf-8') as f: f.write(f"# Raw yt-dlp output for {channel_id} - Page {page}\n") f.write(f"# Channel URL: {channel_url}\n") f.write(f"# Command: {' '.join(cmd)}\n") f.write(f"# Timestamp: {datetime.now().isoformat()}\n") f.write(f"# Total lines: {len(lines)}\n") f.write("#" * 80 + "\n\n") for i, line in enumerate(lines, 1): f.write(f"{i:6d}: {line}\n") print(f" ๐Ÿ’พ Saved raw output to: {raw_output_file.name}") except Exception as e: print(f" โš ๏ธ Could not save raw output: {e}") if not lines: print(f" โœ… No more videos found on page {page}") break print(f" ๐Ÿ“Š Page {page}: Found {len(lines)} videos") page_videos = [] invalid_count = 0 for line in lines: if not line.strip(): continue # More robust parsing that handles titles with | characters # Extract video ID directly from the URL that yt-dlp provides # Find the URL and extract video ID from it url_match = re.search(r'https://www\.youtube\.com/watch\?v=([a-zA-Z0-9_-]{11})', line) if not url_match: continue # Extract video ID directly from the URL video_id = url_match.group(1) # Extract title (everything before the video ID in the line) title = line[:line.find(video_id)].rstrip('|').strip() # Validate video ID if video_id and ( len(video_id) == 11 and video_id.replace('-', '').replace('_', '').isalnum() and " " not in video_id and "Lyrics" not in video_id and "KARAOKE" not in video_id.upper() and "Vocal" not in video_id and "Guide" not in video_id ): page_videos.append({"title": title, "id": video_id}) else: invalid_count += 1 if invalid_count <= 3: # Show first 3 invalid IDs per page print(f" โš ๏ธ Invalid ID: '{video_id}' for '{title[:50]}...'") if invalid_count > 3: print(f" โš ๏ธ ... and {invalid_count - 3} more invalid IDs on this page") all_videos.extend(page_videos) print(f" โœ… Page {page}: Added {len(page_videos)} valid videos (total: {len(all_videos)})") # If we got fewer videos than expected, we're probably at the end if len(lines) < videos_per_page: print(f" ๐Ÿ Reached end of channel (last page had {len(lines)} videos)") break page += 1 # Safety check to prevent infinite loops if page > 50: # Max 50 pages (10,000 videos with 200 per page) print(f" โš ๏ธ Reached maximum page limit (50 pages), stopping") break except subprocess.TimeoutExpired: print(f" โš ๏ธ Page {page} timed out, stopping") break except subprocess.CalledProcessError as e: print(f" โŒ Error fetching page {page}: {e}") break except KeyboardInterrupt: print(f" โน๏ธ User interrupted, stopping at page {page}") break if not all_videos: print(f" โŒ No valid videos found") return [] print(f" ๐ŸŽ‰ Channel download complete!") print(f" ๐Ÿ“Š Total videos fetched: {len(all_videos)}") # Save to individual channel cache file self._save_channel_cache(channel_id, all_videos) print(f" ๐Ÿ’พ Saved cache to: {self._get_channel_cache_file(channel_id).name}") return all_videos def _fetch_videos_flat_playlist(self, channel_url, channel_id, yt_dlp_path): """Fetch all videos using flat playlist (faster but less detailed progress).""" # Fetch with yt-dlp print(f" ๐ŸŒ Fetching video list from YouTube (this may take a while)...") print(f" ๐Ÿ“ก Channel URL: {channel_url}") import subprocess from karaoke_downloader.youtube_utils import _parse_yt_dlp_command # First, let's get the total count to show progress count_cmd = _parse_yt_dlp_command(yt_dlp_path) + [ "--flat-playlist", "--print", "%(title)s", "--playlist-end", "1", # Just get first video to test channel_url, ] try: print(f" ๐Ÿ” Testing channel access...") test_result = subprocess.run(count_cmd, capture_output=True, text=True, timeout=30) if test_result.returncode == 0: print(f" โœ… Channel is accessible") else: print(f" โš ๏ธ Channel test failed: {test_result.stderr}") except subprocess.TimeoutExpired: print(f" โš ๏ธ Channel test timed out") except Exception as e: print(f" โš ๏ธ Channel test error: {e}") # Now fetch all videos with progress indicators cmd = _parse_yt_dlp_command(yt_dlp_path) + [ "--flat-playlist", "--print", "%(title)s|%(id)s|%(url)s", "--verbose", # Add verbose output to see what's happening channel_url, ] try: print(f" ๐Ÿ”ง Running yt-dlp command: {' '.join(cmd)}") print(f" ๐Ÿ“ฅ Starting video list download...") # Use a timeout and show progress result = subprocess.run(cmd, capture_output=True, text=True, check=True, timeout=300) lines = result.stdout.strip().splitlines() # Save raw output for debugging raw_output_file = self._get_channel_cache_file(channel_id).parent / f"{channel_id}_raw_output.txt" try: with open(raw_output_file, 'w', encoding='utf-8') as f: f.write(f"# Raw yt-dlp output for {channel_id}\n") f.write(f"# Channel URL: {channel_url}\n") f.write(f"# Command: {' '.join(cmd)}\n") f.write(f"# Timestamp: {datetime.now().isoformat()}\n") f.write(f"# Total lines: {len(lines)}\n") f.write("#" * 80 + "\n\n") for i, line in enumerate(lines, 1): f.write(f"{i:6d}: {line}\n") print(f" ๐Ÿ’พ Saved raw output to: {raw_output_file.name}") except Exception as e: print(f" โš ๏ธ Could not save raw output: {e}") print(f" ๐Ÿ“„ Raw output lines: {len(lines)}") print(f" ๐Ÿ“Š Download completed successfully!") # Show some sample lines to understand the format if lines: print(f" ๐Ÿ“‹ Sample output format:") for i, line in enumerate(lines[:3]): print(f" Line {i+1}: {line[:100]}...") if len(lines) > 3: print(f" ... and {len(lines) - 3} more lines") videos = [] invalid_count = 0 print(f" ๐Ÿ” Processing {len(lines)} video entries...") for i, line in enumerate(lines): if i % 1000 == 0 and i > 0: # Progress indicator every 1000 lines print(f" ๐Ÿ“Š Processing line {i}/{len(lines)}... ({i/len(lines)*100:.1f}%)") # More robust parsing that handles titles with | characters # Extract video ID directly from the URL that yt-dlp provides # Find the URL and extract video ID from it url_match = re.search(r'https://www\.youtube\.com/watch\?v=([a-zA-Z0-9_-]{11})', line) if not url_match: invalid_count += 1 if invalid_count <= 5: print(f" โš ๏ธ Skipping line with no URL: '{line[:100]}...'") elif invalid_count == 6: print(f" โš ๏ธ ... and {len(lines) - i - 1} more invalid lines") continue # Extract video ID directly from the URL video_id = url_match.group(1) # Extract title (everything before the video ID in the line) title = line[:line.find(video_id)].rstrip('|').strip() # Validate video ID if video_id and ( len(video_id) == 11 and video_id.replace('-', '').replace('_', '').isalnum() and " " not in video_id and "Lyrics" not in video_id and "KARAOKE" not in video_id.upper() and "Vocal" not in video_id and "Guide" not in video_id ): videos.append({"title": title, "id": video_id}) else: invalid_count += 1 if invalid_count <= 5: # Only show first 5 invalid IDs print(f" โš ๏ธ Skipping invalid video ID: '{video_id}' for title: '{title[:50]}...'") elif invalid_count == 6: print(f" โš ๏ธ ... and {len(lines) - i - 1} more invalid IDs") if not videos: print(f" โŒ No valid videos found after parsing") return [] print(f" โœ… Parsed {len(videos)} valid videos from YouTube") print(f" โš ๏ธ Skipped {invalid_count} invalid video IDs") # Save to individual channel cache file self._save_channel_cache(channel_id, videos) print(f" ๐Ÿ’พ Saved cache to: {self._get_channel_cache_file(channel_id).name}") return videos except subprocess.TimeoutExpired: print(f"โŒ yt-dlp timed out after 5 minutes - channel may be too large") return [] except subprocess.CalledProcessError as e: print(f"โŒ yt-dlp failed to fetch playlist for cache: {e}") print(f" ๐Ÿ“„ stderr: {e.stderr}") return []