import threading from enum import Enum import json from datetime import datetime from pathlib import Path class SongStatus(str, Enum): NOT_DOWNLOADED = "NOT_DOWNLOADED" DOWNLOADING = "DOWNLOADING" DOWNLOADED = "DOWNLOADED" PARTIAL = "PARTIAL" FAILED = "FAILED" CONVERTING = "CONVERTING" CONVERTED = "CONVERTED" class FormatType(str, Enum): MP4 = "MP4" MP3 = "MP3" OTHER = "OTHER" class TrackingManager: def __init__( self, tracking_file="data/karaoke_tracking.json", cache_file="data/channel_cache.json", ): self.tracking_file = Path(tracking_file) self.cache_file = Path(cache_file) self.data = {"playlists": {}, "songs": {}} self.cache = {} self._lock = threading.Lock() self._load() self._load_cache() def _load(self): if self.tracking_file.exists(): try: with open(self.tracking_file, "r", encoding="utf-8") as f: self.data = json.load(f) except Exception: self.data = {"playlists": {}, "songs": {}} def _save(self): with self._lock: with open(self.tracking_file, "w", encoding="utf-8") as f: json.dump(self.data, f, indent=2, ensure_ascii=False) def force_save(self): self._save() def _load_cache(self): if self.cache_file.exists(): try: with open(self.cache_file, "r", encoding="utf-8") as f: self.cache = json.load(f) except Exception: self.cache = {} def save_cache(self): with open(self.cache_file, "w", encoding="utf-8") as f: json.dump(self.cache, f, indent=2, ensure_ascii=False) def get_statistics(self): total_songs = len(self.data["songs"]) downloaded_songs = sum( 1 for s in self.data["songs"].values() if s["status"] in [SongStatus.DOWNLOADED, SongStatus.CONVERTED] ) failed_songs = sum( 1 for s in self.data["songs"].values() if s["status"] == SongStatus.FAILED ) partial_songs = sum( 1 for s in self.data["songs"].values() if s["status"] == SongStatus.PARTIAL ) total_playlists = len(self.data["playlists"]) total_size_mb = sum( s.get("file_size", 0) for s in self.data["songs"].values() if s.get("file_size") ) / (1024 * 1024) last_updated = max( ( s.get("last_updated") for s in self.data["songs"].values() if s.get("last_updated") ), default=None, ) return { "total_songs": total_songs, "downloaded_songs": downloaded_songs, "failed_songs": failed_songs, "partial_songs": partial_songs, "total_playlists": total_playlists, "total_size_mb": round(total_size_mb, 2), "last_updated": last_updated, } def get_playlist_songs(self, playlist_id): return [ s for s in self.data["songs"].values() if s["playlist_id"] == playlist_id ] def get_failed_songs(self, playlist_id=None): if playlist_id: return [ s for s in self.data["songs"].values() if s["playlist_id"] == playlist_id and s["status"] == SongStatus.FAILED ] return [ s for s in self.data["songs"].values() if s["status"] == SongStatus.FAILED ] def get_partial_downloads(self, playlist_id=None): if playlist_id: return [ s for s in self.data["songs"].values() if s["playlist_id"] == playlist_id and s["status"] == SongStatus.PARTIAL ] return [ s for s in self.data["songs"].values() if s["status"] == SongStatus.PARTIAL ] def cleanup_orphaned_files(self, downloads_dir): # Remove tracking entries for files that no longer exist orphaned = [] for song_id, song in list(self.data["songs"].items()): file_path = song.get("file_path") if file_path and not Path(file_path).exists(): orphaned.append(song_id) del self.data["songs"][song_id] self.force_save() return orphaned def get_cache_info(self): total_channels = len(self.cache) total_cached_videos = sum(len(v) for v in self.cache.values()) cache_duration_hours = 24 # default last_updated = None return { "total_channels": total_channels, "total_cached_videos": total_cached_videos, "cache_duration_hours": cache_duration_hours, "last_updated": last_updated, } def clear_channel_cache(self, channel_id=None): if channel_id is None or channel_id == "all": self.cache = {} else: self.cache.pop(channel_id, None) self.save_cache() def set_cache_duration(self, hours): # Placeholder for cache duration logic pass def export_playlist_report(self, playlist_id): playlist = self.data["playlists"].get(playlist_id) if not playlist: return f"Playlist '{playlist_id}' not found." songs = self.get_playlist_songs(playlist_id) report = {"playlist": playlist, "songs": songs} return json.dumps(report, indent=2, ensure_ascii=False) def is_song_downloaded(self, artist, title, channel_name=None, video_id=None): """ Check if a song has already been downloaded by this system. Returns True if the song exists in tracking with DOWNLOADED or CONVERTED status. """ # If we have video_id and channel_name, try direct key lookup first (most efficient) if video_id and channel_name: song_key = f"{video_id}@{channel_name}" if song_key in self.data["songs"]: song_data = self.data["songs"][song_key] if song_data.get("status") in [ SongStatus.DOWNLOADED, SongStatus.CONVERTED, ]: return True # Fallback to content search (for cases where we don't have video_id) for song_id, song_data in self.data["songs"].items(): # Check if this song matches the artist and title if song_data.get("artist") == artist and song_data.get("title") == title: # Check if it's marked as downloaded if song_data.get("status") in [ SongStatus.DOWNLOADED, SongStatus.CONVERTED, ]: return True # Also check the video title field which might contain the song info video_title = song_data.get("video_title", "") if video_title and artist in video_title and title in video_title: if song_data.get("status") in [ SongStatus.DOWNLOADED, SongStatus.CONVERTED, ]: return True return False def is_file_exists(self, file_path): """ Check if a file already exists on the filesystem. """ return Path(file_path).exists() def is_song_failed(self, artist, title, channel_name=None, video_id=None): """ Check if a song has previously failed to download. Returns True if the song exists in tracking with FAILED status. """ # If we have video_id and channel_name, try direct key lookup first (most efficient) if video_id and channel_name: song_key = f"{video_id}@{channel_name}" if song_key in self.data["songs"]: song_data = self.data["songs"][song_key] if song_data.get("status") == SongStatus.FAILED: return True # Fallback to content search (for cases where we don't have video_id) for song_id, song_data in self.data["songs"].items(): # Check if this song matches the artist and title if song_data.get("artist") == artist and song_data.get("title") == title: # Check if it's marked as failed if song_data.get("status") == SongStatus.FAILED: return True # Also check the video title field which might contain the song info video_title = song_data.get("video_title", "") if video_title and artist in video_title and title in video_title: if song_data.get("status") == SongStatus.FAILED: return True return False def mark_song_downloaded( self, artist, title, video_id, channel_name, file_path, file_size=None ): """ Mark a song as downloaded in the tracking system. """ # Use the existing tracking structure: video_id@channel_name song_key = f"{video_id}@{channel_name}" self.data["songs"][song_key] = { "artist": artist, "title": title, "video_id": video_id, "channel_name": channel_name, "video_title": f"{artist} - {title}", "file_path": str(file_path), "file_size": file_size, "status": SongStatus.DOWNLOADED, "last_updated": datetime.now().isoformat(), } self._save() def mark_song_failed( self, artist, title, video_id, channel_name, error_message=None ): """ Mark a song as failed in the tracking system. """ # Use the existing tracking structure: video_id@channel_name song_key = f"{video_id}@{channel_name}" self.data["songs"][song_key] = { "artist": artist, "title": title, "video_id": video_id, "channel_name": channel_name, "video_title": f"{artist} - {title}", "status": SongStatus.FAILED, "error_message": error_message, "last_updated": datetime.now().isoformat(), } self._save() def get_channel_video_list( self, channel_url, yt_dlp_path=None, force_refresh=False ): """ Return a list of videos (dicts with 'title' and 'id') for the channel, using cache if available unless force_refresh is True. """ # Use platform-aware path if none provided if yt_dlp_path is None: from karaoke_downloader.config_manager import load_config config = load_config() yt_dlp_path = config.yt_dlp_path channel_name, channel_id = None, None from karaoke_downloader.youtube_utils import get_channel_info channel_name, channel_id = get_channel_info(channel_url) # Check if cache has the old flat structure or new nested structure cache_data = None cache_key = None # Try nested structure first (new format) if "channels" in self.cache: # Try multiple possible cache keys in nested structure possible_keys = [ channel_id, # The extracted channel ID channel_url, # The full URL channel_name, # The extracted channel name ] for key in possible_keys: if key and key in self.cache["channels"]: cache_data = self.cache["channels"][key]["videos"] cache_key = key break # Try flat structure (old format) as fallback if cache_data is None: possible_keys = [ channel_id, # The extracted channel ID channel_url, # The full URL channel_name, # The extracted channel name ] for key in possible_keys: if key and key in self.cache: cache_data = self.cache[key] cache_key = key break if not cache_key: cache_key = channel_id or channel_url # Use as fallback for new entries print(f" 🔍 Trying cache keys: {possible_keys}") print(f" 🔍 Selected cache key: '{cache_key}'") if not force_refresh and cache_data is not None: print( f" 📋 Using cached video list ({len(cache_data)} videos)" ) # Convert old cache format to new format if needed converted_videos = [] for video in cache_data: if "video_id" in video and "id" not in video: # Convert old format to new format converted_videos.append({ "title": video["title"], "id": video["video_id"] }) else: # Already in new format converted_videos.append(video) return converted_videos else: print(f" ❌ Cache miss for all keys") # Fetch with yt-dlp print(f" 🌐 Fetching video list from YouTube (this may take a while)...") import subprocess from karaoke_downloader.youtube_utils import _parse_yt_dlp_command cmd = _parse_yt_dlp_command(yt_dlp_path) + [ "--flat-playlist", "--print", "%(title)s|%(id)s|%(url)s", channel_url, ] try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) lines = result.stdout.strip().splitlines() videos = [] for line in lines: parts = line.split("|") if len(parts) >= 2: title, video_id = parts[0].strip(), parts[1].strip() videos.append({"title": title, "id": video_id}) # Save in nested structure format if "channels" not in self.cache: self.cache["channels"] = {} self.cache["channels"][cache_key] = { "videos": videos, "last_updated": datetime.now().isoformat(), "channel_name": channel_name, "channel_id": channel_id } self.save_cache() return videos except subprocess.CalledProcessError as e: print(f"❌ yt-dlp failed to fetch playlist for cache: {e}") return []