KaraokeVideoDownloader/karaoke_downloader/tracking_manager.py

136 lines
5.1 KiB
Python

import json
from pathlib import Path
from enum import Enum
from datetime import datetime
import threading
class SongStatus(str, Enum):
NOT_DOWNLOADED = "NOT_DOWNLOADED"
DOWNLOADING = "DOWNLOADING"
DOWNLOADED = "DOWNLOADED"
PARTIAL = "PARTIAL"
FAILED = "FAILED"
CONVERTING = "CONVERTING"
CONVERTED = "CONVERTED"
class FormatType(str, Enum):
MP4 = "MP4"
MP3 = "MP3"
OTHER = "OTHER"
class TrackingManager:
def __init__(self, tracking_file="data/karaoke_tracking.json", cache_file="data/channel_cache.json"):
self.tracking_file = Path(tracking_file)
self.cache_file = Path(cache_file)
self.data = {
"playlists": {},
"songs": {}
}
self.cache = {}
self._lock = threading.Lock()
self._load()
self._load_cache()
def _load(self):
if self.tracking_file.exists():
try:
with open(self.tracking_file, 'r', encoding='utf-8') as f:
self.data = json.load(f)
except Exception:
self.data = {"playlists": {}, "songs": {}}
def _save(self):
with self._lock:
with open(self.tracking_file, 'w', encoding='utf-8') as f:
json.dump(self.data, f, indent=2, ensure_ascii=False)
def force_save(self):
self._save()
def _load_cache(self):
if self.cache_file.exists():
try:
with open(self.cache_file, 'r', encoding='utf-8') as f:
self.cache = json.load(f)
except Exception:
self.cache = {}
def save_cache(self):
with open(self.cache_file, 'w', encoding='utf-8') as f:
json.dump(self.cache, f, indent=2, ensure_ascii=False)
def get_statistics(self):
total_songs = len(self.data['songs'])
downloaded_songs = sum(1 for s in self.data['songs'].values() if s['status'] in [SongStatus.DOWNLOADED, SongStatus.CONVERTED])
failed_songs = sum(1 for s in self.data['songs'].values() if s['status'] == SongStatus.FAILED)
partial_songs = sum(1 for s in self.data['songs'].values() if s['status'] == SongStatus.PARTIAL)
total_playlists = len(self.data['playlists'])
total_size_mb = sum(s.get('file_size', 0) for s in self.data['songs'].values() if s.get('file_size')) / (1024*1024)
last_updated = max((s.get('last_updated') for s in self.data['songs'].values() if s.get('last_updated')), default=None)
return {
"total_songs": total_songs,
"downloaded_songs": downloaded_songs,
"failed_songs": failed_songs,
"partial_songs": partial_songs,
"total_playlists": total_playlists,
"total_size_mb": round(total_size_mb, 2),
"last_updated": last_updated
}
def get_playlist_songs(self, playlist_id):
return [s for s in self.data['songs'].values() if s['playlist_id'] == playlist_id]
def get_failed_songs(self, playlist_id=None):
if playlist_id:
return [s for s in self.data['songs'].values() if s['playlist_id'] == playlist_id and s['status'] == SongStatus.FAILED]
return [s for s in self.data['songs'].values() if s['status'] == SongStatus.FAILED]
def get_partial_downloads(self, playlist_id=None):
if playlist_id:
return [s for s in self.data['songs'].values() if s['playlist_id'] == playlist_id and s['status'] == SongStatus.PARTIAL]
return [s for s in self.data['songs'].values() if s['status'] == SongStatus.PARTIAL]
def cleanup_orphaned_files(self, downloads_dir):
# Remove tracking entries for files that no longer exist
orphaned = []
for song_id, song in list(self.data['songs'].items()):
file_path = song.get('file_path')
if file_path and not Path(file_path).exists():
orphaned.append(song_id)
del self.data['songs'][song_id]
self.force_save()
return orphaned
def get_cache_info(self):
total_channels = len(self.cache)
total_cached_videos = sum(len(v) for v in self.cache.values())
cache_duration_hours = 24 # default
last_updated = None
return {
"total_channels": total_channels,
"total_cached_videos": total_cached_videos,
"cache_duration_hours": cache_duration_hours,
"last_updated": last_updated
}
def clear_channel_cache(self, channel_id=None):
if channel_id is None or channel_id == 'all':
self.cache = {}
else:
self.cache.pop(channel_id, None)
self.save_cache()
def set_cache_duration(self, hours):
# Placeholder for cache duration logic
pass
def export_playlist_report(self, playlist_id):
playlist = self.data['playlists'].get(playlist_id)
if not playlist:
return f"Playlist '{playlist_id}' not found."
songs = self.get_playlist_songs(playlist_id)
report = {
"playlist": playlist,
"songs": songs
}
return json.dumps(report, indent=2, ensure_ascii=False)