KaraokeVideoDownloader/karaoke_downloader/downloader.py

469 lines
25 KiB
Python

import os
import sys
import subprocess
import json
import re
from pathlib import Path
from datetime import datetime
from karaoke_downloader.tracking_manager import TrackingManager, SongStatus, FormatType
from karaoke_downloader.id3_utils import add_id3_tags, extract_artist_title
from karaoke_downloader.songlist_manager import (
load_songlist, load_songlist_tracking, save_songlist_tracking,
is_songlist_song_downloaded, mark_songlist_song_downloaded, normalize_title,
load_server_songs, is_song_on_server
)
from karaoke_downloader.youtube_utils import get_channel_info, get_playlist_info
import logging
DATA_DIR = Path("data")
class KaraokeDownloader:
def __init__(self):
self.yt_dlp_path = Path("downloader/yt-dlp.exe")
self.downloads_dir = Path("downloads")
self.logs_dir = Path("logs")
self.downloads_dir.mkdir(exist_ok=True)
self.logs_dir.mkdir(exist_ok=True)
self.tracker = TrackingManager(tracking_file=DATA_DIR / "karaoke_tracking.json", cache_file=DATA_DIR / "channel_cache.json")
self.config = self._load_config()
self.songlist_tracking_file = DATA_DIR / "songlist_tracking.json"
self.songlist_tracking = load_songlist_tracking(str(self.songlist_tracking_file))
# Load server songs for availability checking
self.server_songs = load_server_songs()
def _load_config(self):
config_file = DATA_DIR / "config.json"
if config_file.exists():
try:
with open(config_file, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError) as e:
print(f"Warning: Could not load config.json: {e}")
return {
"download_settings": {
"format": "best[height<=720][ext=mp4]/best[height<=720]/best[ext=mp4]/best",
"preferred_resolution": "720p",
"audio_format": "mp3",
"audio_quality": "0",
"subtitle_language": "en",
"subtitle_format": "srt",
"write_metadata": False,
"write_thumbnail": False,
"write_description": False,
"write_annotations": False,
"write_comments": False,
"write_subtitles": False,
"embed_metadata": False,
"add_metadata": False,
"continue_downloads": True,
"no_overwrites": True,
"ignore_errors": True,
"no_warnings": False
},
"folder_structure": {
"downloads_dir": "downloads",
"logs_dir": "logs",
"tracking_file": str(DATA_DIR / "karaoke_tracking.json")
},
"logging": {
"level": "INFO",
"format": "%(asctime)s - %(levelname)s - %(message)s",
"include_console": True,
"include_file": True
},
"yt_dlp_path": "downloader/yt-dlp.exe"
}
def reset_channel_downloads(self, channel_name, reset_songlist=False, delete_files=False):
"""
Reset all tracking and optionally files for a channel.
If reset_songlist is False, songlist songs are preserved (tracking and files).
If reset_songlist is True, songlist songs for this channel are also reset/deleted.
"""
print(f"\n🔄 Resetting channel: {channel_name} (reset_songlist={reset_songlist}, delete_files={delete_files})")
# Find channel_id from channel_name
channel_id = None
for pid, playlist in self.tracker.data.get('playlists', {}).items():
if playlist['name'] == channel_name or pid == channel_name:
channel_id = pid
break
if not channel_id:
print(f"❌ Channel '{channel_name}' not found in tracking.")
return
# Get all songs for this channel
songs_to_reset = []
for song_id, song in self.tracker.data.get('songs', {}).items():
if song['playlist_id'] == channel_id:
# Check if this is a songlist song
artist, title = song.get('artist', ''), song.get('title', song.get('name', ''))
key = f"{artist.lower()}_{normalize_title(title)}"
is_songlist = key in self.songlist_tracking
if is_songlist and not reset_songlist:
continue # skip songlist songs if not resetting them
songs_to_reset.append((song_id, song, is_songlist))
# Reset tracking and optionally delete files
files_preserved = 0
files_deleted = 0
for song_id, song, is_songlist in songs_to_reset:
# Remove from main tracking
self.tracker.data['songs'][song_id]['status'] = 'NOT_DOWNLOADED'
self.tracker.data['songs'][song_id]['formats'] = {}
self.tracker.data['songs'][song_id]['last_error'] = ''
self.tracker.data['songs'][song_id]['download_attempts'] = 0
self.tracker.data['songs'][song_id]['last_updated'] = None
# Remove from songlist tracking if needed
if is_songlist and reset_songlist:
artist, title = song.get('artist', ''), song.get('title', song.get('name', ''))
key = f"{artist.lower()}_{normalize_title(title)}"
if key in self.songlist_tracking:
del self.songlist_tracking[key]
# Delete file if requested
if delete_files:
file_path = song.get('file_path')
if file_path:
try:
p = Path(file_path)
if p.exists():
p.unlink()
files_deleted += 1
else:
files_preserved += 1
except Exception as e:
print(f"⚠️ Could not delete file {file_path}: {e}")
# --- FIX: Remove all songlist_tracking entries for this channel if reset_songlist is True ---
if reset_songlist:
keys_to_remove = [k for k, v in self.songlist_tracking.items() if v.get('channel') == channel_name]
for k in keys_to_remove:
del self.songlist_tracking[k]
# Save changes
self.tracker.force_save()
save_songlist_tracking(self.songlist_tracking, str(self.songlist_tracking_file))
print(f"✅ Reset {len(songs_to_reset)} songs for channel '{channel_name}'.")
if delete_files:
print(f" Files deleted: {files_deleted}, files preserved: {files_preserved}")
if not reset_songlist:
print(f" Songlist songs were preserved.")
def download_from_file(self, file_path, force_refresh=False):
file = Path(file_path)
if not file.exists():
print(f"❌ File not found: {file_path}")
return False
with open(file, "r", encoding="utf-8") as f:
urls = [line.strip() for line in f if line.strip() and not line.strip().startswith("#")]
if not urls:
print(f"❌ No URLs found in {file_path}")
return False
all_success = True
for url in urls:
print(f"\n➡️ Processing: {url}")
success = self.download_channel_videos(url, force_refresh=force_refresh)
if not success:
all_success = False
return all_success
def download_channel_videos(self, url, force_refresh=False):
"""
Download videos from a channel or playlist URL, respecting songlist-only and limit flags.
Only download the first N matches from the songlist (N = self.config.get('limit', 1)).
"""
channel_name, channel_id = get_channel_info(url)
print(f"\n🎬 Downloading from channel: {channel_name} ({url})")
songlist = load_songlist()
if not songlist:
print("⚠️ No songlist loaded. Skipping.")
return False
# Get limit from config or default to 1
limit = self.config.get('limit', 1)
# --- Get channel video list with yt-dlp --flat-playlist ---
cmd = [
str(self.yt_dlp_path),
'--flat-playlist',
'--print', '%(title)s|%(id)s|%(url)s',
url
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
lines = result.stdout.strip().splitlines()
except subprocess.CalledProcessError as e:
print(f"❌ yt-dlp failed to fetch playlist: {e}")
return False
# Build a list of available videos
available_videos = []
for line in lines:
parts = line.split('|')
if len(parts) >= 2:
title, video_id = parts[0].strip(), parts[1].strip()
available_videos.append({'title': title, 'id': video_id})
# Normalize songlist for matching
normalized_songlist = {
f"{s['artist'].lower()}_{normalize_title(s['title'])}": s for s in songlist
}
# Find matches
matches = []
for video in available_videos:
artist, title = extract_artist_title(video['title'])
key = f"{artist.lower()}_{normalize_title(title)}"
if key in normalized_songlist:
# Check if already downloaded or on server
if is_songlist_song_downloaded(self.songlist_tracking, artist, title):
continue
if is_song_on_server(self.server_songs, artist, title):
print(f"🎵 Song already on server: {artist} - {title}")
continue
matches.append((video, normalized_songlist[key]))
if len(matches) >= limit:
break
if not matches:
print("🎵 No new songlist matches found for this channel.")
return True
# Download only the first N matches
for video, song in matches:
artist, title = song['artist'], song['title']
output_path = self.downloads_dir / channel_name / f"{artist} - {title} (Karaoke Version).mp4"
output_path.parent.mkdir(parents=True, exist_ok=True)
print(f"⬇️ Downloading: {artist} - {title} -> {output_path}")
# Download by video ID
video_url = f"https://www.youtube.com/watch?v={video['id']}"
cmd = [
str(self.yt_dlp_path),
"-o", str(output_path),
"-f", self.config["download_settings"]["format"],
video_url
]
try:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as e:
print(f"❌ yt-dlp failed: {e}")
continue
if not output_path.exists() or output_path.stat().st_size == 0:
print(f"❌ Download failed or file is empty: {output_path}")
continue
# TEMP: Skipping MP4 validation for debugging
# if not self._is_valid_mp4(output_path):
# print(f"❌ File is not a valid MP4: {output_path}")
# continue
add_id3_tags(output_path, f"{artist} - {title} (Karaoke Version)", channel_name)
mark_songlist_song_downloaded(self.songlist_tracking, artist, title, channel_name, output_path)
print(f"✅ Downloaded and tracked: {artist} - {title}")
print(f"🎉 All post-processing complete for: {output_path}")
return True
def download_songlist_across_channels(self, channel_urls, limit=None):
"""
For each song in the songlist, try each channel in order and download from the first channel where it is found.
Download up to 'limit' songs, skipping any that cannot be found, until the limit is reached or all possible matches are exhausted.
"""
songlist = load_songlist()
if not songlist:
print("⚠️ No songlist loaded. Skipping.")
return False
# Filter for songs not yet downloaded
undownloaded = [s for s in songlist if not is_songlist_song_downloaded(self.songlist_tracking, s['artist'], s['title'])]
print(f"🎯 {len(songlist)} total unique songs in songlist.")
print(f"🎯 {len(undownloaded)} unique songlist songs to download.")
# Further filter out songs already on server
not_on_server = [s for s in undownloaded if not is_song_on_server(self.server_songs, s['artist'], s['title'])]
server_available = len(undownloaded) - len(not_on_server)
if server_available > 0:
print(f"🎵 {server_available} songs already available on server, skipping.")
undownloaded = not_on_server
print(f"🎯 {len(undownloaded)} songs need to be downloaded.")
if not undownloaded:
print("🎵 All songlist songs already downloaded.")
return True
print("🔍 Songs to search for:")
for song in undownloaded:
print(f" - {song['artist']} - {song['title']}")
downloaded_count = 0
attempted = set()
total_to_download = limit if limit is not None else len(undownloaded)
print(f"\n🎬 Processing {len(channel_urls)} channels for song matches...")
# Keep looping until limit is reached or no more undownloaded songs
while undownloaded and (limit is None or downloaded_count < limit):
for song in list(undownloaded):
if limit is not None and downloaded_count >= limit:
break
artist, title = song['artist'], song['title']
if (artist, title) in attempted:
continue
found = False
print(f"\n🔍 Searching for: {artist} - {title}")
for channel_url in channel_urls:
channel_name, channel_id = get_channel_info(channel_url)
available_videos = self.tracker.get_channel_video_list(
channel_url,
yt_dlp_path=str(self.yt_dlp_path),
force_refresh=False
)
for video in available_videos:
v_artist, v_title = extract_artist_title(video['title'])
if (normalize_title(v_artist) == normalize_title(artist) and normalize_title(v_title) == normalize_title(title)) or \
(normalize_title(video['title']) == normalize_title(f"{artist} - {title}")):
# Progress print statement
print(f"📥 Downloading {downloaded_count + 1} of {total_to_download} songlist songs...")
print(f"🎯 Found on channel: {channel_name}")
# Download this song from this channel
# Create a shorter, safer filename - do this ONCE and use consistently
safe_title = title.replace("(From ", "").replace(")", "").replace(" - ", " ").replace(":", "").replace("'", "").replace('"', "")
safe_artist = artist.replace("'", "").replace('"', "")
# Remove all Windows-invalid characters
invalid_chars = ['?', ':', '*', '"', '<', '>', '|', '/', '\\']
for char in invalid_chars:
safe_title = safe_title.replace(char, "")
safe_artist = safe_artist.replace(char, "")
# Also remove any other potentially problematic characters
safe_title = safe_title.replace("...", "").replace("..", "").replace(".", "").strip()
safe_artist = safe_artist.strip()
filename = f"{safe_artist} - {safe_title}.mp4"
# Limit filename length to avoid Windows path issues
if len(filename) > 100:
filename = f"{safe_artist[:30]} - {safe_title[:60]}.mp4"
output_path = self.downloads_dir / channel_name / filename
output_path.parent.mkdir(parents=True, exist_ok=True)
print(f"⬇️ Downloading: {artist} - {title} -> {output_path}")
video_url = f"https://www.youtube.com/watch?v={video['id']}"
dlp_cmd = [
str(self.yt_dlp_path),
"--no-check-certificates",
"--ignore-errors",
"--no-warnings",
"-o", str(output_path),
"-f", self.config["download_settings"]["format"],
video_url
]
print(f"🔧 Running command: {' '.join(dlp_cmd)}")
print(f"📺 Resolution settings: {self.config.get('download_settings', {}).get('preferred_resolution', 'Unknown')}")
print(f"🎬 Format string: {self.config.get('download_settings', {}).get('format', 'Unknown')}")
# Debug: Show available formats (optional)
if self.config.get('debug_show_formats', False):
print(f"🔍 Checking available formats for: {video_url}")
format_cmd = [
str(self.yt_dlp_path),
"--list-formats",
video_url
]
try:
format_result = subprocess.run(format_cmd, capture_output=True, text=True, timeout=30)
print(f"📋 Available formats:\n{format_result.stdout}")
except Exception as e:
print(f"⚠️ Could not check formats: {e}")
try:
result = subprocess.run(dlp_cmd, capture_output=True, text=True, check=True)
print(f"✅ yt-dlp completed successfully")
print(f"📄 yt-dlp stdout: {result.stdout}")
except subprocess.CalledProcessError as e:
print(f"❌ yt-dlp failed with exit code {e.returncode}")
print(f"❌ yt-dlp stderr: {e.stderr}")
continue
if not output_path.exists():
print(f"❌ Download failed: file does not exist: {output_path}")
# Check if yt-dlp saved it somewhere else
possible_files = list(output_path.parent.glob("*.mp4"))
if possible_files:
print(f"🔍 Found these files in the directory: {[f.name for f in possible_files]}")
# Look for a file that matches our pattern (artist - title)
artist_part = safe_artist.lower()
title_part = safe_title.lower()
for file in possible_files:
file_lower = file.stem.lower()
if artist_part in file_lower and any(word in file_lower for word in title_part.split()):
print(f"🎯 Found matching file: {file.name}")
output_path = file
break
else:
print(f"❌ No matching file found for: {artist} - {title}")
continue
else:
continue
if output_path.stat().st_size == 0:
print(f"❌ Download failed: file is empty (0 bytes): {output_path}")
continue
# TEMP: Skipping MP4 validation for debugging
# if not self._is_valid_mp4(output_path):
# print(f"❌ File is not a valid MP4: {output_path}")
# continue
add_id3_tags(output_path, f"{artist} - {title} (Karaoke Version)", channel_name)
mark_songlist_song_downloaded(self.songlist_tracking, artist, title, channel_name, output_path)
print(f"✅ Downloaded and tracked: {artist} - {title}")
print(f"🎉 All post-processing complete for: {output_path}")
downloaded_count += 1
found = True
break # Only download from first channel where found
if found:
break
attempted.add((artist, title))
if found:
undownloaded.remove(song)
# If no new downloads in this pass, break to avoid infinite loop
if downloaded_count == 0 or (limit is not None and downloaded_count >= limit):
break
if undownloaded:
print(f"⚠️ {len(undownloaded)} songlist songs could not be found in any channel:")
for song in undownloaded:
print(f" - {song['artist']} - {song['title']}")
print(f"🎉 Downloaded {downloaded_count} songlist songs.")
print(f"📊 Summary: Processed {len(channel_urls)} channels, found {downloaded_count} songs, {len(undownloaded)} songs not found.")
return True
def _is_valid_mp4(self, file_path):
"""Check if the file is a valid MP4 using ffprobe, if available."""
try:
cmd = ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=codec_name", "-of", "default=noprint_wrappers=1:nokey=1", str(file_path)]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return "mp4" in result.stdout or "h264" in result.stdout or "hevc" in result.stdout
except Exception:
# If ffprobe is not available, skip the check
return True
def reset_songlist_all():
"""Delete all files tracked in songlist_tracking.json, clear songlist_tracking.json, and remove songlist songs from karaoke_tracking.json."""
import json
from pathlib import Path
# Load songlist tracking
songlist_tracking_file = Path('data/songlist_tracking.json')
karaoke_tracking_file = Path('data/karaoke_tracking.json')
if songlist_tracking_file.exists():
with open(songlist_tracking_file, 'r', encoding='utf-8') as f:
tracking = json.load(f)
else:
tracking = {}
# Delete all files tracked
for entry in tracking.values():
file_path = entry.get('file_path')
if file_path:
p = Path(file_path)
try:
if p.exists():
p.unlink()
print(f"🗑️ Deleted: {p}")
except Exception as e:
print(f"⚠️ Could not delete {p}: {e}")
# Clear songlist_tracking.json
songlist_tracking_file.write_text("{}", encoding="utf-8")
print("🧹 Cleared songlist_tracking.json")
# Remove songlist songs from karaoke_tracking.json
if karaoke_tracking_file.exists():
with open(karaoke_tracking_file, 'r', encoding='utf-8') as f:
karaoke_data = json.load(f)
song_keys_to_remove = []
for song_id, song in karaoke_data.get('songs', {}).items():
artist = song.get('artist', '')
title = song.get('title', song.get('name', ''))
key = f"{artist.lower()}_{normalize_title(title)}"
if key in tracking:
song_keys_to_remove.append(song_id)
for song_id in song_keys_to_remove:
del karaoke_data['songs'][song_id]
with open(karaoke_tracking_file, 'w', encoding='utf-8') as f:
json.dump(karaoke_data, f, indent=2, ensure_ascii=False)
print(f"🧹 Removed {len(song_keys_to_remove)} songlist songs from karaoke_tracking.json")
print("✅ Global songlist reset complete.")
# For brevity, the rest of the class methods should be copied here from the original download_karaoke.py,
# updating all references to use the new karaoke_downloader.* imports as needed.