KaraokeVideoDownloader/karaoke_downloader/tracking_manager.py

348 lines
12 KiB
Python

import threading
from enum import Enum
import json
from datetime import datetime
from pathlib import Path
class SongStatus(str, Enum):
NOT_DOWNLOADED = "NOT_DOWNLOADED"
DOWNLOADING = "DOWNLOADING"
DOWNLOADED = "DOWNLOADED"
PARTIAL = "PARTIAL"
FAILED = "FAILED"
CONVERTING = "CONVERTING"
CONVERTED = "CONVERTED"
class FormatType(str, Enum):
MP4 = "MP4"
MP3 = "MP3"
OTHER = "OTHER"
class TrackingManager:
def __init__(
self,
tracking_file="data/karaoke_tracking.json",
cache_file="data/channel_cache.json",
):
self.tracking_file = Path(tracking_file)
self.cache_file = Path(cache_file)
self.data = {"playlists": {}, "songs": {}}
self.cache = {}
self._lock = threading.Lock()
self._load()
self._load_cache()
def _load(self):
if self.tracking_file.exists():
try:
with open(self.tracking_file, "r", encoding="utf-8") as f:
self.data = json.load(f)
except Exception:
self.data = {"playlists": {}, "songs": {}}
def _save(self):
with self._lock:
with open(self.tracking_file, "w", encoding="utf-8") as f:
json.dump(self.data, f, indent=2, ensure_ascii=False)
def force_save(self):
self._save()
def _load_cache(self):
if self.cache_file.exists():
try:
with open(self.cache_file, "r", encoding="utf-8") as f:
self.cache = json.load(f)
except Exception:
self.cache = {}
def save_cache(self):
with open(self.cache_file, "w", encoding="utf-8") as f:
json.dump(self.cache, f, indent=2, ensure_ascii=False)
def get_statistics(self):
total_songs = len(self.data["songs"])
downloaded_songs = sum(
1
for s in self.data["songs"].values()
if s["status"] in [SongStatus.DOWNLOADED, SongStatus.CONVERTED]
)
failed_songs = sum(
1 for s in self.data["songs"].values() if s["status"] == SongStatus.FAILED
)
partial_songs = sum(
1 for s in self.data["songs"].values() if s["status"] == SongStatus.PARTIAL
)
total_playlists = len(self.data["playlists"])
total_size_mb = sum(
s.get("file_size", 0)
for s in self.data["songs"].values()
if s.get("file_size")
) / (1024 * 1024)
last_updated = max(
(
s.get("last_updated")
for s in self.data["songs"].values()
if s.get("last_updated")
),
default=None,
)
return {
"total_songs": total_songs,
"downloaded_songs": downloaded_songs,
"failed_songs": failed_songs,
"partial_songs": partial_songs,
"total_playlists": total_playlists,
"total_size_mb": round(total_size_mb, 2),
"last_updated": last_updated,
}
def get_playlist_songs(self, playlist_id):
return [
s for s in self.data["songs"].values() if s["playlist_id"] == playlist_id
]
def get_failed_songs(self, playlist_id=None):
if playlist_id:
return [
s
for s in self.data["songs"].values()
if s["playlist_id"] == playlist_id and s["status"] == SongStatus.FAILED
]
return [
s for s in self.data["songs"].values() if s["status"] == SongStatus.FAILED
]
def get_partial_downloads(self, playlist_id=None):
if playlist_id:
return [
s
for s in self.data["songs"].values()
if s["playlist_id"] == playlist_id and s["status"] == SongStatus.PARTIAL
]
return [
s for s in self.data["songs"].values() if s["status"] == SongStatus.PARTIAL
]
def cleanup_orphaned_files(self, downloads_dir):
# Remove tracking entries for files that no longer exist
orphaned = []
for song_id, song in list(self.data["songs"].items()):
file_path = song.get("file_path")
if file_path and not Path(file_path).exists():
orphaned.append(song_id)
del self.data["songs"][song_id]
self.force_save()
return orphaned
def get_cache_info(self):
total_channels = len(self.cache)
total_cached_videos = sum(len(v) for v in self.cache.values())
cache_duration_hours = 24 # default
last_updated = None
return {
"total_channels": total_channels,
"total_cached_videos": total_cached_videos,
"cache_duration_hours": cache_duration_hours,
"last_updated": last_updated,
}
def clear_channel_cache(self, channel_id=None):
if channel_id is None or channel_id == "all":
self.cache = {}
else:
self.cache.pop(channel_id, None)
self.save_cache()
def set_cache_duration(self, hours):
# Placeholder for cache duration logic
pass
def export_playlist_report(self, playlist_id):
playlist = self.data["playlists"].get(playlist_id)
if not playlist:
return f"Playlist '{playlist_id}' not found."
songs = self.get_playlist_songs(playlist_id)
report = {"playlist": playlist, "songs": songs}
return json.dumps(report, indent=2, ensure_ascii=False)
def is_song_downloaded(self, artist, title, channel_name=None, video_id=None):
"""
Check if a song has already been downloaded by this system.
Returns True if the song exists in tracking with DOWNLOADED or CONVERTED status.
"""
# If we have video_id and channel_name, try direct key lookup first (most efficient)
if video_id and channel_name:
song_key = f"{video_id}@{channel_name}"
if song_key in self.data["songs"]:
song_data = self.data["songs"][song_key]
if song_data.get("status") in [
SongStatus.DOWNLOADED,
SongStatus.CONVERTED,
]:
return True
# Fallback to content search (for cases where we don't have video_id)
for song_id, song_data in self.data["songs"].items():
# Check if this song matches the artist and title
if song_data.get("artist") == artist and song_data.get("title") == title:
# Check if it's marked as downloaded
if song_data.get("status") in [
SongStatus.DOWNLOADED,
SongStatus.CONVERTED,
]:
return True
# Also check the video title field which might contain the song info
video_title = song_data.get("video_title", "")
if video_title and artist in video_title and title in video_title:
if song_data.get("status") in [
SongStatus.DOWNLOADED,
SongStatus.CONVERTED,
]:
return True
return False
def is_file_exists(self, file_path):
"""
Check if a file already exists on the filesystem.
"""
return Path(file_path).exists()
def is_song_failed(self, artist, title, channel_name=None, video_id=None):
"""
Check if a song has previously failed to download.
Returns True if the song exists in tracking with FAILED status.
"""
# If we have video_id and channel_name, try direct key lookup first (most efficient)
if video_id and channel_name:
song_key = f"{video_id}@{channel_name}"
if song_key in self.data["songs"]:
song_data = self.data["songs"][song_key]
if song_data.get("status") == SongStatus.FAILED:
return True
# Fallback to content search (for cases where we don't have video_id)
for song_id, song_data in self.data["songs"].items():
# Check if this song matches the artist and title
if song_data.get("artist") == artist and song_data.get("title") == title:
# Check if it's marked as failed
if song_data.get("status") == SongStatus.FAILED:
return True
# Also check the video title field which might contain the song info
video_title = song_data.get("video_title", "")
if video_title and artist in video_title and title in video_title:
if song_data.get("status") == SongStatus.FAILED:
return True
return False
def mark_song_downloaded(
self, artist, title, video_id, channel_name, file_path, file_size=None
):
"""
Mark a song as downloaded in the tracking system.
"""
# Use the existing tracking structure: video_id@channel_name
song_key = f"{video_id}@{channel_name}"
self.data["songs"][song_key] = {
"artist": artist,
"title": title,
"video_id": video_id,
"channel_name": channel_name,
"video_title": f"{artist} - {title}",
"file_path": str(file_path),
"file_size": file_size,
"status": SongStatus.DOWNLOADED,
"last_updated": datetime.now().isoformat(),
}
self._save()
def mark_song_failed(
self, artist, title, video_id, channel_name, error_message=None
):
"""
Mark a song as failed in the tracking system.
"""
# Use the existing tracking structure: video_id@channel_name
song_key = f"{video_id}@{channel_name}"
self.data["songs"][song_key] = {
"artist": artist,
"title": title,
"video_id": video_id,
"channel_name": channel_name,
"video_title": f"{artist} - {title}",
"status": SongStatus.FAILED,
"error_message": error_message,
"last_updated": datetime.now().isoformat(),
}
self._save()
def get_channel_video_list(
self, channel_url, yt_dlp_path="downloader/yt-dlp.exe", force_refresh=False
):
"""
Return a list of videos (dicts with 'title' and 'id') for the channel, using cache if available unless force_refresh is True.
"""
channel_name, channel_id = None, None
from karaoke_downloader.youtube_utils import get_channel_info
channel_name, channel_id = get_channel_info(channel_url)
# Try multiple possible cache keys
possible_keys = [
channel_id, # The extracted channel ID
channel_url, # The full URL
channel_name, # The extracted channel name
]
cache_key = None
for key in possible_keys:
if key and key in self.cache:
cache_key = key
break
if not cache_key:
cache_key = channel_id or channel_url # Use as fallback for new entries
print(f" 🔍 Trying cache keys: {possible_keys}")
print(f" 🔍 Selected cache key: '{cache_key}'")
if not force_refresh and cache_key in self.cache:
print(
f" 📋 Using cached video list ({len(self.cache[cache_key])} videos)"
)
return self.cache[cache_key]
else:
print(f" ❌ Cache miss for all keys")
# Fetch with yt-dlp
print(f" 🌐 Fetching video list from YouTube (this may take a while)...")
import subprocess
cmd = [
yt_dlp_path,
"--flat-playlist",
"--print",
"%(title)s|%(id)s|%(url)s",
channel_url,
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
lines = result.stdout.strip().splitlines()
videos = []
for line in lines:
parts = line.split("|")
if len(parts) >= 2:
title, video_id = parts[0].strip(), parts[1].strip()
videos.append({"title": title, "id": video_id})
self.cache[cache_key] = videos
self.save_cache()
return videos
except subprocess.CalledProcessError as e:
print(f"❌ yt-dlp failed to fetch playlist for cache: {e}")
return []