KaraokeVideoDownloader/karaoke_downloader/cli.py

677 lines
26 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Karaoke Video Downloader CLI
Command-line interface for the karaoke video downloader.
"""
import argparse
import os
import sys
from pathlib import Path
from typing import List
from karaoke_downloader.channel_parser import ChannelParser
from karaoke_downloader.config_manager import AppConfig
from karaoke_downloader.downloader import KaraokeDownloader
# Constants
DEFAULT_LATEST_PER_CHANNEL_LIMIT = 10
DEFAULT_FUZZY_THRESHOLD = 85
def load_channels_from_json(channels_file: str = "data/channels.json") -> List[str]:
"""
Load channel URLs from the new JSON format.
Args:
channels_file: Path to the channels.json file
Returns:
List of channel URLs
"""
try:
parser = ChannelParser(channels_file)
channels = parser.channels_config.get("channels", [])
return [channel["url"] for channel in channels]
except Exception as e:
print(f"❌ Error loading channels from {channels_file}: {e}")
return []
def load_channels_from_text(channels_file: str = "data/channels.txt") -> List[str]:
"""
Load channel URLs from the old text format (for backward compatibility).
Args:
channels_file: Path to the channels.txt file
Returns:
List of channel URLs
"""
try:
with open(channels_file, "r", encoding="utf-8") as f:
return [
line.strip()
for line in f
if line.strip() and not line.strip().startswith("#")
]
except Exception as e:
print(f"❌ Error loading channels from {channels_file}: {e}")
return []
def load_channels(channel_file: str = None) -> List[str]:
"""
Load channel URLs from either JSON or text format.
Args:
channel_file: Path to the channel file (optional)
Returns:
List of channel URLs
"""
if channel_file:
# Use the specified file
if channel_file.endswith('.json'):
return load_channels_from_json(channel_file)
else:
return load_channels_from_text(channel_file)
else:
# Try JSON first, then fall back to text
json_file = "data/channels.json"
txt_file = "data/channels.txt"
if os.path.exists(json_file):
print(f"📋 Using new JSON format: {json_file}")
return load_channels_from_json(json_file)
elif os.path.exists(txt_file):
print(f"📋 Using legacy text format: {txt_file}")
return load_channels_from_text(txt_file)
else:
print("❌ No channel file found. Please create data/channels.json or data/channels.txt")
return []
def main():
parser = argparse.ArgumentParser(
description="Karaoke Video Downloader - Download YouTube playlists and channel videos for karaoke (default: downloads latest videos from all channels)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python download_karaoke.py --limit 10 # Download latest 10 videos from all channels
python download_karaoke.py --songlist-only --limit 10 # Download only songlist songs across channels
python download_karaoke.py https://www.youtube.com/@SingKingKaraoke/videos # Download from specific channel
python download_karaoke.py --file data/channels.txt # Download from custom channel list
python download_karaoke.py --manual --limit 5 # Download from manual videos collection
python download_karaoke.py --reset-channel SingKingKaraoke --delete-files
""",
)
parser.add_argument(
"url", nargs="?", help="YouTube playlist or channel URL to download"
)
parser.add_argument(
"--file",
"-f",
help="Text file containing playlist or channel URLs (one per line)",
)
parser.add_argument(
"--status",
"-s",
action="store_true",
help="Show download status and statistics",
)
parser.add_argument(
"--report",
"-r",
metavar="PLAYLIST_ID",
help="Generate a detailed report for a specific playlist",
)
parser.add_argument(
"--sync",
"--sync-only",
action="store_true",
help="Only sync playlist without downloading (update tracking)",
)
parser.add_argument(
"--cleanup", action="store_true", help="Clean up orphaned tracking entries"
)
parser.add_argument(
"--resolution",
"--res",
choices=["480p", "720p", "1080p", "1440p", "2160p"],
default="720p",
help="Preferred video resolution (default: 720p)",
)
parser.add_argument(
"--limit",
"-l",
type=int,
help="Limit the number of videos to download (e.g., --limit 10)",
)
parser.add_argument(
"--refresh",
"--force-refresh",
action="store_true",
help="Force refresh channel cache (ignore cached data)",
)
parser.add_argument(
"--cache-info", action="store_true", help="Show channel cache information"
)
parser.add_argument(
"--clear-cache",
metavar="CHANNEL_ID",
nargs="?",
const="all",
help="Clear cache for specific channel or all channels (use --clear-cache all)",
)
parser.add_argument(
"--cache-duration",
type=int,
metavar="HOURS",
help="Set cache duration in hours (default: 24)",
)
parser.add_argument(
"--songlist-priority",
action="store_true",
help="Prioritize downloads based on data/songList.json (default: enabled)",
)
parser.add_argument(
"--no-songlist-priority",
action="store_true",
help="Disable songlist prioritization",
)
parser.add_argument(
"--generate-unmatched-report",
action="store_true",
help="Generate a report of songs that couldn't be found in any channel (runs after downloads)",
)
parser.add_argument(
"--show-pagination",
action="store_true",
help="Show page-by-page progress when downloading channel video lists (slower but more detailed)",
)
parser.add_argument(
"--parallel-channels",
action="store_true",
help="Enable parallel channel scanning for faster channel processing (scans multiple channels simultaneously)",
)
parser.add_argument(
"--channel-workers",
type=int,
default=3,
help="Number of parallel channel scanning workers (default: 3, max: 10)",
)
parser.add_argument(
"--songlist-only",
action="store_true",
help="Only download songs that are in the songlist (skip all others)",
)
parser.add_argument(
"--songlist-focus",
nargs="+",
metavar="PLAYLIST_TITLE",
help='Focus on specific playlists by title (e.g., --songlist-focus "2025 - Apple Top 50" "2024 - Billboard Hot 100")',
)
parser.add_argument(
"--songlist-file",
metavar="FILE_PATH",
help="Custom songlist file path to use with --songlist-focus (default: data/songList.json)",
)
parser.add_argument(
"--force",
action="store_true",
help="Force download from channels regardless of whether songs are already downloaded, on server, or marked as duplicates",
)
parser.add_argument(
"--songlist-status",
action="store_true",
help="Show songlist download status and statistics",
)
parser.add_argument(
"--reset-channel",
metavar="CHANNEL_NAME",
help="Reset all tracking and files for a channel",
)
parser.add_argument(
"--reset-songlist",
action="store_true",
help="When used with --reset-channel, also reset songlist songs for this channel",
)
parser.add_argument(
"--reset-songlist-all",
action="store_true",
help="Reset all songlist tracking and delete all songlist-downloaded files (global)",
)
parser.add_argument(
"--clear-server-duplicates",
action="store_true",
help="Clear server duplicates tracking (allows re-checking songs against server)",
)
parser.add_argument(
"--version", "-v", action="version", version="Karaoke Playlist Downloader v1.0"
)
parser.add_argument(
"--force-download-plan",
action="store_true",
help="Force refresh the download plan cache (re-scan all channels for matches)",
)
parser.add_argument(
"--latest-per-channel",
action="store_true",
help="Download the latest N videos from each channel (use with --limit) [DEPRECATED: This is now the default behavior]",
)
parser.add_argument(
"--fuzzy-match",
action="store_true",
help="Enable fuzzy matching for songlist-to-video matching (uses rapidfuzz if available)",
)
parser.add_argument(
"--fuzzy-threshold",
type=int,
default=DEFAULT_FUZZY_THRESHOLD,
help=f"Fuzzy match threshold (0-100, default {DEFAULT_FUZZY_THRESHOLD})",
)
parser.add_argument(
"--parallel",
action="store_true",
help="Enable parallel downloads for improved speed (3-5x faster for large batches, defaults to 3 workers)",
)
parser.add_argument(
"--workers",
type=int,
default=3,
help="Number of parallel download workers (default: 3, max: 10, only used with --parallel)",
)
parser.add_argument(
"--generate-songlist",
nargs="+",
metavar="DIRECTORY",
help="Generate song list from MP4 files with ID3 tags in specified directories",
)
parser.add_argument(
"--no-append-songlist",
action="store_true",
help="Create a new song list instead of appending when using --generate-songlist",
)
parser.add_argument(
"--manual",
action="store_true",
help="Download from manual videos collection (data/manual_videos.json)",
)
args = parser.parse_args()
# Validate workers argument
if args.workers < 1 or args.workers > 10:
print("❌ Error: --workers must be between 1 and 10")
sys.exit(1)
# Validate channel workers argument
if args.channel_workers < 1 or args.channel_workers > 10:
print("❌ Error: --channel-workers must be between 1 and 10")
sys.exit(1)
yt_dlp_path = Path("downloader/yt-dlp.exe")
if not yt_dlp_path.exists():
print("❌ Error: yt-dlp.exe not found in downloader/ directory")
print("Please ensure yt-dlp.exe is present in the downloader/ folder")
sys.exit(1)
downloader = KaraokeDownloader()
# Set parallel download options
if args.parallel:
downloader.enable_parallel_downloads = True
downloader.parallel_workers = args.workers
print(f"⚡ Parallel downloads enabled with {args.workers} workers")
else:
downloader.enable_parallel_downloads = False
print("🐌 Sequential downloads enabled")
if args.limit:
downloader.download_limit = args.limit
print(f"🎯 Download limit set to: {args.limit} videos")
if args.no_songlist_priority:
downloader.use_songlist_priority = False
print("🎯 Songlist prioritization disabled")
else:
downloader.use_songlist_priority = True
print("🎯 Songlist prioritization enabled")
if args.songlist_only:
downloader.songlist_only = True
print("🎯 Songlist-only mode enabled (will only download songlist songs)")
if args.songlist_focus:
downloader.songlist_focus_titles = args.songlist_focus
downloader.songlist_only = True # Enable songlist-only mode when focusing
args.songlist_only = True # Also set the args flag to ensure CLI logic works
print(
f"🎯 Songlist focus mode enabled for playlists: {', '.join(args.songlist_focus)}"
)
if args.songlist_file:
downloader.songlist_file_path = args.songlist_file
print(f"📁 Using custom songlist file: {args.songlist_file}")
if args.force:
downloader.force_download = True
print("💪 Force mode enabled - will download regardless of existing files or server duplicates")
if args.resolution != "720p":
downloader.config_manager.update_resolution(args.resolution)
# --- NEW: Reset channel CLI command ---
if args.reset_channel:
downloader.reset_channel_downloads(
args.reset_channel,
reset_songlist=args.reset_songlist,
delete_files=True, # Always delete files if they exist
)
sys.exit(0)
# --- END NEW ---
# --- NEW: If no URL or file is provided, but --songlist-only is set, use all channels ---
if (args.songlist_only or args.songlist_focus) and not args.url and not args.file:
channel_urls = load_channels()
if channel_urls:
print(
"📋 No URL or --file provided, defaulting to all configured channels for songlist mode."
)
else:
print(
"❌ No URL, --file, or channel configuration found. Please provide a channel URL or create data/channels.json."
)
sys.exit(1)
# --- END NEW ---
if args.reset_songlist_all:
from karaoke_downloader.downloader import reset_songlist_all
reset_songlist_all()
print("✅ All songlist tracking and files have been reset.")
sys.exit(0)
if args.clear_server_duplicates:
from karaoke_downloader.server_manager import save_server_duplicates_tracking
save_server_duplicates_tracking({})
print("✅ Server duplicates tracking has been cleared.")
print(" Songs will be re-checked against the server on next run.")
sys.exit(0)
if args.generate_songlist:
from karaoke_downloader.songlist_generator import SongListGenerator
print("🎵 Generating song list from MP4 files with ID3 tags...")
generator = SongListGenerator()
try:
generator.generate_songlist_from_multiple_directories(
args.generate_songlist,
append=not args.no_append_songlist
)
print("✅ Song list generation completed successfully!")
except Exception as e:
print(f"❌ Error generating song list: {e}")
sys.exit(1)
sys.exit(0)
if args.status:
stats = downloader.tracker.get_statistics()
print("🎤 Karaoke Downloader Status")
print("=" * 40)
print(f"Total Songs: {stats['total_songs']}")
print(f"Total Playlists: {stats['total_playlists']}")
print(f"Downloaded Songs: {stats['downloaded_songs']}")
print(f"Failed Songs: {stats['failed_songs']}")
print(f"Partial Downloads: {stats['partial_songs']}")
print(f"Total Size: {stats['total_size_mb']} MB")
print(f"Last Updated: {stats['last_updated']}")
sys.exit(0)
elif args.cache_info:
cache_info = downloader.tracker.get_cache_info()
print("💾 Channel Cache Information")
print("=" * 40)
print(f"Total Channels: {cache_info['total_channels']}")
print(f"Total Cached Videos: {cache_info['total_cached_videos']}")
print(f"Cache Duration: {cache_info['cache_duration_hours']} hours")
print(f"Last Updated: {cache_info['last_updated']}")
sys.exit(0)
elif args.clear_cache:
if args.clear_cache == "all":
downloader.tracker.clear_channel_cache()
print("🧹 Cleared all channel caches")
else:
downloader.tracker.clear_channel_cache(args.clear_cache)
print(f"🧹 Cleared cache for channel: {args.clear_cache}")
sys.exit(0)
elif args.cache_duration:
downloader.tracker.set_cache_duration(args.cache_duration)
print(f"⏰ Cache duration set to {args.cache_duration} hours")
sys.exit(0)
elif args.report:
report = downloader.tracker.export_playlist_report(args.report)
print(report)
sys.exit(0)
elif args.cleanup:
orphaned = downloader.tracker.cleanup_orphaned_files(downloader.downloads_dir)
print(f"🧹 Cleaned up {len(orphaned)} orphaned tracking entries")
sys.exit(0)
elif args.songlist_status:
songlist = downloader._load_songlist()
tracking = downloader.songlist_tracking
print("📋 Songlist Download Status")
print("=" * 40)
print(f"Total songs in list: {len(songlist)}")
print(f"Songs downloaded: {len(tracking)}")
print(f"Songs remaining: {len(songlist) - len(tracking)}")
print(f"Progress: {(len(tracking) / len(songlist) * 100):.1f}%")
if tracking:
print(f"\n📁 Downloaded songs:")
for key, info in list(tracking.items())[:10]:
print(
f"{info['artist']} - {info['title']} (from {info['channel']})"
)
if len(tracking) > 10:
print(f" ... and {len(tracking) - 10} more")
sys.exit(0)
elif args.manual:
# Download from manual videos collection
print("🎤 Downloading from manual videos collection...")
success = downloader.download_channel_videos(
"manual://static",
force_refresh=args.refresh,
fuzzy_match=args.fuzzy_match,
fuzzy_threshold=args.fuzzy_threshold,
)
elif args.songlist_only or args.songlist_focus:
# Use provided file or default to channels configuration
channel_urls = load_channels(args.file)
if not channel_urls:
print(f"❌ No channels found in configuration")
sys.exit(1)
limit = args.limit if args.limit else None
success = downloader.download_songlist_across_channels(
channel_urls,
limit=args.limit,
force_refresh_download_plan=args.force_download_plan if hasattr(args, "force_download_plan") else False,
fuzzy_match=args.fuzzy_match,
fuzzy_threshold=args.fuzzy_threshold,
force_download=args.force,
show_pagination=args.show_pagination,
parallel_channels=args.parallel_channels,
max_channel_workers=args.channel_workers,
)
elif args.latest_per_channel:
# Use provided file or default to channels configuration
channel_urls = load_channels(args.file)
if not channel_urls:
print(f"❌ No channels found in configuration")
sys.exit(1)
limit = args.limit if args.limit else DEFAULT_LATEST_PER_CHANNEL_LIMIT
force_refresh_download_plan = (
args.force_download_plan if hasattr(args, "force_download_plan") else False
)
fuzzy_match = args.fuzzy_match if hasattr(args, "fuzzy_match") else False
fuzzy_threshold = (
args.fuzzy_threshold
if hasattr(args, "fuzzy_threshold")
else DEFAULT_FUZZY_THRESHOLD
)
success = downloader.download_latest_per_channel(
channel_urls,
limit=limit,
force_refresh_download_plan=force_refresh_download_plan,
fuzzy_match=fuzzy_match,
fuzzy_threshold=fuzzy_threshold,
force_download=args.force,
)
elif args.url:
success = downloader.download_channel_videos(
args.url, force_refresh=args.refresh
)
else:
# Default behavior: download from channels (equivalent to --latest-per-channel)
print("🎯 No specific mode specified, defaulting to download from channels")
channel_urls = load_channels(args.file)
if not channel_urls:
print(f"❌ No channels found in configuration")
print("Please provide a channel URL or create data/channels.json")
sys.exit(1)
limit = args.limit if args.limit else DEFAULT_LATEST_PER_CHANNEL_LIMIT
force_refresh_download_plan = (
args.force_download_plan if hasattr(args, "force_download_plan") else False
)
fuzzy_match = args.fuzzy_match if hasattr(args, "fuzzy_match") else False
fuzzy_threshold = (
args.fuzzy_threshold
if hasattr(args, "fuzzy_threshold")
else DEFAULT_FUZZY_THRESHOLD
)
success = downloader.download_latest_per_channel(
channel_urls,
limit=limit,
force_refresh_download_plan=force_refresh_download_plan,
fuzzy_match=fuzzy_match,
fuzzy_threshold=fuzzy_threshold,
force_download=args.force,
)
# Generate unmatched report if requested (additive feature)
if args.generate_unmatched_report:
from karaoke_downloader.download_planner import generate_unmatched_report, build_download_plan
from karaoke_downloader.songlist_manager import load_songlist
print("\n🔍 Generating unmatched songs report...")
# Load songlist based on focus mode
if args.songlist_focus:
# Load focused playlists
songlist_file_path = args.songlist_file if args.songlist_file else "data/songList.json"
songlist_file = Path(songlist_file_path)
if not songlist_file.exists():
print(f"⚠️ Songlist file not found: {songlist_file_path}")
else:
try:
with open(songlist_file, "r", encoding="utf-8") as f:
raw_data = json.load(f)
# Filter playlists by title
focused_playlists = []
for playlist in raw_data:
playlist_title = playlist.get("title", "")
if playlist_title in args.songlist_focus:
focused_playlists.append(playlist)
if focused_playlists:
# Flatten the focused playlists into songs
focused_songs = []
seen = set()
for playlist in focused_playlists:
if "songs" in playlist:
for song in playlist["songs"]:
if "artist" in song and "title" in song:
artist = song["artist"].strip()
title = song["title"].strip()
key = f"{artist.lower()}_{title.lower()}"
if key in seen:
continue
seen.add(key)
focused_songs.append(
{
"artist": artist,
"title": title,
"position": song.get("position", 0),
}
)
songlist = focused_songs
else:
print(f"⚠️ No playlists found matching: {', '.join(args.songlist_focus)}")
songlist = []
except (json.JSONDecodeError, FileNotFoundError) as e:
print(f"⚠️ Could not load songlist for report: {e}")
songlist = []
else:
# Load all songs from songlist
songlist_path = args.songlist_file if args.songlist_file else "data/songList.json"
songlist = load_songlist(songlist_path)
if songlist:
# Load channel URLs
channel_file = args.file if args.file else "data/channels.txt"
if os.path.exists(channel_file):
with open(channel_file, "r", encoding='utf-8') as f:
channel_urls = [
line.strip()
for line in f
if line.strip() and not line.strip().startswith("#")
]
print(f"📋 Analyzing {len(songlist)} songs against {len(channel_urls)} channels...")
# Build download plan to get unmatched songs
fuzzy_match = args.fuzzy_match if hasattr(args, "fuzzy_match") else False
fuzzy_threshold = (
args.fuzzy_threshold
if hasattr(args, "fuzzy_threshold")
else DEFAULT_FUZZY_THRESHOLD
)
try:
download_plan, unmatched = build_download_plan(
channel_urls,
songlist,
downloader.tracker,
downloader.yt_dlp_path,
fuzzy_match=fuzzy_match,
fuzzy_threshold=fuzzy_threshold,
)
if unmatched:
report_file = generate_unmatched_report(unmatched)
print(f"\n📋 Unmatched songs report generated successfully!")
print(f"📁 Report saved to: {report_file}")
print(f"📊 Summary: {len(download_plan)} songs found, {len(unmatched)} songs not found")
print(f"\n🔍 First 10 unmatched songs:")
for i, song in enumerate(unmatched[:10], 1):
print(f" {i:2d}. {song['artist']} - {song['title']}")
if len(unmatched) > 10:
print(f" ... and {len(unmatched) - 10} more songs")
else:
print(f"\n✅ All {len(songlist)} songs were found in the channels!")
except Exception as e:
print(f"❌ Error generating report: {e}")
else:
print(f"❌ Channel file not found: {channel_file}")
else:
print("❌ No songlist available for report generation")
# Initialize success variable
success = False
downloader.tracker.force_save()
if success:
print("\n🎤 All downloads completed successfully!")
sys.exit(0)
else:
print("\n⚠️ Some downloads may have failed. Check the logs for details.")
sys.exit(1)