Signed-off-by: mbrucedogs <mbrucedogs@gmail.com>

This commit is contained in:
mbrucedogs 2025-07-24 19:44:08 -05:00
parent b473dbf329
commit 5580207c6c
4 changed files with 361 additions and 136 deletions

6
PRD.md
View File

@ -65,6 +65,8 @@ python download_karaoke.py --clear-cache SingKingKaraoke
- ✅ Automatic cleanup of extra yt-dlp files
- ✅ **Reset/clear channel tracking and files via CLI**
- ✅ **Clear channel cache via CLI**
- ✅ **Download plan pre-scan and caching**: Before downloading, the tool pre-scans all channels for songlist matches, builds a download plan, and prints stats. The plan is cached for 1 day in data/download_plan_cache.json for fast resuming and reliability. Use --force-download-plan to force a refresh.
- ✅ **Latest-per-channel download**: Download the latest N videos from each channel in a single batch, with a per-channel download plan, robust resume, and unique plan cache. Use --latest-per-channel and --limit N.
---
@ -114,6 +116,8 @@ KaroakeVideoDownloader/
- `--reset-channel <CHANNEL_NAME>`: **Reset all tracking and files for a channel**
- `--reset-songlist`: **When used with --reset-channel, also reset songlist songs for this channel**
- `--clear-cache <CHANNEL_ID|all>`: **Clear channel video cache for a specific channel or all**
- `--force-download-plan`: **Force refresh the download plan cache (re-scan all channels for matches)**
- `--latest-per-channel`: **Download the latest N videos from each channel (use with --limit)**
---
@ -124,6 +128,8 @@ KaroakeVideoDownloader/
- **ID3 Tagging:** Artist/title extracted from video title and embedded in MP4 files.
- **Cleanup:** Extra files from yt-dlp (e.g., `.info.json`) are automatically removed after download.
- **Reset/Clear:** Use `--reset-channel` to reset all tracking and files for a channel (optionally including songlist songs with `--reset-songlist`). Use `--clear-cache` to clear cached video lists for a channel or all channels.
- **Download plan pre-scan:** Before downloading, the tool scans all channels for songlist matches, builds a download plan, and prints stats (matches, unmatched, per-channel breakdown). The plan is cached for 1 day and reused unless --force-download-plan is set.
- **Latest-per-channel plan:** Download the latest N videos from each channel, with a per-channel plan and robust resume. Each channel is removed from the plan as it completes. Plan cache is deleted when all channels are done.
---

View File

@ -12,6 +12,7 @@ A Python-based Windows CLI tool to download karaoke videos from YouTube channels
- 🧹 **Automatic Cleanup**: Removes extra yt-dlp files
- 📈 **Real-Time Progress**: Detailed console and log output
- 🧹 **Reset/Clear Channel**: Reset all tracking and files for a channel, or clear channel cache via CLI
- 🗂️ **Latest-per-channel download**: Download the latest N videos from each channel in a single batch, with a per-channel download plan, robust resume, and unique plan cache. Use --latest-per-channel and --limit N.
## 📋 Requirements
- **Windows 10/11**
@ -56,6 +57,11 @@ python download_karaoke.py --limit 5
python download_karaoke.py --resolution 1080p
```
### Download Latest N Videos Per Channel
```bash
python download_karaoke.py --file data/channels.txt --latest-per-channel --limit 5
```
### **Reset/Start Over for a Channel**
```bash
python download_karaoke.py --reset-channel SingKingKaraoke
@ -135,6 +141,7 @@ KaroakeVideoDownloader/
- `--reset-channel <CHANNEL_NAME>`: **Reset all tracking and files for a channel**
- `--reset-songlist`: **When used with --reset-channel, also reset songlist songs for this channel**
- `--clear-cache <CHANNEL_ID|all>`: **Clear channel video cache for a specific channel or all**
- `--latest-per-channel`: **Download the latest N videos from each channel (use with --limit)**
## 📝 Example Usage
```bash

View File

@ -35,6 +35,8 @@ Examples:
parser.add_argument('--reset-songlist', action='store_true', help='When used with --reset-channel, also reset songlist songs for this channel')
parser.add_argument('--reset-songlist-all', action='store_true', help='Reset all songlist tracking and delete all songlist-downloaded files (global)')
parser.add_argument('--version', '-v', action='version', version='Karaoke Playlist Downloader v1.0')
parser.add_argument('--force-download-plan', action='store_true', help='Force refresh the download plan cache (re-scan all channels for matches)')
parser.add_argument('--latest-per-channel', action='store_true', help='Download the latest N videos from each channel (use with --limit)')
args = parser.parse_args()
yt_dlp_path = Path("downloader/yt-dlp.exe")
@ -158,9 +160,17 @@ Examples:
with open(args.file, "r", encoding="utf-8") as f:
channel_urls = [line.strip() for line in f if line.strip() and not line.strip().startswith("#")]
limit = args.limit if args.limit else None
success = downloader.download_songlist_across_channels(channel_urls, limit=limit)
force_refresh_download_plan = args.force_download_plan if hasattr(args, 'force_download_plan') else False
success = downloader.download_songlist_across_channels(channel_urls, limit=limit, force_refresh_download_plan=force_refresh_download_plan)
elif args.url:
success = downloader.download_channel_videos(args.url, force_refresh=args.refresh)
elif args.latest_per_channel and args.file:
# Read all channel URLs from file
with open(args.file, "r", encoding="utf-8") as f:
channel_urls = [line.strip() for line in f if line.strip() and not line.strip().startswith("#")]
limit = args.limit if args.limit else 5
force_refresh_download_plan = args.force_download_plan if hasattr(args, 'force_download_plan') else False
success = downloader.download_latest_per_channel(channel_urls, limit=limit, force_refresh_download_plan=force_refresh_download_plan)
else:
parser.print_help()
sys.exit(1)

View File

@ -4,7 +4,7 @@ import subprocess
import json
import re
from pathlib import Path
from datetime import datetime
from datetime import datetime, timedelta
from karaoke_downloader.tracking_manager import TrackingManager, SongStatus, FormatType
from karaoke_downloader.id3_utils import add_id3_tags, extract_artist_title
from karaoke_downloader.songlist_manager import (
@ -14,6 +14,7 @@ from karaoke_downloader.songlist_manager import (
)
from karaoke_downloader.youtube_utils import get_channel_info, get_playlist_info
import logging
import hashlib
DATA_DIR = Path("data")
@ -249,7 +250,71 @@ class KaraokeDownloader:
print(f"🎉 All post-processing complete for: {output_path}")
return True
def download_songlist_across_channels(self, channel_urls, limit=None):
def build_download_plan(self, channel_urls, undownloaded):
"""
For each song in undownloaded, scan all channels for a match.
Return (download_plan, unmatched_songs):
- download_plan: list of dicts {artist, title, channel_name, channel_url, video_id, video_title}
- unmatched_songs: list of songs not found in any channel
"""
plan = []
unmatched = []
channel_match_counts = {}
for channel_url in channel_urls:
channel_name, channel_id = get_channel_info(channel_url)
print(f"\n🚦 Starting channel: {channel_name} ({channel_url})")
available_videos = self.tracker.get_channel_video_list(
channel_url,
yt_dlp_path=str(self.yt_dlp_path),
force_refresh=False
)
matches_this_channel = 0
for song in undownloaded:
artist, title = song['artist'], song['title']
found = False
for video in available_videos:
v_artist, v_title = extract_artist_title(video['title'])
if (normalize_title(v_artist) == normalize_title(artist) and normalize_title(v_title) == normalize_title(title)) or \
(normalize_title(video['title']) == normalize_title(f"{artist} - {title}")):
# Only add if not already in plan (first channel wins)
if not any(p['artist'] == artist and p['title'] == title for p in plan):
plan.append({
'artist': artist,
'title': title,
'channel_name': channel_name,
'channel_url': channel_url,
'video_id': video['id'],
'video_title': video['title']
})
matches_this_channel += 1
found = True
break
# Don't break here; keep looking for all matches in this channel
channel_match_counts[channel_name] = matches_this_channel
print(f" → Found {matches_this_channel} songlist matches in this channel.")
# Now find unmatched songs
for song in undownloaded:
if not any(p['artist'] == song['artist'] and p['title'] == song['title'] for p in plan):
unmatched.append(song)
# Print summary table
print("\n📊 Channel match summary:")
for channel, count in channel_match_counts.items():
print(f" {channel}: {count} matches")
print(f" TOTAL: {sum(channel_match_counts.values())} matches across {len(channel_match_counts)} channels.")
return plan, unmatched
def get_download_plan_cache_file(self, mode, **kwargs):
"""Generate a unique cache filename based on mode and key parameters."""
parts = [f"plan_{mode}"]
for k, v in sorted(kwargs.items()):
parts.append(f"{k}{v}")
base = "_".join(parts)
# Hash for safety if string is long
if len(base) > 60:
base = base[:40] + "_" + hashlib.md5(base.encode()).hexdigest()
return Path(f"data/{base}.json")
def download_songlist_across_channels(self, channel_urls, limit=None, force_refresh_download_plan=False):
"""
For each song in the songlist, try each channel in order and download from the first channel where it is found.
Download up to 'limit' songs, skipping any that cannot be found, until the limit is reached or all possible matches are exhausted.
@ -274,141 +339,278 @@ class KaraokeDownloader:
if not undownloaded:
print("🎵 All songlist songs already downloaded.")
return True
print("🔍 Songs to search for:")
for song in undownloaded:
print(f" - {song['artist']} - {song['title']}")
downloaded_count = 0
attempted = set()
total_to_download = limit if limit is not None else len(undownloaded)
print(f"\n🎬 Processing {len(channel_urls)} channels for song matches...")
# Keep looping until limit is reached or no more undownloaded songs
while undownloaded and (limit is None or downloaded_count < limit):
for song in list(undownloaded):
if limit is not None and downloaded_count >= limit:
break
artist, title = song['artist'], song['title']
if (artist, title) in attempted:
continue
found = False
print(f"\n🔍 Searching for: {artist} - {title}")
for channel_url in channel_urls:
channel_name, channel_id = get_channel_info(channel_url)
available_videos = self.tracker.get_channel_video_list(
channel_url,
yt_dlp_path=str(self.yt_dlp_path),
force_refresh=False
)
for video in available_videos:
v_artist, v_title = extract_artist_title(video['title'])
if (normalize_title(v_artist) == normalize_title(artist) and normalize_title(v_title) == normalize_title(title)) or \
(normalize_title(video['title']) == normalize_title(f"{artist} - {title}")):
# Progress print statement
print(f"📥 Downloading {downloaded_count + 1} of {total_to_download} songlist songs...")
print(f"🎯 Found on channel: {channel_name}")
# Download this song from this channel
# Create a shorter, safer filename - do this ONCE and use consistently
safe_title = title.replace("(From ", "").replace(")", "").replace(" - ", " ").replace(":", "").replace("'", "").replace('"', "")
safe_artist = artist.replace("'", "").replace('"', "")
# Remove all Windows-invalid characters
invalid_chars = ['?', ':', '*', '"', '<', '>', '|', '/', '\\']
for char in invalid_chars:
safe_title = safe_title.replace(char, "")
safe_artist = safe_artist.replace(char, "")
# Also remove any other potentially problematic characters
safe_title = safe_title.replace("...", "").replace("..", "").replace(".", "").strip()
safe_artist = safe_artist.strip()
filename = f"{safe_artist} - {safe_title}.mp4"
# Limit filename length to avoid Windows path issues
if len(filename) > 100:
filename = f"{safe_artist[:30]} - {safe_title[:60]}.mp4"
output_path = self.downloads_dir / channel_name / filename
output_path.parent.mkdir(parents=True, exist_ok=True)
print(f"⬇️ Downloading: {artist} - {title} -> {output_path}")
video_url = f"https://www.youtube.com/watch?v={video['id']}"
dlp_cmd = [
str(self.yt_dlp_path),
"--no-check-certificates",
"--ignore-errors",
"--no-warnings",
"-o", str(output_path),
"-f", self.config["download_settings"]["format"],
video_url
]
print(f"🔧 Running command: {' '.join(dlp_cmd)}")
print(f"📺 Resolution settings: {self.config.get('download_settings', {}).get('preferred_resolution', 'Unknown')}")
print(f"🎬 Format string: {self.config.get('download_settings', {}).get('format', 'Unknown')}")
# Debug: Show available formats (optional)
if self.config.get('debug_show_formats', False):
print(f"🔍 Checking available formats for: {video_url}")
format_cmd = [
str(self.yt_dlp_path),
"--list-formats",
video_url
]
try:
format_result = subprocess.run(format_cmd, capture_output=True, text=True, timeout=30)
print(f"📋 Available formats:\n{format_result.stdout}")
except Exception as e:
print(f"⚠️ Could not check formats: {e}")
try:
result = subprocess.run(dlp_cmd, capture_output=True, text=True, check=True)
print(f"✅ yt-dlp completed successfully")
print(f"📄 yt-dlp stdout: {result.stdout}")
except subprocess.CalledProcessError as e:
print(f"❌ yt-dlp failed with exit code {e.returncode}")
print(f"❌ yt-dlp stderr: {e.stderr}")
continue
if not output_path.exists():
print(f"❌ Download failed: file does not exist: {output_path}")
# Check if yt-dlp saved it somewhere else
possible_files = list(output_path.parent.glob("*.mp4"))
if possible_files:
print(f"🔍 Found these files in the directory: {[f.name for f in possible_files]}")
# Look for a file that matches our pattern (artist - title)
artist_part = safe_artist.lower()
title_part = safe_title.lower()
for file in possible_files:
file_lower = file.stem.lower()
if artist_part in file_lower and any(word in file_lower for word in title_part.split()):
print(f"🎯 Found matching file: {file.name}")
output_path = file
break
else:
print(f"❌ No matching file found for: {artist} - {title}")
continue
else:
continue
if output_path.stat().st_size == 0:
print(f"❌ Download failed: file is empty (0 bytes): {output_path}")
continue
# TEMP: Skipping MP4 validation for debugging
# if not self._is_valid_mp4(output_path):
# print(f"❌ File is not a valid MP4: {output_path}")
# continue
add_id3_tags(output_path, f"{artist} - {title} (Karaoke Version)", channel_name)
mark_songlist_song_downloaded(self.songlist_tracking, artist, title, channel_name, output_path)
print(f"✅ Downloaded and tracked: {artist} - {title}")
print(f"🎉 All post-processing complete for: {output_path}")
downloaded_count += 1
found = True
break # Only download from first channel where found
if found:
break
attempted.add((artist, title))
if found:
undownloaded.remove(song)
# If no new downloads in this pass, break to avoid infinite loop
if downloaded_count == 0 or (limit is not None and downloaded_count >= limit):
break
if undownloaded:
print(f"⚠️ {len(undownloaded)} songlist songs could not be found in any channel:")
for song in undownloaded:
# Removed per-song printout for cleaner output
# print("🔍 Songs to search for:")
# for song in undownloaded:
# print(f" - {song['artist']} - {song['title']}")
# --- Download plan cache logic ---
plan_mode = "songlist"
plan_kwargs = {"limit": limit or "all", "channels": len(channel_urls)}
cache_file = self.get_download_plan_cache_file(plan_mode, **plan_kwargs)
use_cache = False
if not force_refresh_download_plan and cache_file.exists():
try:
with open(cache_file, 'r', encoding='utf-8') as f:
cache_data = json.load(f)
cache_time = datetime.fromisoformat(cache_data.get('timestamp'))
if datetime.now() - cache_time < timedelta(days=1):
print(f"🗂️ Using cached download plan from {cache_time} ({cache_file.name}).")
download_plan = cache_data['download_plan']
unmatched = cache_data['unmatched']
use_cache = True
except Exception as e:
print(f"⚠️ Could not load download plan cache: {e}")
if not use_cache:
print("\n🔎 Pre-scanning channels for matches...")
download_plan, unmatched = self.build_download_plan(channel_urls, undownloaded)
if download_plan:
cache_data = {
'timestamp': datetime.now().isoformat(),
'download_plan': download_plan,
'unmatched': unmatched
}
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump(cache_data, f, indent=2, ensure_ascii=False)
print(f"🗂️ Saved new download plan cache: {cache_file.name}")
else:
if cache_file.exists():
cache_file.unlink()
print(f"🗂️ No matches found, not saving download plan cache.")
print(f"\n📊 Download plan ready: {len(download_plan)} songs will be downloaded.")
print(f"{len(unmatched)} songs could not be found in any channel.")
if unmatched:
print("Unmatched songs:")
for song in unmatched[:10]:
print(f" - {song['artist']} - {song['title']}")
if len(unmatched) > 10:
print(f" ...and {len(unmatched)-10} more.")
# --- Download phase ---
downloaded_count = 0
total_to_download = limit if limit is not None else len(download_plan)
for idx, item in enumerate(download_plan):
if limit is not None and downloaded_count >= limit:
break
artist = item['artist']
title = item['title']
channel_name = item['channel_name']
channel_url = item['channel_url']
video_id = item['video_id']
video_title = item['video_title']
print(f"\n⬇️ Downloading {idx+1} of {total_to_download}: {artist} - {title} (from {channel_name})")
# --- Existing download logic here, using channel_name, video_id, etc. ---
# (Copy the download logic from the previous loop, using these variables)
# Create a shorter, safer filename - do this ONCE and use consistently
safe_title = title.replace("(From ", "").replace(")", "").replace(" - ", " ").replace(":", "").replace("'", "").replace('"', "")
safe_artist = artist.replace("'", "").replace('"', "")
# Remove all Windows-invalid characters
invalid_chars = ['?', ':', '*', '"', '<', '>', '|', '/', '\\']
for char in invalid_chars:
safe_title = safe_title.replace(char, "")
safe_artist = safe_artist.replace(char, "")
# Also remove any other potentially problematic characters
safe_title = safe_title.replace("...", "").replace("..", "").replace(".", "").strip()
safe_artist = safe_artist.strip()
filename = f"{safe_artist} - {safe_title}.mp4"
# Limit filename length to avoid Windows path issues
if len(filename) > 100:
filename = f"{safe_artist[:30]} - {safe_title[:60]}.mp4"
output_path = self.downloads_dir / channel_name / filename
output_path.parent.mkdir(parents=True, exist_ok=True)
print(f"⬇️ Downloading: {artist} - {title} -> {output_path}")
video_url = f"https://www.youtube.com/watch?v={video_id}"
dlp_cmd = [
str(self.yt_dlp_path),
"--no-check-certificates",
"--ignore-errors",
"--no-warnings",
"-o", str(output_path),
"-f", self.config["download_settings"]["format"],
video_url
]
print(f"🔧 Running command: {' '.join(dlp_cmd)}")
print(f"📺 Resolution settings: {self.config.get('download_settings', {}).get('preferred_resolution', 'Unknown')}")
print(f"🎬 Format string: {self.config.get('download_settings', {}).get('format', 'Unknown')}")
# Debug: Show available formats (optional)
if self.config.get('debug_show_formats', False):
print(f"🔍 Checking available formats for: {video_url}")
format_cmd = [
str(self.yt_dlp_path),
"--list-formats",
video_url
]
try:
format_result = subprocess.run(format_cmd, capture_output=True, text=True, timeout=30)
print(f"📋 Available formats:\n{format_result.stdout}")
except Exception as e:
print(f"⚠️ Could not check formats: {e}")
try:
result = subprocess.run(dlp_cmd, capture_output=True, text=True, check=True)
print(f"✅ yt-dlp completed successfully")
print(f"📄 yt-dlp stdout: {result.stdout}")
except subprocess.CalledProcessError as e:
print(f"❌ yt-dlp failed with exit code {e.returncode}")
print(f"❌ yt-dlp stderr: {e.stderr}")
continue
if not output_path.exists():
print(f"❌ Download failed: file does not exist: {output_path}")
# Check if yt-dlp saved it somewhere else
possible_files = list(output_path.parent.glob("*.mp4"))
if possible_files:
print(f"🔍 Found these files in the directory: {[f.name for f in possible_files]}")
# Look for a file that matches our pattern (artist - title)
artist_part = safe_artist.lower()
title_part = safe_title.lower()
for file in possible_files:
file_lower = file.stem.lower()
if artist_part in file_lower and any(word in file_lower for word in title_part.split()):
print(f"🎯 Found matching file: {file.name}")
output_path = file
break
else:
print(f"❌ No matching file found for: {artist} - {title}")
continue
else:
continue
if output_path.stat().st_size == 0:
print(f"❌ Download failed: file is empty (0 bytes): {output_path}")
continue
# TEMP: Skipping MP4 validation for debugging
# if not self._is_valid_mp4(output_path):
# print(f"❌ File is not a valid MP4: {output_path}")
# continue
add_id3_tags(output_path, f"{artist} - {title} (Karaoke Version)", channel_name)
mark_songlist_song_downloaded(self.songlist_tracking, artist, title, channel_name, output_path)
print(f"✅ Downloaded and tracked: {artist} - {title}")
print(f"🎉 All post-processing complete for: {output_path}")
downloaded_count += 1
# After each download, if this was the last song, delete the cache
if idx + 1 == total_to_download:
if cache_file.exists():
try:
cache_file.unlink()
print(f"🗑️ Deleted download plan cache after last song downloaded: {cache_file.name}")
except Exception as e:
print(f"⚠️ Could not delete download plan cache: {e}")
print(f"🎉 Downloaded {downloaded_count} songlist songs.")
print(f"📊 Summary: Processed {len(channel_urls)} channels, found {downloaded_count} songs, {len(undownloaded)} songs not found.")
print(f"📊 Summary: Processed {len(channel_urls)} channels, found {downloaded_count} songs, {len(unmatched)} songs not found.")
# Delete the download plan cache if all planned downloads are done
if cache_file.exists():
try:
cache_file.unlink()
print(f"🗑️ Deleted download plan cache after completion: {cache_file.name}")
except Exception as e:
print(f"⚠️ Could not delete download plan cache: {e}")
return True
def download_latest_per_channel(self, channel_urls, limit=5, force_refresh_download_plan=False):
"""
Download the latest N videos from each channel in channel_urls.
- Pre-scan all channels for their latest N videos.
- Build a per-channel download plan and cache it.
- Resume robustly if interrupted (removes each channel from the plan as it completes).
- Deletes the plan cache when all channels are done.
"""
plan_mode = "latest_per_channel"
plan_kwargs = {"limit": limit, "channels": len(channel_urls)}
cache_file = self.get_download_plan_cache_file(plan_mode, **plan_kwargs)
use_cache = False
if not force_refresh_download_plan and cache_file.exists():
try:
with open(cache_file, 'r', encoding='utf-8') as f:
plan_data = json.load(f)
cache_time = datetime.fromisoformat(plan_data.get('timestamp'))
if datetime.now() - cache_time < timedelta(days=1):
print(f"🗂️ Using cached latest-per-channel plan from {cache_time} ({cache_file.name}).")
channel_plans = plan_data['channel_plans']
use_cache = True
except Exception as e:
print(f"⚠️ Could not load latest-per-channel plan cache: {e}")
if not use_cache:
print("\n🔎 Pre-scanning all channels for latest videos...")
channel_plans = []
for channel_url in channel_urls:
channel_name, channel_id = get_channel_info(channel_url)
print(f"\n🚦 Starting channel: {channel_name} ({channel_url})")
available_videos = self.tracker.get_channel_video_list(
channel_url,
yt_dlp_path=str(self.yt_dlp_path),
force_refresh=False
)
# Sort by upload order (assume yt-dlp returns in order, or sort by id if available)
latest_videos = available_videos[:limit]
print(f" → Found {len(latest_videos)} latest videos for this channel.")
channel_plans.append({
'channel_name': channel_name,
'channel_url': channel_url,
'videos': latest_videos
})
plan_data = {
'timestamp': datetime.now().isoformat(),
'channel_plans': channel_plans
}
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump(plan_data, f, indent=2, ensure_ascii=False)
print(f"🗂️ Saved new latest-per-channel plan cache: {cache_file.name}")
# --- Download phase ---
total_channels = len(channel_plans)
for idx, channel_plan in enumerate(channel_plans):
channel_name = channel_plan['channel_name']
channel_url = channel_plan['channel_url']
videos = channel_plan['videos']
print(f"\n⬇️ Downloading {len(videos)} videos from channel {idx+1} of {total_channels}: {channel_name}")
for v_idx, video in enumerate(videos):
title = video['title']
video_id = video['id']
# Sanitize filename
safe_title = title
invalid_chars = ['?', ':', '*', '"', '<', '>', '|', '/', '\\']
for char in invalid_chars:
safe_title = safe_title.replace(char, "")
safe_title = safe_title.replace("...", "").replace("..", "").replace(".", "").strip()
filename = f"{channel_name} - {safe_title}.mp4"
if len(filename) > 100:
filename = f"{channel_name[:30]} - {safe_title[:60]}.mp4"
output_path = self.downloads_dir / channel_name / filename
output_path.parent.mkdir(parents=True, exist_ok=True)
print(f" ({v_idx+1}/{len(videos)}) Downloading: {title} -> {output_path}")
video_url = f"https://www.youtube.com/watch?v={video_id}"
dlp_cmd = [
str(self.yt_dlp_path),
"--no-check-certificates",
"--ignore-errors",
"--no-warnings",
"-o", str(output_path),
"-f", self.config["download_settings"]["format"],
video_url
]
try:
result = subprocess.run(dlp_cmd, capture_output=True, text=True, check=True)
print(f" ✅ yt-dlp completed successfully")
except subprocess.CalledProcessError as e:
print(f" ❌ yt-dlp failed with exit code {e.returncode}")
print(f" ❌ yt-dlp stderr: {e.stderr}")
continue
if not output_path.exists() or output_path.stat().st_size == 0:
print(f" ❌ Download failed or file is empty: {output_path}")
continue
add_id3_tags(output_path, title, channel_name)
print(f" ✅ Downloaded and tagged: {title}")
# After channel is done, remove it from the plan and update cache
channel_plans[idx]['videos'] = []
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump({'timestamp': datetime.now().isoformat(), 'channel_plans': channel_plans}, f, indent=2, ensure_ascii=False)
print(f" 🗑️ Channel {channel_name} completed and removed from plan cache.")
# After all channels are done, delete the cache
if cache_file.exists():
try:
cache_file.unlink()
print(f"🗑️ Deleted latest-per-channel plan cache after completion: {cache_file.name}")
except Exception as e:
print(f"⚠️ Could not delete latest-per-channel plan cache: {e}")
print(f"🎉 All latest videos downloaded for all channels!")
return True
def _is_valid_mp4(self, file_path):