Signed-off-by: mbrucedogs <mbrucedogs@gmail.com>

This commit is contained in:
mbrucedogs 2025-07-29 08:45:12 -05:00
parent ec95b24a69
commit 42e7a6a09c
9 changed files with 8490 additions and 592 deletions

42
PRD.md
View File

@ -197,6 +197,8 @@ KaroakeVideoDownloader/
- `--parallel`: **Enable parallel downloads for improved speed**
- `--workers <N>`: **Number of parallel download workers (1-10, default: 3, only used with --parallel)**
- `--manual`: **Download from manual videos collection (data/manual_videos.json)**
- `--channel-focus <CHANNEL_NAME>`: **Download from a specific channel by name (e.g., 'SingKingKaraoke')**
- `--all-videos`: **Download all videos from channel (not just songlist matches), skipping existing files and songs in songs.json**
---
@ -389,3 +391,43 @@ The codebase has been comprehensively refactored to improve maintainability and
- **Update documentation immediately when implementing new features**
- **Remove outdated information and consolidate related changes**
- **Ensure all CLI options and features are documented in both files**
## 🔧 Recent Bug Fixes & Improvements (v3.4.4)
### **All Videos Download Mode**
- **New `--all-videos` parameter**: Download all videos from a channel, not just songlist matches
- **Smart MP3/MP4 detection**: Automatically detects if you have MP3 versions in songs.json and downloads MP4 video versions
- **Existing file skipping**: Skips videos that already exist on the filesystem
- **Progress tracking**: Shows clear progress with "Downloading X/Y videos" format
- **Parallel processing support**: Works with `--parallel --workers N` for faster downloads
- **Channel focus integration**: Works with `--channel-focus` to target specific channels
- **Limit support**: Works with `--limit N` to control download batch size
### **Smart Songlist Integration**
- **MP4 version detection**: Checks if MP4 version already exists in songs.json before downloading
- **MP3 upgrade path**: Downloads MP4 video versions when only MP3 versions exist in songlist
- **Duplicate prevention**: Skips downloads when MP4 versions already exist
- **Efficient filtering**: Only processes videos that need to be downloaded
### **Benefits of All Videos Mode**
- **Complete channel downloads**: Download entire channels without songlist restrictions
- **Automatic format upgrading**: Upgrade MP3 collections to MP4 video versions
- **Efficient processing**: Only downloads videos that don't already exist
- **Flexible control**: Use with limits, parallel processing, and channel targeting
- **Clear progress feedback**: Real-time progress tracking for large downloads
---
## 🚀 Future Enhancements
- [ ] Web UI for easier management
- [ ] More advanced song matching (multi-language)
- [ ] Download scheduling and retry logic
- [ ] More granular status reporting
- [x] **Parallel downloads for improved speed** ✅ **COMPLETED**
- [x] **Enhanced fuzzy matching with improved video title parsing** ✅ **COMPLETED**
- [x] **Consolidated extract_artist_title function** ✅ **COMPLETED**
- [x] **Duplicate file prevention and filename consistency** ✅ **COMPLETED**
- [ ] Unit tests for all modules
- [ ] Integration tests for end-to-end workflows
- [ ] Plugin system for custom file operations
- [ ] Advanced configuration UI
- [ ] Real-time download progress visualization

View File

@ -145,6 +145,21 @@ python data/cleanup_duplicate_files.py
python download_karaoke.py https://www.youtube.com/@SingKingKaraoke/videos
```
### Download ALL Videos from a Channel (Not Just Songlist Matches)
```bash
python download_karaoke.py --channel-focus SingKingKaraoke --all-videos
```
### Download ALL Videos with Parallel Processing
```bash
python download_karaoke.py --channel-focus SingKingKaraoke --all-videos --parallel --workers 10
```
### Download ALL Videos with Limit
```bash
python download_karaoke.py --channel-focus SingKingKaraoke --all-videos --limit 100
```
### Download Only Songlist Songs (Fast Mode)
```bash
python download_karaoke.py --songlist-only --limit 5
@ -325,6 +340,8 @@ KaroakeVideoDownloader/
- `--generate-songlist <DIR1> <DIR2>...`: **Generate song list from MP4 files with ID3 tags in specified directories**
- `--no-append-songlist`: **Create a new song list instead of appending when using --generate-songlist**
- `--force`: **Force download from channels, bypassing all existing file checks and re-downloading if necessary**
- `--channel-focus <CHANNEL_NAME>`: **Download from a specific channel by name (e.g., 'SingKingKaraoke')**
- `--all-videos`: **Download all videos from channel (not just songlist matches), skipping existing files**
## 📝 Example Usage
@ -358,6 +375,11 @@ python download_karaoke.py --reset-channel SingKingKaraoke --reset-songlist
python download_karaoke.py --clear-cache all
python download_karaoke.py --clear-server-duplicates
# Download ALL videos from a specific channel
python download_karaoke.py --channel-focus SingKingKaraoke --all-videos
python download_karaoke.py --channel-focus SingKingKaraoke --all-videos --parallel --workers 10
python download_karaoke.py --channel-focus SingKingKaraoke --all-videos --limit 100
# Song list generation from MP4 files
python download_karaoke.py --generate-songlist /path/to/mp4/directory
python download_karaoke.py --generate-songlist /path/to/dir1 /path/to/dir2 --no-append-songlist

View File

@ -1,6 +1,6 @@
# 🎤 Karaoke Video Downloader - CLI Commands Reference
# Copy and paste these commands into your terminal
# Updated: v3.4.3 (includes manual video collection, channel parsing rules, and all previous improvements)
# Updated: v3.4.4 (includes all videos download mode, manual video collection, channel parsing rules, and all previous improvements)
## 📥 BASIC DOWNLOADS
@ -45,6 +45,26 @@ python add_manual_video.py list
# Remove a video from manual collection
python add_manual_video.py remove "Artist - Song Title (Karaoke Version)"
## 🎬 ALL VIDEOS DOWNLOAD MODE (v3.4.4)
# Download ALL videos from a specific channel (not just songlist matches)
python download_karaoke.py --channel-focus SingKingKaraoke --all-videos
# Download ALL videos with parallel processing for speed
python download_karaoke.py --channel-focus SingKingKaraoke --all-videos --parallel --workers 10
# Download ALL videos with limit (download first N videos)
python download_karaoke.py --channel-focus SingKingKaraoke --all-videos --limit 100
# Download ALL videos with parallel processing and limit
python download_karaoke.py --channel-focus SingKingKaraoke --all-videos --parallel --workers 5 --limit 50
# Download ALL videos from ZoomKaraokeOfficial channel
python download_karaoke.py --channel-focus ZoomKaraokeOfficial --all-videos
# Download ALL videos with custom resolution
python download_karaoke.py --channel-focus SingKingKaraoke --all-videos --resolution 1080p
## 📋 SONG LIST GENERATION
# Generate song list from MP4 files in a directory (append to existing song list)

View File

@ -20038,6 +20038,6 @@
"id": "iPt5vUHVROI"
}
],
"last_updated": "2025-07-27T20:36:32.831332",
"last_updated": "2025-07-28T20:36:32.831332",
"video_count": 5009
}

View File

@ -38974,6 +38974,6 @@
"id": "_38gtxtjvPI"
}
],
"last_updated": "2025-07-27T20:50:52.833995",
"last_updated": "2025-07-28T20:50:52.833995",
"video_count": 9743
}

View File

@ -26,6 +26,46 @@
"upload_date": "2024-01-01",
"duration": 180,
"view_count": 1000
},
{
"title": "AC/DC - Big Balls",
"url": "https://www.youtube.com/watch?v=kiSDpVmu4Bk",
"id": "kiSDpVmu4Bk",
"upload_date": "2024-01-01",
"duration": 180,
"view_count": 1000
},
{
"title": "Jon Bon Jovi - Blaze of Glory",
"url": "https://www.youtube.com/watch?v=SzRAoDMlQY",
"id": "SzRAoDMlQY",
"upload_date": "2024-01-01",
"duration": 180,
"view_count": 1000
},
{
"title": "ZZ Top - Sharp Dressed Man",
"url": "https://www.youtube.com/watch?v=prRalwto9iY",
"id": "prRalwto9iY",
"upload_date": "2024-01-01",
"duration": 180,
"view_count": 1000
},
{
"title": "Nickelback - Photograph",
"url": "https://www.youtube.com/watch?v=qTphCTAUhUg",
"id": "qTphCTAUhUg",
"upload_date": "2024-01-01",
"duration": 180,
"view_count": 1000
},
{
"title": "Billy Joel - Shes Got A Way",
"url": "https://www.youtube.com/watch?v=DeeTFIgKuC8",
"id": "DeeTFIgKuC8",
"upload_date": "2024-01-01",
"duration": 180,
"view_count": 1000
}
],
"parsing_rules": {

File diff suppressed because it is too large Load Diff

View File

@ -61,35 +61,37 @@ def load_channels_from_text(channels_file: str = "data/channels.txt") -> List[st
def load_channels(channel_file: str = None) -> List[str]:
"""
Load channel URLs from either JSON or text format.
Args:
channel_file: Path to the channel file (optional)
Returns:
List of channel URLs
"""
if channel_file:
# Use the specified file
if channel_file.endswith('.json'):
"""Load channel URLs from file."""
if channel_file is None:
# Try JSON first, then fall back to text
if os.path.exists("data/channels.json"):
return load_channels_from_json("data/channels.json")
elif os.path.exists("data/channels.txt"):
return load_channels_from_text("data/channels.txt")
else:
return []
else:
if channel_file.endswith(".json"):
return load_channels_from_json(channel_file)
else:
return load_channels_from_text(channel_file)
else:
# Try JSON first, then fall back to text
json_file = "data/channels.json"
txt_file = "data/channels.txt"
if os.path.exists(json_file):
print(f"📋 Using new JSON format: {json_file}")
return load_channels_from_json(json_file)
elif os.path.exists(txt_file):
print(f"📋 Using legacy text format: {txt_file}")
return load_channels_from_text(txt_file)
else:
print("❌ No channel file found. Please create data/channels.json or data/channels.txt")
return []
def get_channel_url_by_name(channel_name: str) -> str:
"""Look up a channel URL by its name from the channels configuration."""
channel_urls = load_channels()
# Normalize the channel name for comparison
normalized_name = channel_name.lower().replace("@", "").replace("karaoke", "").strip()
for url in channel_urls:
# Extract channel name from URL
if "/@" in url:
url_channel_name = url.split("/@")[1].split("/")[0].lower()
if url_channel_name == normalized_name or url_channel_name.replace("karaoke", "").strip() == normalized_name:
return url
return None
def main():
@ -100,9 +102,10 @@ def main():
Examples:
python download_karaoke.py --limit 10 # Download latest 10 videos from all channels
python download_karaoke.py --songlist-only --limit 10 # Download only songlist songs across channels
python download_karaoke.py https://www.youtube.com/@SingKingKaraoke/videos # Download from specific channel
python download_karaoke.py --channel-focus SingKingKaraoke --limit 5 # Download from specific channel
python download_karaoke.py --channel-focus SingKingKaraoke --all-videos # Download ALL videos from channel
python download_karaoke.py https://www.youtube.com/@SingKingKaraoke/videos # Download from specific channel URL
python download_karaoke.py --file data/channels.txt # Download from custom channel list
python download_karaoke.py --manual --limit 5 # Download from manual videos collection
python download_karaoke.py --reset-channel SingKingKaraoke --delete-files
""",
)
@ -298,6 +301,16 @@ Examples:
action="store_true",
help="Download from manual videos collection (data/manual_videos.json)",
)
parser.add_argument(
"--channel-focus",
type=str,
help="Download from a specific channel by name (e.g., 'SingKingKaraoke')",
)
parser.add_argument(
"--all-videos",
action="store_true",
help="Download all videos from channel (not just songlist matches), skipping existing files",
)
args = parser.parse_args()
# Validate workers argument
@ -428,9 +441,10 @@ Examples:
print("💾 Channel Cache Information")
print("=" * 40)
print(f"Total Channels: {cache_info['total_channels']}")
print(f"Total Cached Videos: {cache_info['total_cached_videos']}")
print(f"Cache Duration: {cache_info['cache_duration_hours']} hours")
print(f"Last Updated: {cache_info['last_updated']}")
print(f"Total Cached Videos: {cache_info['total_videos']}")
print("\n📋 Channel Details:")
for channel in cache_info['channels']:
print(f"{channel['channel']}: {channel['videos']} videos (updated: {channel['last_updated']})")
sys.exit(0)
elif args.clear_cache:
if args.clear_cache == "all":
@ -478,7 +492,42 @@ Examples:
force_refresh=args.refresh,
fuzzy_match=args.fuzzy_match,
fuzzy_threshold=args.fuzzy_threshold,
force_download=args.force,
)
elif args.channel_focus:
# Download from a specific channel by name
print(f"🎤 Looking up channel: {args.channel_focus}")
channel_url = get_channel_url_by_name(args.channel_focus)
if not channel_url:
print(f"❌ Channel '{args.channel_focus}' not found in configuration")
print("Available channels:")
channel_urls = load_channels()
for url in channel_urls:
if "/@" in url:
channel_name = url.split("/@")[1].split("/")[0]
print(f"{channel_name}")
sys.exit(1)
if args.all_videos:
# Download ALL videos from the channel (not just songlist matches)
print(f"🎤 Downloading ALL videos from channel: {args.channel_focus} ({channel_url})")
success = downloader.download_all_channel_videos(
channel_url,
force_refresh=args.refresh,
force_download=args.force,
limit=args.limit,
)
else:
# Download only songlist matches from the channel
print(f"🎤 Downloading from channel: {args.channel_focus} ({channel_url})")
success = downloader.download_channel_videos(
channel_url,
force_refresh=args.refresh,
fuzzy_match=args.fuzzy_match,
fuzzy_threshold=args.fuzzy_threshold,
force_download=args.force,
)
elif args.songlist_only or args.songlist_focus:
# Use provided file or default to channels configuration
channel_urls = load_channels(args.file)

View File

@ -185,6 +185,7 @@ class KaraokeDownloader:
force_refresh=False,
fuzzy_match=False,
fuzzy_threshold=DEFAULT_FUZZY_THRESHOLD,
force_download=False,
):
"""Download videos from a channel or playlist URL, respecting songlist-only and limit flags. Supports fuzzy matching."""
@ -213,111 +214,177 @@ class KaraokeDownloader:
print(f"📋 Found {len(available_videos)} manual videos")
# Process manual videos (skip yt-dlp)
return self._process_videos_for_download(available_videos, channel_name, force_refresh, fuzzy_match, fuzzy_threshold)
return self._process_videos_for_download(available_videos, channel_name, force_refresh, fuzzy_match, fuzzy_threshold, force_download)
# Regular YouTube channel processing
# Regular YouTube channel processing - USE TRACKING MANAGER CACHE
channel_name, channel_id = get_channel_info(url)
print(f"\n🎬 Downloading from channel: {channel_name} ({url})")
songlist = load_songlist(self.songlist_file_path)
if not songlist:
print("⚠️ No songlist loaded. Skipping.")
# Use tracking manager to get videos (with cache support)
available_videos = self.tracker.get_channel_video_list(
url,
str(self.yt_dlp_path),
force_refresh=force_refresh
)
if not available_videos:
print("⚠️ No videos found for this channel. Skipping.")
return False
print(f"📋 Found {len(available_videos)} videos from channel")
# Process videos for download
return self._process_videos_for_download(available_videos, channel_name, force_refresh, fuzzy_match, fuzzy_threshold, force_download)
def download_all_channel_videos(
self,
url,
force_refresh=False,
force_download=False,
limit=None,
):
"""Download ALL videos from a channel, skipping existing files and songs in songs.json."""
# Check if this is a manual channel
from karaoke_downloader.manual_video_manager import is_manual_channel, get_manual_channel_info, get_manual_videos_for_channel
if is_manual_channel(url):
channel_name, channel_id = get_manual_channel_info(url)
print(f"\n🎬 Downloading ALL videos from manual channel: {channel_name} ({url})")
# Load manual videos
manual_videos = get_manual_videos_for_channel(channel_name)
if not manual_videos:
print("⚠️ No manual videos found. Skipping.")
return False
# Convert to the expected format
available_videos = []
for video in manual_videos:
available_videos.append({
"title": video.get("title", ""),
"id": video.get("id", ""),
"url": video.get("url", "")
})
print(f"📋 Found {len(available_videos)} manual videos")
# Process all manual videos (skip songlist filtering)
return self._process_all_videos_for_download(available_videos, channel_name, force_refresh, force_download, limit)
# Regular YouTube channel processing - USE TRACKING MANAGER CACHE
channel_name, channel_id = get_channel_info(url)
print(f"\n🎬 Downloading ALL videos from channel: {channel_name} ({url})")
# Use tracking manager to get videos (with cache support)
available_videos = self.tracker.get_channel_video_list(
url,
str(self.yt_dlp_path),
force_refresh=force_refresh
)
if not available_videos:
print("⚠️ No videos found for this channel. Skipping.")
return False
print(f"📋 Found {len(available_videos)} videos from channel")
# Process all videos for download (skip songlist filtering)
return self._process_all_videos_for_download(available_videos, channel_name, force_refresh, force_download, limit)
def _process_all_videos_for_download(self, available_videos, channel_name, force_refresh=False, force_download=False, limit=None):
"""Process ALL videos for download (no songlist filtering)."""
print(f"🔍 Processing {len(available_videos)} videos for download (no songlist filtering)...")
# Load server songs and duplicates tracking for availability checking
server_songs = load_server_songs()
server_duplicates_tracking = load_server_duplicates_tracking()
limit = getattr(self.config, "limit", 1)
cmd = [
str(self.yt_dlp_path),
"--flat-playlist",
"--print",
"%(title)s|%(id)s|%(url)s",
url,
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
lines = result.stdout.strip().splitlines()
except subprocess.CalledProcessError as e:
print(f"❌ yt-dlp failed to fetch playlist: {e}")
return False
available_videos = []
for line in lines:
parts = line.split("|")
if len(parts) >= 2:
title, video_id = parts[0].strip(), parts[1].strip()
available_videos.append({"title": title, "id": video_id})
# Normalize songlist for matching
normalized_songlist = {
create_song_key(s["artist"], s["title"]): s for s in songlist
}
matches = []
similarity = get_similarity_function()
# Load songlist to check if songs are already in songs.json
songlist = load_songlist(self.songlist_file_path)
songlist_artists_titles = set()
if songlist:
for song in songlist:
song_key = f"{song['artist']} - {song['title']}"
songlist_artists_titles.add(song_key)
# Process all videos, skipping existing files and songs in songs.json
videos_to_download = []
skipped_count = 0
for video in available_videos:
artist, title = self.channel_parser.extract_artist_title(video["title"], channel_name)
key = create_song_key(artist, title)
if fuzzy_match:
# Fuzzy match against all songlist keys
best_score = 0
best_song = None
for song_key, song in normalized_songlist.items():
score = similarity(key, song_key)
if score > best_score:
best_score = score
best_song = song
if best_score >= fuzzy_threshold and best_song:
# Check if already downloaded or on server (unless force mode is enabled)
if self.force_download or not is_songlist_song_downloaded(
self.songlist_tracking, best_song["artist"], best_song["title"]
):
# Check if already marked as server duplicate (unless force mode is enabled)
if self.force_download or not is_song_marked_as_server_duplicate(
server_duplicates_tracking,
best_song["artist"],
best_song["title"],
):
# Check if already on server and mark for future skipping (unless force mode is enabled)
if self.force_download or not check_and_mark_server_duplicate(
server_songs,
server_duplicates_tracking,
best_song["artist"],
best_song["title"],
video["title"],
channel_name,
):
matches.append((video, best_song))
print(
f" → Fuzzy match: {artist} - {title} <-> {best_song['artist']} - {best_song['title']} (score: {best_score})"
)
if len(matches) >= limit:
break
else:
if key in normalized_songlist:
song = normalized_songlist[key]
# Check if already downloaded or on server (unless force mode is enabled)
if self.force_download or not is_songlist_song_downloaded(
self.songlist_tracking, song["artist"], song["title"]
):
# Check if already marked as server duplicate (unless force mode is enabled)
if self.force_download or not is_song_marked_as_server_duplicate(
server_duplicates_tracking, song["artist"], song["title"]
):
# Check if already on server and mark for future skipping (unless force mode is enabled)
if self.force_download or not check_and_mark_server_duplicate(
server_songs,
server_duplicates_tracking,
song["artist"],
song["title"],
video["title"],
channel_name,
):
matches.append((video, song))
if len(matches) >= limit:
break
if not matches:
print("🎵 No new songlist matches found for this channel.")
video_title = video["title"]
video_id = video["id"]
# Extract artist and title
artist, extracted_title = self.channel_parser.extract_artist_title(video_title, channel_name)
if not artist and not extracted_title:
# Fallback: use the full title
artist = ""
extracted_title = video_title
# Create filename
filename = f"{artist} - {extracted_title}.mp4" if artist and extracted_title else f"{video_title}.mp4"
output_path = self.downloads_dir / channel_name / filename
# Check if file already exists
if output_path.exists() and not force_download:
print(f" ⏭️ Skipping existing file: {filename}")
skipped_count += 1
continue
# Check if song is already in songs.json
song_key = f"{artist} - {extracted_title}"
if song_key in songlist_artists_titles and not force_download:
# Check if there's already an MP4 version in songs.json
if song_key in server_songs:
song_info = server_songs[song_key]
if song_info.get("is_mp4", False):
print(f" ⏭️ Skipping song in songs.json (MP4): {song_key}")
skipped_count += 1
continue
else:
print(f" 📥 Found MP3 version in songs.json, will download MP4: {song_key}")
# Continue to download the MP4 version
else:
print(f" 📥 Found song in songlist, will download MP4: {song_key}")
# Continue to download the MP4 version
# Check if already downloaded (tracking)
if self.tracker.is_song_downloaded(artist, extracted_title, channel_name, video_id) and not force_download:
print(f" ⏭️ Skipping already tracked: {song_key}")
skipped_count += 1
continue
# Check if on server
if not force_download and check_and_mark_server_duplicate(
server_songs,
server_duplicates_tracking,
artist,
extracted_title,
video_title,
channel_name,
):
print(f" ⏭️ Skipping server duplicate: {song_key}")
skipped_count += 1
continue
# Add to download list
videos_to_download.append((video, artist, extracted_title, filename))
print(f"📊 Found {len(videos_to_download)} videos to download (skipped {skipped_count})")
# Apply limit if specified
if limit is not None:
original_count = len(videos_to_download)
videos_to_download = videos_to_download[:limit]
print(f"🎯 Limited to first {limit} videos (was {original_count} total)")
if not videos_to_download:
print("🎵 No new videos to download.")
return True
# Download only the first N matches using the new pipeline
# Download videos using the download pipeline
pipeline = DownloadPipeline(
yt_dlp_path=str(self.yt_dlp_path),
config=self.config,
@ -326,9 +393,9 @@ class KaraokeDownloader:
tracker=self.tracker,
)
for video, song in matches:
artist, title = song["artist"], song["title"]
print(f"🎵 Processing: {artist} - {title}")
success_count = 0
for i, (video, artist, title, filename) in enumerate(videos_to_download, 1):
print(f"⬇️ Downloading {i}/{len(videos_to_download)}: {artist} - {title}")
if pipeline.execute_pipeline(
video_id=video["id"],
@ -337,10 +404,13 @@ class KaraokeDownloader:
channel_name=channel_name,
video_title=video.get("title", ""),
):
print(f"✅ Successfully processed: {artist} - {title}")
print(f"✅ Successfully downloaded: {artist} - {title}")
success_count += 1
else:
print(f"❌ Failed to process: {artist} - {title}")
return True
print(f"❌ Failed to download: {artist} - {title}")
print(f"\n🎉 Download complete! {success_count}/{len(videos_to_download)} videos downloaded successfully")
return success_count > 0
def download_songlist_across_channels(
self,
@ -587,463 +657,40 @@ class KaraokeDownloader:
)
return success
def execute_download_plan_parallel(
self,
download_plan,
unmatched,
cache_file,
limit=None,
):
"""
Execute a download plan using parallel downloads.
Args:
download_plan: List of download items to process
unmatched: List of unmatched songs
cache_file: Path to cache file for progress tracking
limit: Optional limit on number of downloads
Returns:
tuple: (downloaded_count, success)
"""
if not self.enable_parallel_downloads:
# Fall back to sequential downloads
return execute_download_plan(
download_plan=download_plan,
unmatched=unmatched,
cache_file=cache_file,
config=self.config,
yt_dlp_path=self.yt_dlp_path,
downloads_dir=self.downloads_dir,
songlist_tracking=self.songlist_tracking,
limit=limit,
)
# Create parallel downloader
parallel_downloader = create_parallel_downloader(
yt_dlp_path=str(self.yt_dlp_path),
config=self.config,
downloads_dir=self.downloads_dir,
max_workers=self.parallel_workers,
songlist_tracking=self.songlist_tracking,
tracker=self.tracker,
)
# Convert download plan to tasks
tasks = []
total_to_download = limit if limit is not None else len(download_plan)
for idx, item in enumerate(download_plan):
if limit is not None and len(tasks) >= limit:
break
task = DownloadTask(
video_id=item["video_id"],
artist=item["artist"],
title=item["title"],
channel_name=item["channel_name"],
video_title=item.get("video_title"),
priority=len(download_plan) - idx, # Higher priority for earlier items
)
tasks.append(task)
print(f"🚀 Starting parallel downloads with {self.parallel_workers} workers...")
print(f"📋 Total tasks: {len(tasks)}")
# Add all tasks to the parallel downloader
parallel_downloader.add_download_tasks(tasks)
# Execute parallel downloads
results = parallel_downloader.execute_downloads(show_progress=True)
# Process results and update tracking
successful_downloads = 0
for result in results:
if result.success:
successful_downloads += 1
# Remove completed item from plan and update cache
for idx, item in enumerate(download_plan):
if (item["video_id"] == result.task.video_id and
item["artist"] == result.task.artist and
item["title"] == result.task.title):
download_plan.pop(idx)
break
# Update cache after each successful download
save_plan_cache(cache_file, download_plan, unmatched)
print(f"🎉 Parallel downloads completed!")
print(f" ✅ Successful: {successful_downloads}")
print(f" ❌ Failed: {len(results) - successful_downloads}")
# Delete cache if all items are complete
if len(download_plan) == 0:
cleanup_cache(cache_file)
return successful_downloads, True
def execute_latest_per_channel_parallel(
self,
channel_plans,
cache_file,
):
"""
Execute latest-per-channel downloads using parallel processing.
Args:
channel_plans: List of channel plans with videos to download
cache_file: Path to cache file for progress tracking
Returns:
bool: True if successful
"""
if not self.enable_parallel_downloads:
# Fall back to sequential processing
return self._execute_latest_per_channel_sequential(channel_plans, cache_file)
# Create parallel downloader
parallel_downloader = create_parallel_downloader(
yt_dlp_path=str(self.yt_dlp_path),
config=self.config,
downloads_dir=self.downloads_dir,
max_workers=self.parallel_workers,
songlist_tracking=self.songlist_tracking,
tracker=self.tracker,
)
# Convert all videos to tasks
all_tasks = []
task_to_channel_mapping = {} # Map task identifiers to their channel index
for channel_idx, channel_plan in enumerate(channel_plans):
channel_name = channel_plan["channel_name"]
videos = channel_plan["videos"]
for v_idx, video in enumerate(videos):
title = video["title"]
video_id = video["id"]
# Sanitize filename
safe_title = title
invalid_chars = ["?", ":", "*", '"', "<", ">", "|", "/", "\\"]
for char in invalid_chars:
safe_title = safe_title.replace(char, "")
safe_title = (
safe_title.replace("...", "")
.replace("..", "")
.replace(".", "")
.strip()
)
# Extract artist and title for tracking
artist, title_clean = self.channel_parser.extract_artist_title(title, channel_name)
task = DownloadTask(
video_id=video_id,
artist=artist,
title=title_clean,
channel_name=channel_name,
video_title=title,
priority=len(channel_plans) - channel_idx, # Higher priority for earlier channels
)
all_tasks.append(task)
# Use a unique identifier for the mapping
task_id = f"{video_id}_{channel_name}_{title_clean}"
task_to_channel_mapping[task_id] = channel_idx
print(f"🚀 Starting parallel latest-per-channel downloads with {self.parallel_workers} workers...")
print(f"📋 Total tasks: {len(all_tasks)} across {len(channel_plans)} channels")
# Add all tasks to the parallel downloader
parallel_downloader.add_download_tasks(all_tasks)
# Execute parallel downloads
results = parallel_downloader.execute_downloads(show_progress=True)
# Process results and update channel plans
successful_downloads = 0
channel_completion_status = {i: False for i in range(len(channel_plans))}
for result in results:
if result.success:
successful_downloads += 1
# Create the same task identifier used in the mapping
task_id = f"{result.task.video_id}_{result.task.channel_name}_{result.task.title}"
channel_idx = task_to_channel_mapping[task_id]
# Mark this video as completed in the channel plan
channel_plan = channel_plans[channel_idx]
for v_idx, video in enumerate(channel_plan["videos"]):
if (video["id"] == result.task.video_id and
video["title"] == result.task.video_title):
channel_plan["videos"].pop(v_idx)
break
# Check if channel is complete
if len(channel_plan["videos"]) == 0 and not channel_completion_status[channel_idx]:
channel_completion_status[channel_idx] = True
print(f" 🗑️ Channel {result.task.channel_name} completed and removed from plan cache.")
# Update cache with remaining videos
with open(cache_file, "w", encoding="utf-8") as f:
json.dump(
{
"timestamp": datetime.now().isoformat(),
"channel_plans": channel_plans,
},
f,
indent=2,
ensure_ascii=False,
)
print(f"🎉 Parallel latest-per-channel downloads completed!")
print(f" ✅ Successful: {successful_downloads}")
print(f" ❌ Failed: {len(results) - successful_downloads}")
# Delete cache if all channels are complete
all_complete = all(channel_completion_status.values())
if all_complete:
if cache_file.exists():
try:
cache_file.unlink()
print(f"🗑️ Deleted latest-per-channel plan cache after completion: {cache_file.name}")
except Exception as e:
print(f"⚠️ Could not delete latest-per-channel plan cache: {e}")
return True
def _execute_latest_per_channel_sequential(
self,
channel_plans,
cache_file,
):
"""
Execute latest-per-channel downloads sequentially (fallback method).
"""
total_channels = len(channel_plans)
for idx, channel_plan in enumerate(channel_plans):
channel_name = channel_plan["channel_name"]
channel_url = channel_plan["channel_url"]
videos = channel_plan["videos"]
print(
f"\n⬇️ Downloading {len(videos)} videos from channel {idx+1} of {total_channels}: {channel_name}"
)
for v_idx, video in enumerate(videos):
title = video["title"]
video_id = video["id"]
# Sanitize filename
safe_title = title
invalid_chars = ["?", ":", "*", '"', "<", ">", "|", "/", "\\"]
for char in invalid_chars:
safe_title = safe_title.replace(char, "")
safe_title = (
safe_title.replace("...", "")
.replace("..", "")
.replace(".", "")
.strip()
)
filename = f"{channel_name} - {safe_title}.mp4"
# Extract artist and title for tracking
artist, title_clean = self.channel_parser.extract_artist_title(title, channel_name)
print(
f" ({v_idx+1}/{len(videos)}) Processing: {artist} - {title_clean}"
)
# Use the new pipeline for consistent processing
pipeline = DownloadPipeline(
yt_dlp_path=str(self.yt_dlp_path),
config=self.config,
downloads_dir=self.downloads_dir,
songlist_tracking=self.songlist_tracking,
tracker=self.tracker,
)
if pipeline.execute_pipeline(
video_id=video_id,
artist=artist,
title=title_clean,
channel_name=channel_name,
video_title=title,
):
print(f" ✅ Successfully processed: {artist} - {title_clean}")
else:
print(f" ❌ Failed to process: {artist} - {title_clean}")
# After channel is done, remove it from the plan and update cache
channel_plans[idx]["videos"] = []
with open(cache_file, "w", encoding="utf-8") as f:
json.dump(
{
"timestamp": datetime.now().isoformat(),
"channel_plans": channel_plans,
},
f,
indent=2,
ensure_ascii=False,
)
print(
f" 🗑️ Channel {channel_name} completed and removed from plan cache."
)
# After all channels are done, delete the cache
if cache_file.exists():
try:
cache_file.unlink()
print(
f"🗑️ Deleted latest-per-channel plan cache after completion: {cache_file.name}"
)
except Exception as e:
print(f"⚠️ Could not delete latest-per-channel plan cache: {e}")
print(f"🎉 All latest videos downloaded for all channels!")
return True
def download_latest_per_channel(
self,
channel_urls,
limit=5,
force_refresh_download_plan=False,
fuzzy_match=False,
fuzzy_threshold=DEFAULT_FUZZY_THRESHOLD,
force_download=False,
):
"""
Download the latest N videos from each channel in channel_urls.
- Pre-scan all channels for their latest N videos.
- Check against local songs file to avoid duplicates.
- Build a per-channel download plan and cache it.
- Resume robustly if interrupted (removes each channel from the plan as it completes).
- Deletes the plan cache when all channels are done.
"""
# Set force download flag
self.force_download = force_download
print(f"\n🎬 Downloading latest {limit} videos from each channel...")
# Load server songs for availability checking
server_songs = load_server_songs()
server_duplicates_tracking = load_server_duplicates_tracking()
plan_mode = "latest_per_channel"
# Include all parameters that affect the plan generation
plan_kwargs = {
"limit": limit,
"channels": len(channel_urls),
"fuzzy": fuzzy_match,
"threshold": fuzzy_threshold,
}
# Add channel URLs hash to ensure same channels = same cache
channels_hash = hashlib.md5(
"|".join(sorted(channel_urls)).encode()
).hexdigest()[:8]
plan_kwargs["channels_hash"] = channels_hash
cache_file = get_download_plan_cache_file(plan_mode, **plan_kwargs)
use_cache = False
if not force_refresh_download_plan and cache_file.exists():
try:
with open(cache_file, "r", encoding="utf-8") as f:
plan_data = json.load(f)
cache_time = datetime.fromisoformat(plan_data.get("timestamp"))
if datetime.now() - cache_time < timedelta(
days=DEFAULT_CACHE_EXPIRATION_DAYS
):
print(
f"🗂️ Using cached latest-per-channel plan from {cache_time} ({cache_file.name})."
)
channel_plans = plan_data["channel_plans"]
use_cache = True
except Exception as e:
print(f"⚠️ Could not load latest-per-channel plan cache: {e}")
if not use_cache:
print("\n🔎 Pre-scanning all channels for latest videos...")
channel_plans = []
total_found = 0
total_filtered = 0
total_marked = 0
for channel_url in channel_urls:
channel_name, channel_id = get_channel_info(channel_url)
print(f"\n🚦 Starting channel: {channel_name} ({channel_url})")
available_videos = self.tracker.get_channel_video_list(
channel_url, yt_dlp_path=str(self.yt_dlp_path), force_refresh=False
)
print(
f" → Found {len(available_videos)} total videos for this channel."
)
# Pre-filter: Create a set of known duplicate keys for O(1) lookup
known_duplicate_keys = set()
for song_key in server_duplicates_tracking.keys():
known_duplicate_keys.add(song_key)
# Pre-filter videos to exclude known duplicates before processing
pre_filtered_videos = []
for video in available_videos:
artist, title = self.channel_parser.extract_artist_title(video["title"], channel_name)
song_key = create_song_key(artist, title)
if song_key not in known_duplicate_keys:
pre_filtered_videos.append(video)
print(
f" → After pre-filtering: {len(pre_filtered_videos)} videos not previously marked as duplicates."
)
# Process videos until we reach the limit for this channel
filtered_videos = []
videos_checked = 0
for video in pre_filtered_videos:
if len(filtered_videos) >= limit:
break # We have enough videos for this channel
videos_checked += 1
artist, title = self.channel_parser.extract_artist_title(video["title"], channel_name)
# Check if should skip this song during planning phase
should_skip, reason, filtered_count = self._should_skip_song(
artist,
title,
channel_name,
video["id"],
video["title"],
server_songs,
server_duplicates_tracking,
)
if should_skip:
total_filtered += 1
if reason == "on server":
total_marked += filtered_count
continue
filtered_videos.append(video)
print(
f" → After processing: {len(filtered_videos)} videos to download (checked {videos_checked} videos, filtered out {videos_checked - len(filtered_videos)} already on server)."
)
total_found += len(filtered_videos)
channel_plans.append(
{
"channel_name": channel_name,
"channel_url": channel_url,
"videos": filtered_videos,
}
)
print(
f"\n📊 Summary: {total_found} videos to download across {len(channel_plans)} channels (filtered out {total_filtered} already on server, marked {total_marked} new duplicates for future skipping)."
)
plan_data = {
"timestamp": datetime.now().isoformat(),
"channel_plans": channel_plans,
}
with open(cache_file, "w", encoding="utf-8") as f:
json.dump(plan_data, f, indent=2, ensure_ascii=False)
print(f"🗂️ Saved new latest-per-channel plan cache: {cache_file.name}")
# --- Download phase ---
return self.execute_latest_per_channel_parallel(channel_plans, cache_file)
def _process_videos_for_download(self, available_videos, channel_name, force_refresh=False, fuzzy_match=False, fuzzy_threshold=DEFAULT_FUZZY_THRESHOLD):
def _process_videos_for_download(self, available_videos, channel_name, force_refresh=False, fuzzy_match=False, fuzzy_threshold=DEFAULT_FUZZY_THRESHOLD, force_download=False):
"""Process videos for download (used for both manual and regular channels)."""
songlist = load_songlist(self.songlist_file_path)
# For manual videos with force_download, bypass songlist filtering
if force_download and channel_name == "@ManualVideos":
print(f"💪 Force mode enabled - downloading all {len(available_videos)} manual videos")
# Download all videos directly without songlist filtering
success_count = 0
for i, video in enumerate(available_videos, 1):
title = video["title"]
video_id = video["id"]
print(f"\n⬇️ Downloading {i}/{len(available_videos)}: {title}")
print(f" 🎬 Video: {title} ({channel_name})")
# Create filename from title
artist, extracted_title = self.channel_parser.extract_artist_title(title, channel_name)
if not artist and not extracted_title:
# Fallback: use the full title as filename
filename = sanitize_filename("", title)
else:
filename = sanitize_filename(artist, extracted_title)
# Download the video
success = self._download_single_video(video, channel_name, filename, force_download=True)
if success:
success_count += 1
print(f"\n✅ Downloaded {success_count}/{len(available_videos)} manual videos")
return success_count > 0
# Regular songlist-based processing
if not songlist:
print("⚠️ No songlist loaded. Skipping.")
return False
@ -1169,6 +816,44 @@ class KaraokeDownloader:
print(f"\n🎉 Download complete! {success_count}/{len(matches)} videos downloaded successfully")
return success_count > 0
def _download_single_video(self, video, channel_name, filename, force_download=False):
"""Download a single video using the download pipeline."""
video_id = video["id"]
video_title = video["title"]
# Extract artist and title for tracking
artist, extracted_title = self.channel_parser.extract_artist_title(video_title, channel_name)
if not artist and not extracted_title:
# Fallback: use the full title
artist = ""
extracted_title = video_title
output_path = self.downloads_dir / channel_name / filename
# Use the download pipeline
pipeline = DownloadPipeline(
yt_dlp_path=str(self.yt_dlp_path),
config=self.config,
downloads_dir=self.downloads_dir,
songlist_tracking=self.songlist_tracking,
tracker=self.tracker,
)
success = pipeline.execute_pipeline(
video_id=video_id,
artist=artist,
title=extracted_title,
channel_name=channel_name,
video_title=video_title
)
if success:
print(f"✅ Successfully downloaded: {video_title}")
else:
print(f"❌ Failed to download: {video_title}")
return success
def reset_songlist_all():
"""Delete all files tracked in songlist_tracking.json, clear songlist_tracking.json, and remove songlist songs from karaoke_tracking.json."""