292 lines
12 KiB
Python
292 lines
12 KiB
Python
import json
|
|
from pathlib import Path
|
|
from enum import Enum
|
|
from datetime import datetime
|
|
import threading
|
|
|
|
class SongStatus(str, Enum):
|
|
NOT_DOWNLOADED = "NOT_DOWNLOADED"
|
|
DOWNLOADING = "DOWNLOADING"
|
|
DOWNLOADED = "DOWNLOADED"
|
|
PARTIAL = "PARTIAL"
|
|
FAILED = "FAILED"
|
|
CONVERTING = "CONVERTING"
|
|
CONVERTED = "CONVERTED"
|
|
|
|
class FormatType(str, Enum):
|
|
MP4 = "MP4"
|
|
MP3 = "MP3"
|
|
OTHER = "OTHER"
|
|
|
|
class TrackingManager:
|
|
def __init__(self, tracking_file="data/karaoke_tracking.json", cache_file="data/channel_cache.json"):
|
|
self.tracking_file = Path(tracking_file)
|
|
self.cache_file = Path(cache_file)
|
|
self.data = {
|
|
"playlists": {},
|
|
"songs": {}
|
|
}
|
|
self.cache = {}
|
|
self._lock = threading.Lock()
|
|
self._load()
|
|
self._load_cache()
|
|
|
|
def _load(self):
|
|
if self.tracking_file.exists():
|
|
try:
|
|
with open(self.tracking_file, 'r', encoding='utf-8') as f:
|
|
self.data = json.load(f)
|
|
except Exception:
|
|
self.data = {"playlists": {}, "songs": {}}
|
|
|
|
def _save(self):
|
|
with self._lock:
|
|
with open(self.tracking_file, 'w', encoding='utf-8') as f:
|
|
json.dump(self.data, f, indent=2, ensure_ascii=False)
|
|
|
|
def force_save(self):
|
|
self._save()
|
|
|
|
def _load_cache(self):
|
|
if self.cache_file.exists():
|
|
try:
|
|
with open(self.cache_file, 'r', encoding='utf-8') as f:
|
|
self.cache = json.load(f)
|
|
except Exception:
|
|
self.cache = {}
|
|
|
|
def save_cache(self):
|
|
with open(self.cache_file, 'w', encoding='utf-8') as f:
|
|
json.dump(self.cache, f, indent=2, ensure_ascii=False)
|
|
|
|
def get_statistics(self):
|
|
total_songs = len(self.data['songs'])
|
|
downloaded_songs = sum(1 for s in self.data['songs'].values() if s['status'] in [SongStatus.DOWNLOADED, SongStatus.CONVERTED])
|
|
failed_songs = sum(1 for s in self.data['songs'].values() if s['status'] == SongStatus.FAILED)
|
|
partial_songs = sum(1 for s in self.data['songs'].values() if s['status'] == SongStatus.PARTIAL)
|
|
total_playlists = len(self.data['playlists'])
|
|
total_size_mb = sum(s.get('file_size', 0) for s in self.data['songs'].values() if s.get('file_size')) / (1024*1024)
|
|
last_updated = max((s.get('last_updated') for s in self.data['songs'].values() if s.get('last_updated')), default=None)
|
|
return {
|
|
"total_songs": total_songs,
|
|
"downloaded_songs": downloaded_songs,
|
|
"failed_songs": failed_songs,
|
|
"partial_songs": partial_songs,
|
|
"total_playlists": total_playlists,
|
|
"total_size_mb": round(total_size_mb, 2),
|
|
"last_updated": last_updated
|
|
}
|
|
|
|
def get_playlist_songs(self, playlist_id):
|
|
return [s for s in self.data['songs'].values() if s['playlist_id'] == playlist_id]
|
|
|
|
def get_failed_songs(self, playlist_id=None):
|
|
if playlist_id:
|
|
return [s for s in self.data['songs'].values() if s['playlist_id'] == playlist_id and s['status'] == SongStatus.FAILED]
|
|
return [s for s in self.data['songs'].values() if s['status'] == SongStatus.FAILED]
|
|
|
|
def get_partial_downloads(self, playlist_id=None):
|
|
if playlist_id:
|
|
return [s for s in self.data['songs'].values() if s['playlist_id'] == playlist_id and s['status'] == SongStatus.PARTIAL]
|
|
return [s for s in self.data['songs'].values() if s['status'] == SongStatus.PARTIAL]
|
|
|
|
def cleanup_orphaned_files(self, downloads_dir):
|
|
# Remove tracking entries for files that no longer exist
|
|
orphaned = []
|
|
for song_id, song in list(self.data['songs'].items()):
|
|
file_path = song.get('file_path')
|
|
if file_path and not Path(file_path).exists():
|
|
orphaned.append(song_id)
|
|
del self.data['songs'][song_id]
|
|
self.force_save()
|
|
return orphaned
|
|
|
|
def get_cache_info(self):
|
|
total_channels = len(self.cache)
|
|
total_cached_videos = sum(len(v) for v in self.cache.values())
|
|
cache_duration_hours = 24 # default
|
|
last_updated = None
|
|
return {
|
|
"total_channels": total_channels,
|
|
"total_cached_videos": total_cached_videos,
|
|
"cache_duration_hours": cache_duration_hours,
|
|
"last_updated": last_updated
|
|
}
|
|
|
|
def clear_channel_cache(self, channel_id=None):
|
|
if channel_id is None or channel_id == 'all':
|
|
self.cache = {}
|
|
else:
|
|
self.cache.pop(channel_id, None)
|
|
self.save_cache()
|
|
|
|
def set_cache_duration(self, hours):
|
|
# Placeholder for cache duration logic
|
|
pass
|
|
|
|
def export_playlist_report(self, playlist_id):
|
|
playlist = self.data['playlists'].get(playlist_id)
|
|
if not playlist:
|
|
return f"Playlist '{playlist_id}' not found."
|
|
songs = self.get_playlist_songs(playlist_id)
|
|
report = {
|
|
"playlist": playlist,
|
|
"songs": songs
|
|
}
|
|
return json.dumps(report, indent=2, ensure_ascii=False)
|
|
|
|
def is_song_downloaded(self, artist, title, channel_name=None, video_id=None):
|
|
"""
|
|
Check if a song has already been downloaded by this system.
|
|
Returns True if the song exists in tracking with DOWNLOADED or CONVERTED status.
|
|
"""
|
|
# If we have video_id and channel_name, try direct key lookup first (most efficient)
|
|
if video_id and channel_name:
|
|
song_key = f"{video_id}@{channel_name}"
|
|
if song_key in self.data['songs']:
|
|
song_data = self.data['songs'][song_key]
|
|
if song_data.get('status') in [SongStatus.DOWNLOADED, SongStatus.CONVERTED]:
|
|
return True
|
|
|
|
# Fallback to content search (for cases where we don't have video_id)
|
|
for song_id, song_data in self.data['songs'].items():
|
|
# Check if this song matches the artist and title
|
|
if song_data.get('artist') == artist and song_data.get('title') == title:
|
|
# Check if it's marked as downloaded
|
|
if song_data.get('status') in [SongStatus.DOWNLOADED, SongStatus.CONVERTED]:
|
|
return True
|
|
# Also check the video title field which might contain the song info
|
|
video_title = song_data.get('video_title', '')
|
|
if video_title and artist in video_title and title in video_title:
|
|
if song_data.get('status') in [SongStatus.DOWNLOADED, SongStatus.CONVERTED]:
|
|
return True
|
|
return False
|
|
|
|
def is_file_exists(self, file_path):
|
|
"""
|
|
Check if a file already exists on the filesystem.
|
|
"""
|
|
return Path(file_path).exists()
|
|
|
|
def is_song_failed(self, artist, title, channel_name=None, video_id=None):
|
|
"""
|
|
Check if a song has previously failed to download.
|
|
Returns True if the song exists in tracking with FAILED status.
|
|
"""
|
|
# If we have video_id and channel_name, try direct key lookup first (most efficient)
|
|
if video_id and channel_name:
|
|
song_key = f"{video_id}@{channel_name}"
|
|
if song_key in self.data['songs']:
|
|
song_data = self.data['songs'][song_key]
|
|
if song_data.get('status') == SongStatus.FAILED:
|
|
return True
|
|
|
|
# Fallback to content search (for cases where we don't have video_id)
|
|
for song_id, song_data in self.data['songs'].items():
|
|
# Check if this song matches the artist and title
|
|
if song_data.get('artist') == artist and song_data.get('title') == title:
|
|
# Check if it's marked as failed
|
|
if song_data.get('status') == SongStatus.FAILED:
|
|
return True
|
|
# Also check the video title field which might contain the song info
|
|
video_title = song_data.get('video_title', '')
|
|
if video_title and artist in video_title and title in video_title:
|
|
if song_data.get('status') == SongStatus.FAILED:
|
|
return True
|
|
return False
|
|
|
|
def mark_song_downloaded(self, artist, title, video_id, channel_name, file_path, file_size=None):
|
|
"""
|
|
Mark a song as downloaded in the tracking system.
|
|
"""
|
|
# Use the existing tracking structure: video_id@channel_name
|
|
song_key = f"{video_id}@{channel_name}"
|
|
|
|
self.data['songs'][song_key] = {
|
|
'artist': artist,
|
|
'title': title,
|
|
'video_id': video_id,
|
|
'channel_name': channel_name,
|
|
'video_title': f"{artist} - {title}",
|
|
'file_path': str(file_path),
|
|
'file_size': file_size,
|
|
'status': SongStatus.DOWNLOADED,
|
|
'last_updated': datetime.now().isoformat()
|
|
}
|
|
self._save()
|
|
|
|
def mark_song_failed(self, artist, title, video_id, channel_name, error_message=None):
|
|
"""
|
|
Mark a song as failed in the tracking system.
|
|
"""
|
|
# Use the existing tracking structure: video_id@channel_name
|
|
song_key = f"{video_id}@{channel_name}"
|
|
|
|
self.data['songs'][song_key] = {
|
|
'artist': artist,
|
|
'title': title,
|
|
'video_id': video_id,
|
|
'channel_name': channel_name,
|
|
'video_title': f"{artist} - {title}",
|
|
'status': SongStatus.FAILED,
|
|
'error_message': error_message,
|
|
'last_updated': datetime.now().isoformat()
|
|
}
|
|
self._save()
|
|
|
|
def get_channel_video_list(self, channel_url, yt_dlp_path="downloader/yt-dlp.exe", force_refresh=False):
|
|
"""
|
|
Return a list of videos (dicts with 'title' and 'id') for the channel, using cache if available unless force_refresh is True.
|
|
"""
|
|
channel_name, channel_id = None, None
|
|
from karaoke_downloader.youtube_utils import get_channel_info
|
|
channel_name, channel_id = get_channel_info(channel_url)
|
|
|
|
# Try multiple possible cache keys
|
|
possible_keys = [
|
|
channel_id, # The extracted channel ID
|
|
channel_url, # The full URL
|
|
channel_name # The extracted channel name
|
|
]
|
|
|
|
cache_key = None
|
|
for key in possible_keys:
|
|
if key and key in self.cache:
|
|
cache_key = key
|
|
break
|
|
|
|
if not cache_key:
|
|
cache_key = channel_id or channel_url # Use as fallback for new entries
|
|
|
|
print(f" 🔍 Trying cache keys: {possible_keys}")
|
|
print(f" 🔍 Selected cache key: '{cache_key}'")
|
|
|
|
if not force_refresh and cache_key in self.cache:
|
|
print(f" 📋 Using cached video list ({len(self.cache[cache_key])} videos)")
|
|
return self.cache[cache_key]
|
|
else:
|
|
print(f" ❌ Cache miss for all keys")
|
|
# Fetch with yt-dlp
|
|
print(f" 🌐 Fetching video list from YouTube (this may take a while)...")
|
|
import subprocess
|
|
cmd = [
|
|
yt_dlp_path,
|
|
'--flat-playlist',
|
|
'--print', '%(title)s|%(id)s|%(url)s',
|
|
channel_url
|
|
]
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
|
|
lines = result.stdout.strip().splitlines()
|
|
videos = []
|
|
for line in lines:
|
|
parts = line.split('|')
|
|
if len(parts) >= 2:
|
|
title, video_id = parts[0].strip(), parts[1].strip()
|
|
videos.append({'title': title, 'id': video_id})
|
|
self.cache[cache_key] = videos
|
|
self.save_cache()
|
|
return videos
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"❌ yt-dlp failed to fetch playlist for cache: {e}")
|
|
return [] |