import os import sys import subprocess import json import re from pathlib import Path from datetime import datetime, timedelta from karaoke_downloader.tracking_manager import TrackingManager, SongStatus, FormatType from karaoke_downloader.id3_utils import add_id3_tags, extract_artist_title from karaoke_downloader.songlist_manager import ( load_songlist, load_songlist_tracking, save_songlist_tracking, is_songlist_song_downloaded, mark_songlist_song_downloaded, normalize_title, load_server_songs, is_song_on_server ) from karaoke_downloader.youtube_utils import get_channel_info, get_playlist_info import logging import hashlib DATA_DIR = Path("data") class KaraokeDownloader: def __init__(self): self.yt_dlp_path = Path("downloader/yt-dlp.exe") self.downloads_dir = Path("downloads") self.logs_dir = Path("logs") self.downloads_dir.mkdir(exist_ok=True) self.logs_dir.mkdir(exist_ok=True) self.tracker = TrackingManager(tracking_file=DATA_DIR / "karaoke_tracking.json", cache_file=DATA_DIR / "channel_cache.json") self.config = self._load_config() self.songlist_tracking_file = DATA_DIR / "songlist_tracking.json" self.songlist_tracking = load_songlist_tracking(str(self.songlist_tracking_file)) # Load server songs for availability checking self.server_songs = load_server_songs() def _load_config(self): config_file = DATA_DIR / "config.json" if config_file.exists(): try: with open(config_file, 'r', encoding='utf-8') as f: return json.load(f) except (json.JSONDecodeError, FileNotFoundError) as e: print(f"Warning: Could not load config.json: {e}") return { "download_settings": { "format": "best[height<=720][ext=mp4]/best[height<=720]/best[ext=mp4]/best", "preferred_resolution": "720p", "audio_format": "mp3", "audio_quality": "0", "subtitle_language": "en", "subtitle_format": "srt", "write_metadata": False, "write_thumbnail": False, "write_description": False, "write_annotations": False, "write_comments": False, "write_subtitles": False, "embed_metadata": False, "add_metadata": False, "continue_downloads": True, "no_overwrites": True, "ignore_errors": True, "no_warnings": False }, "folder_structure": { "downloads_dir": "downloads", "logs_dir": "logs", "tracking_file": str(DATA_DIR / "karaoke_tracking.json") }, "logging": { "level": "INFO", "format": "%(asctime)s - %(levelname)s - %(message)s", "include_console": True, "include_file": True }, "yt_dlp_path": "downloader/yt-dlp.exe" } def reset_channel_downloads(self, channel_name, reset_songlist=False, delete_files=False): """ Reset all tracking and optionally files for a channel. If reset_songlist is False, songlist songs are preserved (tracking and files). If reset_songlist is True, songlist songs for this channel are also reset/deleted. """ print(f"\n๐Ÿ”„ Resetting channel: {channel_name} (reset_songlist={reset_songlist}, delete_files={delete_files})") # Find channel_id from channel_name channel_id = None for pid, playlist in self.tracker.data.get('playlists', {}).items(): if playlist['name'] == channel_name or pid == channel_name: channel_id = pid break if not channel_id: print(f"โŒ Channel '{channel_name}' not found in tracking.") return # Get all songs for this channel songs_to_reset = [] for song_id, song in self.tracker.data.get('songs', {}).items(): if song['playlist_id'] == channel_id: # Check if this is a songlist song artist, title = song.get('artist', ''), song.get('title', song.get('name', '')) key = f"{artist.lower()}_{normalize_title(title)}" is_songlist = key in self.songlist_tracking if is_songlist and not reset_songlist: continue # skip songlist songs if not resetting them songs_to_reset.append((song_id, song, is_songlist)) # Reset tracking and optionally delete files files_preserved = 0 files_deleted = 0 for song_id, song, is_songlist in songs_to_reset: # Remove from main tracking self.tracker.data['songs'][song_id]['status'] = 'NOT_DOWNLOADED' self.tracker.data['songs'][song_id]['formats'] = {} self.tracker.data['songs'][song_id]['last_error'] = '' self.tracker.data['songs'][song_id]['download_attempts'] = 0 self.tracker.data['songs'][song_id]['last_updated'] = None # Remove from songlist tracking if needed if is_songlist and reset_songlist: artist, title = song.get('artist', ''), song.get('title', song.get('name', '')) key = f"{artist.lower()}_{normalize_title(title)}" if key in self.songlist_tracking: del self.songlist_tracking[key] # Delete file if requested if delete_files: file_path = song.get('file_path') if file_path: try: p = Path(file_path) if p.exists(): p.unlink() files_deleted += 1 else: files_preserved += 1 except Exception as e: print(f"โš ๏ธ Could not delete file {file_path}: {e}") # --- FIX: Remove all songlist_tracking entries for this channel if reset_songlist is True --- if reset_songlist: keys_to_remove = [k for k, v in self.songlist_tracking.items() if v.get('channel') == channel_name] for k in keys_to_remove: del self.songlist_tracking[k] # Save changes self.tracker.force_save() save_songlist_tracking(self.songlist_tracking, str(self.songlist_tracking_file)) print(f"โœ… Reset {len(songs_to_reset)} songs for channel '{channel_name}'.") if delete_files: print(f" Files deleted: {files_deleted}, files preserved: {files_preserved}") if not reset_songlist: print(f" Songlist songs were preserved.") def download_from_file(self, file_path, force_refresh=False): file = Path(file_path) if not file.exists(): print(f"โŒ File not found: {file_path}") return False with open(file, "r", encoding="utf-8") as f: urls = [line.strip() for line in f if line.strip() and not line.strip().startswith("#")] if not urls: print(f"โŒ No URLs found in {file_path}") return False all_success = True for url in urls: print(f"\nโžก๏ธ Processing: {url}") success = self.download_channel_videos(url, force_refresh=force_refresh) if not success: all_success = False return all_success def download_channel_videos(self, url, force_refresh=False, fuzzy_match=False, fuzzy_threshold=85): """Download videos from a channel or playlist URL, respecting songlist-only and limit flags. Supports fuzzy matching.""" channel_name, channel_id = get_channel_info(url) print(f"\n๐ŸŽฌ Downloading from channel: {channel_name} ({url})") songlist = load_songlist() if not songlist: print("โš ๏ธ No songlist loaded. Skipping.") return False limit = self.config.get('limit', 1) cmd = [ str(self.yt_dlp_path), '--flat-playlist', '--print', '%(title)s|%(id)s|%(url)s', url ] try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) lines = result.stdout.strip().splitlines() except subprocess.CalledProcessError as e: print(f"โŒ yt-dlp failed to fetch playlist: {e}") return False available_videos = [] for line in lines: parts = line.split('|') if len(parts) >= 2: title, video_id = parts[0].strip(), parts[1].strip() available_videos.append({'title': title, 'id': video_id}) # Normalize songlist for matching try: from rapidfuzz import fuzz def similarity(a, b): return fuzz.ratio(a, b) except ImportError: import difflib def similarity(a, b): return int(difflib.SequenceMatcher(None, a, b).ratio() * 100) normalized_songlist = { f"{s['artist'].lower()}_{normalize_title(s['title'])}": s for s in songlist } matches = [] for video in available_videos: artist, title = extract_artist_title(video['title']) key = f"{artist.lower()}_{normalize_title(title)}" if fuzzy_match: # Fuzzy match against all songlist keys best_score = 0 best_song = None for song_key, song in normalized_songlist.items(): score = similarity(key, song_key) if score > best_score: best_score = score best_song = song if best_score >= fuzzy_threshold and best_song: if not is_songlist_song_downloaded(self.songlist_tracking, best_song['artist'], best_song['title']): matches.append((video, best_song)) print(f" โ†’ Fuzzy match: {artist} - {title} <-> {best_song['artist']} - {best_song['title']} (score: {best_score})") if len(matches) >= limit: break else: if key in normalized_songlist: if not is_songlist_song_downloaded(self.songlist_tracking, artist, title): matches.append((video, normalized_songlist[key])) if len(matches) >= limit: break if not matches: print("๐ŸŽต No new songlist matches found for this channel.") return True # Download only the first N matches for video, song in matches: artist, title = song['artist'], song['title'] output_path = self.downloads_dir / channel_name / f"{artist} - {title} (Karaoke Version).mp4" output_path.parent.mkdir(parents=True, exist_ok=True) print(f"โฌ‡๏ธ Downloading: {artist} - {title} -> {output_path}") video_url = f"https://www.youtube.com/watch?v={video['id']}" cmd = [ str(self.yt_dlp_path), "-o", str(output_path), "-f", self.config["download_settings"]["format"], video_url ] try: subprocess.run(cmd, check=True) except subprocess.CalledProcessError as e: print(f"โŒ yt-dlp failed: {e}") continue if not output_path.exists() or output_path.stat().st_size == 0: print(f"โŒ Download failed or file is empty: {output_path}") continue if not self._is_valid_mp4(output_path): print(f"โŒ File is not a valid MP4: {output_path}") continue add_id3_tags(output_path, f"{artist} - {title} (Karaoke Version)", channel_name) mark_songlist_song_downloaded(self.songlist_tracking, artist, title, channel_name, output_path) print(f"โœ… Downloaded and tracked: {artist} - {title}") print(f"๐ŸŽ‰ All post-processing complete for: {output_path}") return True def build_download_plan(self, channel_urls, undownloaded, fuzzy_match=False, fuzzy_threshold=85): """ For each song in undownloaded, scan all channels for a match. Use fuzzy matching if enabled. Return (download_plan, unmatched_songs): - download_plan: list of dicts {artist, title, channel_name, channel_url, video_id, video_title} - unmatched_songs: list of songs not found in any channel """ try: from rapidfuzz import fuzz def similarity(a, b): return fuzz.ratio(a, b) except ImportError: import difflib def similarity(a, b): return int(difflib.SequenceMatcher(None, a, b).ratio() * 100) plan = [] unmatched = [] channel_match_counts = {} for channel_url in channel_urls: channel_name, channel_id = get_channel_info(channel_url) print(f"\n๐Ÿšฆ Starting channel: {channel_name} ({channel_url})") available_videos = self.tracker.get_channel_video_list( channel_url, yt_dlp_path=str(self.yt_dlp_path), force_refresh=False ) matches_this_channel = 0 channel_fuzzy_matches = [] # For optional top-N reporting for song in undownloaded: artist, title = song['artist'], song['title'] found = False song_key = f"{artist.lower()}_{normalize_title(title)}" for video in available_videos: v_artist, v_title = extract_artist_title(video['title']) video_key = f"{v_artist.lower()}_{normalize_title(v_title)}" if fuzzy_match: score = similarity(song_key, video_key) if score >= fuzzy_threshold: if not any(p['artist'] == artist and p['title'] == title for p in plan): plan.append({ 'artist': artist, 'title': title, 'channel_name': channel_name, 'channel_url': channel_url, 'video_id': video['id'], 'video_title': video['title'], 'match_score': score }) # channel_fuzzy_matches.append((artist, title, video['title'], score)) matches_this_channel += 1 found = True break else: if (normalize_title(v_artist) == normalize_title(artist) and normalize_title(v_title) == normalize_title(title)) or \ (normalize_title(video['title']) == normalize_title(f"{artist} - {title}")): if not any(p['artist'] == artist and p['title'] == title for p in plan): plan.append({ 'artist': artist, 'title': title, 'channel_name': channel_name, 'channel_url': channel_url, 'video_id': video['id'], 'video_title': video['title'], 'match_score': 100 }) matches_this_channel += 1 found = True break # Don't break here; keep looking for all matches in this channel channel_match_counts[channel_name] = matches_this_channel print(f" โ†’ Found {matches_this_channel} songlist matches in this channel.") # Optionally, print top 3 fuzzy matches for review # if fuzzy_match and channel_fuzzy_matches: # top_matches = sorted(channel_fuzzy_matches, key=lambda x: -x[3])[:3] # for a, t, vt, s in top_matches: # print(f" Top match: {a} - {t} <-> {vt} (score: {s})") # Now find unmatched songs for song in undownloaded: if not any(p['artist'] == song['artist'] and p['title'] == song['title'] for p in plan): unmatched.append(song) # Print summary table print("\n๐Ÿ“Š Channel match summary:") for channel, count in channel_match_counts.items(): print(f" {channel}: {count} matches") print(f" TOTAL: {sum(channel_match_counts.values())} matches across {len(channel_match_counts)} channels.") return plan, unmatched def get_download_plan_cache_file(self, mode, **kwargs): """Generate a unique cache filename based on mode and key parameters.""" parts = [f"plan_{mode}"] for k, v in sorted(kwargs.items()): parts.append(f"{k}{v}") base = "_".join(parts) # Hash for safety if string is long if len(base) > 60: base = base[:40] + "_" + hashlib.md5(base.encode()).hexdigest() return Path(f"data/{base}.json") def download_songlist_across_channels(self, channel_urls, limit=None, force_refresh_download_plan=False, fuzzy_match=False, fuzzy_threshold=85): """ For each song in the songlist, try each channel in order and download from the first channel where it is found. Download up to 'limit' songs, skipping any that cannot be found, until the limit is reached or all possible matches are exhausted. """ songlist = load_songlist() if not songlist: print("โš ๏ธ No songlist loaded. Skipping.") return False # Filter for songs not yet downloaded undownloaded = [s for s in songlist if not is_songlist_song_downloaded(self.songlist_tracking, s['artist'], s['title'])] print(f"๐ŸŽฏ {len(songlist)} total unique songs in songlist.") print(f"๐ŸŽฏ {len(undownloaded)} unique songlist songs to download.") # Further filter out songs already on server not_on_server = [s for s in undownloaded if not is_song_on_server(self.server_songs, s['artist'], s['title'])] server_available = len(undownloaded) - len(not_on_server) if server_available > 0: print(f"๐ŸŽต {server_available} songs already available on server, skipping.") undownloaded = not_on_server print(f"๐ŸŽฏ {len(undownloaded)} songs need to be downloaded.") if not undownloaded: print("๐ŸŽต All songlist songs already downloaded.") return True # --- FAST MODE: Early exit and deduplication if limit is set --- if limit is not None: print("\nโšก Fast mode enabled: will stop as soon as limit is reached with successful downloads.") try: from rapidfuzz import fuzz def similarity(a, b): return fuzz.ratio(a, b) except ImportError: import difflib def similarity(a, b): return int(difflib.SequenceMatcher(None, a, b).ratio() * 100) downloaded_count = 0 unique_keys = set() total_attempted = 0 for channel_url in channel_urls: channel_name, channel_id = get_channel_info(channel_url) print(f"\n๐Ÿšฆ Starting channel: {channel_name} ({channel_url})") available_videos = self.tracker.get_channel_video_list( channel_url, yt_dlp_path=str(self.yt_dlp_path), force_refresh=False ) for song in undownloaded: artist, title = song['artist'], song['title'] key = f"{artist.lower()}_{normalize_title(title)}" if key in unique_keys: continue # Already downloaded or queued found = False for video in available_videos: v_artist, v_title = extract_artist_title(video['title']) video_key = f"{v_artist.lower()}_{normalize_title(v_title)}" if fuzzy_match: score = similarity(key, video_key) if score >= fuzzy_threshold: found = True else: if (normalize_title(v_artist) == normalize_title(artist) and normalize_title(v_title) == normalize_title(title)) or \ (normalize_title(video['title']) == normalize_title(f"{artist} - {title}")): found = True if found: print(f"\nโฌ‡๏ธ Downloading {downloaded_count+1} of {limit}: {artist} - {title} (from {channel_name})") # --- Download logic (reuse from below) --- safe_title = title.replace("(From ", "").replace(")", "").replace(" - ", " ").replace(":", "").replace("'", "").replace('"', "") safe_artist = artist.replace("'", "").replace('"', "") invalid_chars = ['?', ':', '*', '"', '<', '>', '|', '/', '\\'] for char in invalid_chars: safe_title = safe_title.replace(char, "") safe_artist = safe_artist.replace(char, "") safe_title = safe_title.replace("...", "").replace("..", "").replace(".", "").strip() safe_artist = safe_artist.strip() filename = f"{safe_artist} - {safe_title}.mp4" # Call the actual download function (simulate the same as in the plan loop) success = self._download_video_and_track( channel_name, channel_url, video['id'], video['title'], artist, title, filename ) total_attempted += 1 if success: downloaded_count += 1 unique_keys.add(key) print(f"โœ… Downloaded and tracked: {artist} - {title}") else: print(f"โŒ Download failed: {artist} - {title}") if downloaded_count >= limit: print(f"๐ŸŽ‰ Reached download limit ({limit}). Stopping early.") return True break # Don't try to match this song to other videos in this channel print(f"๐ŸŽ‰ Downloaded {downloaded_count} unique songlist songs (limit was {limit}).") if downloaded_count < limit: print(f"โš ๏ธ Only {downloaded_count} songs were downloaded. Some may not have been found or downloads failed.") return True # --- ORIGINAL FULL PLAN MODE (no limit) --- # Removed per-song printout for cleaner output # print("๐Ÿ” Songs to search for:") # for song in undownloaded: # print(f" - {song['artist']} - {song['title']}") # --- Download plan cache logic --- plan_mode = "songlist" plan_kwargs = {"limit": limit or "all", "channels": len(channel_urls)} cache_file = self.get_download_plan_cache_file(plan_mode, **plan_kwargs) use_cache = False if not force_refresh_download_plan and cache_file.exists(): try: with open(cache_file, 'r', encoding='utf-8') as f: cache_data = json.load(f) cache_time = datetime.fromisoformat(cache_data.get('timestamp')) if datetime.now() - cache_time < timedelta(days=1): print(f"๐Ÿ—‚๏ธ Using cached download plan from {cache_time} ({cache_file.name}).") download_plan = cache_data['download_plan'] unmatched = cache_data['unmatched'] use_cache = True except Exception as e: print(f"โš ๏ธ Could not load download plan cache: {e}") if not use_cache: print("\n๐Ÿ”Ž Pre-scanning channels for matches...") download_plan, unmatched = self.build_download_plan(channel_urls, undownloaded, fuzzy_match=fuzzy_match, fuzzy_threshold=fuzzy_threshold) if download_plan: cache_data = { 'timestamp': datetime.now().isoformat(), 'download_plan': download_plan, 'unmatched': unmatched } with open(cache_file, 'w', encoding='utf-8') as f: json.dump(cache_data, f, indent=2, ensure_ascii=False) print(f"๐Ÿ—‚๏ธ Saved new download plan cache: {cache_file.name}") else: if cache_file.exists(): cache_file.unlink() print(f"๐Ÿ—‚๏ธ No matches found, not saving download plan cache.") print(f"\n๐Ÿ“Š Download plan ready: {len(download_plan)} songs will be downloaded.") print(f"โŒ {len(unmatched)} songs could not be found in any channel.") if unmatched: print("Unmatched songs:") for song in unmatched[:10]: print(f" - {song['artist']} - {song['title']}") if len(unmatched) > 10: print(f" ...and {len(unmatched)-10} more.") # --- Download phase --- downloaded_count = 0 total_to_download = limit if limit is not None else len(download_plan) for idx, item in enumerate(download_plan): if limit is not None and downloaded_count >= limit: break artist = item['artist'] title = item['title'] channel_name = item['channel_name'] channel_url = item['channel_url'] video_id = item['video_id'] video_title = item['video_title'] print(f"\nโฌ‡๏ธ Downloading {idx+1} of {total_to_download}: {artist} - {title} (from {channel_name})") # --- Existing download logic here, using channel_name, video_id, etc. --- # (Copy the download logic from the previous loop, using these variables) # Create a shorter, safer filename - do this ONCE and use consistently safe_title = title.replace("(From ", "").replace(")", "").replace(" - ", " ").replace(":", "").replace("'", "").replace('"', "") safe_artist = artist.replace("'", "").replace('"', "") # Remove all Windows-invalid characters invalid_chars = ['?', ':', '*', '"', '<', '>', '|', '/', '\\'] for char in invalid_chars: safe_title = safe_title.replace(char, "") safe_artist = safe_artist.replace(char, "") # Also remove any other potentially problematic characters safe_title = safe_title.replace("...", "").replace("..", "").replace(".", "").strip() safe_artist = safe_artist.strip() filename = f"{safe_artist} - {safe_title}.mp4" # Limit filename length to avoid Windows path issues if len(filename) > 100: filename = f"{safe_artist[:30]} - {safe_title[:60]}.mp4" output_path = self.downloads_dir / channel_name / filename output_path.parent.mkdir(parents=True, exist_ok=True) print(f"โฌ‡๏ธ Downloading: {artist} - {title} -> {output_path}") video_url = f"https://www.youtube.com/watch?v={video_id}" dlp_cmd = [ str(self.yt_dlp_path), "--no-check-certificates", "--ignore-errors", "--no-warnings", "-o", str(output_path), "-f", self.config["download_settings"]["format"], video_url ] print(f"๐Ÿ”ง Running command: {' '.join(dlp_cmd)}") print(f"๐Ÿ“บ Resolution settings: {self.config.get('download_settings', {}).get('preferred_resolution', 'Unknown')}") print(f"๐ŸŽฌ Format string: {self.config.get('download_settings', {}).get('format', 'Unknown')}") # Debug: Show available formats (optional) if self.config.get('debug_show_formats', False): print(f"๐Ÿ” Checking available formats for: {video_url}") format_cmd = [ str(self.yt_dlp_path), "--list-formats", video_url ] try: format_result = subprocess.run(format_cmd, capture_output=True, text=True, timeout=30) print(f"๐Ÿ“‹ Available formats:\n{format_result.stdout}") except Exception as e: print(f"โš ๏ธ Could not check formats: {e}") try: result = subprocess.run(dlp_cmd, capture_output=True, text=True, check=True) print(f"โœ… yt-dlp completed successfully") print(f"๐Ÿ“„ yt-dlp stdout: {result.stdout}") except subprocess.CalledProcessError as e: print(f"โŒ yt-dlp failed with exit code {e.returncode}") print(f"โŒ yt-dlp stderr: {e.stderr}") continue if not output_path.exists(): print(f"โŒ Download failed: file does not exist: {output_path}") # Check if yt-dlp saved it somewhere else possible_files = list(output_path.parent.glob("*.mp4")) if possible_files: print(f"๐Ÿ” Found these files in the directory: {[f.name for f in possible_files]}") # Look for a file that matches our pattern (artist - title) artist_part = safe_artist.lower() title_part = safe_title.lower() for file in possible_files: file_lower = file.stem.lower() if artist_part in file_lower and any(word in file_lower for word in title_part.split()): print(f"๐ŸŽฏ Found matching file: {file.name}") output_path = file break else: print(f"โŒ No matching file found for: {artist} - {title}") continue else: continue if output_path.stat().st_size == 0: print(f"โŒ Download failed: file is empty (0 bytes): {output_path}") continue # TEMP: Skipping MP4 validation for debugging # if not self._is_valid_mp4(output_path): # print(f"โŒ File is not a valid MP4: {output_path}") # continue add_id3_tags(output_path, f"{artist} - {title} (Karaoke Version)", channel_name) mark_songlist_song_downloaded(self.songlist_tracking, artist, title, channel_name, output_path) print(f"โœ… Downloaded and tracked: {artist} - {title}") print(f"๐ŸŽ‰ All post-processing complete for: {output_path}") downloaded_count += 1 # After each download, if this was the last song, delete the cache if idx + 1 == total_to_download: if cache_file.exists(): try: cache_file.unlink() print(f"๐Ÿ—‘๏ธ Deleted download plan cache after last song downloaded: {cache_file.name}") except Exception as e: print(f"โš ๏ธ Could not delete download plan cache: {e}") print(f"๐ŸŽ‰ Downloaded {downloaded_count} songlist songs.") print(f"๐Ÿ“Š Summary: Processed {len(channel_urls)} channels, found {downloaded_count} songs, {len(unmatched)} songs not found.") # Delete the download plan cache if all planned downloads are done if cache_file.exists(): try: cache_file.unlink() print(f"๐Ÿ—‘๏ธ Deleted download plan cache after completion: {cache_file.name}") except Exception as e: print(f"โš ๏ธ Could not delete download plan cache: {e}") return True def download_latest_per_channel(self, channel_urls, limit=5, force_refresh_download_plan=False): """ Download the latest N videos from each channel in channel_urls. - Pre-scan all channels for their latest N videos. - Build a per-channel download plan and cache it. - Resume robustly if interrupted (removes each channel from the plan as it completes). - Deletes the plan cache when all channels are done. """ plan_mode = "latest_per_channel" plan_kwargs = {"limit": limit, "channels": len(channel_urls)} cache_file = self.get_download_plan_cache_file(plan_mode, **plan_kwargs) use_cache = False if not force_refresh_download_plan and cache_file.exists(): try: with open(cache_file, 'r', encoding='utf-8') as f: plan_data = json.load(f) cache_time = datetime.fromisoformat(plan_data.get('timestamp')) if datetime.now() - cache_time < timedelta(days=1): print(f"๐Ÿ—‚๏ธ Using cached latest-per-channel plan from {cache_time} ({cache_file.name}).") channel_plans = plan_data['channel_plans'] use_cache = True except Exception as e: print(f"โš ๏ธ Could not load latest-per-channel plan cache: {e}") if not use_cache: print("\n๐Ÿ”Ž Pre-scanning all channels for latest videos...") channel_plans = [] for channel_url in channel_urls: channel_name, channel_id = get_channel_info(channel_url) print(f"\n๐Ÿšฆ Starting channel: {channel_name} ({channel_url})") available_videos = self.tracker.get_channel_video_list( channel_url, yt_dlp_path=str(self.yt_dlp_path), force_refresh=False ) # Sort by upload order (assume yt-dlp returns in order, or sort by id if available) latest_videos = available_videos[:limit] print(f" โ†’ Found {len(latest_videos)} latest videos for this channel.") channel_plans.append({ 'channel_name': channel_name, 'channel_url': channel_url, 'videos': latest_videos }) plan_data = { 'timestamp': datetime.now().isoformat(), 'channel_plans': channel_plans } with open(cache_file, 'w', encoding='utf-8') as f: json.dump(plan_data, f, indent=2, ensure_ascii=False) print(f"๐Ÿ—‚๏ธ Saved new latest-per-channel plan cache: {cache_file.name}") # --- Download phase --- total_channels = len(channel_plans) for idx, channel_plan in enumerate(channel_plans): channel_name = channel_plan['channel_name'] channel_url = channel_plan['channel_url'] videos = channel_plan['videos'] print(f"\nโฌ‡๏ธ Downloading {len(videos)} videos from channel {idx+1} of {total_channels}: {channel_name}") for v_idx, video in enumerate(videos): title = video['title'] video_id = video['id'] # Sanitize filename safe_title = title invalid_chars = ['?', ':', '*', '"', '<', '>', '|', '/', '\\'] for char in invalid_chars: safe_title = safe_title.replace(char, "") safe_title = safe_title.replace("...", "").replace("..", "").replace(".", "").strip() filename = f"{channel_name} - {safe_title}.mp4" if len(filename) > 100: filename = f"{channel_name[:30]} - {safe_title[:60]}.mp4" output_path = self.downloads_dir / channel_name / filename output_path.parent.mkdir(parents=True, exist_ok=True) print(f" ({v_idx+1}/{len(videos)}) Downloading: {title} -> {output_path}") video_url = f"https://www.youtube.com/watch?v={video_id}" dlp_cmd = [ str(self.yt_dlp_path), "--no-check-certificates", "--ignore-errors", "--no-warnings", "-o", str(output_path), "-f", self.config["download_settings"]["format"], video_url ] try: result = subprocess.run(dlp_cmd, capture_output=True, text=True, check=True) print(f" โœ… yt-dlp completed successfully") except subprocess.CalledProcessError as e: print(f" โŒ yt-dlp failed with exit code {e.returncode}") print(f" โŒ yt-dlp stderr: {e.stderr}") continue if not output_path.exists() or output_path.stat().st_size == 0: print(f" โŒ Download failed or file is empty: {output_path}") continue add_id3_tags(output_path, title, channel_name) print(f" โœ… Downloaded and tagged: {title}") # After channel is done, remove it from the plan and update cache channel_plans[idx]['videos'] = [] with open(cache_file, 'w', encoding='utf-8') as f: json.dump({'timestamp': datetime.now().isoformat(), 'channel_plans': channel_plans}, f, indent=2, ensure_ascii=False) print(f" ๐Ÿ—‘๏ธ Channel {channel_name} completed and removed from plan cache.") # After all channels are done, delete the cache if cache_file.exists(): try: cache_file.unlink() print(f"๐Ÿ—‘๏ธ Deleted latest-per-channel plan cache after completion: {cache_file.name}") except Exception as e: print(f"โš ๏ธ Could not delete latest-per-channel plan cache: {e}") print(f"๐ŸŽ‰ All latest videos downloaded for all channels!") return True def _is_valid_mp4(self, file_path): """Check if the file is a valid MP4 using ffprobe, if available.""" try: cmd = ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=codec_name", "-of", "default=noprint_wrappers=1:nokey=1", str(file_path)] result = subprocess.run(cmd, capture_output=True, text=True, check=True) return "mp4" in result.stdout or "h264" in result.stdout or "hevc" in result.stdout except Exception: # If ffprobe is not available, skip the check return True def _download_video_and_track(self, channel_name, channel_url, video_id, video_title, artist, title, filename): """ Helper to download a single video and track its status. Returns True if successful, False otherwise. """ output_path = self.downloads_dir / channel_name / filename output_path.parent.mkdir(parents=True, exist_ok=True) print(f"โฌ‡๏ธ Downloading: {artist} - {title} -> {output_path}") video_url = f"https://www.youtube.com/watch?v={video_id}" dlp_cmd = [ str(self.yt_dlp_path), "--no-check-certificates", "--ignore-errors", "--no-warnings", "-o", str(output_path), "-f", self.config["download_settings"]["format"], video_url ] try: result = subprocess.run(dlp_cmd, capture_output=True, text=True, check=True) print(f"โœ… yt-dlp completed successfully") print(f"๐Ÿ“„ yt-dlp stdout: {result.stdout}") except subprocess.CalledProcessError as e: print(f"โŒ yt-dlp failed with exit code {e.returncode}") print(f"โŒ yt-dlp stderr: {e.stderr}") return False if not output_path.exists(): print(f"โŒ Download failed: file does not exist: {output_path}") return False if output_path.stat().st_size == 0: print(f"โŒ Download failed: file is empty (0 bytes): {output_path}") return False # TEMP: Skipping MP4 validation for debugging # if not self._is_valid_mp4(output_path): # print(f"โŒ File is not a valid MP4: {output_path}") # return False add_id3_tags(output_path, f"{artist} - {title} (Karaoke Version)", channel_name) mark_songlist_song_downloaded(self.songlist_tracking, artist, title, channel_name, output_path) print(f"โœ… Downloaded and tracked: {artist} - {title}") print(f"๐ŸŽ‰ All post-processing complete for: {output_path}") return True def reset_songlist_all(): """Delete all files tracked in songlist_tracking.json, clear songlist_tracking.json, and remove songlist songs from karaoke_tracking.json.""" import json from pathlib import Path # Load songlist tracking songlist_tracking_file = Path('data/songlist_tracking.json') karaoke_tracking_file = Path('data/karaoke_tracking.json') if songlist_tracking_file.exists(): with open(songlist_tracking_file, 'r', encoding='utf-8') as f: tracking = json.load(f) else: tracking = {} # Delete all files tracked for entry in tracking.values(): file_path = entry.get('file_path') if file_path: p = Path(file_path) try: if p.exists(): p.unlink() print(f"๐Ÿ—‘๏ธ Deleted: {p}") except Exception as e: print(f"โš ๏ธ Could not delete {p}: {e}") # Clear songlist_tracking.json songlist_tracking_file.write_text("{}", encoding="utf-8") print("๐Ÿงน Cleared songlist_tracking.json") # Remove songlist songs from karaoke_tracking.json if karaoke_tracking_file.exists(): with open(karaoke_tracking_file, 'r', encoding='utf-8') as f: karaoke_data = json.load(f) song_keys_to_remove = [] for song_id, song in karaoke_data.get('songs', {}).items(): artist = song.get('artist', '') title = song.get('title', song.get('name', '')) key = f"{artist.lower()}_{normalize_title(title)}" if key in tracking: song_keys_to_remove.append(song_id) for song_id in song_keys_to_remove: del karaoke_data['songs'][song_id] with open(karaoke_tracking_file, 'w', encoding='utf-8') as f: json.dump(karaoke_data, f, indent=2, ensure_ascii=False) print(f"๐Ÿงน Removed {len(song_keys_to_remove)} songlist songs from karaoke_tracking.json") print("โœ… Global songlist reset complete.") # For brevity, the rest of the class methods should be copied here from the original download_karaoke.py, # updating all references to use the new karaoke_downloader.* imports as needed.