KaraokeVideoDownloader/karaoke_downloader/downloader.py

834 lines
44 KiB
Python

import os
import sys
import subprocess
import json
import re
from pathlib import Path
from datetime import datetime, timedelta
from karaoke_downloader.tracking_manager import TrackingManager, SongStatus, FormatType
from karaoke_downloader.id3_utils import add_id3_tags, extract_artist_title
from karaoke_downloader.songlist_manager import (
load_songlist, load_songlist_tracking, save_songlist_tracking,
is_songlist_song_downloaded, mark_songlist_song_downloaded, normalize_title,
load_server_songs, is_song_on_server
)
from karaoke_downloader.youtube_utils import get_channel_info, get_playlist_info
import logging
import hashlib
DATA_DIR = Path("data")
class KaraokeDownloader:
def __init__(self):
self.yt_dlp_path = Path("downloader/yt-dlp.exe")
self.downloads_dir = Path("downloads")
self.logs_dir = Path("logs")
self.downloads_dir.mkdir(exist_ok=True)
self.logs_dir.mkdir(exist_ok=True)
self.tracker = TrackingManager(tracking_file=DATA_DIR / "karaoke_tracking.json", cache_file=DATA_DIR / "channel_cache.json")
self.config = self._load_config()
self.songlist_tracking_file = DATA_DIR / "songlist_tracking.json"
self.songlist_tracking = load_songlist_tracking(str(self.songlist_tracking_file))
# Load server songs for availability checking
self.server_songs = load_server_songs()
def _load_config(self):
config_file = DATA_DIR / "config.json"
if config_file.exists():
try:
with open(config_file, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError) as e:
print(f"Warning: Could not load config.json: {e}")
return {
"download_settings": {
"format": "best[height<=720][ext=mp4]/best[height<=720]/best[ext=mp4]/best",
"preferred_resolution": "720p",
"audio_format": "mp3",
"audio_quality": "0",
"subtitle_language": "en",
"subtitle_format": "srt",
"write_metadata": False,
"write_thumbnail": False,
"write_description": False,
"write_annotations": False,
"write_comments": False,
"write_subtitles": False,
"embed_metadata": False,
"add_metadata": False,
"continue_downloads": True,
"no_overwrites": True,
"ignore_errors": True,
"no_warnings": False
},
"folder_structure": {
"downloads_dir": "downloads",
"logs_dir": "logs",
"tracking_file": str(DATA_DIR / "karaoke_tracking.json")
},
"logging": {
"level": "INFO",
"format": "%(asctime)s - %(levelname)s - %(message)s",
"include_console": True,
"include_file": True
},
"yt_dlp_path": "downloader/yt-dlp.exe"
}
def reset_channel_downloads(self, channel_name, reset_songlist=False, delete_files=False):
"""
Reset all tracking and optionally files for a channel.
If reset_songlist is False, songlist songs are preserved (tracking and files).
If reset_songlist is True, songlist songs for this channel are also reset/deleted.
"""
print(f"\n🔄 Resetting channel: {channel_name} (reset_songlist={reset_songlist}, delete_files={delete_files})")
# Find channel_id from channel_name
channel_id = None
for pid, playlist in self.tracker.data.get('playlists', {}).items():
if playlist['name'] == channel_name or pid == channel_name:
channel_id = pid
break
if not channel_id:
print(f"❌ Channel '{channel_name}' not found in tracking.")
return
# Get all songs for this channel
songs_to_reset = []
for song_id, song in self.tracker.data.get('songs', {}).items():
if song['playlist_id'] == channel_id:
# Check if this is a songlist song
artist, title = song.get('artist', ''), song.get('title', song.get('name', ''))
key = f"{artist.lower()}_{normalize_title(title)}"
is_songlist = key in self.songlist_tracking
if is_songlist and not reset_songlist:
continue # skip songlist songs if not resetting them
songs_to_reset.append((song_id, song, is_songlist))
# Reset tracking and optionally delete files
files_preserved = 0
files_deleted = 0
for song_id, song, is_songlist in songs_to_reset:
# Remove from main tracking
self.tracker.data['songs'][song_id]['status'] = 'NOT_DOWNLOADED'
self.tracker.data['songs'][song_id]['formats'] = {}
self.tracker.data['songs'][song_id]['last_error'] = ''
self.tracker.data['songs'][song_id]['download_attempts'] = 0
self.tracker.data['songs'][song_id]['last_updated'] = None
# Remove from songlist tracking if needed
if is_songlist and reset_songlist:
artist, title = song.get('artist', ''), song.get('title', song.get('name', ''))
key = f"{artist.lower()}_{normalize_title(title)}"
if key in self.songlist_tracking:
del self.songlist_tracking[key]
# Delete file if requested
if delete_files:
file_path = song.get('file_path')
if file_path:
try:
p = Path(file_path)
if p.exists():
p.unlink()
files_deleted += 1
else:
files_preserved += 1
except Exception as e:
print(f"⚠️ Could not delete file {file_path}: {e}")
# --- FIX: Remove all songlist_tracking entries for this channel if reset_songlist is True ---
if reset_songlist:
keys_to_remove = [k for k, v in self.songlist_tracking.items() if v.get('channel') == channel_name]
for k in keys_to_remove:
del self.songlist_tracking[k]
# Save changes
self.tracker.force_save()
save_songlist_tracking(self.songlist_tracking, str(self.songlist_tracking_file))
print(f"✅ Reset {len(songs_to_reset)} songs for channel '{channel_name}'.")
if delete_files:
print(f" Files deleted: {files_deleted}, files preserved: {files_preserved}")
if not reset_songlist:
print(f" Songlist songs were preserved.")
def download_from_file(self, file_path, force_refresh=False):
file = Path(file_path)
if not file.exists():
print(f"❌ File not found: {file_path}")
return False
with open(file, "r", encoding="utf-8") as f:
urls = [line.strip() for line in f if line.strip() and not line.strip().startswith("#")]
if not urls:
print(f"❌ No URLs found in {file_path}")
return False
all_success = True
for url in urls:
print(f"\n➡️ Processing: {url}")
success = self.download_channel_videos(url, force_refresh=force_refresh)
if not success:
all_success = False
return all_success
def download_channel_videos(self, url, force_refresh=False, fuzzy_match=False, fuzzy_threshold=90):
"""Download videos from a channel or playlist URL, respecting songlist-only and limit flags. Supports fuzzy matching."""
channel_name, channel_id = get_channel_info(url)
print(f"\n🎬 Downloading from channel: {channel_name} ({url})")
songlist = load_songlist()
if not songlist:
print("⚠️ No songlist loaded. Skipping.")
return False
limit = self.config.get('limit', 1)
cmd = [
str(self.yt_dlp_path),
'--flat-playlist',
'--print', '%(title)s|%(id)s|%(url)s',
url
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
lines = result.stdout.strip().splitlines()
except subprocess.CalledProcessError as e:
print(f"❌ yt-dlp failed to fetch playlist: {e}")
return False
available_videos = []
for line in lines:
parts = line.split('|')
if len(parts) >= 2:
title, video_id = parts[0].strip(), parts[1].strip()
available_videos.append({'title': title, 'id': video_id})
# Normalize songlist for matching
try:
from rapidfuzz import fuzz
def similarity(a, b):
return fuzz.ratio(a, b)
except ImportError:
import difflib
def similarity(a, b):
return int(difflib.SequenceMatcher(None, a, b).ratio() * 100)
normalized_songlist = {
f"{s['artist'].lower()}_{normalize_title(s['title'])}": s for s in songlist
}
matches = []
for video in available_videos:
artist, title = extract_artist_title(video['title'])
key = f"{artist.lower()}_{normalize_title(title)}"
if fuzzy_match:
# Fuzzy match against all songlist keys
best_score = 0
best_song = None
for song_key, song in normalized_songlist.items():
score = similarity(key, song_key)
if score > best_score:
best_score = score
best_song = song
if best_score >= fuzzy_threshold and best_song:
if not is_songlist_song_downloaded(self.songlist_tracking, best_song['artist'], best_song['title']):
matches.append((video, best_song))
print(f" → Fuzzy match: {artist} - {title} <-> {best_song['artist']} - {best_song['title']} (score: {best_score})")
if len(matches) >= limit:
break
else:
if key in normalized_songlist:
if not is_songlist_song_downloaded(self.songlist_tracking, artist, title):
matches.append((video, normalized_songlist[key]))
if len(matches) >= limit:
break
if not matches:
print("🎵 No new songlist matches found for this channel.")
return True
# Download only the first N matches
for video, song in matches:
artist, title = song['artist'], song['title']
output_path = self.downloads_dir / channel_name / f"{artist} - {title} (Karaoke Version).mp4"
output_path.parent.mkdir(parents=True, exist_ok=True)
print(f"⬇️ Downloading: {artist} - {title} -> {output_path}")
video_url = f"https://www.youtube.com/watch?v={video['id']}"
cmd = [
str(self.yt_dlp_path),
"-o", str(output_path),
"-f", self.config["download_settings"]["format"],
video_url
]
try:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as e:
print(f"❌ yt-dlp failed: {e}")
continue
if not output_path.exists() or output_path.stat().st_size == 0:
print(f"❌ Download failed or file is empty: {output_path}")
continue
if not self._is_valid_mp4(output_path):
print(f"❌ File is not a valid MP4: {output_path}")
continue
add_id3_tags(output_path, f"{artist} - {title} (Karaoke Version)", channel_name)
mark_songlist_song_downloaded(self.songlist_tracking, artist, title, channel_name, output_path)
print(f"✅ Downloaded and tracked: {artist} - {title}")
print(f"🎉 All post-processing complete for: {output_path}")
return True
def build_download_plan(self, channel_urls, undownloaded, fuzzy_match=False, fuzzy_threshold=90):
"""
For each song in undownloaded, scan all channels for a match.
Use fuzzy matching if enabled.
Return (download_plan, unmatched_songs):
- download_plan: list of dicts {artist, title, channel_name, channel_url, video_id, video_title}
- unmatched_songs: list of songs not found in any channel
"""
try:
from rapidfuzz import fuzz
def similarity(a, b):
return fuzz.ratio(a, b)
except ImportError:
import difflib
def similarity(a, b):
return int(difflib.SequenceMatcher(None, a, b).ratio() * 100)
plan = []
unmatched = []
channel_match_counts = {}
for channel_url in channel_urls:
channel_name, channel_id = get_channel_info(channel_url)
print(f"\n🚦 Starting channel: {channel_name} ({channel_url})")
available_videos = self.tracker.get_channel_video_list(
channel_url,
yt_dlp_path=str(self.yt_dlp_path),
force_refresh=False
)
matches_this_channel = 0
channel_fuzzy_matches = [] # For optional top-N reporting
for song in undownloaded:
artist, title = song['artist'], song['title']
found = False
song_key = f"{artist.lower()}_{normalize_title(title)}"
for video in available_videos:
v_artist, v_title = extract_artist_title(video['title'])
video_key = f"{v_artist.lower()}_{normalize_title(v_title)}"
if fuzzy_match:
score = similarity(song_key, video_key)
if score >= fuzzy_threshold:
if not any(p['artist'] == artist and p['title'] == title for p in plan):
plan.append({
'artist': artist,
'title': title,
'channel_name': channel_name,
'channel_url': channel_url,
'video_id': video['id'],
'video_title': video['title'],
'match_score': score
})
print(f" → Match: \"{artist} - {title}\" <-> \"{video['title']}\" (score: {score})")
matches_this_channel += 1
found = True
break
else:
if (normalize_title(v_artist) == normalize_title(artist) and normalize_title(v_title) == normalize_title(title)) or \
(normalize_title(video['title']) == normalize_title(f"{artist} - {title}")):
if not any(p['artist'] == artist and p['title'] == title for p in plan):
plan.append({
'artist': artist,
'title': title,
'channel_name': channel_name,
'channel_url': channel_url,
'video_id': video['id'],
'video_title': video['title'],
'match_score': 100
})
print(f" → Match: \"{artist} - {title}\" <-> \"{video['title']}\" (exact)")
matches_this_channel += 1
found = True
break
# Don't break here; keep looking for all matches in this channel
channel_match_counts[channel_name] = matches_this_channel
print(f" → Found {matches_this_channel} songlist matches in this channel.")
# Optionally, print top 3 fuzzy matches for review
# if fuzzy_match and channel_fuzzy_matches:
# top_matches = sorted(channel_fuzzy_matches, key=lambda x: -x[3])[:3]
# for a, t, vt, s in top_matches:
# print(f" Top match: {a} - {t} <-> {vt} (score: {s})")
# Now find unmatched songs
for song in undownloaded:
if not any(p['artist'] == song['artist'] and p['title'] == song['title'] for p in plan):
unmatched.append(song)
# Print summary table
print("\n📊 Channel match summary:")
for channel, count in channel_match_counts.items():
print(f" {channel}: {count} matches")
print(f" TOTAL: {sum(channel_match_counts.values())} matches across {len(channel_match_counts)} channels.")
return plan, unmatched
def get_download_plan_cache_file(self, mode, **kwargs):
"""Generate a unique cache filename based on mode and key parameters."""
parts = [f"plan_{mode}"]
for k, v in sorted(kwargs.items()):
parts.append(f"{k}{v}")
base = "_".join(parts)
# Hash for safety if string is long
if len(base) > 60:
base = base[:40] + "_" + hashlib.md5(base.encode()).hexdigest()
return Path(f"data/{base}.json")
def download_songlist_across_channels(self, channel_urls, limit=None, force_refresh_download_plan=False, fuzzy_match=False, fuzzy_threshold=90):
"""
For each song in the songlist, try each channel in order and download from the first channel where it is found.
Download up to 'limit' songs, skipping any that cannot be found, until the limit is reached or all possible matches are exhausted.
"""
songlist = load_songlist()
if not songlist:
print("⚠️ No songlist loaded. Skipping.")
return False
# Filter for songs not yet downloaded
undownloaded = [s for s in songlist if not is_songlist_song_downloaded(self.songlist_tracking, s['artist'], s['title'])]
print(f"🎯 {len(songlist)} total unique songs in songlist.")
print(f"🎯 {len(undownloaded)} unique songlist songs to download.")
# Further filter out songs already on server
not_on_server = [s for s in undownloaded if not is_song_on_server(self.server_songs, s['artist'], s['title'])]
server_available = len(undownloaded) - len(not_on_server)
if server_available > 0:
print(f"🎵 {server_available} songs already available on server, skipping.")
undownloaded = not_on_server
print(f"🎯 {len(undownloaded)} songs need to be downloaded.")
if not undownloaded:
print("🎵 All songlist songs already downloaded.")
return True
# --- FAST MODE: Early exit and deduplication if limit is set ---
if limit is not None:
print("\n⚡ Fast mode enabled: will stop as soon as limit is reached with successful downloads.")
try:
from rapidfuzz import fuzz
def similarity(a, b):
return fuzz.ratio(a, b)
except ImportError:
import difflib
def similarity(a, b):
return int(difflib.SequenceMatcher(None, a, b).ratio() * 100)
downloaded_count = 0
unique_keys = set()
total_attempted = 0
for channel_url in channel_urls:
channel_name, channel_id = get_channel_info(channel_url)
print(f"\n🚦 Starting channel: {channel_name} ({channel_url})")
available_videos = self.tracker.get_channel_video_list(
channel_url,
yt_dlp_path=str(self.yt_dlp_path),
force_refresh=False
)
for song in undownloaded:
artist, title = song['artist'], song['title']
key = f"{artist.lower()}_{normalize_title(title)}"
if key in unique_keys:
continue # Already downloaded or queued
found = False
for video in available_videos:
v_artist, v_title = extract_artist_title(video['title'])
video_key = f"{v_artist.lower()}_{normalize_title(v_title)}"
if fuzzy_match:
score = similarity(key, video_key)
if score >= fuzzy_threshold:
print(f" → Match: \"{artist} - {title}\" <-> \"{video['title']}\" (score: {score})")
found = True
else:
if (normalize_title(v_artist) == normalize_title(artist) and normalize_title(v_title) == normalize_title(title)) or \
(normalize_title(video['title']) == normalize_title(f"{artist} - {title}")):
print(f" → Match: \"{artist} - {title}\" <-> \"{video['title']}\" (exact)")
found = True
if found:
print(f"\n⬇️ Downloading {downloaded_count+1} of {limit}:")
print(f" 📋 Songlist: {artist} - {title}")
print(f" 🎬 Video: {video['title']} ({channel_name})")
# --- Download logic (reuse from below) ---
safe_title = title.replace("(From ", "").replace(")", "").replace(" - ", " ").replace(":", "").replace("'", "").replace('"', "")
safe_artist = artist.replace("'", "").replace('"', "")
invalid_chars = ['?', ':', '*', '"', '<', '>', '|', '/', '\\']
for char in invalid_chars:
safe_title = safe_title.replace(char, "")
safe_artist = safe_artist.replace(char, "")
safe_title = safe_title.replace("...", "").replace("..", "").replace(".", "").strip()
safe_artist = safe_artist.strip()
filename = f"{safe_artist} - {safe_title}.mp4"
# Call the actual download function (simulate the same as in the plan loop)
success = self._download_video_and_track(
channel_name, channel_url, video['id'], video['title'], artist, title, filename
)
total_attempted += 1
if success:
downloaded_count += 1
unique_keys.add(key)
print(f"✅ Downloaded and tracked: {artist} - {title}")
else:
print(f"❌ Download failed: {artist} - {title}")
if downloaded_count >= limit:
print(f"🎉 Reached download limit ({limit}). Stopping early.")
return True
break # Don't try to match this song to other videos in this channel
print(f"🎉 Downloaded {downloaded_count} unique songlist songs (limit was {limit}).")
if downloaded_count < limit:
print(f"⚠️ Only {downloaded_count} songs were downloaded. Some may not have been found or downloads failed.")
return True
# --- ORIGINAL FULL PLAN MODE (no limit) ---
# Removed per-song printout for cleaner output
# print("🔍 Songs to search for:")
# for song in undownloaded:
# print(f" - {song['artist']} - {song['title']}")
# --- Download plan cache logic ---
plan_mode = "songlist"
plan_kwargs = {"limit": limit or "all", "channels": len(channel_urls)}
cache_file = self.get_download_plan_cache_file(plan_mode, **plan_kwargs)
use_cache = False
if not force_refresh_download_plan and cache_file.exists():
try:
with open(cache_file, 'r', encoding='utf-8') as f:
cache_data = json.load(f)
cache_time = datetime.fromisoformat(cache_data.get('timestamp'))
if datetime.now() - cache_time < timedelta(days=1):
print(f"🗂️ Using cached download plan from {cache_time} ({cache_file.name}).")
download_plan = cache_data['download_plan']
unmatched = cache_data['unmatched']
use_cache = True
except Exception as e:
print(f"⚠️ Could not load download plan cache: {e}")
if not use_cache:
print("\n🔎 Pre-scanning channels for matches...")
download_plan, unmatched = self.build_download_plan(channel_urls, undownloaded, fuzzy_match=fuzzy_match, fuzzy_threshold=fuzzy_threshold)
if download_plan:
cache_data = {
'timestamp': datetime.now().isoformat(),
'download_plan': download_plan,
'unmatched': unmatched
}
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump(cache_data, f, indent=2, ensure_ascii=False)
print(f"🗂️ Saved new download plan cache: {cache_file.name}")
else:
if cache_file.exists():
cache_file.unlink()
print(f"🗂️ No matches found, not saving download plan cache.")
print(f"\n📊 Download plan ready: {len(download_plan)} songs will be downloaded.")
print(f"{len(unmatched)} songs could not be found in any channel.")
if unmatched:
print("Unmatched songs:")
for song in unmatched[:10]:
print(f" - {song['artist']} - {song['title']}")
if len(unmatched) > 10:
print(f" ...and {len(unmatched)-10} more.")
# --- Download phase ---
downloaded_count = 0
total_to_download = limit if limit is not None else len(download_plan)
for idx, item in enumerate(download_plan):
if limit is not None and downloaded_count >= limit:
break
artist = item['artist']
title = item['title']
channel_name = item['channel_name']
channel_url = item['channel_url']
video_id = item['video_id']
video_title = item['video_title']
print(f"\n⬇️ Downloading {idx+1} of {total_to_download}:")
print(f" 📋 Songlist: {artist} - {title}")
print(f" 🎬 Video: {video_title} ({channel_name})")
# --- Existing download logic here, using channel_name, video_id, etc. ---
# (Copy the download logic from the previous loop, using these variables)
# Create a shorter, safer filename - do this ONCE and use consistently
safe_title = title.replace("(From ", "").replace(")", "").replace(" - ", " ").replace(":", "").replace("'", "").replace('"', "")
safe_artist = artist.replace("'", "").replace('"', "")
# Remove all Windows-invalid characters
invalid_chars = ['?', ':', '*', '"', '<', '>', '|', '/', '\\']
for char in invalid_chars:
safe_title = safe_title.replace(char, "")
safe_artist = safe_artist.replace(char, "")
# Also remove any other potentially problematic characters
safe_title = safe_title.replace("...", "").replace("..", "").replace(".", "").strip()
safe_artist = safe_artist.strip()
filename = f"{safe_artist} - {safe_title}.mp4"
# Limit filename length to avoid Windows path issues
if len(filename) > 100:
filename = f"{safe_artist[:30]} - {safe_title[:60]}.mp4"
output_path = self.downloads_dir / channel_name / filename
output_path.parent.mkdir(parents=True, exist_ok=True)
print(f"⬇️ Downloading: {artist} - {title} -> {output_path}")
video_url = f"https://www.youtube.com/watch?v={video_id}"
dlp_cmd = [
str(self.yt_dlp_path),
"--no-check-certificates",
"--ignore-errors",
"--no-warnings",
"-o", str(output_path),
"-f", self.config["download_settings"]["format"],
video_url
]
print(f"🔧 Running command: {' '.join(dlp_cmd)}")
print(f"📺 Resolution settings: {self.config.get('download_settings', {}).get('preferred_resolution', 'Unknown')}")
print(f"🎬 Format string: {self.config.get('download_settings', {}).get('format', 'Unknown')}")
# Debug: Show available formats (optional)
if self.config.get('debug_show_formats', False):
print(f"🔍 Checking available formats for: {video_url}")
format_cmd = [
str(self.yt_dlp_path),
"--list-formats",
video_url
]
try:
format_result = subprocess.run(format_cmd, capture_output=True, text=True, timeout=30)
print(f"📋 Available formats:\n{format_result.stdout}")
except Exception as e:
print(f"⚠️ Could not check formats: {e}")
try:
result = subprocess.run(dlp_cmd, capture_output=True, text=True, check=True)
print(f"✅ yt-dlp completed successfully")
print(f"📄 yt-dlp stdout: {result.stdout}")
except subprocess.CalledProcessError as e:
print(f"❌ yt-dlp failed with exit code {e.returncode}")
print(f"❌ yt-dlp stderr: {e.stderr}")
continue
if not output_path.exists():
print(f"❌ Download failed: file does not exist: {output_path}")
# Check if yt-dlp saved it somewhere else
possible_files = list(output_path.parent.glob("*.mp4"))
if possible_files:
print(f"🔍 Found these files in the directory: {[f.name for f in possible_files]}")
# Look for a file that matches our pattern (artist - title)
artist_part = safe_artist.lower()
title_part = safe_title.lower()
for file in possible_files:
file_lower = file.stem.lower()
if artist_part in file_lower and any(word in file_lower for word in title_part.split()):
print(f"🎯 Found matching file: {file.name}")
output_path = file
break
else:
print(f"❌ No matching file found for: {artist} - {title}")
continue
else:
continue
if output_path.stat().st_size == 0:
print(f"❌ Download failed: file is empty (0 bytes): {output_path}")
continue
# TEMP: Skipping MP4 validation for debugging
# if not self._is_valid_mp4(output_path):
# print(f"❌ File is not a valid MP4: {output_path}")
# continue
add_id3_tags(output_path, f"{artist} - {title} (Karaoke Version)", channel_name)
mark_songlist_song_downloaded(self.songlist_tracking, artist, title, channel_name, output_path)
print(f"✅ Downloaded and tracked: {artist} - {title}")
print(f"🎉 All post-processing complete for: {output_path}")
downloaded_count += 1
# After each download, if this was the last song, delete the cache
if idx + 1 == total_to_download:
if cache_file.exists():
try:
cache_file.unlink()
print(f"🗑️ Deleted download plan cache after last song downloaded: {cache_file.name}")
except Exception as e:
print(f"⚠️ Could not delete download plan cache: {e}")
print(f"🎉 Downloaded {downloaded_count} songlist songs.")
print(f"📊 Summary: Processed {len(channel_urls)} channels, found {downloaded_count} songs, {len(unmatched)} songs not found.")
# Delete the download plan cache if all planned downloads are done
if cache_file.exists():
try:
cache_file.unlink()
print(f"🗑️ Deleted download plan cache after completion: {cache_file.name}")
except Exception as e:
print(f"⚠️ Could not delete download plan cache: {e}")
return True
def download_latest_per_channel(self, channel_urls, limit=5, force_refresh_download_plan=False):
"""
Download the latest N videos from each channel in channel_urls.
- Pre-scan all channels for their latest N videos.
- Build a per-channel download plan and cache it.
- Resume robustly if interrupted (removes each channel from the plan as it completes).
- Deletes the plan cache when all channels are done.
"""
plan_mode = "latest_per_channel"
plan_kwargs = {"limit": limit, "channels": len(channel_urls)}
cache_file = self.get_download_plan_cache_file(plan_mode, **plan_kwargs)
use_cache = False
if not force_refresh_download_plan and cache_file.exists():
try:
with open(cache_file, 'r', encoding='utf-8') as f:
plan_data = json.load(f)
cache_time = datetime.fromisoformat(plan_data.get('timestamp'))
if datetime.now() - cache_time < timedelta(days=1):
print(f"🗂️ Using cached latest-per-channel plan from {cache_time} ({cache_file.name}).")
channel_plans = plan_data['channel_plans']
use_cache = True
except Exception as e:
print(f"⚠️ Could not load latest-per-channel plan cache: {e}")
if not use_cache:
print("\n🔎 Pre-scanning all channels for latest videos...")
channel_plans = []
for channel_url in channel_urls:
channel_name, channel_id = get_channel_info(channel_url)
print(f"\n🚦 Starting channel: {channel_name} ({channel_url})")
available_videos = self.tracker.get_channel_video_list(
channel_url,
yt_dlp_path=str(self.yt_dlp_path),
force_refresh=False
)
# Sort by upload order (assume yt-dlp returns in order, or sort by id if available)
latest_videos = available_videos[:limit]
print(f" → Found {len(latest_videos)} latest videos for this channel.")
channel_plans.append({
'channel_name': channel_name,
'channel_url': channel_url,
'videos': latest_videos
})
plan_data = {
'timestamp': datetime.now().isoformat(),
'channel_plans': channel_plans
}
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump(plan_data, f, indent=2, ensure_ascii=False)
print(f"🗂️ Saved new latest-per-channel plan cache: {cache_file.name}")
# --- Download phase ---
total_channels = len(channel_plans)
for idx, channel_plan in enumerate(channel_plans):
channel_name = channel_plan['channel_name']
channel_url = channel_plan['channel_url']
videos = channel_plan['videos']
print(f"\n⬇️ Downloading {len(videos)} videos from channel {idx+1} of {total_channels}: {channel_name}")
for v_idx, video in enumerate(videos):
title = video['title']
video_id = video['id']
# Sanitize filename
safe_title = title
invalid_chars = ['?', ':', '*', '"', '<', '>', '|', '/', '\\']
for char in invalid_chars:
safe_title = safe_title.replace(char, "")
safe_title = safe_title.replace("...", "").replace("..", "").replace(".", "").strip()
filename = f"{channel_name} - {safe_title}.mp4"
if len(filename) > 100:
filename = f"{channel_name[:30]} - {safe_title[:60]}.mp4"
output_path = self.downloads_dir / channel_name / filename
output_path.parent.mkdir(parents=True, exist_ok=True)
print(f" ({v_idx+1}/{len(videos)}) Downloading: {title} -> {output_path}")
video_url = f"https://www.youtube.com/watch?v={video_id}"
dlp_cmd = [
str(self.yt_dlp_path),
"--no-check-certificates",
"--ignore-errors",
"--no-warnings",
"-o", str(output_path),
"-f", self.config["download_settings"]["format"],
video_url
]
try:
result = subprocess.run(dlp_cmd, capture_output=True, text=True, check=True)
print(f" ✅ yt-dlp completed successfully")
except subprocess.CalledProcessError as e:
print(f" ❌ yt-dlp failed with exit code {e.returncode}")
print(f" ❌ yt-dlp stderr: {e.stderr}")
continue
if not output_path.exists() or output_path.stat().st_size == 0:
print(f" ❌ Download failed or file is empty: {output_path}")
continue
add_id3_tags(output_path, title, channel_name)
print(f" ✅ Downloaded and tagged: {title}")
# After channel is done, remove it from the plan and update cache
channel_plans[idx]['videos'] = []
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump({'timestamp': datetime.now().isoformat(), 'channel_plans': channel_plans}, f, indent=2, ensure_ascii=False)
print(f" 🗑️ Channel {channel_name} completed and removed from plan cache.")
# After all channels are done, delete the cache
if cache_file.exists():
try:
cache_file.unlink()
print(f"🗑️ Deleted latest-per-channel plan cache after completion: {cache_file.name}")
except Exception as e:
print(f"⚠️ Could not delete latest-per-channel plan cache: {e}")
print(f"🎉 All latest videos downloaded for all channels!")
return True
def _is_valid_mp4(self, file_path):
"""Check if the file is a valid MP4 using ffprobe, if available."""
try:
cmd = ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=codec_name", "-of", "default=noprint_wrappers=1:nokey=1", str(file_path)]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return "mp4" in result.stdout or "h264" in result.stdout or "hevc" in result.stdout
except Exception:
# If ffprobe is not available, skip the check
return True
def _download_video_and_track(self, channel_name, channel_url, video_id, video_title, artist, title, filename):
"""
Helper to download a single video and track its status.
Returns True if successful, False otherwise.
"""
output_path = self.downloads_dir / channel_name / filename
output_path.parent.mkdir(parents=True, exist_ok=True)
print(f"⬇️ Downloading: {artist} - {title} -> {output_path}")
video_url = f"https://www.youtube.com/watch?v={video_id}"
dlp_cmd = [
str(self.yt_dlp_path),
"--no-check-certificates",
"--ignore-errors",
"--no-warnings",
"-o", str(output_path),
"-f", self.config["download_settings"]["format"],
video_url
]
try:
result = subprocess.run(dlp_cmd, capture_output=True, text=True, check=True)
print(f"✅ yt-dlp completed successfully")
print(f"📄 yt-dlp stdout: {result.stdout}")
except subprocess.CalledProcessError as e:
print(f"❌ yt-dlp failed with exit code {e.returncode}")
print(f"❌ yt-dlp stderr: {e.stderr}")
return False
if not output_path.exists():
print(f"❌ Download failed: file does not exist: {output_path}")
return False
if output_path.stat().st_size == 0:
print(f"❌ Download failed: file is empty (0 bytes): {output_path}")
return False
# TEMP: Skipping MP4 validation for debugging
# if not self._is_valid_mp4(output_path):
# print(f"❌ File is not a valid MP4: {output_path}")
# return False
add_id3_tags(output_path, f"{artist} - {title} (Karaoke Version)", channel_name)
mark_songlist_song_downloaded(self.songlist_tracking, artist, title, channel_name, output_path)
print(f"✅ Downloaded and tracked: {artist} - {title}")
print(f"🎉 All post-processing complete for: {output_path}")
return True
def reset_songlist_all():
"""Delete all files tracked in songlist_tracking.json, clear songlist_tracking.json, and remove songlist songs from karaoke_tracking.json."""
import json
from pathlib import Path
# Load songlist tracking
songlist_tracking_file = Path('data/songlist_tracking.json')
karaoke_tracking_file = Path('data/karaoke_tracking.json')
if songlist_tracking_file.exists():
with open(songlist_tracking_file, 'r', encoding='utf-8') as f:
tracking = json.load(f)
else:
tracking = {}
# Delete all files tracked
for entry in tracking.values():
file_path = entry.get('file_path')
if file_path:
p = Path(file_path)
try:
if p.exists():
p.unlink()
print(f"🗑️ Deleted: {p}")
except Exception as e:
print(f"⚠️ Could not delete {p}: {e}")
# Clear songlist_tracking.json
songlist_tracking_file.write_text("{}", encoding="utf-8")
print("🧹 Cleared songlist_tracking.json")
# Remove songlist songs from karaoke_tracking.json
if karaoke_tracking_file.exists():
with open(karaoke_tracking_file, 'r', encoding='utf-8') as f:
karaoke_data = json.load(f)
song_keys_to_remove = []
for song_id, song in karaoke_data.get('songs', {}).items():
artist = song.get('artist', '')
title = song.get('title', song.get('name', ''))
key = f"{artist.lower()}_{normalize_title(title)}"
if key in tracking:
song_keys_to_remove.append(song_id)
for song_id in song_keys_to_remove:
del karaoke_data['songs'][song_id]
with open(karaoke_tracking_file, 'w', encoding='utf-8') as f:
json.dump(karaoke_data, f, indent=2, ensure_ascii=False)
print(f"🧹 Removed {len(song_keys_to_remove)} songlist songs from karaoke_tracking.json")
print("✅ Global songlist reset complete.")
# For brevity, the rest of the class methods should be copied here from the original download_karaoke.py,
# updating all references to use the new karaoke_downloader.* imports as needed.