1060 lines
43 KiB
Python
1060 lines
43 KiB
Python
import hashlib
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
|
|
import json
|
|
import subprocess
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
from karaoke_downloader.cache_manager import (
|
|
delete_plan_cache,
|
|
get_download_plan_cache_file,
|
|
load_cached_plan,
|
|
save_plan_cache,
|
|
)
|
|
from karaoke_downloader.channel_manager import (
|
|
download_from_file,
|
|
reset_channel_downloads,
|
|
)
|
|
from karaoke_downloader.config_manager import get_config_manager, load_config
|
|
from karaoke_downloader.download_pipeline import DownloadPipeline
|
|
from karaoke_downloader.download_planner import build_download_plan
|
|
from karaoke_downloader.error_utils import handle_yt_dlp_error, log_error
|
|
from karaoke_downloader.file_utils import ensure_directory_exists, sanitize_filename
|
|
from karaoke_downloader.fuzzy_matcher import (
|
|
create_song_key,
|
|
create_video_key,
|
|
get_similarity_function,
|
|
is_exact_match,
|
|
is_fuzzy_match,
|
|
)
|
|
from karaoke_downloader.id3_utils import add_id3_tags, extract_artist_title
|
|
from karaoke_downloader.server_manager import (
|
|
check_and_mark_server_duplicate,
|
|
is_song_marked_as_server_duplicate,
|
|
is_song_on_server,
|
|
load_server_duplicates_tracking,
|
|
load_server_songs,
|
|
)
|
|
from karaoke_downloader.song_validator import create_song_validator
|
|
from karaoke_downloader.songlist_manager import (
|
|
is_songlist_song_downloaded,
|
|
load_songlist,
|
|
load_songlist_tracking,
|
|
mark_songlist_song_downloaded,
|
|
normalize_title,
|
|
save_songlist_tracking,
|
|
)
|
|
from karaoke_downloader.tracking_manager import FormatType, SongStatus, TrackingManager
|
|
from karaoke_downloader.video_downloader import (
|
|
download_video_and_track,
|
|
execute_download_plan,
|
|
is_valid_mp4,
|
|
cleanup_cache,
|
|
)
|
|
from karaoke_downloader.parallel_downloader import (
|
|
ParallelDownloader,
|
|
DownloadTask,
|
|
create_parallel_downloader,
|
|
)
|
|
from karaoke_downloader.youtube_utils import get_channel_info, get_playlist_info
|
|
|
|
# Constants
|
|
DEFAULT_FUZZY_THRESHOLD = 85
|
|
DEFAULT_CACHE_EXPIRATION_DAYS = 1
|
|
DEFAULT_DISPLAY_LIMIT = 10
|
|
|
|
DATA_DIR = Path("data")
|
|
|
|
|
|
class KaraokeDownloader:
|
|
def __init__(self):
|
|
# Load configuration
|
|
self.config_manager = get_config_manager()
|
|
self.config = self.config_manager.load_config()
|
|
|
|
# Initialize paths
|
|
self.yt_dlp_path = Path(self.config.yt_dlp_path)
|
|
self.downloads_dir = Path(self.config.folder_structure.downloads_dir)
|
|
self.logs_dir = Path(self.config.folder_structure.logs_dir)
|
|
|
|
# Ensure directories exist
|
|
ensure_directory_exists(self.downloads_dir)
|
|
ensure_directory_exists(self.logs_dir)
|
|
|
|
# Initialize tracking
|
|
tracking_file = DATA_DIR / "karaoke_tracking.json"
|
|
cache_dir = DATA_DIR / "channel_cache"
|
|
self.tracker = TrackingManager(
|
|
tracking_file=tracking_file, cache_dir=cache_dir
|
|
)
|
|
|
|
# Initialize song validator
|
|
self.song_validator = create_song_validator(self.tracker, self.downloads_dir)
|
|
|
|
# Load songlist tracking
|
|
self.songlist_tracking_file = DATA_DIR / "songlist_tracking.json"
|
|
self.songlist_tracking = load_songlist_tracking(
|
|
str(self.songlist_tracking_file)
|
|
)
|
|
|
|
# Load server songs for availability checking
|
|
self.server_songs = load_server_songs()
|
|
|
|
# Parallel download settings
|
|
self.enable_parallel_downloads = False
|
|
self.parallel_workers = 3
|
|
|
|
# Songlist focus mode attributes
|
|
self.songlist_focus_titles = None
|
|
self.songlist_only = False
|
|
self.use_songlist_priority = True
|
|
self.download_limit = None
|
|
self.force_download = False
|
|
self.songlist_file_path = "data/songList.json" # Default songlist file path
|
|
|
|
def _load_config(self):
|
|
"""Load configuration using the config manager."""
|
|
return self.config_manager.load_config()
|
|
|
|
def _should_skip_song(
|
|
self,
|
|
artist,
|
|
title,
|
|
channel_name,
|
|
video_id,
|
|
video_title,
|
|
server_songs=None,
|
|
server_duplicates_tracking=None,
|
|
):
|
|
"""
|
|
Check if a song should be skipped using the centralized SongValidator.
|
|
|
|
Returns:
|
|
tuple: (should_skip, reason, total_filtered)
|
|
"""
|
|
return self.song_validator.should_skip_song(
|
|
artist,
|
|
title,
|
|
channel_name,
|
|
video_id,
|
|
video_title,
|
|
server_songs,
|
|
server_duplicates_tracking,
|
|
self.force_download,
|
|
)
|
|
|
|
def _mark_song_failed(self, artist, title, video_id, channel_name, error_message):
|
|
"""
|
|
Mark a song as failed in tracking using the SongValidator.
|
|
"""
|
|
self.song_validator.mark_song_failed(
|
|
artist, title, video_id, channel_name, error_message
|
|
)
|
|
|
|
def _handle_download_failure(
|
|
self, artist, title, video_id, channel_name, error_type, error_details=""
|
|
):
|
|
"""
|
|
Handle download failures using the SongValidator.
|
|
|
|
Args:
|
|
artist: Song artist
|
|
title: Song title
|
|
video_id: YouTube video ID
|
|
channel_name: Channel name
|
|
error_type: Type of error (e.g., "yt-dlp failed", "file verification failed")
|
|
error_details: Additional error details
|
|
"""
|
|
self.song_validator.handle_download_failure(
|
|
artist, title, video_id, channel_name, error_type, error_details
|
|
)
|
|
|
|
def download_channel_videos(
|
|
self,
|
|
url,
|
|
force_refresh=False,
|
|
fuzzy_match=False,
|
|
fuzzy_threshold=DEFAULT_FUZZY_THRESHOLD,
|
|
):
|
|
"""Download videos from a channel or playlist URL, respecting songlist-only and limit flags. Supports fuzzy matching."""
|
|
channel_name, channel_id = get_channel_info(url)
|
|
print(f"\n🎬 Downloading from channel: {channel_name} ({url})")
|
|
songlist = load_songlist(self.songlist_file_path)
|
|
if not songlist:
|
|
print("⚠️ No songlist loaded. Skipping.")
|
|
return False
|
|
|
|
# Load server songs and duplicates tracking for availability checking
|
|
server_songs = load_server_songs()
|
|
server_duplicates_tracking = load_server_duplicates_tracking()
|
|
|
|
limit = getattr(self.config, "limit", 1)
|
|
cmd = [
|
|
str(self.yt_dlp_path),
|
|
"--flat-playlist",
|
|
"--print",
|
|
"%(title)s|%(id)s|%(url)s",
|
|
url,
|
|
]
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
|
|
lines = result.stdout.strip().splitlines()
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"❌ yt-dlp failed to fetch playlist: {e}")
|
|
return False
|
|
available_videos = []
|
|
for line in lines:
|
|
parts = line.split("|")
|
|
if len(parts) >= 2:
|
|
title, video_id = parts[0].strip(), parts[1].strip()
|
|
available_videos.append({"title": title, "id": video_id})
|
|
# Normalize songlist for matching
|
|
normalized_songlist = {
|
|
create_song_key(s["artist"], s["title"]): s for s in songlist
|
|
}
|
|
matches = []
|
|
similarity = get_similarity_function()
|
|
for video in available_videos:
|
|
artist, title = extract_artist_title(video["title"])
|
|
key = create_song_key(artist, title)
|
|
if fuzzy_match:
|
|
# Fuzzy match against all songlist keys
|
|
best_score = 0
|
|
best_song = None
|
|
for song_key, song in normalized_songlist.items():
|
|
score = similarity(key, song_key)
|
|
if score > best_score:
|
|
best_score = score
|
|
best_song = song
|
|
if best_score >= fuzzy_threshold and best_song:
|
|
# Check if already downloaded or on server (unless force mode is enabled)
|
|
if self.force_download or not is_songlist_song_downloaded(
|
|
self.songlist_tracking, best_song["artist"], best_song["title"]
|
|
):
|
|
# Check if already marked as server duplicate (unless force mode is enabled)
|
|
if self.force_download or not is_song_marked_as_server_duplicate(
|
|
server_duplicates_tracking,
|
|
best_song["artist"],
|
|
best_song["title"],
|
|
):
|
|
# Check if already on server and mark for future skipping (unless force mode is enabled)
|
|
if self.force_download or not check_and_mark_server_duplicate(
|
|
server_songs,
|
|
server_duplicates_tracking,
|
|
best_song["artist"],
|
|
best_song["title"],
|
|
video["title"],
|
|
channel_name,
|
|
):
|
|
matches.append((video, best_song))
|
|
print(
|
|
f" → Fuzzy match: {artist} - {title} <-> {best_song['artist']} - {best_song['title']} (score: {best_score})"
|
|
)
|
|
if len(matches) >= limit:
|
|
break
|
|
else:
|
|
if key in normalized_songlist:
|
|
song = normalized_songlist[key]
|
|
# Check if already downloaded or on server (unless force mode is enabled)
|
|
if self.force_download or not is_songlist_song_downloaded(
|
|
self.songlist_tracking, song["artist"], song["title"]
|
|
):
|
|
# Check if already marked as server duplicate (unless force mode is enabled)
|
|
if self.force_download or not is_song_marked_as_server_duplicate(
|
|
server_duplicates_tracking, song["artist"], song["title"]
|
|
):
|
|
# Check if already on server and mark for future skipping (unless force mode is enabled)
|
|
if self.force_download or not check_and_mark_server_duplicate(
|
|
server_songs,
|
|
server_duplicates_tracking,
|
|
song["artist"],
|
|
song["title"],
|
|
video["title"],
|
|
channel_name,
|
|
):
|
|
matches.append((video, song))
|
|
if len(matches) >= limit:
|
|
break
|
|
if not matches:
|
|
print("🎵 No new songlist matches found for this channel.")
|
|
return True
|
|
# Download only the first N matches using the new pipeline
|
|
pipeline = DownloadPipeline(
|
|
yt_dlp_path=str(self.yt_dlp_path),
|
|
config=self.config,
|
|
downloads_dir=self.downloads_dir,
|
|
songlist_tracking=self.songlist_tracking,
|
|
tracker=self.tracker,
|
|
)
|
|
|
|
for video, song in matches:
|
|
artist, title = song["artist"], song["title"]
|
|
print(f"🎵 Processing: {artist} - {title}")
|
|
|
|
if pipeline.execute_pipeline(
|
|
video_id=video["id"],
|
|
artist=artist,
|
|
title=title,
|
|
channel_name=channel_name,
|
|
video_title=video.get("title", ""),
|
|
):
|
|
print(f"✅ Successfully processed: {artist} - {title}")
|
|
else:
|
|
print(f"❌ Failed to process: {artist} - {title}")
|
|
return True
|
|
|
|
def download_songlist_across_channels(
|
|
self,
|
|
channel_urls,
|
|
limit=None,
|
|
force_refresh_download_plan=False,
|
|
fuzzy_match=False,
|
|
fuzzy_threshold=DEFAULT_FUZZY_THRESHOLD,
|
|
force_download=False,
|
|
show_pagination=False,
|
|
parallel_channels=False,
|
|
max_channel_workers=3,
|
|
):
|
|
"""
|
|
Download songs from the songlist across multiple channels.
|
|
"""
|
|
# Set force download flag
|
|
self.force_download = force_download
|
|
|
|
# Apply songlist focus filtering if specified
|
|
if self.songlist_focus_titles:
|
|
# Load the raw songlist data to filter by playlist titles
|
|
songlist_file = Path(self.songlist_file_path)
|
|
if not songlist_file.exists():
|
|
print(f"⚠️ Songlist file not found: {self.songlist_file_path}")
|
|
return False
|
|
|
|
try:
|
|
with open(songlist_file, "r", encoding="utf-8") as f:
|
|
raw_data = json.load(f)
|
|
|
|
# Filter playlists by title
|
|
focused_playlists = []
|
|
print(f"🔍 Looking for playlists: {self.songlist_focus_titles}")
|
|
print(f"🔍 Available playlists in songList.json:")
|
|
for i, playlist in enumerate(raw_data[:5]): # Show first 5 playlists
|
|
print(f" {i+1}. '{playlist.get('title', 'NO TITLE')}'")
|
|
if len(raw_data) > 5:
|
|
print(f" ... and {len(raw_data) - 5} more playlists")
|
|
|
|
for playlist in raw_data:
|
|
playlist_title = playlist.get("title", "")
|
|
if playlist_title in self.songlist_focus_titles:
|
|
focused_playlists.append(playlist)
|
|
print(f"✅ Found matching playlist: '{playlist_title}'")
|
|
|
|
if not focused_playlists:
|
|
print(
|
|
f"⚠️ No playlists found matching the specified titles: {', '.join(self.songlist_focus_titles)}"
|
|
)
|
|
return False
|
|
|
|
# Flatten the focused playlists into songs
|
|
focused_songs = []
|
|
seen = set()
|
|
for playlist in focused_playlists:
|
|
if "songs" in playlist:
|
|
for song in playlist["songs"]:
|
|
if "artist" in song and "title" in song:
|
|
artist = song["artist"].strip()
|
|
title = song["title"].strip()
|
|
key = f"{artist.lower()}_{title.lower()}"
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
focused_songs.append(
|
|
{
|
|
"artist": artist,
|
|
"title": title,
|
|
"position": song.get("position", 0),
|
|
}
|
|
)
|
|
|
|
songlist = focused_songs
|
|
print(
|
|
f"\n🎯 Songlist focus mode: {len(focused_songs)} songs from {len(focused_playlists)} playlists selected"
|
|
)
|
|
print(f"🎯 Focused playlists: {', '.join(self.songlist_focus_titles)}")
|
|
|
|
except (json.JSONDecodeError, FileNotFoundError) as e:
|
|
print(f"⚠️ Could not load songlist for filtering from {self.songlist_file_path}: {e}")
|
|
return False
|
|
else:
|
|
# Load songlist normally (flattened from all playlists)
|
|
songlist = load_songlist(self.songlist_file_path)
|
|
if not songlist:
|
|
print("⚠️ No songlist loaded. Skipping.")
|
|
return False
|
|
|
|
# Filter for songs not yet downloaded
|
|
if self.force_download:
|
|
undownloaded = songlist
|
|
print(f"\n🎯 {len(songlist)} total unique songs in songlist.")
|
|
print(f"💪 Force mode enabled - will download all songs regardless of existing status")
|
|
else:
|
|
undownloaded = [
|
|
s
|
|
for s in songlist
|
|
if not is_songlist_song_downloaded(
|
|
self.songlist_tracking, s["artist"], s["title"]
|
|
)
|
|
]
|
|
print(f"\n🎯 {len(songlist)} total unique songs in songlist.")
|
|
print(f"\n🎯 {len(undownloaded)} unique songlist songs to download.")
|
|
|
|
# Load server songs and duplicates tracking for availability checking
|
|
server_songs = load_server_songs()
|
|
server_duplicates_tracking = load_server_duplicates_tracking()
|
|
|
|
# Initialize counters for logging
|
|
server_available_mp4 = 0
|
|
server_available_other = 0
|
|
marked_duplicates = 0
|
|
|
|
# Further filter out songs already on server or marked as duplicates
|
|
if self.force_download:
|
|
not_on_server = undownloaded
|
|
print(f"💪 Force mode enabled - will download all songs regardless of server status")
|
|
else:
|
|
not_on_server = []
|
|
|
|
for song in undownloaded:
|
|
artist, title = song["artist"], song["title"]
|
|
|
|
# Check if already marked as server duplicate
|
|
if is_song_marked_as_server_duplicate(
|
|
server_duplicates_tracking, artist, title
|
|
):
|
|
marked_duplicates += 1
|
|
continue
|
|
|
|
# Check if already on server and mark for future skipping
|
|
if check_and_mark_server_duplicate(
|
|
server_songs,
|
|
server_duplicates_tracking,
|
|
artist,
|
|
title,
|
|
f"{artist} - {title}",
|
|
"songlist",
|
|
):
|
|
server_available_mp4 += 1
|
|
continue
|
|
|
|
# Check if song is on server but in different format (MP3/CDG)
|
|
key = f"{artist.lower()}_{normalize_title(title)}"
|
|
if key in server_songs:
|
|
song_info = server_songs[key]
|
|
if song_info.get("is_mp3", False) or song_info.get("is_cdg", False):
|
|
server_available_other += 1
|
|
print(f"🎵 Found {artist} - {title} on server as {song_info.get('is_mp3', False) and 'MP3' or 'CDG'} format, will download video version")
|
|
|
|
not_on_server.append(song)
|
|
|
|
# Apply limit to undownloaded list before logging
|
|
# This ensures that only the specified number of songs are processed and logged,
|
|
# providing accurate counts when using --limit
|
|
if limit is not None:
|
|
original_count = len(not_on_server)
|
|
not_on_server = not_on_server[:limit]
|
|
print(f"\n🎯 Limited to first {limit} songs (was {original_count} total)")
|
|
|
|
undownloaded = not_on_server
|
|
|
|
# Now log the counts based on the limited list
|
|
if server_available_mp4 > 0:
|
|
print(
|
|
f"\n🎵 {server_available_mp4} songs already available as MP4 on server, skipping."
|
|
)
|
|
if server_available_other > 0:
|
|
# Only count songs that are in the limited list
|
|
limited_server_other = sum(1 for song in not_on_server
|
|
if f"{song['artist'].lower()}_{normalize_title(song['title'])}" in server_songs)
|
|
if limited_server_other > 0:
|
|
print(
|
|
f"\n🎵 {limited_server_other} songs found on server as MP3/CDG, will download video versions."
|
|
)
|
|
if marked_duplicates > 0:
|
|
print(
|
|
f"\n🏷️ {marked_duplicates} songs previously marked as server duplicates, skipping."
|
|
)
|
|
|
|
print(f"\n🎯 {len(undownloaded)} songs need to be downloaded.")
|
|
if not undownloaded:
|
|
print("🎵 All songlist songs already downloaded.")
|
|
return True
|
|
|
|
# --- Download plan building (same for both normal and focus modes) ---
|
|
# --- Download plan cache logic ---
|
|
plan_mode = "songlist"
|
|
# Include only parameters that affect the plan generation (exclude limit since it only affects execution)
|
|
plan_kwargs = {
|
|
"channels": len(channel_urls),
|
|
"fuzzy": fuzzy_match,
|
|
"threshold": fuzzy_threshold,
|
|
}
|
|
# Add channel URLs hash to ensure same channels = same cache
|
|
channels_hash = hashlib.md5(
|
|
"|".join(sorted(channel_urls)).encode()
|
|
).hexdigest()[:8]
|
|
plan_kwargs["channels_hash"] = channels_hash
|
|
cache_file = get_download_plan_cache_file(plan_mode, **plan_kwargs)
|
|
use_cache = False
|
|
download_plan, unmatched = load_cached_plan(cache_file)
|
|
if (
|
|
not force_refresh_download_plan
|
|
and download_plan is not None
|
|
and unmatched is not None
|
|
):
|
|
use_cache = True
|
|
print(f"\n📋 Using cached download plan from: {cache_file}")
|
|
if not use_cache:
|
|
print(f"\n🔍 Pre-scanning {len(channel_urls)} channels for matches...")
|
|
print(f"🔍 Scanning {len(undownloaded)} songs against all channels...")
|
|
download_plan, unmatched = build_download_plan(
|
|
channel_urls,
|
|
undownloaded,
|
|
self.tracker,
|
|
self.yt_dlp_path,
|
|
fuzzy_match=fuzzy_match,
|
|
fuzzy_threshold=fuzzy_threshold,
|
|
show_pagination=show_pagination,
|
|
parallel_channels=parallel_channels,
|
|
max_channel_workers=max_channel_workers,
|
|
)
|
|
save_plan_cache(cache_file, download_plan, unmatched)
|
|
print(f"💾 Download plan cached to: {cache_file}")
|
|
print(
|
|
f"\n📊 Download plan ready: {len(download_plan)} songs will be downloaded."
|
|
)
|
|
print(f"❌ {len(unmatched)} songs could not be found in any channel.")
|
|
if unmatched:
|
|
print("Unmatched songs:")
|
|
for song in unmatched[:DEFAULT_DISPLAY_LIMIT]:
|
|
print(f" - {song['artist']} - {song['title']}")
|
|
if len(unmatched) > DEFAULT_DISPLAY_LIMIT:
|
|
print(f" ...and {len(unmatched)-DEFAULT_DISPLAY_LIMIT} more.")
|
|
|
|
# --- Download phase ---
|
|
downloaded_count, success = self.execute_download_plan_parallel(
|
|
download_plan=download_plan,
|
|
unmatched=unmatched,
|
|
cache_file=cache_file,
|
|
limit=limit,
|
|
)
|
|
return success
|
|
|
|
def execute_download_plan_parallel(
|
|
self,
|
|
download_plan,
|
|
unmatched,
|
|
cache_file,
|
|
limit=None,
|
|
):
|
|
"""
|
|
Execute a download plan using parallel downloads.
|
|
|
|
Args:
|
|
download_plan: List of download items to process
|
|
unmatched: List of unmatched songs
|
|
cache_file: Path to cache file for progress tracking
|
|
limit: Optional limit on number of downloads
|
|
|
|
Returns:
|
|
tuple: (downloaded_count, success)
|
|
"""
|
|
if not self.enable_parallel_downloads:
|
|
# Fall back to sequential downloads
|
|
return execute_download_plan(
|
|
download_plan=download_plan,
|
|
unmatched=unmatched,
|
|
cache_file=cache_file,
|
|
config=self.config,
|
|
yt_dlp_path=self.yt_dlp_path,
|
|
downloads_dir=self.downloads_dir,
|
|
songlist_tracking=self.songlist_tracking,
|
|
limit=limit,
|
|
)
|
|
|
|
# Create parallel downloader
|
|
parallel_downloader = create_parallel_downloader(
|
|
yt_dlp_path=str(self.yt_dlp_path),
|
|
config=self.config,
|
|
downloads_dir=self.downloads_dir,
|
|
max_workers=self.parallel_workers,
|
|
songlist_tracking=self.songlist_tracking,
|
|
tracker=self.tracker,
|
|
)
|
|
|
|
# Convert download plan to tasks
|
|
tasks = []
|
|
total_to_download = limit if limit is not None else len(download_plan)
|
|
|
|
for idx, item in enumerate(download_plan):
|
|
if limit is not None and len(tasks) >= limit:
|
|
break
|
|
|
|
task = DownloadTask(
|
|
video_id=item["video_id"],
|
|
artist=item["artist"],
|
|
title=item["title"],
|
|
channel_name=item["channel_name"],
|
|
video_title=item.get("video_title"),
|
|
priority=len(download_plan) - idx, # Higher priority for earlier items
|
|
)
|
|
tasks.append(task)
|
|
|
|
print(f"🚀 Starting parallel downloads with {self.parallel_workers} workers...")
|
|
print(f"📋 Total tasks: {len(tasks)}")
|
|
|
|
# Add all tasks to the parallel downloader
|
|
parallel_downloader.add_download_tasks(tasks)
|
|
|
|
# Execute parallel downloads
|
|
results = parallel_downloader.execute_downloads(show_progress=True)
|
|
|
|
# Process results and update tracking
|
|
successful_downloads = 0
|
|
for result in results:
|
|
if result.success:
|
|
successful_downloads += 1
|
|
# Remove completed item from plan and update cache
|
|
for idx, item in enumerate(download_plan):
|
|
if (item["video_id"] == result.task.video_id and
|
|
item["artist"] == result.task.artist and
|
|
item["title"] == result.task.title):
|
|
download_plan.pop(idx)
|
|
break
|
|
|
|
# Update cache after each successful download
|
|
save_plan_cache(cache_file, download_plan, unmatched)
|
|
|
|
print(f"🎉 Parallel downloads completed!")
|
|
print(f" ✅ Successful: {successful_downloads}")
|
|
print(f" ❌ Failed: {len(results) - successful_downloads}")
|
|
|
|
# Delete cache if all items are complete
|
|
if len(download_plan) == 0:
|
|
cleanup_cache(cache_file)
|
|
|
|
return successful_downloads, True
|
|
|
|
def execute_latest_per_channel_parallel(
|
|
self,
|
|
channel_plans,
|
|
cache_file,
|
|
):
|
|
"""
|
|
Execute latest-per-channel downloads using parallel processing.
|
|
|
|
Args:
|
|
channel_plans: List of channel plans with videos to download
|
|
cache_file: Path to cache file for progress tracking
|
|
|
|
Returns:
|
|
bool: True if successful
|
|
"""
|
|
if not self.enable_parallel_downloads:
|
|
# Fall back to sequential processing
|
|
return self._execute_latest_per_channel_sequential(channel_plans, cache_file)
|
|
|
|
# Create parallel downloader
|
|
parallel_downloader = create_parallel_downloader(
|
|
yt_dlp_path=str(self.yt_dlp_path),
|
|
config=self.config,
|
|
downloads_dir=self.downloads_dir,
|
|
max_workers=self.parallel_workers,
|
|
songlist_tracking=self.songlist_tracking,
|
|
tracker=self.tracker,
|
|
)
|
|
|
|
# Convert all videos to tasks
|
|
all_tasks = []
|
|
task_to_channel_mapping = {} # Map task identifiers to their channel index
|
|
|
|
for channel_idx, channel_plan in enumerate(channel_plans):
|
|
channel_name = channel_plan["channel_name"]
|
|
videos = channel_plan["videos"]
|
|
|
|
for v_idx, video in enumerate(videos):
|
|
title = video["title"]
|
|
video_id = video["id"]
|
|
|
|
# Sanitize filename
|
|
safe_title = title
|
|
invalid_chars = ["?", ":", "*", '"', "<", ">", "|", "/", "\\"]
|
|
for char in invalid_chars:
|
|
safe_title = safe_title.replace(char, "")
|
|
safe_title = (
|
|
safe_title.replace("...", "")
|
|
.replace("..", "")
|
|
.replace(".", "")
|
|
.strip()
|
|
)
|
|
|
|
# Extract artist and title for tracking
|
|
artist, title_clean = extract_artist_title(title)
|
|
|
|
task = DownloadTask(
|
|
video_id=video_id,
|
|
artist=artist,
|
|
title=title_clean,
|
|
channel_name=channel_name,
|
|
video_title=title,
|
|
priority=len(channel_plans) - channel_idx, # Higher priority for earlier channels
|
|
)
|
|
all_tasks.append(task)
|
|
# Use a unique identifier for the mapping
|
|
task_id = f"{video_id}_{channel_name}_{title_clean}"
|
|
task_to_channel_mapping[task_id] = channel_idx
|
|
|
|
print(f"🚀 Starting parallel latest-per-channel downloads with {self.parallel_workers} workers...")
|
|
print(f"📋 Total tasks: {len(all_tasks)} across {len(channel_plans)} channels")
|
|
|
|
# Add all tasks to the parallel downloader
|
|
parallel_downloader.add_download_tasks(all_tasks)
|
|
|
|
# Execute parallel downloads
|
|
results = parallel_downloader.execute_downloads(show_progress=True)
|
|
|
|
# Process results and update channel plans
|
|
successful_downloads = 0
|
|
channel_completion_status = {i: False for i in range(len(channel_plans))}
|
|
|
|
for result in results:
|
|
if result.success:
|
|
successful_downloads += 1
|
|
# Create the same task identifier used in the mapping
|
|
task_id = f"{result.task.video_id}_{result.task.channel_name}_{result.task.title}"
|
|
channel_idx = task_to_channel_mapping[task_id]
|
|
|
|
# Mark this video as completed in the channel plan
|
|
channel_plan = channel_plans[channel_idx]
|
|
for v_idx, video in enumerate(channel_plan["videos"]):
|
|
if (video["id"] == result.task.video_id and
|
|
video["title"] == result.task.video_title):
|
|
channel_plan["videos"].pop(v_idx)
|
|
break
|
|
|
|
# Check if channel is complete
|
|
if len(channel_plan["videos"]) == 0 and not channel_completion_status[channel_idx]:
|
|
channel_completion_status[channel_idx] = True
|
|
print(f" 🗑️ Channel {result.task.channel_name} completed and removed from plan cache.")
|
|
|
|
# Update cache with remaining videos
|
|
with open(cache_file, "w", encoding="utf-8") as f:
|
|
json.dump(
|
|
{
|
|
"timestamp": datetime.now().isoformat(),
|
|
"channel_plans": channel_plans,
|
|
},
|
|
f,
|
|
indent=2,
|
|
ensure_ascii=False,
|
|
)
|
|
|
|
print(f"🎉 Parallel latest-per-channel downloads completed!")
|
|
print(f" ✅ Successful: {successful_downloads}")
|
|
print(f" ❌ Failed: {len(results) - successful_downloads}")
|
|
|
|
# Delete cache if all channels are complete
|
|
all_complete = all(channel_completion_status.values())
|
|
if all_complete:
|
|
if cache_file.exists():
|
|
try:
|
|
cache_file.unlink()
|
|
print(f"🗑️ Deleted latest-per-channel plan cache after completion: {cache_file.name}")
|
|
except Exception as e:
|
|
print(f"⚠️ Could not delete latest-per-channel plan cache: {e}")
|
|
|
|
return True
|
|
|
|
def _execute_latest_per_channel_sequential(
|
|
self,
|
|
channel_plans,
|
|
cache_file,
|
|
):
|
|
"""
|
|
Execute latest-per-channel downloads sequentially (fallback method).
|
|
"""
|
|
total_channels = len(channel_plans)
|
|
for idx, channel_plan in enumerate(channel_plans):
|
|
channel_name = channel_plan["channel_name"]
|
|
channel_url = channel_plan["channel_url"]
|
|
videos = channel_plan["videos"]
|
|
print(
|
|
f"\n⬇️ Downloading {len(videos)} videos from channel {idx+1} of {total_channels}: {channel_name}"
|
|
)
|
|
for v_idx, video in enumerate(videos):
|
|
title = video["title"]
|
|
video_id = video["id"]
|
|
# Sanitize filename
|
|
safe_title = title
|
|
invalid_chars = ["?", ":", "*", '"', "<", ">", "|", "/", "\\"]
|
|
for char in invalid_chars:
|
|
safe_title = safe_title.replace(char, "")
|
|
safe_title = (
|
|
safe_title.replace("...", "")
|
|
.replace("..", "")
|
|
.replace(".", "")
|
|
.strip()
|
|
)
|
|
filename = f"{channel_name} - {safe_title}.mp4"
|
|
# Extract artist and title for tracking
|
|
artist, title_clean = extract_artist_title(title)
|
|
|
|
print(
|
|
f" ({v_idx+1}/{len(videos)}) Processing: {artist} - {title_clean}"
|
|
)
|
|
|
|
# Use the new pipeline for consistent processing
|
|
pipeline = DownloadPipeline(
|
|
yt_dlp_path=str(self.yt_dlp_path),
|
|
config=self.config,
|
|
downloads_dir=self.downloads_dir,
|
|
songlist_tracking=self.songlist_tracking,
|
|
tracker=self.tracker,
|
|
)
|
|
|
|
if pipeline.execute_pipeline(
|
|
video_id=video_id,
|
|
artist=artist,
|
|
title=title_clean,
|
|
channel_name=channel_name,
|
|
video_title=title,
|
|
):
|
|
print(f" ✅ Successfully processed: {artist} - {title_clean}")
|
|
else:
|
|
print(f" ❌ Failed to process: {artist} - {title_clean}")
|
|
# After channel is done, remove it from the plan and update cache
|
|
channel_plans[idx]["videos"] = []
|
|
with open(cache_file, "w", encoding="utf-8") as f:
|
|
json.dump(
|
|
{
|
|
"timestamp": datetime.now().isoformat(),
|
|
"channel_plans": channel_plans,
|
|
},
|
|
f,
|
|
indent=2,
|
|
ensure_ascii=False,
|
|
)
|
|
print(
|
|
f" 🗑️ Channel {channel_name} completed and removed from plan cache."
|
|
)
|
|
# After all channels are done, delete the cache
|
|
if cache_file.exists():
|
|
try:
|
|
cache_file.unlink()
|
|
print(
|
|
f"🗑️ Deleted latest-per-channel plan cache after completion: {cache_file.name}"
|
|
)
|
|
except Exception as e:
|
|
print(f"⚠️ Could not delete latest-per-channel plan cache: {e}")
|
|
print(f"🎉 All latest videos downloaded for all channels!")
|
|
return True
|
|
|
|
def download_latest_per_channel(
|
|
self,
|
|
channel_urls,
|
|
limit=5,
|
|
force_refresh_download_plan=False,
|
|
fuzzy_match=False,
|
|
fuzzy_threshold=DEFAULT_FUZZY_THRESHOLD,
|
|
force_download=False,
|
|
):
|
|
"""
|
|
Download the latest N videos from each channel in channel_urls.
|
|
- Pre-scan all channels for their latest N videos.
|
|
- Check against local songs file to avoid duplicates.
|
|
- Build a per-channel download plan and cache it.
|
|
- Resume robustly if interrupted (removes each channel from the plan as it completes).
|
|
- Deletes the plan cache when all channels are done.
|
|
"""
|
|
# Set force download flag
|
|
self.force_download = force_download
|
|
|
|
print(f"\n🎬 Downloading latest {limit} videos from each channel...")
|
|
# Load server songs for availability checking
|
|
server_songs = load_server_songs()
|
|
server_duplicates_tracking = load_server_duplicates_tracking()
|
|
|
|
plan_mode = "latest_per_channel"
|
|
# Include all parameters that affect the plan generation
|
|
plan_kwargs = {
|
|
"limit": limit,
|
|
"channels": len(channel_urls),
|
|
"fuzzy": fuzzy_match,
|
|
"threshold": fuzzy_threshold,
|
|
}
|
|
# Add channel URLs hash to ensure same channels = same cache
|
|
channels_hash = hashlib.md5(
|
|
"|".join(sorted(channel_urls)).encode()
|
|
).hexdigest()[:8]
|
|
plan_kwargs["channels_hash"] = channels_hash
|
|
cache_file = get_download_plan_cache_file(plan_mode, **plan_kwargs)
|
|
use_cache = False
|
|
if not force_refresh_download_plan and cache_file.exists():
|
|
try:
|
|
with open(cache_file, "r", encoding="utf-8") as f:
|
|
plan_data = json.load(f)
|
|
cache_time = datetime.fromisoformat(plan_data.get("timestamp"))
|
|
if datetime.now() - cache_time < timedelta(
|
|
days=DEFAULT_CACHE_EXPIRATION_DAYS
|
|
):
|
|
print(
|
|
f"🗂️ Using cached latest-per-channel plan from {cache_time} ({cache_file.name})."
|
|
)
|
|
channel_plans = plan_data["channel_plans"]
|
|
use_cache = True
|
|
except Exception as e:
|
|
print(f"⚠️ Could not load latest-per-channel plan cache: {e}")
|
|
if not use_cache:
|
|
print("\n🔎 Pre-scanning all channels for latest videos...")
|
|
channel_plans = []
|
|
total_found = 0
|
|
total_filtered = 0
|
|
total_marked = 0
|
|
|
|
for channel_url in channel_urls:
|
|
channel_name, channel_id = get_channel_info(channel_url)
|
|
print(f"\n🚦 Starting channel: {channel_name} ({channel_url})")
|
|
available_videos = self.tracker.get_channel_video_list(
|
|
channel_url, yt_dlp_path=str(self.yt_dlp_path), force_refresh=False
|
|
)
|
|
print(
|
|
f" → Found {len(available_videos)} total videos for this channel."
|
|
)
|
|
|
|
# Pre-filter: Create a set of known duplicate keys for O(1) lookup
|
|
known_duplicate_keys = set()
|
|
for song_key in server_duplicates_tracking.keys():
|
|
known_duplicate_keys.add(song_key)
|
|
|
|
# Pre-filter videos to exclude known duplicates before processing
|
|
pre_filtered_videos = []
|
|
for video in available_videos:
|
|
artist, title = extract_artist_title(video["title"])
|
|
song_key = create_song_key(artist, title)
|
|
if song_key not in known_duplicate_keys:
|
|
pre_filtered_videos.append(video)
|
|
|
|
print(
|
|
f" → After pre-filtering: {len(pre_filtered_videos)} videos not previously marked as duplicates."
|
|
)
|
|
|
|
# Process videos until we reach the limit for this channel
|
|
filtered_videos = []
|
|
videos_checked = 0
|
|
|
|
for video in pre_filtered_videos:
|
|
if len(filtered_videos) >= limit:
|
|
break # We have enough videos for this channel
|
|
|
|
videos_checked += 1
|
|
artist, title = extract_artist_title(video["title"])
|
|
|
|
# Check if should skip this song during planning phase
|
|
should_skip, reason, filtered_count = self._should_skip_song(
|
|
artist,
|
|
title,
|
|
channel_name,
|
|
video["id"],
|
|
video["title"],
|
|
server_songs,
|
|
server_duplicates_tracking,
|
|
)
|
|
if should_skip:
|
|
total_filtered += 1
|
|
if reason == "on server":
|
|
total_marked += filtered_count
|
|
continue
|
|
|
|
filtered_videos.append(video)
|
|
|
|
print(
|
|
f" → After processing: {len(filtered_videos)} videos to download (checked {videos_checked} videos, filtered out {videos_checked - len(filtered_videos)} already on server)."
|
|
)
|
|
total_found += len(filtered_videos)
|
|
|
|
channel_plans.append(
|
|
{
|
|
"channel_name": channel_name,
|
|
"channel_url": channel_url,
|
|
"videos": filtered_videos,
|
|
}
|
|
)
|
|
|
|
print(
|
|
f"\n📊 Summary: {total_found} videos to download across {len(channel_plans)} channels (filtered out {total_filtered} already on server, marked {total_marked} new duplicates for future skipping)."
|
|
)
|
|
|
|
plan_data = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"channel_plans": channel_plans,
|
|
}
|
|
with open(cache_file, "w", encoding="utf-8") as f:
|
|
json.dump(plan_data, f, indent=2, ensure_ascii=False)
|
|
print(f"🗂️ Saved new latest-per-channel plan cache: {cache_file.name}")
|
|
# --- Download phase ---
|
|
return self.execute_latest_per_channel_parallel(channel_plans, cache_file)
|
|
|
|
|
|
def reset_songlist_all():
|
|
"""Delete all files tracked in songlist_tracking.json, clear songlist_tracking.json, and remove songlist songs from karaoke_tracking.json."""
|
|
import json
|
|
from pathlib import Path
|
|
|
|
# Load songlist tracking
|
|
songlist_tracking_file = Path("data/songlist_tracking.json")
|
|
karaoke_tracking_file = Path("data/karaoke_tracking.json")
|
|
if songlist_tracking_file.exists():
|
|
with open(songlist_tracking_file, "r", encoding="utf-8") as f:
|
|
tracking = json.load(f)
|
|
else:
|
|
tracking = {}
|
|
# Delete all files tracked
|
|
for entry in tracking.values():
|
|
file_path = entry.get("file_path")
|
|
if file_path:
|
|
p = Path(file_path)
|
|
try:
|
|
if p.exists():
|
|
p.unlink()
|
|
print(f"🗑️ Deleted: {p}")
|
|
except Exception as e:
|
|
print(f"⚠️ Could not delete {p}: {e}")
|
|
# Clear songlist_tracking.json
|
|
songlist_tracking_file.write_text("{}", encoding="utf-8")
|
|
print("🧹 Cleared songlist_tracking.json")
|
|
# Remove songlist songs from karaoke_tracking.json
|
|
if karaoke_tracking_file.exists():
|
|
with open(karaoke_tracking_file, "r", encoding="utf-8") as f:
|
|
karaoke_data = json.load(f)
|
|
song_keys_to_remove = []
|
|
for song_id, song in karaoke_data.get("songs", {}).items():
|
|
artist = song.get("artist", "")
|
|
title = song.get("title", song.get("name", ""))
|
|
key = f"{artist.lower()}_{normalize_title(title)}"
|
|
if key in tracking:
|
|
song_keys_to_remove.append(song_id)
|
|
for song_id in song_keys_to_remove:
|
|
del karaoke_data["songs"][song_id]
|
|
with open(karaoke_tracking_file, "w", encoding="utf-8") as f:
|
|
json.dump(karaoke_data, f, indent=2, ensure_ascii=False)
|
|
print(
|
|
f"🧹 Removed {len(song_keys_to_remove)} songlist songs from karaoke_tracking.json"
|
|
)
|
|
print("✅ Global songlist reset complete.")
|
|
|
|
# For brevity, the rest of the class methods should be copied here from the original download_karaoke.py,
|
|
# updating all references to use the new karaoke_downloader.* imports as needed.
|