diff --git a/PRD.md b/PRD.md index c4f5475..73f0a87 100644 --- a/PRD.md +++ b/PRD.md @@ -65,6 +65,8 @@ python download_karaoke.py --clear-cache SingKingKaraoke - ✅ Automatic cleanup of extra yt-dlp files - ✅ **Reset/clear channel tracking and files via CLI** - ✅ **Clear channel cache via CLI** +- ✅ **Download plan pre-scan and caching**: Before downloading, the tool pre-scans all channels for songlist matches, builds a download plan, and prints stats. The plan is cached for 1 day in data/download_plan_cache.json for fast resuming and reliability. Use --force-download-plan to force a refresh. +- ✅ **Latest-per-channel download**: Download the latest N videos from each channel in a single batch, with a per-channel download plan, robust resume, and unique plan cache. Use --latest-per-channel and --limit N. --- @@ -114,6 +116,8 @@ KaroakeVideoDownloader/ - `--reset-channel `: **Reset all tracking and files for a channel** - `--reset-songlist`: **When used with --reset-channel, also reset songlist songs for this channel** - `--clear-cache `: **Clear channel video cache for a specific channel or all** +- `--force-download-plan`: **Force refresh the download plan cache (re-scan all channels for matches)** +- `--latest-per-channel`: **Download the latest N videos from each channel (use with --limit)** --- @@ -124,6 +128,8 @@ KaroakeVideoDownloader/ - **ID3 Tagging:** Artist/title extracted from video title and embedded in MP4 files. - **Cleanup:** Extra files from yt-dlp (e.g., `.info.json`) are automatically removed after download. - **Reset/Clear:** Use `--reset-channel` to reset all tracking and files for a channel (optionally including songlist songs with `--reset-songlist`). Use `--clear-cache` to clear cached video lists for a channel or all channels. +- **Download plan pre-scan:** Before downloading, the tool scans all channels for songlist matches, builds a download plan, and prints stats (matches, unmatched, per-channel breakdown). The plan is cached for 1 day and reused unless --force-download-plan is set. +- **Latest-per-channel plan:** Download the latest N videos from each channel, with a per-channel plan and robust resume. Each channel is removed from the plan as it completes. Plan cache is deleted when all channels are done. --- diff --git a/README.md b/README.md index 5cbb06e..5c69411 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ A Python-based Windows CLI tool to download karaoke videos from YouTube channels - 🧹 **Automatic Cleanup**: Removes extra yt-dlp files - 📈 **Real-Time Progress**: Detailed console and log output - 🧹 **Reset/Clear Channel**: Reset all tracking and files for a channel, or clear channel cache via CLI +- 🗂️ **Latest-per-channel download**: Download the latest N videos from each channel in a single batch, with a per-channel download plan, robust resume, and unique plan cache. Use --latest-per-channel and --limit N. ## 📋 Requirements - **Windows 10/11** @@ -56,6 +57,11 @@ python download_karaoke.py --limit 5 python download_karaoke.py --resolution 1080p ``` +### Download Latest N Videos Per Channel +```bash +python download_karaoke.py --file data/channels.txt --latest-per-channel --limit 5 +``` + ### **Reset/Start Over for a Channel** ```bash python download_karaoke.py --reset-channel SingKingKaraoke @@ -135,6 +141,7 @@ KaroakeVideoDownloader/ - `--reset-channel `: **Reset all tracking and files for a channel** - `--reset-songlist`: **When used with --reset-channel, also reset songlist songs for this channel** - `--clear-cache `: **Clear channel video cache for a specific channel or all** +- `--latest-per-channel`: **Download the latest N videos from each channel (use with --limit)** ## 📝 Example Usage ```bash diff --git a/karaoke_downloader/cli.py b/karaoke_downloader/cli.py index 1085007..754a41e 100644 --- a/karaoke_downloader/cli.py +++ b/karaoke_downloader/cli.py @@ -35,6 +35,8 @@ Examples: parser.add_argument('--reset-songlist', action='store_true', help='When used with --reset-channel, also reset songlist songs for this channel') parser.add_argument('--reset-songlist-all', action='store_true', help='Reset all songlist tracking and delete all songlist-downloaded files (global)') parser.add_argument('--version', '-v', action='version', version='Karaoke Playlist Downloader v1.0') + parser.add_argument('--force-download-plan', action='store_true', help='Force refresh the download plan cache (re-scan all channels for matches)') + parser.add_argument('--latest-per-channel', action='store_true', help='Download the latest N videos from each channel (use with --limit)') args = parser.parse_args() yt_dlp_path = Path("downloader/yt-dlp.exe") @@ -158,9 +160,17 @@ Examples: with open(args.file, "r", encoding="utf-8") as f: channel_urls = [line.strip() for line in f if line.strip() and not line.strip().startswith("#")] limit = args.limit if args.limit else None - success = downloader.download_songlist_across_channels(channel_urls, limit=limit) + force_refresh_download_plan = args.force_download_plan if hasattr(args, 'force_download_plan') else False + success = downloader.download_songlist_across_channels(channel_urls, limit=limit, force_refresh_download_plan=force_refresh_download_plan) elif args.url: success = downloader.download_channel_videos(args.url, force_refresh=args.refresh) + elif args.latest_per_channel and args.file: + # Read all channel URLs from file + with open(args.file, "r", encoding="utf-8") as f: + channel_urls = [line.strip() for line in f if line.strip() and not line.strip().startswith("#")] + limit = args.limit if args.limit else 5 + force_refresh_download_plan = args.force_download_plan if hasattr(args, 'force_download_plan') else False + success = downloader.download_latest_per_channel(channel_urls, limit=limit, force_refresh_download_plan=force_refresh_download_plan) else: parser.print_help() sys.exit(1) diff --git a/karaoke_downloader/downloader.py b/karaoke_downloader/downloader.py index 330b5e5..35999ab 100644 --- a/karaoke_downloader/downloader.py +++ b/karaoke_downloader/downloader.py @@ -4,7 +4,7 @@ import subprocess import json import re from pathlib import Path -from datetime import datetime +from datetime import datetime, timedelta from karaoke_downloader.tracking_manager import TrackingManager, SongStatus, FormatType from karaoke_downloader.id3_utils import add_id3_tags, extract_artist_title from karaoke_downloader.songlist_manager import ( @@ -14,6 +14,7 @@ from karaoke_downloader.songlist_manager import ( ) from karaoke_downloader.youtube_utils import get_channel_info, get_playlist_info import logging +import hashlib DATA_DIR = Path("data") @@ -249,7 +250,71 @@ class KaraokeDownloader: print(f"🎉 All post-processing complete for: {output_path}") return True - def download_songlist_across_channels(self, channel_urls, limit=None): + def build_download_plan(self, channel_urls, undownloaded): + """ + For each song in undownloaded, scan all channels for a match. + Return (download_plan, unmatched_songs): + - download_plan: list of dicts {artist, title, channel_name, channel_url, video_id, video_title} + - unmatched_songs: list of songs not found in any channel + """ + plan = [] + unmatched = [] + channel_match_counts = {} + for channel_url in channel_urls: + channel_name, channel_id = get_channel_info(channel_url) + print(f"\n🚦 Starting channel: {channel_name} ({channel_url})") + available_videos = self.tracker.get_channel_video_list( + channel_url, + yt_dlp_path=str(self.yt_dlp_path), + force_refresh=False + ) + matches_this_channel = 0 + for song in undownloaded: + artist, title = song['artist'], song['title'] + found = False + for video in available_videos: + v_artist, v_title = extract_artist_title(video['title']) + if (normalize_title(v_artist) == normalize_title(artist) and normalize_title(v_title) == normalize_title(title)) or \ + (normalize_title(video['title']) == normalize_title(f"{artist} - {title}")): + # Only add if not already in plan (first channel wins) + if not any(p['artist'] == artist and p['title'] == title for p in plan): + plan.append({ + 'artist': artist, + 'title': title, + 'channel_name': channel_name, + 'channel_url': channel_url, + 'video_id': video['id'], + 'video_title': video['title'] + }) + matches_this_channel += 1 + found = True + break + # Don't break here; keep looking for all matches in this channel + channel_match_counts[channel_name] = matches_this_channel + print(f" → Found {matches_this_channel} songlist matches in this channel.") + # Now find unmatched songs + for song in undownloaded: + if not any(p['artist'] == song['artist'] and p['title'] == song['title'] for p in plan): + unmatched.append(song) + # Print summary table + print("\n📊 Channel match summary:") + for channel, count in channel_match_counts.items(): + print(f" {channel}: {count} matches") + print(f" TOTAL: {sum(channel_match_counts.values())} matches across {len(channel_match_counts)} channels.") + return plan, unmatched + + def get_download_plan_cache_file(self, mode, **kwargs): + """Generate a unique cache filename based on mode and key parameters.""" + parts = [f"plan_{mode}"] + for k, v in sorted(kwargs.items()): + parts.append(f"{k}{v}") + base = "_".join(parts) + # Hash for safety if string is long + if len(base) > 60: + base = base[:40] + "_" + hashlib.md5(base.encode()).hexdigest() + return Path(f"data/{base}.json") + + def download_songlist_across_channels(self, channel_urls, limit=None, force_refresh_download_plan=False): """ For each song in the songlist, try each channel in order and download from the first channel where it is found. Download up to 'limit' songs, skipping any that cannot be found, until the limit is reached or all possible matches are exhausted. @@ -274,141 +339,278 @@ class KaraokeDownloader: if not undownloaded: print("🎵 All songlist songs already downloaded.") return True - print("🔍 Songs to search for:") - for song in undownloaded: - print(f" - {song['artist']} - {song['title']}") - downloaded_count = 0 - attempted = set() - total_to_download = limit if limit is not None else len(undownloaded) - print(f"\n🎬 Processing {len(channel_urls)} channels for song matches...") - # Keep looping until limit is reached or no more undownloaded songs - while undownloaded and (limit is None or downloaded_count < limit): - for song in list(undownloaded): - if limit is not None and downloaded_count >= limit: - break - artist, title = song['artist'], song['title'] - if (artist, title) in attempted: - continue - found = False - print(f"\n🔍 Searching for: {artist} - {title}") - for channel_url in channel_urls: - channel_name, channel_id = get_channel_info(channel_url) - available_videos = self.tracker.get_channel_video_list( - channel_url, - yt_dlp_path=str(self.yt_dlp_path), - force_refresh=False - ) - for video in available_videos: - v_artist, v_title = extract_artist_title(video['title']) - if (normalize_title(v_artist) == normalize_title(artist) and normalize_title(v_title) == normalize_title(title)) or \ - (normalize_title(video['title']) == normalize_title(f"{artist} - {title}")): - # Progress print statement - print(f"📥 Downloading {downloaded_count + 1} of {total_to_download} songlist songs...") - print(f"🎯 Found on channel: {channel_name}") - # Download this song from this channel - # Create a shorter, safer filename - do this ONCE and use consistently - safe_title = title.replace("(From ", "").replace(")", "").replace(" - ", " ").replace(":", "").replace("'", "").replace('"', "") - safe_artist = artist.replace("'", "").replace('"', "") - # Remove all Windows-invalid characters - invalid_chars = ['?', ':', '*', '"', '<', '>', '|', '/', '\\'] - for char in invalid_chars: - safe_title = safe_title.replace(char, "") - safe_artist = safe_artist.replace(char, "") - # Also remove any other potentially problematic characters - safe_title = safe_title.replace("...", "").replace("..", "").replace(".", "").strip() - safe_artist = safe_artist.strip() - - filename = f"{safe_artist} - {safe_title}.mp4" - # Limit filename length to avoid Windows path issues - if len(filename) > 100: - filename = f"{safe_artist[:30]} - {safe_title[:60]}.mp4" - - output_path = self.downloads_dir / channel_name / filename - output_path.parent.mkdir(parents=True, exist_ok=True) - print(f"⬇️ Downloading: {artist} - {title} -> {output_path}") - video_url = f"https://www.youtube.com/watch?v={video['id']}" - dlp_cmd = [ - str(self.yt_dlp_path), - "--no-check-certificates", - "--ignore-errors", - "--no-warnings", - "-o", str(output_path), - "-f", self.config["download_settings"]["format"], - video_url - ] - print(f"🔧 Running command: {' '.join(dlp_cmd)}") - print(f"📺 Resolution settings: {self.config.get('download_settings', {}).get('preferred_resolution', 'Unknown')}") - print(f"🎬 Format string: {self.config.get('download_settings', {}).get('format', 'Unknown')}") - - # Debug: Show available formats (optional) - if self.config.get('debug_show_formats', False): - print(f"🔍 Checking available formats for: {video_url}") - format_cmd = [ - str(self.yt_dlp_path), - "--list-formats", - video_url - ] - try: - format_result = subprocess.run(format_cmd, capture_output=True, text=True, timeout=30) - print(f"📋 Available formats:\n{format_result.stdout}") - except Exception as e: - print(f"⚠️ Could not check formats: {e}") - try: - result = subprocess.run(dlp_cmd, capture_output=True, text=True, check=True) - print(f"✅ yt-dlp completed successfully") - print(f"📄 yt-dlp stdout: {result.stdout}") - except subprocess.CalledProcessError as e: - print(f"❌ yt-dlp failed with exit code {e.returncode}") - print(f"❌ yt-dlp stderr: {e.stderr}") - continue - if not output_path.exists(): - print(f"❌ Download failed: file does not exist: {output_path}") - # Check if yt-dlp saved it somewhere else - possible_files = list(output_path.parent.glob("*.mp4")) - if possible_files: - print(f"🔍 Found these files in the directory: {[f.name for f in possible_files]}") - # Look for a file that matches our pattern (artist - title) - artist_part = safe_artist.lower() - title_part = safe_title.lower() - for file in possible_files: - file_lower = file.stem.lower() - if artist_part in file_lower and any(word in file_lower for word in title_part.split()): - print(f"🎯 Found matching file: {file.name}") - output_path = file - break - else: - print(f"❌ No matching file found for: {artist} - {title}") - continue - else: - continue - if output_path.stat().st_size == 0: - print(f"❌ Download failed: file is empty (0 bytes): {output_path}") - continue - # TEMP: Skipping MP4 validation for debugging - # if not self._is_valid_mp4(output_path): - # print(f"❌ File is not a valid MP4: {output_path}") - # continue - add_id3_tags(output_path, f"{artist} - {title} (Karaoke Version)", channel_name) - mark_songlist_song_downloaded(self.songlist_tracking, artist, title, channel_name, output_path) - print(f"✅ Downloaded and tracked: {artist} - {title}") - print(f"🎉 All post-processing complete for: {output_path}") - downloaded_count += 1 - found = True - break # Only download from first channel where found - if found: - break - attempted.add((artist, title)) - if found: - undownloaded.remove(song) - # If no new downloads in this pass, break to avoid infinite loop - if downloaded_count == 0 or (limit is not None and downloaded_count >= limit): - break - if undownloaded: - print(f"⚠️ {len(undownloaded)} songlist songs could not be found in any channel:") - for song in undownloaded: + # Removed per-song printout for cleaner output + # print("🔍 Songs to search for:") + # for song in undownloaded: + # print(f" - {song['artist']} - {song['title']}") + # --- Download plan cache logic --- + plan_mode = "songlist" + plan_kwargs = {"limit": limit or "all", "channels": len(channel_urls)} + cache_file = self.get_download_plan_cache_file(plan_mode, **plan_kwargs) + use_cache = False + if not force_refresh_download_plan and cache_file.exists(): + try: + with open(cache_file, 'r', encoding='utf-8') as f: + cache_data = json.load(f) + cache_time = datetime.fromisoformat(cache_data.get('timestamp')) + if datetime.now() - cache_time < timedelta(days=1): + print(f"🗂️ Using cached download plan from {cache_time} ({cache_file.name}).") + download_plan = cache_data['download_plan'] + unmatched = cache_data['unmatched'] + use_cache = True + except Exception as e: + print(f"⚠️ Could not load download plan cache: {e}") + if not use_cache: + print("\n🔎 Pre-scanning channels for matches...") + download_plan, unmatched = self.build_download_plan(channel_urls, undownloaded) + if download_plan: + cache_data = { + 'timestamp': datetime.now().isoformat(), + 'download_plan': download_plan, + 'unmatched': unmatched + } + with open(cache_file, 'w', encoding='utf-8') as f: + json.dump(cache_data, f, indent=2, ensure_ascii=False) + print(f"🗂️ Saved new download plan cache: {cache_file.name}") + else: + if cache_file.exists(): + cache_file.unlink() + print(f"🗂️ No matches found, not saving download plan cache.") + print(f"\n📊 Download plan ready: {len(download_plan)} songs will be downloaded.") + print(f"❌ {len(unmatched)} songs could not be found in any channel.") + if unmatched: + print("Unmatched songs:") + for song in unmatched[:10]: print(f" - {song['artist']} - {song['title']}") + if len(unmatched) > 10: + print(f" ...and {len(unmatched)-10} more.") + # --- Download phase --- + downloaded_count = 0 + total_to_download = limit if limit is not None else len(download_plan) + for idx, item in enumerate(download_plan): + if limit is not None and downloaded_count >= limit: + break + artist = item['artist'] + title = item['title'] + channel_name = item['channel_name'] + channel_url = item['channel_url'] + video_id = item['video_id'] + video_title = item['video_title'] + print(f"\n⬇️ Downloading {idx+1} of {total_to_download}: {artist} - {title} (from {channel_name})") + # --- Existing download logic here, using channel_name, video_id, etc. --- + # (Copy the download logic from the previous loop, using these variables) + # Create a shorter, safer filename - do this ONCE and use consistently + safe_title = title.replace("(From ", "").replace(")", "").replace(" - ", " ").replace(":", "").replace("'", "").replace('"', "") + safe_artist = artist.replace("'", "").replace('"', "") + # Remove all Windows-invalid characters + invalid_chars = ['?', ':', '*', '"', '<', '>', '|', '/', '\\'] + for char in invalid_chars: + safe_title = safe_title.replace(char, "") + safe_artist = safe_artist.replace(char, "") + # Also remove any other potentially problematic characters + safe_title = safe_title.replace("...", "").replace("..", "").replace(".", "").strip() + safe_artist = safe_artist.strip() + + filename = f"{safe_artist} - {safe_title}.mp4" + # Limit filename length to avoid Windows path issues + if len(filename) > 100: + filename = f"{safe_artist[:30]} - {safe_title[:60]}.mp4" + + output_path = self.downloads_dir / channel_name / filename + output_path.parent.mkdir(parents=True, exist_ok=True) + print(f"⬇️ Downloading: {artist} - {title} -> {output_path}") + video_url = f"https://www.youtube.com/watch?v={video_id}" + dlp_cmd = [ + str(self.yt_dlp_path), + "--no-check-certificates", + "--ignore-errors", + "--no-warnings", + "-o", str(output_path), + "-f", self.config["download_settings"]["format"], + video_url + ] + print(f"🔧 Running command: {' '.join(dlp_cmd)}") + print(f"📺 Resolution settings: {self.config.get('download_settings', {}).get('preferred_resolution', 'Unknown')}") + print(f"🎬 Format string: {self.config.get('download_settings', {}).get('format', 'Unknown')}") + + # Debug: Show available formats (optional) + if self.config.get('debug_show_formats', False): + print(f"🔍 Checking available formats for: {video_url}") + format_cmd = [ + str(self.yt_dlp_path), + "--list-formats", + video_url + ] + try: + format_result = subprocess.run(format_cmd, capture_output=True, text=True, timeout=30) + print(f"📋 Available formats:\n{format_result.stdout}") + except Exception as e: + print(f"⚠️ Could not check formats: {e}") + try: + result = subprocess.run(dlp_cmd, capture_output=True, text=True, check=True) + print(f"✅ yt-dlp completed successfully") + print(f"📄 yt-dlp stdout: {result.stdout}") + except subprocess.CalledProcessError as e: + print(f"❌ yt-dlp failed with exit code {e.returncode}") + print(f"❌ yt-dlp stderr: {e.stderr}") + continue + if not output_path.exists(): + print(f"❌ Download failed: file does not exist: {output_path}") + # Check if yt-dlp saved it somewhere else + possible_files = list(output_path.parent.glob("*.mp4")) + if possible_files: + print(f"🔍 Found these files in the directory: {[f.name for f in possible_files]}") + # Look for a file that matches our pattern (artist - title) + artist_part = safe_artist.lower() + title_part = safe_title.lower() + for file in possible_files: + file_lower = file.stem.lower() + if artist_part in file_lower and any(word in file_lower for word in title_part.split()): + print(f"🎯 Found matching file: {file.name}") + output_path = file + break + else: + print(f"❌ No matching file found for: {artist} - {title}") + continue + else: + continue + if output_path.stat().st_size == 0: + print(f"❌ Download failed: file is empty (0 bytes): {output_path}") + continue + # TEMP: Skipping MP4 validation for debugging + # if not self._is_valid_mp4(output_path): + # print(f"❌ File is not a valid MP4: {output_path}") + # continue + add_id3_tags(output_path, f"{artist} - {title} (Karaoke Version)", channel_name) + mark_songlist_song_downloaded(self.songlist_tracking, artist, title, channel_name, output_path) + print(f"✅ Downloaded and tracked: {artist} - {title}") + print(f"🎉 All post-processing complete for: {output_path}") + downloaded_count += 1 + # After each download, if this was the last song, delete the cache + if idx + 1 == total_to_download: + if cache_file.exists(): + try: + cache_file.unlink() + print(f"🗑️ Deleted download plan cache after last song downloaded: {cache_file.name}") + except Exception as e: + print(f"⚠️ Could not delete download plan cache: {e}") print(f"🎉 Downloaded {downloaded_count} songlist songs.") - print(f"📊 Summary: Processed {len(channel_urls)} channels, found {downloaded_count} songs, {len(undownloaded)} songs not found.") + print(f"📊 Summary: Processed {len(channel_urls)} channels, found {downloaded_count} songs, {len(unmatched)} songs not found.") + # Delete the download plan cache if all planned downloads are done + if cache_file.exists(): + try: + cache_file.unlink() + print(f"🗑️ Deleted download plan cache after completion: {cache_file.name}") + except Exception as e: + print(f"⚠️ Could not delete download plan cache: {e}") + return True + + def download_latest_per_channel(self, channel_urls, limit=5, force_refresh_download_plan=False): + """ + Download the latest N videos from each channel in channel_urls. + - Pre-scan all channels for their latest N videos. + - Build a per-channel download plan and cache it. + - Resume robustly if interrupted (removes each channel from the plan as it completes). + - Deletes the plan cache when all channels are done. + """ + plan_mode = "latest_per_channel" + plan_kwargs = {"limit": limit, "channels": len(channel_urls)} + cache_file = self.get_download_plan_cache_file(plan_mode, **plan_kwargs) + use_cache = False + if not force_refresh_download_plan and cache_file.exists(): + try: + with open(cache_file, 'r', encoding='utf-8') as f: + plan_data = json.load(f) + cache_time = datetime.fromisoformat(plan_data.get('timestamp')) + if datetime.now() - cache_time < timedelta(days=1): + print(f"🗂️ Using cached latest-per-channel plan from {cache_time} ({cache_file.name}).") + channel_plans = plan_data['channel_plans'] + use_cache = True + except Exception as e: + print(f"⚠️ Could not load latest-per-channel plan cache: {e}") + if not use_cache: + print("\n🔎 Pre-scanning all channels for latest videos...") + channel_plans = [] + for channel_url in channel_urls: + channel_name, channel_id = get_channel_info(channel_url) + print(f"\n🚦 Starting channel: {channel_name} ({channel_url})") + available_videos = self.tracker.get_channel_video_list( + channel_url, + yt_dlp_path=str(self.yt_dlp_path), + force_refresh=False + ) + # Sort by upload order (assume yt-dlp returns in order, or sort by id if available) + latest_videos = available_videos[:limit] + print(f" → Found {len(latest_videos)} latest videos for this channel.") + channel_plans.append({ + 'channel_name': channel_name, + 'channel_url': channel_url, + 'videos': latest_videos + }) + plan_data = { + 'timestamp': datetime.now().isoformat(), + 'channel_plans': channel_plans + } + with open(cache_file, 'w', encoding='utf-8') as f: + json.dump(plan_data, f, indent=2, ensure_ascii=False) + print(f"🗂️ Saved new latest-per-channel plan cache: {cache_file.name}") + # --- Download phase --- + total_channels = len(channel_plans) + for idx, channel_plan in enumerate(channel_plans): + channel_name = channel_plan['channel_name'] + channel_url = channel_plan['channel_url'] + videos = channel_plan['videos'] + print(f"\n⬇️ Downloading {len(videos)} videos from channel {idx+1} of {total_channels}: {channel_name}") + for v_idx, video in enumerate(videos): + title = video['title'] + video_id = video['id'] + # Sanitize filename + safe_title = title + invalid_chars = ['?', ':', '*', '"', '<', '>', '|', '/', '\\'] + for char in invalid_chars: + safe_title = safe_title.replace(char, "") + safe_title = safe_title.replace("...", "").replace("..", "").replace(".", "").strip() + filename = f"{channel_name} - {safe_title}.mp4" + if len(filename) > 100: + filename = f"{channel_name[:30]} - {safe_title[:60]}.mp4" + output_path = self.downloads_dir / channel_name / filename + output_path.parent.mkdir(parents=True, exist_ok=True) + print(f" ({v_idx+1}/{len(videos)}) Downloading: {title} -> {output_path}") + video_url = f"https://www.youtube.com/watch?v={video_id}" + dlp_cmd = [ + str(self.yt_dlp_path), + "--no-check-certificates", + "--ignore-errors", + "--no-warnings", + "-o", str(output_path), + "-f", self.config["download_settings"]["format"], + video_url + ] + try: + result = subprocess.run(dlp_cmd, capture_output=True, text=True, check=True) + print(f" ✅ yt-dlp completed successfully") + except subprocess.CalledProcessError as e: + print(f" ❌ yt-dlp failed with exit code {e.returncode}") + print(f" ❌ yt-dlp stderr: {e.stderr}") + continue + if not output_path.exists() or output_path.stat().st_size == 0: + print(f" ❌ Download failed or file is empty: {output_path}") + continue + add_id3_tags(output_path, title, channel_name) + print(f" ✅ Downloaded and tagged: {title}") + # After channel is done, remove it from the plan and update cache + channel_plans[idx]['videos'] = [] + with open(cache_file, 'w', encoding='utf-8') as f: + json.dump({'timestamp': datetime.now().isoformat(), 'channel_plans': channel_plans}, f, indent=2, ensure_ascii=False) + print(f" 🗑️ Channel {channel_name} completed and removed from plan cache.") + # After all channels are done, delete the cache + if cache_file.exists(): + try: + cache_file.unlink() + print(f"🗑️ Deleted latest-per-channel plan cache after completion: {cache_file.name}") + except Exception as e: + print(f"⚠️ Could not delete latest-per-channel plan cache: {e}") + print(f"🎉 All latest videos downloaded for all channels!") return True def _is_valid_mp4(self, file_path):