KaraokeVideoDownloader/karaoke_downloader/cli.py

770 lines
31 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Karaoke Video Downloader CLI
Command-line interface for the karaoke video downloader.
"""
import argparse
import os
import sys
from pathlib import Path
from typing import List
from karaoke_downloader.channel_parser import ChannelParser
from karaoke_downloader.config_manager import AppConfig
from karaoke_downloader.data_path_manager import get_data_path_manager
from karaoke_downloader.downloader import KaraokeDownloader
# Constants
DEFAULT_LATEST_PER_CHANNEL_LIMIT = 10
DEFAULT_FUZZY_THRESHOLD = 85
def load_channels_from_json(channels_file: str = None) -> List[str]:
"""
Load channel URLs from the new JSON format.
Args:
channels_file: Path to the channels.json file (if None, uses default from config)
Returns:
List of channel URLs
"""
if channels_file is None:
channels_file = str(get_data_path_manager().get_channels_json_path())
try:
parser = ChannelParser(channels_file)
channels = parser.channels_config.get("channels", [])
return [channel["url"] for channel in channels]
except Exception as e:
print(f"❌ Error loading channels from {channels_file}: {e}")
return []
def load_channels_from_text(channels_file: str = None) -> List[str]:
"""
Load channel URLs from the old text format (for backward compatibility).
Args:
channels_file: Path to the channels.txt file (if None, uses default from config)
Returns:
List of channel URLs
"""
if channels_file is None:
channels_file = str(get_data_path_manager().get_channels_txt_path())
try:
with open(channels_file, "r", encoding="utf-8") as f:
return [
line.strip()
for line in f
if line.strip() and not line.strip().startswith("#")
]
except Exception as e:
print(f"❌ Error loading channels from {channels_file}: {e}")
return []
def load_channels(channel_file: str = None) -> List[str]:
"""Load channel URLs from file."""
if channel_file is None:
# Use JSON configuration
data_path_manager = get_data_path_manager()
if data_path_manager.file_exists("channels.json"):
return load_channels_from_json()
else:
return []
else:
if channel_file.endswith(".json"):
return load_channels_from_json(channel_file)
else:
return load_channels_from_text(channel_file)
def get_channel_url_by_name(channel_name: str) -> str:
"""Look up a channel URL by its name from the channels configuration."""
channel_urls = load_channels()
# Normalize the channel name for comparison
normalized_name = channel_name.lower().replace("@", "").replace("karaoke", "").strip()
for url in channel_urls:
# Extract channel name from URL
if "/@" in url:
url_channel_name = url.split("/@")[1].split("/")[0].lower()
if url_channel_name == normalized_name or url_channel_name.replace("karaoke", "").strip() == normalized_name:
return url
return None
def main():
parser = argparse.ArgumentParser(
description="Karaoke Video Downloader - Download YouTube playlists and channel videos for karaoke (default: downloads latest videos from all channels)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python download_karaoke.py --limit 10 # Download latest 10 videos from all channels
python download_karaoke.py --songlist-only --limit 10 # Download only songlist songs across channels
python download_karaoke.py --channel-focus SingKingKaraoke --limit 5 # Download from specific channel
python download_karaoke.py --channel-focus SingKingKaraoke --all-videos # Download ALL videos from channel
python download_karaoke.py https://www.youtube.com/@SingKingKaraoke/videos # Download from specific channel URL
python download_karaoke.py --file data/channels.txt # Download from custom channel list
python download_karaoke.py --reset-channel SingKingKaraoke --delete-files
""",
)
parser.add_argument(
"url", nargs="?", help="YouTube playlist or channel URL to download"
)
parser.add_argument(
"--file",
"-f",
help="Text file containing playlist or channel URLs (one per line)",
)
parser.add_argument(
"--status",
"-s",
action="store_true",
help="Show download status and statistics",
)
parser.add_argument(
"--report",
"-r",
metavar="PLAYLIST_ID",
help="Generate a detailed report for a specific playlist",
)
parser.add_argument(
"--sync",
"--sync-only",
action="store_true",
help="Only sync playlist without downloading (update tracking)",
)
parser.add_argument(
"--cleanup", action="store_true", help="Clean up orphaned tracking entries"
)
parser.add_argument(
"--resolution",
"--res",
choices=["480p", "720p", "1080p", "1440p", "2160p"],
default="720p",
help="Preferred video resolution (default: 720p)",
)
parser.add_argument(
"--limit",
"-l",
type=int,
help="Limit the number of videos to download (e.g., --limit 10)",
)
parser.add_argument(
"--refresh",
"--force-refresh",
action="store_true",
help="Force refresh channel cache (ignore cached data)",
)
parser.add_argument(
"--cache-info", action="store_true", help="Show channel cache information"
)
parser.add_argument(
"--clear-cache",
metavar="CHANNEL_ID",
nargs="?",
const="all",
help="Clear cache for specific channel or all channels (use --clear-cache all)",
)
parser.add_argument(
"--cache-duration",
type=int,
metavar="HOURS",
help="Set cache duration in hours (default: 24)",
)
parser.add_argument(
"--songlist-priority",
action="store_true",
help="Prioritize downloads based on songList.json in the data directory (default: enabled)",
)
parser.add_argument(
"--no-songlist-priority",
action="store_true",
help="Disable songlist prioritization",
)
parser.add_argument(
"--generate-unmatched-report",
action="store_true",
help="Generate a report of songs that couldn't be found in any channel (runs after downloads)",
)
parser.add_argument(
"--show-pagination",
action="store_true",
help="Show page-by-page progress when downloading channel video lists (slower but more detailed)",
)
parser.add_argument(
"--parallel-channels",
action="store_true",
help="Enable parallel channel scanning for faster channel processing (scans multiple channels simultaneously)",
)
parser.add_argument(
"--channel-workers",
type=int,
default=3,
help="Number of parallel channel scanning workers (default: 3, max: 10)",
)
parser.add_argument(
"--songlist-only",
action="store_true",
help="Only download songs that are in the songlist (skip all others)",
)
parser.add_argument(
"--songlist-focus",
nargs="+",
metavar="PLAYLIST_TITLE",
help='Focus on specific playlists by title (e.g., --songlist-focus "2025 - Apple Top 50" "2024 - Billboard Hot 100")',
)
parser.add_argument(
"--songlist-file",
metavar="FILE_PATH",
help="Custom songlist file path to use with --songlist-focus (default: songList.json in the data directory)",
)
parser.add_argument(
"--force",
action="store_true",
help="Force download from channels regardless of whether songs are already downloaded, on server, or marked as duplicates",
)
parser.add_argument(
"--songlist-status",
action="store_true",
help="Show songlist download status and statistics",
)
parser.add_argument(
"--reset-channel",
metavar="CHANNEL_NAME",
help="Reset all tracking and files for a channel",
)
parser.add_argument(
"--reset-songlist",
action="store_true",
help="When used with --reset-channel, also reset songlist songs for this channel",
)
parser.add_argument(
"--reset-songlist-all",
action="store_true",
help="Reset all songlist tracking and delete all songlist-downloaded files (global)",
)
parser.add_argument(
"--clear-server-duplicates",
action="store_true",
help="Clear server duplicates tracking (allows re-checking songs against server)",
)
parser.add_argument(
"--version", "-v", action="version", version="Karaoke Playlist Downloader v1.0"
)
parser.add_argument(
"--force-download-plan",
action="store_true",
help="Force refresh the download plan cache (re-scan all channels for matches)",
)
parser.add_argument(
"--latest-per-channel",
action="store_true",
help="Download the latest N videos from each channel (use with --limit) [DEPRECATED: This is now the default behavior]",
)
parser.add_argument(
"--fuzzy-match",
action="store_true",
help="Enable fuzzy matching for songlist-to-video matching (uses rapidfuzz if available)",
)
parser.add_argument(
"--fuzzy-threshold",
type=int,
default=DEFAULT_FUZZY_THRESHOLD,
help=f"Fuzzy match threshold (0-100, default {DEFAULT_FUZZY_THRESHOLD})",
)
parser.add_argument(
"--parallel",
action="store_true",
help="Enable parallel downloads for improved speed (3-5x faster for large batches, defaults to 3 workers)",
)
parser.add_argument(
"--workers",
type=int,
default=3,
help="Number of parallel download workers (default: 3, max: 10, only used with --parallel)",
)
parser.add_argument(
"--generate-songlist",
nargs="+",
metavar="DIRECTORY",
help="Generate song list from MP4 files with ID3 tags in specified directories",
)
parser.add_argument(
"--no-append-songlist",
action="store_true",
help="Create a new song list instead of appending when using --generate-songlist",
)
parser.add_argument(
"--manual",
action="store_true",
help="Download from manual videos collection (manual_videos.json in the data directory)",
)
parser.add_argument(
"--channel-focus",
type=str,
help="Download from a specific channel by name (e.g., 'SingKingKaraoke')",
)
parser.add_argument(
"--all-videos",
action="store_true",
help="Download all videos from channel (not just songlist matches), skipping existing files",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Build download plan and show what would be downloaded without actually downloading anything",
)
args = parser.parse_args()
# Validate workers argument
if args.workers < 1 or args.workers > 10:
print("❌ Error: --workers must be between 1 and 10")
sys.exit(1)
# Validate channel workers argument
if args.channel_workers < 1 or args.channel_workers > 10:
print("❌ Error: --channel-workers must be between 1 and 10")
sys.exit(1)
# Load configuration to get platform-aware yt-dlp path
from karaoke_downloader.config_manager import load_config
config = load_config()
yt_dlp_path = config.yt_dlp_path
# Check if it's a command string (like "python3 -m yt_dlp") or a file path
if yt_dlp_path.startswith(('python', 'python3')):
# It's a command string, test if it works
try:
import subprocess
cmd = yt_dlp_path.split() + ["--version"]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode != 0:
raise Exception(f"Command failed: {result.stderr}")
except Exception as e:
platform_name = "macOS" if sys.platform == "darwin" else "Windows"
print(f"❌ Error: yt-dlp command failed: {yt_dlp_path}")
print(f"Please ensure yt-dlp is properly installed for {platform_name}")
print(f"Error: {e}")
sys.exit(1)
else:
# It's a file path, check if it exists
yt_dlp_file = Path(yt_dlp_path)
if not yt_dlp_file.exists():
platform_name = "macOS" if sys.platform == "darwin" else "Windows"
binary_name = yt_dlp_file.name
print(f"❌ Error: {binary_name} not found in downloader/ directory")
print(f"Please ensure {binary_name} is present in the downloader/ folder for {platform_name}")
print(f"Expected path: {yt_dlp_file}")
sys.exit(1)
downloader = KaraokeDownloader()
# Set parallel download options
if args.parallel:
downloader.enable_parallel_downloads = True
downloader.parallel_workers = args.workers
print(f"⚡ Parallel downloads enabled with {args.workers} workers")
else:
downloader.enable_parallel_downloads = False
print("🐌 Sequential downloads enabled")
if args.limit:
downloader.download_limit = args.limit
print(f"🎯 Download limit set to: {args.limit} videos")
if args.no_songlist_priority:
downloader.use_songlist_priority = False
print("🎯 Songlist prioritization disabled")
else:
downloader.use_songlist_priority = True
print("🎯 Songlist prioritization enabled")
if args.songlist_only:
downloader.songlist_only = True
print("🎯 Songlist-only mode enabled (will only download songlist songs)")
if args.songlist_focus:
downloader.songlist_focus_titles = args.songlist_focus
downloader.songlist_only = True # Enable songlist-only mode when focusing
args.songlist_only = True # Also set the args flag to ensure CLI logic works
print(
f"🎯 Songlist focus mode enabled for playlists: {', '.join(args.songlist_focus)}"
)
if args.songlist_file:
downloader.songlist_file_path = args.songlist_file
print(f"📁 Using custom songlist file: {args.songlist_file}")
if args.force:
downloader.force_download = True
print("💪 Force mode enabled - will download regardless of existing files or server duplicates")
if args.dry_run:
downloader.dry_run = True
print("🔍 Dry run mode enabled - will show download plan without downloading")
if args.resolution != "720p":
downloader.config_manager.update_resolution(args.resolution)
# --- NEW: Reset channel CLI command ---
if args.reset_channel:
downloader.reset_channel_downloads(
args.reset_channel,
reset_songlist=args.reset_songlist,
delete_files=True, # Always delete files if they exist
)
sys.exit(0)
# --- END NEW ---
# --- NEW: If no URL or file is provided, but --songlist-only is set, use all channels ---
if (args.songlist_only or args.songlist_focus) and not args.url and not args.file:
channel_urls = load_channels()
if channel_urls:
print(
"📋 No URL or --file provided, defaulting to all configured channels for songlist mode."
)
else:
print(
"❌ No URL, --file, or channel configuration found. Please provide a channel URL or create channels.json in the data directory."
)
sys.exit(1)
# --- END NEW ---
if args.reset_songlist_all:
from karaoke_downloader.downloader import reset_songlist_all
reset_songlist_all()
print("✅ All songlist tracking and files have been reset.")
sys.exit(0)
if args.clear_server_duplicates:
from karaoke_downloader.server_manager import save_server_duplicates_tracking
save_server_duplicates_tracking({})
print("✅ Server duplicates tracking has been cleared.")
print(" Songs will be re-checked against the server on next run.")
sys.exit(0)
if args.generate_songlist:
from karaoke_downloader.songlist_generator import SongListGenerator
print("🎵 Generating song list from MP4 files with ID3 tags...")
generator = SongListGenerator()
try:
generator.generate_songlist_from_multiple_directories(
args.generate_songlist,
append=not args.no_append_songlist
)
print("✅ Song list generation completed successfully!")
except Exception as e:
print(f"❌ Error generating song list: {e}")
sys.exit(1)
sys.exit(0)
if args.status:
stats = downloader.tracker.get_statistics()
print("🎤 Karaoke Downloader Status")
print("=" * 40)
print(f"Total Songs: {stats['total_songs']}")
print(f"Total Playlists: {stats['total_playlists']}")
print(f"Downloaded Songs: {stats['downloaded_songs']}")
print(f"Failed Songs: {stats['failed_songs']}")
print(f"Partial Downloads: {stats['partial_songs']}")
print(f"Total Size: {stats['total_size_mb']} MB")
print(f"Last Updated: {stats['last_updated']}")
sys.exit(0)
elif args.cache_info:
cache_info = downloader.tracker.get_cache_info()
print("💾 Channel Cache Information")
print("=" * 40)
print(f"Total Channels: {cache_info['total_channels']}")
print(f"Total Cached Videos: {cache_info['total_videos']}")
print("\n📋 Channel Details:")
for channel in cache_info['channels']:
print(f"{channel['channel']}: {channel['videos']} videos (updated: {channel['last_updated']})")
sys.exit(0)
elif args.clear_cache:
if args.clear_cache == "all":
downloader.tracker.clear_channel_cache()
print("🧹 Cleared all channel caches")
else:
downloader.tracker.clear_channel_cache(args.clear_cache)
print(f"🧹 Cleared cache for channel: {args.clear_cache}")
sys.exit(0)
elif args.cache_duration:
downloader.tracker.set_cache_duration(args.cache_duration)
print(f"⏰ Cache duration set to {args.cache_duration} hours")
sys.exit(0)
elif args.report:
report = downloader.tracker.export_playlist_report(args.report)
print(report)
sys.exit(0)
elif args.cleanup:
orphaned = downloader.tracker.cleanup_orphaned_files(downloader.downloads_dir)
print(f"🧹 Cleaned up {len(orphaned)} orphaned tracking entries")
sys.exit(0)
elif args.songlist_status:
songlist = downloader._load_songlist()
tracking = downloader.songlist_tracking
print("📋 Songlist Download Status")
print("=" * 40)
print(f"Total songs in list: {len(songlist)}")
print(f"Songs downloaded: {len(tracking)}")
print(f"Songs remaining: {len(songlist) - len(tracking)}")
print(f"Progress: {(len(tracking) / len(songlist) * 100):.1f}%")
if tracking:
print(f"\n📁 Downloaded songs:")
for key, info in list(tracking.items())[:10]:
print(
f"{info['artist']} - {info['title']} (from {info['channel']})"
)
if len(tracking) > 10:
print(f" ... and {len(tracking) - 10} more")
sys.exit(0)
elif args.manual:
# Download from manual videos collection
print("🎤 Downloading from manual videos collection...")
success = downloader.download_channel_videos(
"manual://static",
force_refresh=args.refresh,
fuzzy_match=args.fuzzy_match,
fuzzy_threshold=args.fuzzy_threshold,
force_download=args.force,
)
elif args.channel_focus:
# Download from a specific channel by name
print(f"🎤 Looking up channel: {args.channel_focus}")
channel_url = get_channel_url_by_name(args.channel_focus)
if not channel_url:
print(f"❌ Channel '{args.channel_focus}' not found in configuration")
print("Available channels:")
channel_urls = load_channels()
for url in channel_urls:
if "/@" in url:
channel_name = url.split("/@")[1].split("/")[0]
print(f"{channel_name}")
sys.exit(1)
if args.all_videos:
# Download ALL videos from the channel (not just songlist matches)
print(f"🎤 Downloading ALL videos from channel: {args.channel_focus} ({channel_url})")
success = downloader.download_all_channel_videos(
channel_url,
force_refresh=args.refresh,
force_download=args.force,
limit=args.limit,
dry_run=args.dry_run,
)
else:
# Download only songlist matches from the channel
print(f"🎤 Downloading from channel: {args.channel_focus} ({channel_url})")
success = downloader.download_channel_videos(
channel_url,
force_refresh=args.refresh,
fuzzy_match=args.fuzzy_match,
fuzzy_threshold=args.fuzzy_threshold,
force_download=args.force,
dry_run=args.dry_run,
)
elif args.songlist_only or args.songlist_focus:
# Use provided file or default to channels configuration
channel_urls = load_channels(args.file)
if not channel_urls:
print(f"❌ No channels found in configuration")
sys.exit(1)
limit = args.limit if args.limit else None
success = downloader.download_songlist_across_channels(
channel_urls,
limit=args.limit,
force_refresh_download_plan=args.force_download_plan if hasattr(args, "force_download_plan") else False,
fuzzy_match=args.fuzzy_match,
fuzzy_threshold=args.fuzzy_threshold,
force_download=args.force,
show_pagination=args.show_pagination,
parallel_channels=args.parallel_channels,
max_channel_workers=args.channel_workers,
dry_run=args.dry_run,
)
elif args.latest_per_channel:
# Use provided file or default to channels configuration
channel_urls = load_channels(args.file)
if not channel_urls:
print(f"❌ No channels found in configuration")
sys.exit(1)
limit = args.limit if args.limit else DEFAULT_LATEST_PER_CHANNEL_LIMIT
force_refresh_download_plan = (
args.force_download_plan if hasattr(args, "force_download_plan") else False
)
fuzzy_match = args.fuzzy_match if hasattr(args, "fuzzy_match") else False
fuzzy_threshold = (
args.fuzzy_threshold
if hasattr(args, "fuzzy_threshold")
else DEFAULT_FUZZY_THRESHOLD
)
success = downloader.download_latest_per_channel(
channel_urls,
limit=limit,
force_refresh_download_plan=force_refresh_download_plan,
fuzzy_match=fuzzy_match,
fuzzy_threshold=fuzzy_threshold,
force_download=args.force,
dry_run=args.dry_run,
)
elif args.url:
success = downloader.download_channel_videos(
args.url, force_refresh=args.refresh, dry_run=args.dry_run
)
else:
# Default behavior: download from channels (equivalent to --latest-per-channel)
print("🎯 No specific mode specified, defaulting to download from channels")
channel_urls = load_channels(args.file)
if not channel_urls:
print(f"❌ No channels found in configuration")
print("Please provide a channel URL or create channels.json in the data directory")
sys.exit(1)
limit = args.limit if args.limit else DEFAULT_LATEST_PER_CHANNEL_LIMIT
force_refresh_download_plan = (
args.force_download_plan if hasattr(args, "force_download_plan") else False
)
fuzzy_match = args.fuzzy_match if hasattr(args, "fuzzy_match") else False
fuzzy_threshold = (
args.fuzzy_threshold
if hasattr(args, "fuzzy_threshold")
else DEFAULT_FUZZY_THRESHOLD
)
success = downloader.download_latest_per_channel(
channel_urls,
limit=limit,
force_refresh_download_plan=force_refresh_download_plan,
fuzzy_match=fuzzy_match,
fuzzy_threshold=fuzzy_threshold,
force_download=args.force,
dry_run=args.dry_run,
)
# Generate unmatched report if requested (additive feature)
if args.generate_unmatched_report:
from karaoke_downloader.download_planner import generate_unmatched_report, build_download_plan
from karaoke_downloader.songlist_manager import load_songlist
print("\n🔍 Generating unmatched songs report...")
# Load songlist based on focus mode
if args.songlist_focus:
# Load focused playlists
songlist_file_path = args.songlist_file if args.songlist_file else str(get_data_path_manager().get_songlist_path())
songlist_file = Path(songlist_file_path)
if not songlist_file.exists():
print(f"⚠️ Songlist file not found: {songlist_file_path}")
else:
try:
with open(songlist_file, "r", encoding="utf-8") as f:
raw_data = json.load(f)
# Filter playlists by title
focused_playlists = []
for playlist in raw_data:
playlist_title = playlist.get("title", "")
if playlist_title in args.songlist_focus:
focused_playlists.append(playlist)
if focused_playlists:
# Flatten the focused playlists into songs
focused_songs = []
seen = set()
for playlist in focused_playlists:
if "songs" in playlist:
for song in playlist["songs"]:
if "artist" in song and "title" in song:
artist = song["artist"].strip()
title = song["title"].strip()
key = f"{artist.lower()}_{title.lower()}"
if key in seen:
continue
seen.add(key)
focused_songs.append(
{
"artist": artist,
"title": title,
"position": song.get("position", 0),
}
)
songlist = focused_songs
else:
print(f"⚠️ No playlists found matching: {', '.join(args.songlist_focus)}")
songlist = []
except (json.JSONDecodeError, FileNotFoundError) as e:
print(f"⚠️ Could not load songlist for report: {e}")
songlist = []
else:
# Load all songs from songlist
songlist_path = args.songlist_file if args.songlist_file else str(get_data_path_manager().get_songlist_path())
songlist = load_songlist(songlist_path)
if songlist:
# Load channel URLs
channel_file = args.file if args.file else str(get_data_path_manager().get_channels_txt_path())
if os.path.exists(channel_file):
with open(channel_file, "r", encoding='utf-8') as f:
channel_urls = [
line.strip()
for line in f
if line.strip() and not line.strip().startswith("#")
]
print(f"📋 Analyzing {len(songlist)} songs against {len(channel_urls)} channels...")
# Build download plan to get unmatched songs
fuzzy_match = args.fuzzy_match if hasattr(args, "fuzzy_match") else False
fuzzy_threshold = (
args.fuzzy_threshold
if hasattr(args, "fuzzy_threshold")
else DEFAULT_FUZZY_THRESHOLD
)
try:
download_plan, unmatched = build_download_plan(
channel_urls,
songlist,
downloader.tracker,
downloader.yt_dlp_path,
fuzzy_match=fuzzy_match,
fuzzy_threshold=fuzzy_threshold,
)
if unmatched:
report_file = generate_unmatched_report(unmatched)
print(f"\n📋 Unmatched songs report generated successfully!")
print(f"📁 Report saved to: {report_file}")
print(f"📊 Summary: {len(download_plan)} songs found, {len(unmatched)} songs not found")
print(f"\n🔍 First 10 unmatched songs:")
for i, song in enumerate(unmatched[:10], 1):
print(f" {i:2d}. {song['artist']} - {song['title']}")
if len(unmatched) > 10:
print(f" ... and {len(unmatched) - 10} more songs")
else:
print(f"\n✅ All {len(songlist)} songs were found in the channels!")
except Exception as e:
print(f"❌ Error generating report: {e}")
else:
print(f"❌ Channel file not found: {channel_file}")
else:
print("❌ No songlist available for report generation")
# Initialize success variable
success = False
downloader.tracker.force_save()
if success:
print("\n🎤 All downloads completed successfully!")
sys.exit(0)
else:
print("\n⚠️ Some downloads may have failed. Check the logs for details.")
sys.exit(1)