501 lines
20 KiB
Python
501 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Tracking Manager for Karaoke Playlist Downloader
|
|
Handles song tracking, multiple formats, partial downloads, and playlist synchronization.
|
|
"""
|
|
|
|
import json
|
|
import hashlib
|
|
import os
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Set
|
|
import logging
|
|
|
|
|
|
class SongStatus:
|
|
"""Enum-like class for song download statuses."""
|
|
NOT_DOWNLOADED = "not_downloaded"
|
|
DOWNLOADING = "downloading"
|
|
DOWNLOADED = "downloaded"
|
|
PARTIAL = "partial"
|
|
FAILED = "failed"
|
|
CONVERTING = "converting"
|
|
CONVERTED = "converted"
|
|
|
|
|
|
class FormatType:
|
|
"""Enum-like class for format types."""
|
|
MP4 = "mp4"
|
|
MP3 = "mp3"
|
|
M4A = "m4a"
|
|
WEBM = "webm"
|
|
|
|
|
|
class TrackingManager:
|
|
def __init__(self, tracking_file: str = "karaoke_tracking.json", cache_file: str = "channel_cache.json"):
|
|
self.tracking_file = Path(tracking_file)
|
|
self.cache_file = Path(cache_file)
|
|
|
|
# Cache settings (must be set before loading cache)
|
|
self.cache_duration_hours = 24 # Cache for 24 hours by default
|
|
|
|
self.data = self._load_tracking_data()
|
|
self.cache = self._load_cache_data()
|
|
self.logger = logging.getLogger(__name__)
|
|
self._batch_save_pending = False
|
|
self._save_timer = None
|
|
|
|
def _load_tracking_data(self) -> Dict:
|
|
"""Load tracking data from JSON file."""
|
|
if self.tracking_file.exists():
|
|
try:
|
|
with open(self.tracking_file, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
# Ensure all required top-level keys exist
|
|
data.setdefault('version', '1.0')
|
|
data.setdefault('last_updated', datetime.now().isoformat())
|
|
data.setdefault('songs', {})
|
|
data.setdefault('playlists', {})
|
|
data.setdefault('formats', {})
|
|
data.setdefault('statistics', {})
|
|
return data
|
|
except (json.JSONDecodeError, FileNotFoundError) as e:
|
|
print(f"Warning: Could not load tracking file: {e}")
|
|
# If file is corrupted, backup and create new one
|
|
if self.tracking_file.exists():
|
|
backup_file = self.tracking_file.with_suffix('.json.backup')
|
|
try:
|
|
self.tracking_file.rename(backup_file)
|
|
print(f"Backed up corrupted file to: {backup_file}")
|
|
except Exception as backup_error:
|
|
print(f"Could not backup corrupted file: {backup_error}")
|
|
|
|
# Return default structure
|
|
return {
|
|
'version': '1.0',
|
|
'last_updated': datetime.now().isoformat(),
|
|
'songs': {},
|
|
'playlists': {},
|
|
'formats': {},
|
|
'statistics': {}
|
|
}
|
|
|
|
def _save_tracking_data(self, force_save: bool = False):
|
|
"""Save tracking data to JSON file with batch saving support."""
|
|
if not force_save:
|
|
# Schedule a delayed save instead of immediate save
|
|
self._batch_save_pending = True
|
|
return
|
|
|
|
self.data['last_updated'] = datetime.now().isoformat()
|
|
try:
|
|
with open(self.tracking_file, 'w', encoding='utf-8') as f:
|
|
json.dump(self.data, f, indent=2, ensure_ascii=False)
|
|
self._batch_save_pending = False
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to save tracking data: {e}")
|
|
|
|
def force_save(self):
|
|
"""Force save the tracking data immediately."""
|
|
self._save_tracking_data(force_save=True)
|
|
|
|
def _load_cache_data(self) -> Dict:
|
|
"""Load channel cache data from JSON file."""
|
|
if self.cache_file.exists():
|
|
try:
|
|
with open(self.cache_file, 'r', encoding='utf-8') as f:
|
|
cache = json.load(f)
|
|
# Ensure required structure
|
|
cache.setdefault('channels', {})
|
|
cache.setdefault('settings', {
|
|
'cache_duration_hours': self.cache_duration_hours,
|
|
'last_updated': datetime.now().isoformat()
|
|
})
|
|
return cache
|
|
except (json.JSONDecodeError, FileNotFoundError) as e:
|
|
print(f"Warning: Could not load cache file: {e}")
|
|
|
|
# Return default cache structure
|
|
return {
|
|
'channels': {},
|
|
'settings': {
|
|
'cache_duration_hours': self.cache_duration_hours,
|
|
'last_updated': datetime.now().isoformat()
|
|
}
|
|
}
|
|
|
|
def _save_cache_data(self):
|
|
"""Save channel cache data to JSON file."""
|
|
try:
|
|
self.cache['settings']['last_updated'] = datetime.now().isoformat()
|
|
with open(self.cache_file, 'w', encoding='utf-8') as f:
|
|
json.dump(self.cache, f, indent=2, ensure_ascii=False)
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to save cache data: {e}")
|
|
|
|
def get_cached_channel_videos(self, channel_id: str) -> Optional[List[Dict]]:
|
|
"""Get cached videos for a channel if cache is still valid."""
|
|
if channel_id not in self.cache['channels']:
|
|
return None
|
|
|
|
channel_cache = self.cache['channels'][channel_id]
|
|
cached_time = datetime.fromisoformat(channel_cache['cached_at'])
|
|
cache_duration = timedelta(hours=self.cache_duration_hours)
|
|
|
|
# Check if cache is still valid
|
|
if datetime.now() - cached_time < cache_duration:
|
|
self.logger.info(f"Using cached channel data for {channel_id} (cached {cached_time})")
|
|
return channel_cache['videos']
|
|
|
|
self.logger.info(f"Channel cache expired for {channel_id}, will refresh")
|
|
return None
|
|
|
|
def cache_channel_videos(self, channel_id: str, videos: List[Dict]):
|
|
"""Cache videos for a channel."""
|
|
self.cache['channels'][channel_id] = {
|
|
'videos': videos,
|
|
'cached_at': datetime.now().isoformat(),
|
|
'video_count': len(videos)
|
|
}
|
|
self._save_cache_data()
|
|
self.logger.info(f"Cached {len(videos)} videos for channel {channel_id}")
|
|
|
|
def clear_channel_cache(self, channel_id: Optional[str] = None):
|
|
"""Clear cache for a specific channel or all channels."""
|
|
if channel_id:
|
|
if channel_id in self.cache['channels']:
|
|
del self.cache['channels'][channel_id]
|
|
self._save_cache_data()
|
|
self.logger.info(f"Cleared cache for channel {channel_id}")
|
|
else:
|
|
self.cache['channels'] = {}
|
|
self._save_cache_data()
|
|
self.logger.info("Cleared all channel caches")
|
|
|
|
def set_cache_duration(self, hours: int):
|
|
"""Set cache duration in hours."""
|
|
self.cache_duration_hours = hours
|
|
self.cache['settings']['cache_duration_hours'] = hours
|
|
self._save_cache_data()
|
|
self.logger.info(f"Cache duration set to {hours} hours")
|
|
|
|
def get_cache_info(self) -> Dict:
|
|
"""Get cache information."""
|
|
total_channels = len(self.cache['channels'])
|
|
total_cached_videos = sum(channel['video_count'] for channel in self.cache['channels'].values())
|
|
|
|
return {
|
|
'total_channels': total_channels,
|
|
'total_cached_videos': total_cached_videos,
|
|
'cache_duration_hours': self.cache_duration_hours,
|
|
'last_updated': self.cache['settings']['last_updated']
|
|
}
|
|
|
|
def _generate_song_id(self, video_id: str, playlist_id: str) -> str:
|
|
"""Generate a unique song ID."""
|
|
return f"{video_id}_{playlist_id}"
|
|
|
|
def _generate_file_hash(self, file_path: Path) -> str:
|
|
"""Generate MD5 hash of file for integrity checking."""
|
|
try:
|
|
if file_path.exists():
|
|
hash_md5 = hashlib.md5()
|
|
with open(file_path, "rb") as f:
|
|
for chunk in iter(lambda: f.read(4096), b""):
|
|
hash_md5.update(chunk)
|
|
return hash_md5.hexdigest()
|
|
except Exception as e:
|
|
self.logger.warning(f"Could not generate hash for {file_path}: {e}")
|
|
return ""
|
|
|
|
def add_playlist(self, playlist_id: str, playlist_name: str, playlist_url: str) -> Dict:
|
|
"""Add or update a playlist in tracking."""
|
|
playlist_data = {
|
|
'name': playlist_name,
|
|
'url': playlist_url,
|
|
'added_date': datetime.now().isoformat(),
|
|
'last_synced': None,
|
|
'total_songs': 0,
|
|
'downloaded_songs': 0,
|
|
'failed_songs': 0,
|
|
'song_ids': []
|
|
}
|
|
|
|
self.data['playlists'][playlist_id] = playlist_data
|
|
self._save_tracking_data()
|
|
return playlist_data
|
|
|
|
def add_song(self, video_id: str, playlist_id: str, title: str, duration: Optional[int] = None) -> Dict:
|
|
"""Add a song to tracking."""
|
|
song_id = self._generate_song_id(video_id, playlist_id)
|
|
|
|
song_data = {
|
|
'video_id': video_id,
|
|
'playlist_id': playlist_id,
|
|
'title': title,
|
|
'duration': duration,
|
|
'added_date': datetime.now().isoformat(),
|
|
'last_updated': datetime.now().isoformat(),
|
|
'status': SongStatus.NOT_DOWNLOADED,
|
|
'formats': {},
|
|
'file_paths': {},
|
|
'file_hashes': {},
|
|
'file_sizes': {},
|
|
'download_attempts': 0,
|
|
'last_error': None,
|
|
'metadata': {}
|
|
}
|
|
|
|
self.data['songs'][song_id] = song_data
|
|
|
|
# Add to playlist's song list
|
|
if playlist_id not in self.data['playlists']:
|
|
self.add_playlist(playlist_id, f"playlist_{playlist_id}", "")
|
|
|
|
if song_id not in self.data['playlists'][playlist_id]['song_ids']:
|
|
self.data['playlists'][playlist_id]['song_ids'].append(song_id)
|
|
|
|
self._update_playlist_statistics(playlist_id)
|
|
self._save_tracking_data()
|
|
return song_data
|
|
|
|
def update_song_status(self, video_id: str, playlist_id: str, status: str,
|
|
format_type: Optional[str] = None, file_path: Optional[Path] = None,
|
|
error_message: Optional[str] = None) -> bool:
|
|
"""Update song status and optionally add format information."""
|
|
song_id = self._generate_song_id(video_id, playlist_id)
|
|
|
|
if song_id not in self.data['songs']:
|
|
self.logger.warning(f"Song {song_id} not found in tracking")
|
|
return False
|
|
|
|
song = self.data['songs'][song_id]
|
|
song['status'] = status
|
|
song['last_updated'] = datetime.now().isoformat()
|
|
|
|
if error_message:
|
|
song['last_error'] = error_message
|
|
|
|
if status == SongStatus.DOWNLOADING:
|
|
song['download_attempts'] += 1
|
|
|
|
# Handle format-specific information
|
|
if format_type and file_path:
|
|
if format_type not in song['formats']:
|
|
song['formats'][format_type] = {
|
|
'status': status,
|
|
'file_path': str(file_path),
|
|
'file_size': file_path.stat().st_size if file_path.exists() else 0,
|
|
'file_hash': self._generate_file_hash(file_path),
|
|
'added_date': datetime.now().isoformat()
|
|
}
|
|
else:
|
|
song['formats'][format_type].update({
|
|
'status': status,
|
|
'file_path': str(file_path),
|
|
'file_size': file_path.stat().st_size if file_path.exists() else 0,
|
|
'file_hash': self._generate_file_hash(file_path)
|
|
})
|
|
|
|
song['file_paths'][format_type] = str(file_path)
|
|
song['file_hashes'][format_type] = song['formats'][format_type]['file_hash']
|
|
song['file_sizes'][format_type] = song['formats'][format_type]['file_size']
|
|
|
|
self._update_playlist_statistics(playlist_id)
|
|
self._save_tracking_data()
|
|
return True
|
|
|
|
def get_song_status(self, video_id: str, playlist_id: str) -> Optional[Dict]:
|
|
"""Get current status of a song."""
|
|
song_id = self._generate_song_id(video_id, playlist_id)
|
|
return self.data['songs'].get(song_id)
|
|
|
|
def is_song_downloaded(self, video_id: str, playlist_id: str, format_type: Optional[str] = None) -> bool:
|
|
"""Check if a song is downloaded in the specified format."""
|
|
song = self.get_song_status(video_id, playlist_id)
|
|
if not song:
|
|
return False
|
|
|
|
if format_type:
|
|
return (format_type in song['formats'] and
|
|
song['formats'][format_type]['status'] in [SongStatus.DOWNLOADED, SongStatus.CONVERTED])
|
|
|
|
# Check if any format is downloaded
|
|
return any(fmt['status'] in [SongStatus.DOWNLOADED, SongStatus.CONVERTED]
|
|
for fmt in song['formats'].values())
|
|
|
|
def get_playlist_songs(self, playlist_id: str) -> List[Dict]:
|
|
"""Get all songs for a playlist."""
|
|
if playlist_id not in self.data['playlists']:
|
|
return []
|
|
|
|
song_ids = self.data['playlists'][playlist_id]['song_ids']
|
|
return [self.data['songs'].get(song_id) for song_id in song_ids
|
|
if song_id in self.data['songs']]
|
|
|
|
def get_downloaded_songs(self, playlist_id: str, format_type: Optional[str] = None) -> List[Dict]:
|
|
"""Get all downloaded songs for a playlist."""
|
|
songs = self.get_playlist_songs(playlist_id)
|
|
if format_type:
|
|
return [song for song in songs if self.is_song_downloaded(song['video_id'], playlist_id, format_type)]
|
|
return [song for song in songs if self.is_song_downloaded(song['video_id'], playlist_id)]
|
|
|
|
def get_failed_songs(self, playlist_id: str) -> List[Dict]:
|
|
"""Get all failed songs for a playlist."""
|
|
songs = self.get_playlist_songs(playlist_id)
|
|
return [song for song in songs if song['status'] == SongStatus.FAILED]
|
|
|
|
def get_partial_downloads(self, playlist_id: str) -> List[Dict]:
|
|
"""Get songs with partial downloads."""
|
|
songs = self.get_playlist_songs(playlist_id)
|
|
return [song for song in songs if song['status'] == SongStatus.PARTIAL]
|
|
|
|
def sync_playlist(self, playlist_id: str, current_songs: List[Dict]) -> Dict:
|
|
"""Sync playlist with current songs from YouTube."""
|
|
if playlist_id not in self.data['playlists']:
|
|
return {'added': len(current_songs), 'removed': 0, 'total': len(current_songs)}
|
|
|
|
existing_song_ids = set(self.data['playlists'][playlist_id]['song_ids'])
|
|
current_song_ids = set()
|
|
|
|
added_songs = 0
|
|
for song_info in current_songs:
|
|
song_id = self._generate_song_id(song_info['video_id'], playlist_id)
|
|
current_song_ids.add(song_id)
|
|
|
|
if song_id not in existing_song_ids:
|
|
self.add_song(
|
|
song_info['video_id'],
|
|
playlist_id,
|
|
song_info['title'],
|
|
song_info.get('duration')
|
|
)
|
|
added_songs += 1
|
|
|
|
# Remove songs that no longer exist in playlist
|
|
removed_songs = len(existing_song_ids - current_song_ids)
|
|
for song_id in existing_song_ids - current_song_ids:
|
|
if song_id in self.data['songs']:
|
|
del self.data['songs'][song_id]
|
|
|
|
# Update playlist sync time
|
|
self.data['playlists'][playlist_id]['last_synced'] = datetime.now().isoformat()
|
|
self._update_playlist_statistics(playlist_id)
|
|
|
|
# Force save after batch operations
|
|
self.force_save()
|
|
|
|
return {
|
|
'added': added_songs,
|
|
'removed': removed_songs,
|
|
'total': len(current_songs)
|
|
}
|
|
|
|
def _update_playlist_statistics(self, playlist_id: str):
|
|
"""Update playlist statistics."""
|
|
if playlist_id not in self.data['playlists']:
|
|
return
|
|
|
|
songs = self.get_playlist_songs(playlist_id)
|
|
playlist = self.data['playlists'][playlist_id]
|
|
|
|
playlist['total_songs'] = len(songs)
|
|
playlist['downloaded_songs'] = len([s for s in songs if self.is_song_downloaded(s['video_id'], playlist_id)])
|
|
playlist['failed_songs'] = len([s for s in songs if s['status'] == SongStatus.FAILED])
|
|
|
|
def get_statistics(self) -> Dict:
|
|
"""Get overall statistics."""
|
|
total_songs = len(self.data['songs'])
|
|
total_playlists = len(self.data['playlists'])
|
|
|
|
downloaded_songs = sum(1 for song in self.data['songs'].values()
|
|
if self.is_song_downloaded(song['video_id'], song['playlist_id']))
|
|
|
|
failed_songs = sum(1 for song in self.data['songs'].values()
|
|
if song['status'] == SongStatus.FAILED)
|
|
|
|
partial_songs = sum(1 for song in self.data['songs'].values()
|
|
if song['status'] == SongStatus.PARTIAL)
|
|
|
|
total_size = 0
|
|
for song in self.data['songs'].values():
|
|
for size in song['file_sizes'].values():
|
|
total_size += size
|
|
|
|
return {
|
|
'total_songs': total_songs,
|
|
'total_playlists': total_playlists,
|
|
'downloaded_songs': downloaded_songs,
|
|
'failed_songs': failed_songs,
|
|
'partial_songs': partial_songs,
|
|
'total_size_bytes': total_size,
|
|
'total_size_mb': round(total_size / (1024 * 1024), 2),
|
|
'last_updated': self.data['last_updated']
|
|
}
|
|
|
|
def cleanup_orphaned_files(self, downloads_dir: Path) -> List[str]:
|
|
"""Remove tracking entries for files that no longer exist."""
|
|
orphaned_songs = []
|
|
|
|
for song_id, song in self.data['songs'].items():
|
|
for format_type, file_path in song['file_paths'].items():
|
|
if not Path(file_path).exists():
|
|
# Remove the format entry
|
|
if format_type in song['formats']:
|
|
del song['formats'][format_type]
|
|
if format_type in song['file_paths']:
|
|
del song['file_paths'][format_type]
|
|
if format_type in song['file_hashes']:
|
|
del song['file_hashes'][format_type]
|
|
if format_type in song['file_sizes']:
|
|
del song['file_sizes'][format_type]
|
|
|
|
# Update status if no formats remain
|
|
if not song['formats']:
|
|
song['status'] = SongStatus.NOT_DOWNLOADED
|
|
|
|
orphaned_songs.append(song_id)
|
|
|
|
self._save_tracking_data()
|
|
return orphaned_songs
|
|
|
|
def export_playlist_report(self, playlist_id: str, output_file: str = None) -> str:
|
|
"""Export a detailed report for a playlist."""
|
|
if playlist_id not in self.data['playlists']:
|
|
return "Playlist not found"
|
|
|
|
playlist = self.data['playlists'][playlist_id]
|
|
songs = self.get_playlist_songs(playlist_id)
|
|
|
|
report = f"""
|
|
Karaoke Playlist Report
|
|
======================
|
|
Playlist: {playlist['name']}
|
|
URL: {playlist['url']}
|
|
Total Songs: {playlist['total_songs']}
|
|
Downloaded: {playlist['downloaded_songs']}
|
|
Failed: {playlist['failed_songs']}
|
|
Last Synced: {playlist['last_synced'] or 'Never'}
|
|
|
|
Song Details:
|
|
"""
|
|
|
|
for song in songs:
|
|
status_icon = {
|
|
SongStatus.DOWNLOADED: "✅",
|
|
SongStatus.CONVERTED: "✅",
|
|
SongStatus.DOWNLOADING: "⏳",
|
|
SongStatus.PARTIAL: "⚠️",
|
|
SongStatus.FAILED: "❌",
|
|
SongStatus.NOT_DOWNLOADED: "⏸️"
|
|
}.get(song['status'], "❓")
|
|
|
|
formats = ", ".join(song['formats'].keys()) if song['formats'] else "None"
|
|
report += f"{status_icon} {song['title']} ({formats})\n"
|
|
|
|
if output_file:
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
f.write(report)
|
|
|
|
return report |