Signed-off-by: mbrucedogs <mbrucedogs@gmail.com>

This commit is contained in:
mbrucedogs 2025-07-23 22:02:30 -05:00
commit fe576965b7
15 changed files with 36404 additions and 0 deletions

91
.gitignore vendored Normal file
View File

@ -0,0 +1,91 @@
# Downloaded karaoke videos and media files
downloads/
*.mp4
*.mp3
*.webm
*.mkv
*.avi
*.mov
*.wmv
*.flv
# Log files
logs/
*.log
# Tracking and cache files
karaoke_tracking.json
karaoke_tracking.json.backup
songlist_tracking.json
*.cache
# yt-dlp temporary files
*.info.json
*.meta
*.description
*.thumbnail
*.annotations
*.comments
*.srt
*.vtt
# Python cache and build files
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# Virtual environments
venv/
env/
ENV/
env.bak/
venv.bak/
# IDE and editor files
.vscode/
.idea/
*.swp
*.swo
*~
.DS_Store
Thumbs.db
# Temporary files
*.tmp
*.temp
*.bak
*.backup
# Configuration files with sensitive data (optional - uncomment if needed)
# config.json
# Test files
test_*.py
*_test.py
.pytest_cache/
# Coverage reports
htmlcov/
.coverage
.coverage.*
coverage.xml
*.cover
.hypothesis/

248
README.md Normal file
View File

@ -0,0 +1,248 @@
# 🎤 Karaoke Playlist Downloader
A Python-based Windows command-line tool that wraps `yt-dlp.exe` to batch-download karaoke videos from YouTube playlists.
## ✨ Features
- 🚀 **Fast Batch Downloads**: Download entire YouTube playlists with a single command
- 📁 **Organized Storage**: Each playlist gets its own folder with proper naming
- 🔄 **Smart Skipping**: Avoid re-downloading videos you already have
- 📝 **Comprehensive Logging**: Detailed logs for each playlist download
- 🎵 **Multiple Formats**: Download as MP4 video or extract MP3 audio
- 📊 **Progress Tracking**: Real-time progress updates and error handling
## 📋 Requirements
- **Windows** (tested on Windows 10/11)
- **Python 3.7+** (included in the project)
- **yt-dlp.exe** (included in `downloader/` folder)
## 🚀 Quick Start
### 1. Download a Single Playlist
```bash
python download_karaoke.py https://www.youtube.com/playlist?list=YOUR_PLAYLIST_ID
```
### 2. Download Multiple Playlists
Create a `playlists.txt` file with your playlist URLs (one per line):
```txt
https://www.youtube.com/playlist?list=PLAYLIST1_ID
https://www.youtube.com/playlist?list=PLAYLIST2_ID
https://www.youtube.com/playlist?list=PLAYLIST3_ID
```
Then run:
```bash
python download_karaoke.py --file playlists.txt
```
## 📁 Project Structure
```
KaroakeVideoDownloader/
├── download_karaoke.py # Main script
├── tracking_manager.py # Advanced tracking system
├── manage_tracking.py # Tracking management utility
├── update_resolution.py # Resolution configuration utility
├── config.json # Configuration file
├── yt-dlp.exe # Downloader binary (in downloader/)
├── playlists.txt # Sample playlist list
├── downloads/ # All video output
│ └── [playlist_name]/ # Folders per playlist
├── logs/
│ └── [playlist_name].log # Download logs
└── karaoke_tracking.json # Advanced tracking database
```
## 🎯 Usage Examples
### Basic Usage
```bash
# Download a single playlist (720p MP4)
python download_karaoke.py https://www.youtube.com/playlist?list=PLxxxxxxxx
# Download with specific resolution
python download_karaoke.py --resolution 1080p https://www.youtube.com/playlist?list=PLxxxxxxxx
# Download from file
python download_karaoke.py --file playlists.txt
# Show download status and statistics
python download_karaoke.py --status
# Generate playlist report
python download_karaoke.py --report PLAYLIST_ID
# Clean up orphaned tracking entries
python download_karaoke.py --cleanup
# Show help
python download_karaoke.py --help
```
### Tracking Management
```bash
# Show overall statistics
python manage_tracking.py --stats
# List all playlists
python manage_tracking.py --list-playlists
# Show playlist details
python manage_tracking.py --playlist PLAYLIST_ID
# Show failed songs
python manage_tracking.py --failed
# Show partial downloads
python manage_tracking.py --partial
# Clean up orphaned entries
python manage_tracking.py --cleanup
# Export database backup
python manage_tracking.py --export backup.json
```
### Resolution Management
```bash
# Show current resolution setting
python update_resolution.py --show
# Update to 1080p resolution
python update_resolution.py --resolution 1080p
# Update to 720p resolution (default)
python update_resolution.py --resolution 720p
```
### Advanced Features
The tool automatically:
- ✅ Creates organized folder structure
- ✅ Skips already downloaded videos
- ✅ Logs all activities
- ✅ Handles errors gracefully
- ✅ Shows real-time progress
- ✅ Extracts metadata and thumbnails
- ✅ Downloads subtitles when available
## 📊 Output Format
### Video Files
- **Format**: MP4 (720p by default, configurable)
- **Resolution**: 720p (upgradeable to 1080p, 1440p, 2160p)
- **Naming**: Original YouTube video title
- **Location**: `downloads/[playlist_name]/`
### Additional Files
- **Metadata**: JSON files with video info
- **Thumbnails**: Video thumbnails
- **Subtitles**: English SRT files (if available)
- **Logs**: Detailed download logs
## 🔧 Configuration
### Customizing Download Options
The script uses optimized yt-dlp settings for karaoke videos:
- **Format**: 720p MP4 by default (configurable via `config.json` or `--resolution`)
- **Resolution Options**: 480p, 720p, 1080p, 1440p, 2160p
- **Audio**: MP3 extraction as fallback
- **Metadata**: Full metadata embedding
- **Subtitles**: English SRT format
- **Error Handling**: Graceful error recovery
### Resolution Configuration
You can easily change the video resolution:
1. **Command Line**: Use `--resolution` flag
```bash
python download_karaoke.py --resolution 1080p https://www.youtube.com/playlist?list=XYZ
```
2. **Config File**: Edit `config.json` or use the utility
```bash
python update_resolution.py --resolution 1080p
```
3. **Supported Resolutions**: 480p, 720p, 1080p, 1440p, 2160p
### File Locations
- **Downloads**: `downloads/` folder
- **Logs**: `logs/` folder
- **Tracking**: `downloaded_videos.json`
## 🐛 Troubleshooting
### Common Issues
1. **yt-dlp.exe not found**
- Ensure `yt-dlp.exe` is in the `downloader/` folder
- Download from [yt-dlp releases](https://github.com/yt-dlp/yt-dlp/releases)
2. **Permission errors**
- Run as administrator if needed
- Check folder write permissions
3. **Network issues**
- Check internet connection
- Try again later (YouTube rate limiting)
4. **Playlist not found**
- Verify playlist URL is correct
- Ensure playlist is public
### Log Files
Check the log files in `logs/` for detailed error information:
```bash
# View latest log
type logs\playlist_name.log
```
## 🔄 Updating
### Update yt-dlp
Download the latest `yt-dlp.exe` from [GitHub releases](https://github.com/yt-dlp/yt-dlp/releases) and replace the file in `downloader/`.
### Update Script
The Python script is self-contained and doesn't require additional dependencies.
## 📝 License
This project is open source. Feel free to modify and distribute.
## 🤝 Contributing
1. Fork the repository
2. Create a feature branch
3. Make your changes
4. Test thoroughly
5. Submit a pull request
## 🆘 Support
For issues and questions:
1. Check the troubleshooting section
2. Review log files for errors
3. Ensure all requirements are met
4. Try with a simple playlist first
---
**Happy Karaoke! 🎵**

29972
channel_cache.json Normal file

File diff suppressed because it is too large Load Diff

2
channels.txt Normal file
View File

@ -0,0 +1,2 @@
https://www.youtube.com/@SingKingKaraoke/videos
https://www.youtube.com/@karafun/videos

34
config.json Normal file
View File

@ -0,0 +1,34 @@
{
"download_settings": {
"format": "best[height<=720][ext=mp4]/best[height<=720]/best[ext=mp4]/best",
"preferred_resolution": "720p",
"audio_format": "mp3",
"audio_quality": "0",
"subtitle_language": "en",
"subtitle_format": "srt",
"write_metadata": false,
"write_thumbnail": false,
"write_description": false,
"write_annotations": false,
"write_comments": false,
"write_subtitles": false,
"embed_metadata": false,
"add_metadata": false,
"continue_downloads": true,
"no_overwrites": true,
"ignore_errors": true,
"no_warnings": false
},
"folder_structure": {
"downloads_dir": "downloads",
"logs_dir": "logs",
"tracking_file": "downloaded_videos.json"
},
"logging": {
"level": "INFO",
"format": "%(asctime)s - %(levelname)s - %(message)s",
"include_console": true,
"include_file": true
},
"yt_dlp_path": "downloader/yt-dlp.exe"
}

74
docs/PRD.md Normal file
View File

@ -0,0 +1,74 @@
# 🎤 Karaoke Playlist Downloader PRD (v1.0)
## ✅ Overview
A Python-based Windows command-line tool that wraps `yt-dlp.exe` to batch-download karaoke videos from YouTube playlists.
---
## 📌 Goals
- Quickly download full YouTube playlists containing karaoke videos.
- Organize downloads by playlist.
- Avoid re-downloading the same videos.
---
## 🧑‍💻 Target Users
- Tech-savvy users, developers, or power users familiar with command-line tools.
- Use case: Karaoke DJs, home karaoke setups, offline playlist prep.
---
## ⚙️ Platform & Stack
- **Platform:** Windows
- **Interface:** Command-line tool (CLI)
- **Tech Stack:** Python + yt-dlp.exe
---
## 📥 Input
- Accepts YouTube playlist URLs via command line.
- Optional: Accept a text file containing multiple playlist URLs (one per line).
### 🔹 Example Usage
```bash
python download_karaoke.py https://www.youtube.com/playlist?list=XYZ
python download_karaoke.py --file playlists.txt
```
---
## 📤 Output
- Downloads videos as **MP4** using `yt-dlp.exe`.
- Each playlist gets its own folder: `downloads/<playlist_name>/`
- Filenames are preserved as per the original YouTube video title.
---
## 🛠️ Features
- ✅ Skips already downloaded videos (uses video ID log).
- ✅ Logs each download to `logs/<playlist_name>.log`.
- ✅ Shows progress and errors in the terminal.
- ✅ Lightweight and fast to execute.
---
## 📁 Folder Structure (Suggested)
```
karaoke_downloader/
├── download_karaoke.py # main script
├── yt-dlp.exe # downloader binary
├── playlists.txt # optional playlist list
├── downloads/ # all video output
│ └── [playlist_name]/ # folders per playlist
└── logs/
└── [playlist_name].log # download log
```
---
## 🚀 Future Enhancements (Optional)
- [ ] `--audio` flag to extract MP3 only.
- [ ] Smart filename cleanup (`[Karaoke]` tag).
- [ ] Graceful skip of unavailable/private videos.
- [ ] Retry logic for failed downloads.

4022
docs/songList.json Normal file

File diff suppressed because it is too large Load Diff

34
download_karaoke.bat Normal file
View File

@ -0,0 +1,34 @@
@echo off
REM Karaoke Playlist Downloader - Windows Batch File
REM This file makes it easy to run the karaoke downloader on Windows
echo.
echo ========================================
echo Karaoke Playlist Downloader
echo ========================================
echo.
REM Check if Python is available
python --version >nul 2>&1
if errorlevel 1 (
echo ERROR: Python is not installed or not in PATH
echo Please install Python 3.7+ from https://python.org
pause
exit /b 1
)
REM Check if yt-dlp.exe exists
if not exist "downloader\yt-dlp.exe" (
echo ERROR: yt-dlp.exe not found in downloader\ directory
echo Please ensure yt-dlp.exe is present in the downloader\ folder
pause
exit /b 1
)
REM Run the downloader with all arguments passed through
python download_karaoke.py %*
REM Pause to show results
echo.
echo Press any key to exit...
pause >nul

986
download_karaoke.py Normal file
View File

@ -0,0 +1,986 @@
#!/usr/bin/env python3
"""
Karaoke Playlist Downloader
A Python-based Windows command-line tool that wraps yt-dlp.exe to batch-download karaoke videos from YouTube playlists.
"""
import os
import sys
import argparse
import subprocess
import json
import re
from pathlib import Path
from urllib.parse import urlparse, parse_qs
import logging
from datetime import datetime
from tracking_manager import TrackingManager, SongStatus, FormatType
# Try to import mutagen for ID3 tagging
try:
from mutagen.mp4 import MP4, MP4Tags
MUTAGEN_AVAILABLE = True
except ImportError:
MUTAGEN_AVAILABLE = False
print("⚠️ mutagen not available - ID3 tagging will be disabled")
print(" Install with: pip install mutagen")
class KaraokeDownloader:
def __init__(self):
self.yt_dlp_path = Path("downloader/yt-dlp.exe")
self.downloads_dir = Path("downloads")
self.logs_dir = Path("logs")
# Create necessary directories
self.downloads_dir.mkdir(exist_ok=True)
self.logs_dir.mkdir(exist_ok=True)
# Initialize tracking manager
self.tracker = TrackingManager()
# Load configuration
self.config = self._load_config()
# Songlist tracking file
self.songlist_tracking_file = Path("songlist_tracking.json")
self.songlist_tracking = self._load_songlist_tracking()
def _get_channel_info(self, channel_url):
"""Extract channel information from URL."""
# Fallback: extract channel info from URL (faster and more reliable)
if '@' in channel_url:
# Handle @username format
channel_name = channel_url.split('@')[1].split('/')[0]
channel_id = f"@{channel_name}"
else:
# Handle other formats
channel_name = "unknown_channel"
channel_id = "unknown_channel"
# Clean the channel name for use as directory name
channel_name = re.sub(r'[<>:"/\\|?*]', '_', channel_name)
return channel_name, channel_id
def _get_playlist_info(self, playlist_url):
"""Extract playlist information from URL (backward compatibility)."""
return self._get_channel_info(playlist_url)
def _load_config(self):
"""Load configuration from config.json file."""
config_file = Path("config.json")
if config_file.exists():
try:
with open(config_file, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError) as e:
print(f"Warning: Could not load config.json: {e}")
# Return default configuration
return {
"download_settings": {
"format": "best[height<=720][ext=mp4]/best[height<=720]/best[ext=mp4]/best",
"preferred_resolution": "720p",
"audio_format": "mp3",
"audio_quality": "0",
"subtitle_language": "en",
"subtitle_format": "srt",
"write_metadata": False,
"write_thumbnail": False,
"write_description": False,
"write_annotations": False,
"write_comments": False,
"write_subtitles": False,
"embed_metadata": False,
"add_metadata": False,
"continue_downloads": True,
"no_overwrites": True,
"ignore_errors": True,
"no_warnings": False
},
"folder_structure": {
"downloads_dir": "downloads",
"logs_dir": "logs",
"tracking_file": "karaoke_tracking.json"
},
"logging": {
"level": "INFO",
"format": "%(asctime)s - %(levelname)s - %(message)s",
"include_console": True,
"include_file": True
},
"yt_dlp_path": "downloader/yt-dlp.exe"
}
def _load_songlist(self):
"""Load songlist from docs/songList.json file."""
songlist_file = Path("docs/songList.json")
if not songlist_file.exists():
print("⚠️ Songlist file not found: docs/songList.json")
return []
try:
with open(songlist_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# Extract all songs from all categories
all_songs = []
for category in data:
if "songs" in category:
for song in category["songs"]:
if "artist" in song and "title" in song:
all_songs.append({
"artist": song["artist"].strip(),
"title": song["title"].strip(),
"position": song.get("position", 0)
})
print(f"📋 Loaded {len(all_songs)} songs from songlist")
return all_songs
except (json.JSONDecodeError, FileNotFoundError) as e:
print(f"⚠️ Could not load songlist: {e}")
return []
def _load_songlist_tracking(self):
"""Load songlist tracking data."""
if not self.songlist_tracking_file.exists():
return {}
try:
with open(self.songlist_tracking_file, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError) as e:
print(f"⚠️ Could not load songlist tracking: {e}")
return {}
def _save_songlist_tracking(self):
"""Save songlist tracking data."""
try:
with open(self.songlist_tracking_file, 'w', encoding='utf-8') as f:
json.dump(self.songlist_tracking, f, indent=2, ensure_ascii=False)
except Exception as e:
print(f"⚠️ Could not save songlist tracking: {e}")
def _is_songlist_song_downloaded(self, artist, title):
"""Check if a songlist song has been downloaded from any channel."""
key = f"{artist.lower()}_{self._normalize_title(title)}"
return key in self.songlist_tracking
def _mark_songlist_song_downloaded(self, artist, title, channel_name, file_path):
"""Mark a songlist song as downloaded."""
key = f"{artist.lower()}_{self._normalize_title(title)}"
self.songlist_tracking[key] = {
"artist": artist,
"title": title,
"channel": channel_name,
"file_path": str(file_path),
"downloaded_at": datetime.now().isoformat()
}
self._save_songlist_tracking()
def _normalize_title(self, title):
"""Normalize title for comparison (remove karaoke suffixes, etc.)."""
# Remove common karaoke suffixes
normalized = title.replace("(Karaoke Version)", "").replace("(Karaoke)", "").strip()
# Remove extra spaces and convert to lowercase for comparison
return " ".join(normalized.split()).lower()
def _find_songlist_matches(self, cached_videos, songlist, songlist_only=False):
"""Find videos that match songs in the songlist."""
matches = []
songlist_lookup = {}
unique_matches = set()
already_downloaded = 0
new_downloads = 0
# Create lookup for songlist songs (normalized)
for song in songlist:
normalized_title = self._normalize_title(song["title"])
key = f"{song['artist'].lower()}_{normalized_title}"
songlist_lookup[key] = song
print(f"🔍 Searching for {len(songlist)} songlist songs in {len(cached_videos)} cached videos...")
for video in cached_videos:
# Extract artist and title from video title
artist, title = self._extract_artist_title(video['title'])
normalized_title = self._normalize_title(title)
key = f"{artist.lower()}_{normalized_title}"
if key in songlist_lookup:
songlist_song = songlist_lookup[key]
# Check if already downloaded from any channel
if self._is_songlist_song_downloaded(artist, title):
already_downloaded += 1
print(f"⏭️ Already downloaded: {artist} - {title}")
continue
matches.append({
'video': video,
'songlist_song': songlist_song,
'priority_score': songlist_song.get('position', 9999) # Lower position = higher priority
})
unique_matches.add(key)
new_downloads += 1
print(f"🎯 Found match: {artist} - {title} (position {songlist_song.get('position', 'N/A')})")
print(f"🎯 Matching Results:")
print(f" • Total matches found: {len(matches) + already_downloaded}")
print(f" • Unique songs matched: {len(unique_matches) + already_downloaded}")
print(f" • Already downloaded: {already_downloaded}")
print(f" • New downloads available: {new_downloads}")
print(f" • Duplicate matches: {len(matches) - len(unique_matches)}")
return matches
def _get_playlist_name(self, playlist_url):
"""Extract playlist name from URL (backward compatibility)."""
return self._get_playlist_info(playlist_url)[0]
def _setup_logging(self, playlist_name):
"""Setup logging for the current playlist."""
log_file = self.logs_dir / f"{playlist_name}.log"
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler(sys.stdout)
]
)
return logging.getLogger(__name__)
def _get_channel_videos(self, channel_url):
"""Get list of videos from a channel."""
try:
cmd = [
str(self.yt_dlp_path),
"--flat-playlist",
"--print", "id,title,duration,upload_date",
channel_url
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode == 0 and result.stdout.strip():
videos = []
lines = result.stdout.strip().split('\n')
for i in range(0, len(lines), 4):
if i + 3 < len(lines):
video_id = lines[i].strip()
title = lines[i + 1].strip()
duration = lines[i + 2].strip()
upload_date = lines[i + 3].strip()
try:
duration_int = int(duration) if duration != 'NA' else None
except ValueError:
duration_int = None
videos.append({
'video_id': video_id,
'title': title,
'duration': duration_int,
'upload_date': upload_date
})
return videos
except (subprocess.TimeoutExpired, subprocess.CalledProcessError) as e:
print(f"Error getting channel videos: {e}")
return []
def _get_playlist_songs(self, playlist_url):
"""Get list of songs in a playlist (backward compatibility)."""
return self._get_channel_videos(playlist_url)
def _extract_artist_title(self, video_title):
"""Extract artist and title from karaoke video title."""
# Remove common karaoke suffixes
title = video_title.replace("(Karaoke Version)", "").replace("(Karaoke)", "").strip()
# Look for common patterns
if " - " in title:
# Format: "Artist - Song Title"
parts = title.split(" - ", 1)
if len(parts) == 2:
artist = parts[0].strip()
song_title = parts[1].strip()
# Handle special cases
if artist == "d4vd, Hyunjin":
return "d4vd, Hyunjin", song_title
elif artist == "Adrianne Lenker":
return "Adrianne Lenker", song_title
else:
return artist, song_title
# Handle special cases without clear artist - title pattern
if "Nothing Beats A Jet2 Holiday" in title:
return "Jet2", "Nothing Beats A Jet2 Holiday"
elif "Spin The Wheel" in title:
return "SingKingKaraoke", "Spin The Wheel - Your Song Requests"
# If no clear pattern, assume the whole title is the song name
return "Unknown Artist", title
def _add_id3_tags(self, file_path, video_title, channel_name):
"""Add ID3 tags to the downloaded MP4 file."""
if not MUTAGEN_AVAILABLE:
print("⚠️ mutagen not available - skipping ID3 tagging")
return
try:
artist, title = self._extract_artist_title(video_title)
# Load the MP4 file
mp4 = MP4(str(file_path))
# Add metadata tags
mp4['\xa9nam'] = title # Title
mp4['\xa9ART'] = artist # Artist
mp4['\xa9alb'] = f"{channel_name} Karaoke" # Album
mp4['\xa9gen'] = "Karaoke" # Genre
mp4['\xa9day'] = str(datetime.now().year) # Year
# Save the metadata
mp4.save()
print(f"📝 Added ID3 tags: Artist='{artist}', Title='{title}'")
except Exception as e:
print(f"⚠️ Could not add ID3 tags: {e}")
def _cleanup_extra_files(self, mp4_file_path):
"""Remove extra files (info.json, meta) that yt-dlp creates alongside MP4 files."""
try:
# Get the base path without extension
base_path = mp4_file_path.with_suffix('')
# Files to remove
extra_files = [
base_path.with_suffix('.info.json'),
base_path.with_suffix('.meta')
]
removed_count = 0
for extra_file in extra_files:
if extra_file.exists():
extra_file.unlink()
removed_count += 1
print(f"🧹 Removed extra file: {extra_file.name}")
if removed_count > 0:
print(f"🧹 Cleaned up {removed_count} extra file(s)")
except Exception as e:
print(f"⚠️ Could not clean up extra files: {e}")
def download_channel_videos(self, channel_url, force_refresh=False):
"""Download all videos from a YouTube channel."""
print(f"🎤 Starting download for channel: {channel_url}")
# Get channel info
print("🔍 Extracting channel information...")
channel_name, channel_id = self._get_channel_info(channel_url)
print(f"📺 Channel name: {channel_name}")
print(f"🆔 Channel ID: {channel_id}")
# Setup logging
print("📝 Setting up logging...")
logger = self._setup_logging(channel_name)
logger.info(f"Starting download for channel: {channel_url}")
# Add channel to tracking
print("📋 Adding channel to tracking system...")
self.tracker.add_playlist(channel_id, channel_name, channel_url)
# Check cache first (unless force refresh is requested)
print("💾 Checking channel cache...")
current_videos = None
if not force_refresh:
current_videos = self.tracker.get_cached_channel_videos(channel_id)
if current_videos:
print(f"📋 Using cached channel data: {len(current_videos)} videos")
else:
print("📋 Cache miss or expired, fetching fresh channel data...")
# Fetch fresh data if needed
if not current_videos:
print("📋 Fetching channel videos from YouTube...")
current_videos = self._get_channel_videos(channel_url)
print(f"🎵 Found {len(current_videos)} videos in channel")
# Cache the results
if current_videos:
print("💾 Caching channel data for future use...")
self.tracker.cache_channel_videos(channel_id, current_videos)
print(f"💾 Cached channel data for future use")
else:
print(f"🎵 Using {len(current_videos)} cached videos")
# Sync channel with tracking
print("🔄 Syncing channel with tracking system...")
sync_result = self.tracker.sync_playlist(channel_id, current_videos)
print(f"🔄 Sync result: {sync_result['added']} new, {sync_result['removed']} removed, {sync_result['total']} total")
# Create channel-specific downloads directory
print("📁 Creating downloads directory...")
downloads_dir = self.downloads_dir / channel_name
downloads_dir.mkdir(parents=True, exist_ok=True)
print(f"📁 Downloads will be saved to: {downloads_dir}")
# Load songlist for prioritization (if enabled)
songlist = []
if hasattr(self, 'use_songlist_priority') and self.use_songlist_priority:
print("📋 Loading songlist for prioritization...")
songlist = self._load_songlist()
# Show songlist statistics
unique_songs = len(set(f"{song['artist']}_{song['title']}" for song in songlist))
print(f"📊 Songlist Statistics:")
print(f" • Total songs in list: {len(songlist)}")
print(f" • Unique songs: {unique_songs}")
print(f" • Duplicates in list: {len(songlist) - unique_songs}")
else:
print("📋 Songlist prioritization disabled")
# Get videos that need downloading
print("🔍 Checking which videos need to be downloaded...")
videos_to_download = []
found_existing = 0
# Get list of existing MP4 files once (more efficient)
existing_files = set()
for file_path in downloads_dir.glob(f"*.mp4"):
if file_path.is_file():
existing_files.add(file_path.stem)
# Also check root downloads directory
for file_path in self.downloads_dir.glob(f"*.mp4"):
if file_path.is_file():
existing_files.add(file_path.stem)
print(f"📁 Found {len(existing_files)} existing MP4 files in directories")
# Separate videos into songlist matches and others
songlist_matches = []
other_videos = []
for video in current_videos:
if not self.tracker.is_song_downloaded(video['video_id'], channel_id):
if video['title'] in existing_files:
# File exists but not tracked, mark as downloaded
self.tracker.update_song_status(
video['video_id'],
channel_id,
SongStatus.DOWNLOADED,
FormatType.MP4,
downloads_dir / f"{video['title']}.mp4" # Assume it's in channel dir
)
found_existing += 1
else:
other_videos.append(video)
# Find songlist matches and prioritize them
if songlist:
songlist_matches = self._find_songlist_matches(other_videos, songlist, songlist_only=getattr(self, 'songlist_only', False))
# Remove matched videos from other_videos to avoid duplicates
matched_video_ids = {match['video']['video_id'] for match in songlist_matches}
other_videos = [v for v in other_videos if v['video_id'] not in matched_video_ids]
# Sort songlist matches by priority (lower position = higher priority)
songlist_matches.sort(key=lambda x: x['priority_score'])
# Add songlist matches first, then other videos (unless songlist_only is enabled)
if getattr(self, 'songlist_only', False):
videos_to_download = [match['video'] for match in songlist_matches]
print(f"🎯 Songlist-only mode: {len(songlist_matches)} songlist matches")
else:
videos_to_download = [match['video'] for match in songlist_matches] + other_videos
print(f"🎯 Prioritized {len(songlist_matches)} songlist matches")
print(f"📝 Added {len(other_videos)} other videos")
else:
videos_to_download = other_videos
if found_existing > 0:
print(f"📁 Found {found_existing} existing files that weren't tracked")
# Apply limit if specified
if hasattr(self, 'download_limit') and self.download_limit:
videos_to_download = videos_to_download[:self.download_limit]
print(f"📥 Videos to download: {len(videos_to_download)} (limited to {self.download_limit})")
else:
print(f"📥 Videos to download: {len(videos_to_download)}")
if not videos_to_download:
print("✅ All videos already downloaded!")
return True
# Build yt-dlp command for each video
for i, video in enumerate(videos_to_download, 1):
print(f"\n🎵 Downloading {i} of {len(videos_to_download)}: {video['title']}")
# Update status to downloading
self.tracker.update_song_status(
video['video_id'],
channel_id,
SongStatus.DOWNLOADING
)
# Build yt-dlp command for this specific video using config
download_settings = self.config["download_settings"]
cmd = [
str(self.yt_dlp_path),
"--format", download_settings["format"],
"--output", str(downloads_dir / "%(title)s.%(ext)s"),
]
# Only add metadata options if enabled in config
if download_settings.get("add_metadata", False):
cmd.append("--add-metadata")
if download_settings.get("embed_metadata", False):
cmd.append("--embed-metadata")
if download_settings.get("write_metadata", False):
cmd.append("--write-info-json")
# Add optional settings based on config (only essential ones)
if download_settings.get("no_overwrites", True):
cmd.append("--no-overwrites")
if download_settings.get("continue_downloads", True):
cmd.append("--continue")
if download_settings.get("ignore_errors", True):
cmd.append("--ignore-errors")
if download_settings.get("no_warnings", False):
cmd.append("--no-warnings")
# Add progress and display options
cmd.extend([
"--progress",
"--newline",
"--console-title",
f"https://www.youtube.com/watch?v={video['video_id']}"
])
try:
print(f"🚀 Starting download for: {video['title']}")
logger.info(f"Starting download for video ID: {video['video_id']}")
# Run yt-dlp
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
universal_newlines=True
)
# Process output in real-time
for line in process.stdout:
line = line.strip()
if line:
print(line)
logger.info(line)
process.wait()
# Check if download was successful (more lenient - if MP4 exists, consider success)
downloaded_file = None
# Look for the file in the channel directory by title (since yt-dlp uses title as filename)
for file_path in downloads_dir.glob(f"*.mp4"):
if file_path.is_file():
# Check if this file matches the video title
file_name = file_path.stem # filename without extension
if file_name == video['title']:
downloaded_file = file_path
break
# If not found, also check the root downloads directory
if not downloaded_file:
for file_path in self.downloads_dir.glob(f"*.mp4"):
if file_path.is_file():
# Check if this file matches the video title
file_name = file_path.stem # filename without extension
if file_name == video['title']:
downloaded_file = file_path
break
if downloaded_file:
# MP4 file exists, consider download successful
format_type = FormatType.MP4
# Update tracking with successful download
self.tracker.update_song_status(
video['video_id'],
channel_id,
SongStatus.DOWNLOADED,
format_type,
downloaded_file
)
print(f"✅ Successfully downloaded: {video['title']} ({format_type})")
logger.info(f"Successfully downloaded: {video['title']} ({format_type})")
# Add ID3 tags to the downloaded file
self._add_id3_tags(downloaded_file, video['title'], channel_name)
# Clean up extra files (info.json, meta files)
self._cleanup_extra_files(downloaded_file)
# Mark as downloaded in songlist tracking if it's a songlist song
if hasattr(self, 'use_songlist_priority') and self.use_songlist_priority:
artist, title = self._extract_artist_title(video['title'])
self._mark_songlist_song_downloaded(artist, title, channel_name, downloaded_file)
print(f"📋 Marked songlist song as downloaded: {artist} - {title}")
else:
# No MP4 file found, mark as failed
self.tracker.update_song_status(
video['video_id'],
channel_id,
SongStatus.FAILED,
error_message=f"Download failed - no MP4 file found (return code: {process.returncode})"
)
print(f"❌ Download failed for: {video['title']} (return code: {process.returncode})")
logger.error(f"Download failed for: {video['title']} (return code: {process.returncode})")
except subprocess.CalledProcessError as e:
error_msg = f"Error running yt-dlp for {video['title']}: {e}"
print(f"{error_msg}")
logger.error(error_msg)
self.tracker.update_song_status(
video['video_id'],
channel_id,
SongStatus.FAILED,
error_message=str(e)
)
except KeyboardInterrupt:
print("\n⏹️ Download interrupted by user")
logger.info("Download interrupted by user")
# Mark current video as partial
self.tracker.update_song_status(
video['video_id'],
channel_id,
SongStatus.PARTIAL
)
return False
# Force save tracking data
self.tracker.force_save()
# Show final statistics
stats = self.tracker.get_statistics()
print(f"\n📊 Download Statistics:")
print(f" Total songs: {stats['total_songs']}")
print(f" Downloaded: {stats['downloaded_songs']}")
print(f" Failed: {stats['failed_songs']}")
print(f" Partial: {stats['partial_songs']}")
print(f" Total size: {stats['total_size_mb']} MB")
return True
def download_playlist(self, playlist_url):
"""Download all videos from a YouTube playlist (backward compatibility)."""
return self.download_channel_videos(playlist_url)
def download_from_file(self, file_path, force_refresh=False):
"""Download multiple playlists from a text file."""
file_path = Path(file_path)
if not file_path.exists():
print(f"❌ File not found: {file_path}")
return False
print(f"📄 Reading playlists from: {file_path}")
with open(file_path, 'r') as f:
playlist_urls = [line.strip() for line in f if line.strip()]
print(f"📋 Found {len(playlist_urls)} URL(s) to download")
success_count = 0
for i, url in enumerate(playlist_urls, 1):
print(f"\n{'='*50}")
print(f"📥 Processing URL {i}/{len(playlist_urls)}: {url}")
print(f"{'='*50}")
if self.download_channel_videos(url, force_refresh=force_refresh):
success_count += 1
print(f"\n🎉 Download summary: {success_count}/{len(playlist_urls)} URLs completed successfully")
# Force save any pending tracking data
self.tracker.force_save()
return success_count == len(playlist_urls)
def main():
parser = argparse.ArgumentParser(
description="Karaoke Video Downloader - Download YouTube playlists and channel videos for karaoke",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python download_karaoke.py https://www.youtube.com/playlist?list=XYZ
python download_karaoke.py https://www.youtube.com/@SingKingKaraoke/videos
python download_karaoke.py --file playlists.txt
python download_karaoke.py --limit 5 https://www.youtube.com/@SingKingKaraoke/videos
python download_karaoke.py --refresh https://www.youtube.com/@SingKingKaraoke/videos
python download_karaoke.py --cache-info
python download_karaoke.py --clear-cache all
"""
)
parser.add_argument(
'url',
nargs='?',
help='YouTube playlist or channel URL to download'
)
parser.add_argument(
'--file', '-f',
help='Text file containing playlist or channel URLs (one per line)'
)
parser.add_argument(
'--status', '-s',
action='store_true',
help='Show download status and statistics'
)
parser.add_argument(
'--report', '-r',
metavar='PLAYLIST_ID',
help='Generate a detailed report for a specific playlist'
)
parser.add_argument(
'--sync', '--sync-only',
action='store_true',
help='Only sync playlist without downloading (update tracking)'
)
parser.add_argument(
'--cleanup',
action='store_true',
help='Clean up orphaned tracking entries'
)
parser.add_argument(
'--resolution', '--res',
choices=['480p', '720p', '1080p', '1440p', '2160p'],
default='720p',
help='Preferred video resolution (default: 720p)'
)
parser.add_argument(
'--limit', '-l',
type=int,
help='Limit the number of videos to download (e.g., --limit 10)'
)
parser.add_argument(
'--refresh', '--force-refresh',
action='store_true',
help='Force refresh channel cache (ignore cached data)'
)
parser.add_argument(
'--cache-info',
action='store_true',
help='Show channel cache information'
)
parser.add_argument(
'--clear-cache',
metavar='CHANNEL_ID',
nargs='?',
const='all',
help='Clear cache for specific channel or all channels (use --clear-cache all)'
)
parser.add_argument(
'--cache-duration',
type=int,
metavar='HOURS',
help='Set cache duration in hours (default: 24)'
)
parser.add_argument(
'--songlist-priority',
action='store_true',
help='Prioritize downloads based on docs/songList.json (default: enabled)'
)
parser.add_argument(
'--no-songlist-priority',
action='store_true',
help='Disable songlist prioritization'
)
parser.add_argument(
'--songlist-only',
action='store_true',
help='Only download songs that are in the songlist (skip all others)'
)
parser.add_argument(
'--songlist-status',
action='store_true',
help='Show songlist download status and statistics'
)
parser.add_argument(
'--version', '-v',
action='version',
version='Karaoke Playlist Downloader v1.0'
)
args = parser.parse_args()
# Check if yt-dlp.exe exists
yt_dlp_path = Path("downloader/yt-dlp.exe")
if not yt_dlp_path.exists():
print("❌ Error: yt-dlp.exe not found in downloader/ directory")
print("Please ensure yt-dlp.exe is present in the downloader/ folder")
sys.exit(1)
# Initialize downloader
downloader = KaraokeDownloader()
# Set download limit if specified
if args.limit:
downloader.download_limit = args.limit
print(f"🎯 Download limit set to: {args.limit} videos")
# Set songlist priority setting
if args.no_songlist_priority:
downloader.use_songlist_priority = False
print("🎯 Songlist prioritization disabled")
else:
downloader.use_songlist_priority = True
print("🎯 Songlist prioritization enabled")
# Set songlist-only mode
if args.songlist_only:
downloader.songlist_only = True
print("🎯 Songlist-only mode enabled (will only download songlist songs)")
# Update resolution if specified
if args.resolution != '720p':
resolution_map = {
'480p': '480',
'720p': '720',
'1080p': '1080',
'1440p': '1440',
'2160p': '2160'
}
height = resolution_map[args.resolution]
downloader.config["download_settings"]["format"] = f"best[height<={height}][ext=mp4]/best[height<={height}]/best[ext=mp4]/best"
downloader.config["download_settings"]["preferred_resolution"] = args.resolution
print(f"🎬 Using resolution: {args.resolution}")
# Process arguments
if args.status:
# Show status and statistics
stats = downloader.tracker.get_statistics()
print("🎤 Karaoke Downloader Status")
print("=" * 40)
print(f"Total Songs: {stats['total_songs']}")
print(f"Total Playlists: {stats['total_playlists']}")
print(f"Downloaded Songs: {stats['downloaded_songs']}")
print(f"Failed Songs: {stats['failed_songs']}")
print(f"Partial Downloads: {stats['partial_songs']}")
print(f"Total Size: {stats['total_size_mb']} MB")
print(f"Last Updated: {stats['last_updated']}")
sys.exit(0)
elif args.cache_info:
# Show cache information
cache_info = downloader.tracker.get_cache_info()
print("💾 Channel Cache Information")
print("=" * 40)
print(f"Total Channels: {cache_info['total_channels']}")
print(f"Total Cached Videos: {cache_info['total_cached_videos']}")
print(f"Cache Duration: {cache_info['cache_duration_hours']} hours")
print(f"Last Updated: {cache_info['last_updated']}")
sys.exit(0)
elif args.clear_cache:
# Clear cache
if args.clear_cache == 'all':
downloader.tracker.clear_channel_cache()
print("🧹 Cleared all channel caches")
else:
downloader.tracker.clear_channel_cache(args.clear_cache)
print(f"🧹 Cleared cache for channel: {args.clear_cache}")
sys.exit(0)
elif args.cache_duration:
# Set cache duration
downloader.tracker.set_cache_duration(args.cache_duration)
print(f"⏰ Cache duration set to {args.cache_duration} hours")
sys.exit(0)
elif args.report:
# Generate playlist report
report = downloader.tracker.export_playlist_report(args.report)
print(report)
sys.exit(0)
elif args.cleanup:
# Clean up orphaned entries
orphaned = downloader.tracker.cleanup_orphaned_files(downloader.downloads_dir)
print(f"🧹 Cleaned up {len(orphaned)} orphaned tracking entries")
sys.exit(0)
elif args.songlist_status:
# Show songlist status and statistics
songlist = downloader._load_songlist()
tracking = downloader.songlist_tracking
print("📋 Songlist Download Status")
print("=" * 40)
print(f"Total songs in list: {len(songlist)}")
print(f"Songs downloaded: {len(tracking)}")
print(f"Songs remaining: {len(songlist) - len(tracking)}")
print(f"Progress: {(len(tracking) / len(songlist) * 100):.1f}%")
if tracking:
print(f"\n📁 Downloaded songs:")
for key, info in list(tracking.items())[:10]: # Show first 10
print(f"{info['artist']} - {info['title']} (from {info['channel']})")
if len(tracking) > 10:
print(f" ... and {len(tracking) - 10} more")
sys.exit(0)
elif args.file:
success = downloader.download_from_file(args.file, force_refresh=args.refresh)
elif args.url:
success = downloader.download_channel_videos(args.url, force_refresh=args.refresh)
else:
parser.print_help()
sys.exit(1)
# Force save any pending tracking data before exit
downloader.tracker.force_save()
if success:
print("\n🎤 All downloads completed successfully!")
sys.exit(0)
else:
print("\n⚠️ Some downloads may have failed. Check the logs for details.")
sys.exit(1)
if __name__ == "__main__":
main()

BIN
downloader/yt-dlp.exe Normal file

Binary file not shown.

263
manage_tracking.py Normal file
View File

@ -0,0 +1,263 @@
#!/usr/bin/env python3
"""
Tracking Management Utility for Karaoke Playlist Downloader
Provides tools to manage and analyze the tracking database.
"""
import argparse
import json
from pathlib import Path
from tracking_manager import TrackingManager, SongStatus
import sys
def show_statistics(tracker):
"""Show overall statistics."""
stats = tracker.get_statistics()
print("🎤 Karaoke Downloader Statistics")
print("=" * 50)
print(f"📊 Total Songs: {stats['total_songs']}")
print(f"📁 Total Playlists: {stats['total_playlists']}")
print(f"✅ Downloaded Songs: {stats['downloaded_songs']}")
print(f"❌ Failed Songs: {stats['failed_songs']}")
print(f"⚠️ Partial Downloads: {stats['partial_songs']}")
print(f"💾 Total Size: {stats['total_size_mb']} MB")
print(f"🕒 Last Updated: {stats['last_updated']}")
if stats['total_songs'] > 0:
success_rate = (stats['downloaded_songs'] / stats['total_songs']) * 100
print(f"📈 Success Rate: {success_rate:.1f}%")
def list_playlists(tracker):
"""List all playlists with their statistics."""
playlists = tracker.data['playlists']
if not playlists:
print("📭 No playlists found in tracking database.")
return
print("📋 Playlists in Database")
print("=" * 50)
for playlist_id, playlist in playlists.items():
print(f"\n🎵 {playlist['name']}")
print(f" ID: {playlist_id}")
print(f" URL: {playlist['url']}")
print(f" Songs: {playlist['total_songs']}")
print(f" Downloaded: {playlist['downloaded_songs']}")
print(f" Failed: {playlist['failed_songs']}")
print(f" Added: {playlist['added_date']}")
print(f" Last Synced: {playlist['last_synced'] or 'Never'}")
def show_playlist_details(tracker, playlist_id):
"""Show detailed information about a specific playlist."""
if playlist_id not in tracker.data['playlists']:
print(f"❌ Playlist '{playlist_id}' not found in tracking database.")
return
playlist = tracker.data['playlists'][playlist_id]
songs = tracker.get_playlist_songs(playlist_id)
print(f"🎵 Playlist Details: {playlist['name']}")
print("=" * 50)
print(f"ID: {playlist_id}")
print(f"URL: {playlist['url']}")
print(f"Total Songs: {playlist['total_songs']}")
print(f"Downloaded: {playlist['downloaded_songs']}")
print(f"Failed: {playlist['failed_songs']}")
print(f"Added: {playlist['added_date']}")
print(f"Last Synced: {playlist['last_synced'] or 'Never'}")
print(f"\n📝 Songs:")
for i, song in enumerate(songs, 1):
status_icon = {
SongStatus.DOWNLOADED: "",
SongStatus.CONVERTED: "",
SongStatus.DOWNLOADING: "",
SongStatus.PARTIAL: "⚠️",
SongStatus.FAILED: "",
SongStatus.NOT_DOWNLOADED: "⏸️"
}.get(song['status'], "")
formats = ", ".join(song['formats'].keys()) if song['formats'] else "None"
print(f" {i:2d}. {status_icon} {song['title']}")
print(f" Video ID: {song['video_id']}")
print(f" Status: {song['status']}")
print(f" Formats: {formats}")
if song['last_error']:
print(f" Error: {song['last_error']}")
print()
def show_failed_songs(tracker, playlist_id=None):
"""Show all failed songs."""
if playlist_id:
songs = tracker.get_failed_songs(playlist_id)
if not songs:
print(f"✅ No failed songs found in playlist '{playlist_id}'.")
return
print(f"❌ Failed Songs in Playlist: {playlist_id}")
else:
songs = [song for song in tracker.data['songs'].values()
if song['status'] == SongStatus.FAILED]
if not songs:
print("✅ No failed songs found in any playlist.")
return
print("❌ All Failed Songs")
print("=" * 50)
for song in songs:
playlist_name = tracker.data['playlists'][song['playlist_id']]['name']
print(f"\n🎵 {song['title']}")
print(f" Playlist: {playlist_name}")
print(f" Video ID: {song['video_id']}")
print(f" Attempts: {song['download_attempts']}")
print(f" Error: {song['last_error']}")
print(f" Last Updated: {song['last_updated']}")
def show_partial_downloads(tracker, playlist_id=None):
"""Show all partial downloads."""
if playlist_id:
songs = tracker.get_partial_downloads(playlist_id)
if not songs:
print(f"✅ No partial downloads found in playlist '{playlist_id}'.")
return
print(f"⚠️ Partial Downloads in Playlist: {playlist_id}")
else:
songs = [song for song in tracker.data['songs'].values()
if song['status'] == SongStatus.PARTIAL]
if not songs:
print("✅ No partial downloads found in any playlist.")
return
print("⚠️ All Partial Downloads")
print("=" * 50)
for song in songs:
playlist_name = tracker.data['playlists'][song['playlist_id']]['name']
print(f"\n🎵 {song['title']}")
print(f" Playlist: {playlist_name}")
print(f" Video ID: {song['video_id']}")
print(f" Formats: {', '.join(song['formats'].keys())}")
print(f" Last Updated: {song['last_updated']}")
def cleanup_orphaned_entries(tracker, downloads_dir):
"""Clean up orphaned tracking entries."""
orphaned = tracker.cleanup_orphaned_files(downloads_dir)
if orphaned:
print(f"🧹 Cleaned up {len(orphaned)} orphaned tracking entries:")
for song_id in orphaned:
song = tracker.data['songs'].get(song_id)
if song:
print(f" - {song['title']} (ID: {song['video_id']})")
else:
print("✅ No orphaned entries found.")
def export_database(tracker, output_file):
"""Export the tracking database to a JSON file."""
try:
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(tracker.data, f, indent=2, ensure_ascii=False)
print(f"💾 Database exported to: {output_file}")
except Exception as e:
print(f"❌ Failed to export database: {e}")
def main():
parser = argparse.ArgumentParser(
description="Tracking Management Utility for Karaoke Playlist Downloader",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python manage_tracking.py --stats
python manage_tracking.py --list-playlists
python manage_tracking.py --playlist PLAYLIST_ID
python manage_tracking.py --failed
python manage_tracking.py --partial
python manage_tracking.py --cleanup
python manage_tracking.py --export backup.json
"""
)
parser.add_argument(
'--stats', '--statistics',
action='store_true',
help='Show overall statistics'
)
parser.add_argument(
'--list-playlists',
action='store_true',
help='List all playlists in the database'
)
parser.add_argument(
'--playlist',
metavar='PLAYLIST_ID',
help='Show detailed information about a specific playlist'
)
parser.add_argument(
'--failed',
action='store_true',
help='Show all failed songs'
)
parser.add_argument(
'--partial',
action='store_true',
help='Show all partial downloads'
)
parser.add_argument(
'--cleanup',
action='store_true',
help='Clean up orphaned tracking entries'
)
parser.add_argument(
'--export',
metavar='FILE',
help='Export tracking database to JSON file'
)
parser.add_argument(
'--tracking-file',
default='karaoke_tracking.json',
help='Path to tracking database file (default: karaoke_tracking.json)'
)
args = parser.parse_args()
# Initialize tracking manager
tracker = TrackingManager(args.tracking_file)
# Process commands
if args.stats:
show_statistics(tracker)
elif args.list_playlists:
list_playlists(tracker)
elif args.playlist:
show_playlist_details(tracker, args.playlist)
elif args.failed:
show_failed_songs(tracker)
elif args.partial:
show_partial_downloads(tracker)
elif args.cleanup:
downloads_dir = Path("downloads")
cleanup_orphaned_entries(tracker, downloads_dir)
elif args.export:
export_database(tracker, args.export)
else:
parser.print_help()
sys.exit(1)
if __name__ == "__main__":
main()

21
requirements.txt Normal file
View File

@ -0,0 +1,21 @@
# Karaoke Playlist Downloader Requirements
# Python 3.7+ required for:
# - pathlib.Path (3.4+)
# - f-strings (3.6+)
# - subprocess.run with capture_output (3.7+)
# Required packages:
mutagen>=1.45.0
# Standard library modules used:
# - os
# - sys
# - argparse
# - subprocess
# - json
# - re
# - pathlib
# - urllib.parse
# - logging
# - datetime

1
test_channel.txt Normal file
View File

@ -0,0 +1 @@
https://www.youtube.com/@SingKingKaraoke/videos

501
tracking_manager.py Normal file
View File

@ -0,0 +1,501 @@
#!/usr/bin/env python3
"""
Tracking Manager for Karaoke Playlist Downloader
Handles song tracking, multiple formats, partial downloads, and playlist synchronization.
"""
import json
import hashlib
import os
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Set
import logging
class SongStatus:
"""Enum-like class for song download statuses."""
NOT_DOWNLOADED = "not_downloaded"
DOWNLOADING = "downloading"
DOWNLOADED = "downloaded"
PARTIAL = "partial"
FAILED = "failed"
CONVERTING = "converting"
CONVERTED = "converted"
class FormatType:
"""Enum-like class for format types."""
MP4 = "mp4"
MP3 = "mp3"
M4A = "m4a"
WEBM = "webm"
class TrackingManager:
def __init__(self, tracking_file: str = "karaoke_tracking.json", cache_file: str = "channel_cache.json"):
self.tracking_file = Path(tracking_file)
self.cache_file = Path(cache_file)
# Cache settings (must be set before loading cache)
self.cache_duration_hours = 24 # Cache for 24 hours by default
self.data = self._load_tracking_data()
self.cache = self._load_cache_data()
self.logger = logging.getLogger(__name__)
self._batch_save_pending = False
self._save_timer = None
def _load_tracking_data(self) -> Dict:
"""Load tracking data from JSON file."""
if self.tracking_file.exists():
try:
with open(self.tracking_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# Ensure all required top-level keys exist
data.setdefault('version', '1.0')
data.setdefault('last_updated', datetime.now().isoformat())
data.setdefault('songs', {})
data.setdefault('playlists', {})
data.setdefault('formats', {})
data.setdefault('statistics', {})
return data
except (json.JSONDecodeError, FileNotFoundError) as e:
print(f"Warning: Could not load tracking file: {e}")
# If file is corrupted, backup and create new one
if self.tracking_file.exists():
backup_file = self.tracking_file.with_suffix('.json.backup')
try:
self.tracking_file.rename(backup_file)
print(f"Backed up corrupted file to: {backup_file}")
except Exception as backup_error:
print(f"Could not backup corrupted file: {backup_error}")
# Return default structure
return {
'version': '1.0',
'last_updated': datetime.now().isoformat(),
'songs': {},
'playlists': {},
'formats': {},
'statistics': {}
}
def _save_tracking_data(self, force_save: bool = False):
"""Save tracking data to JSON file with batch saving support."""
if not force_save:
# Schedule a delayed save instead of immediate save
self._batch_save_pending = True
return
self.data['last_updated'] = datetime.now().isoformat()
try:
with open(self.tracking_file, 'w', encoding='utf-8') as f:
json.dump(self.data, f, indent=2, ensure_ascii=False)
self._batch_save_pending = False
except Exception as e:
self.logger.error(f"Failed to save tracking data: {e}")
def force_save(self):
"""Force save the tracking data immediately."""
self._save_tracking_data(force_save=True)
def _load_cache_data(self) -> Dict:
"""Load channel cache data from JSON file."""
if self.cache_file.exists():
try:
with open(self.cache_file, 'r', encoding='utf-8') as f:
cache = json.load(f)
# Ensure required structure
cache.setdefault('channels', {})
cache.setdefault('settings', {
'cache_duration_hours': self.cache_duration_hours,
'last_updated': datetime.now().isoformat()
})
return cache
except (json.JSONDecodeError, FileNotFoundError) as e:
print(f"Warning: Could not load cache file: {e}")
# Return default cache structure
return {
'channels': {},
'settings': {
'cache_duration_hours': self.cache_duration_hours,
'last_updated': datetime.now().isoformat()
}
}
def _save_cache_data(self):
"""Save channel cache data to JSON file."""
try:
self.cache['settings']['last_updated'] = datetime.now().isoformat()
with open(self.cache_file, 'w', encoding='utf-8') as f:
json.dump(self.cache, f, indent=2, ensure_ascii=False)
except Exception as e:
self.logger.error(f"Failed to save cache data: {e}")
def get_cached_channel_videos(self, channel_id: str) -> Optional[List[Dict]]:
"""Get cached videos for a channel if cache is still valid."""
if channel_id not in self.cache['channels']:
return None
channel_cache = self.cache['channels'][channel_id]
cached_time = datetime.fromisoformat(channel_cache['cached_at'])
cache_duration = timedelta(hours=self.cache_duration_hours)
# Check if cache is still valid
if datetime.now() - cached_time < cache_duration:
self.logger.info(f"Using cached channel data for {channel_id} (cached {cached_time})")
return channel_cache['videos']
self.logger.info(f"Channel cache expired for {channel_id}, will refresh")
return None
def cache_channel_videos(self, channel_id: str, videos: List[Dict]):
"""Cache videos for a channel."""
self.cache['channels'][channel_id] = {
'videos': videos,
'cached_at': datetime.now().isoformat(),
'video_count': len(videos)
}
self._save_cache_data()
self.logger.info(f"Cached {len(videos)} videos for channel {channel_id}")
def clear_channel_cache(self, channel_id: Optional[str] = None):
"""Clear cache for a specific channel or all channels."""
if channel_id:
if channel_id in self.cache['channels']:
del self.cache['channels'][channel_id]
self._save_cache_data()
self.logger.info(f"Cleared cache for channel {channel_id}")
else:
self.cache['channels'] = {}
self._save_cache_data()
self.logger.info("Cleared all channel caches")
def set_cache_duration(self, hours: int):
"""Set cache duration in hours."""
self.cache_duration_hours = hours
self.cache['settings']['cache_duration_hours'] = hours
self._save_cache_data()
self.logger.info(f"Cache duration set to {hours} hours")
def get_cache_info(self) -> Dict:
"""Get cache information."""
total_channels = len(self.cache['channels'])
total_cached_videos = sum(channel['video_count'] for channel in self.cache['channels'].values())
return {
'total_channels': total_channels,
'total_cached_videos': total_cached_videos,
'cache_duration_hours': self.cache_duration_hours,
'last_updated': self.cache['settings']['last_updated']
}
def _generate_song_id(self, video_id: str, playlist_id: str) -> str:
"""Generate a unique song ID."""
return f"{video_id}_{playlist_id}"
def _generate_file_hash(self, file_path: Path) -> str:
"""Generate MD5 hash of file for integrity checking."""
try:
if file_path.exists():
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
except Exception as e:
self.logger.warning(f"Could not generate hash for {file_path}: {e}")
return ""
def add_playlist(self, playlist_id: str, playlist_name: str, playlist_url: str) -> Dict:
"""Add or update a playlist in tracking."""
playlist_data = {
'name': playlist_name,
'url': playlist_url,
'added_date': datetime.now().isoformat(),
'last_synced': None,
'total_songs': 0,
'downloaded_songs': 0,
'failed_songs': 0,
'song_ids': []
}
self.data['playlists'][playlist_id] = playlist_data
self._save_tracking_data()
return playlist_data
def add_song(self, video_id: str, playlist_id: str, title: str, duration: Optional[int] = None) -> Dict:
"""Add a song to tracking."""
song_id = self._generate_song_id(video_id, playlist_id)
song_data = {
'video_id': video_id,
'playlist_id': playlist_id,
'title': title,
'duration': duration,
'added_date': datetime.now().isoformat(),
'last_updated': datetime.now().isoformat(),
'status': SongStatus.NOT_DOWNLOADED,
'formats': {},
'file_paths': {},
'file_hashes': {},
'file_sizes': {},
'download_attempts': 0,
'last_error': None,
'metadata': {}
}
self.data['songs'][song_id] = song_data
# Add to playlist's song list
if playlist_id not in self.data['playlists']:
self.add_playlist(playlist_id, f"playlist_{playlist_id}", "")
if song_id not in self.data['playlists'][playlist_id]['song_ids']:
self.data['playlists'][playlist_id]['song_ids'].append(song_id)
self._update_playlist_statistics(playlist_id)
self._save_tracking_data()
return song_data
def update_song_status(self, video_id: str, playlist_id: str, status: str,
format_type: Optional[str] = None, file_path: Optional[Path] = None,
error_message: Optional[str] = None) -> bool:
"""Update song status and optionally add format information."""
song_id = self._generate_song_id(video_id, playlist_id)
if song_id not in self.data['songs']:
self.logger.warning(f"Song {song_id} not found in tracking")
return False
song = self.data['songs'][song_id]
song['status'] = status
song['last_updated'] = datetime.now().isoformat()
if error_message:
song['last_error'] = error_message
if status == SongStatus.DOWNLOADING:
song['download_attempts'] += 1
# Handle format-specific information
if format_type and file_path:
if format_type not in song['formats']:
song['formats'][format_type] = {
'status': status,
'file_path': str(file_path),
'file_size': file_path.stat().st_size if file_path.exists() else 0,
'file_hash': self._generate_file_hash(file_path),
'added_date': datetime.now().isoformat()
}
else:
song['formats'][format_type].update({
'status': status,
'file_path': str(file_path),
'file_size': file_path.stat().st_size if file_path.exists() else 0,
'file_hash': self._generate_file_hash(file_path)
})
song['file_paths'][format_type] = str(file_path)
song['file_hashes'][format_type] = song['formats'][format_type]['file_hash']
song['file_sizes'][format_type] = song['formats'][format_type]['file_size']
self._update_playlist_statistics(playlist_id)
self._save_tracking_data()
return True
def get_song_status(self, video_id: str, playlist_id: str) -> Optional[Dict]:
"""Get current status of a song."""
song_id = self._generate_song_id(video_id, playlist_id)
return self.data['songs'].get(song_id)
def is_song_downloaded(self, video_id: str, playlist_id: str, format_type: Optional[str] = None) -> bool:
"""Check if a song is downloaded in the specified format."""
song = self.get_song_status(video_id, playlist_id)
if not song:
return False
if format_type:
return (format_type in song['formats'] and
song['formats'][format_type]['status'] in [SongStatus.DOWNLOADED, SongStatus.CONVERTED])
# Check if any format is downloaded
return any(fmt['status'] in [SongStatus.DOWNLOADED, SongStatus.CONVERTED]
for fmt in song['formats'].values())
def get_playlist_songs(self, playlist_id: str) -> List[Dict]:
"""Get all songs for a playlist."""
if playlist_id not in self.data['playlists']:
return []
song_ids = self.data['playlists'][playlist_id]['song_ids']
return [self.data['songs'].get(song_id) for song_id in song_ids
if song_id in self.data['songs']]
def get_downloaded_songs(self, playlist_id: str, format_type: Optional[str] = None) -> List[Dict]:
"""Get all downloaded songs for a playlist."""
songs = self.get_playlist_songs(playlist_id)
if format_type:
return [song for song in songs if self.is_song_downloaded(song['video_id'], playlist_id, format_type)]
return [song for song in songs if self.is_song_downloaded(song['video_id'], playlist_id)]
def get_failed_songs(self, playlist_id: str) -> List[Dict]:
"""Get all failed songs for a playlist."""
songs = self.get_playlist_songs(playlist_id)
return [song for song in songs if song['status'] == SongStatus.FAILED]
def get_partial_downloads(self, playlist_id: str) -> List[Dict]:
"""Get songs with partial downloads."""
songs = self.get_playlist_songs(playlist_id)
return [song for song in songs if song['status'] == SongStatus.PARTIAL]
def sync_playlist(self, playlist_id: str, current_songs: List[Dict]) -> Dict:
"""Sync playlist with current songs from YouTube."""
if playlist_id not in self.data['playlists']:
return {'added': len(current_songs), 'removed': 0, 'total': len(current_songs)}
existing_song_ids = set(self.data['playlists'][playlist_id]['song_ids'])
current_song_ids = set()
added_songs = 0
for song_info in current_songs:
song_id = self._generate_song_id(song_info['video_id'], playlist_id)
current_song_ids.add(song_id)
if song_id not in existing_song_ids:
self.add_song(
song_info['video_id'],
playlist_id,
song_info['title'],
song_info.get('duration')
)
added_songs += 1
# Remove songs that no longer exist in playlist
removed_songs = len(existing_song_ids - current_song_ids)
for song_id in existing_song_ids - current_song_ids:
if song_id in self.data['songs']:
del self.data['songs'][song_id]
# Update playlist sync time
self.data['playlists'][playlist_id]['last_synced'] = datetime.now().isoformat()
self._update_playlist_statistics(playlist_id)
# Force save after batch operations
self.force_save()
return {
'added': added_songs,
'removed': removed_songs,
'total': len(current_songs)
}
def _update_playlist_statistics(self, playlist_id: str):
"""Update playlist statistics."""
if playlist_id not in self.data['playlists']:
return
songs = self.get_playlist_songs(playlist_id)
playlist = self.data['playlists'][playlist_id]
playlist['total_songs'] = len(songs)
playlist['downloaded_songs'] = len([s for s in songs if self.is_song_downloaded(s['video_id'], playlist_id)])
playlist['failed_songs'] = len([s for s in songs if s['status'] == SongStatus.FAILED])
def get_statistics(self) -> Dict:
"""Get overall statistics."""
total_songs = len(self.data['songs'])
total_playlists = len(self.data['playlists'])
downloaded_songs = sum(1 for song in self.data['songs'].values()
if self.is_song_downloaded(song['video_id'], song['playlist_id']))
failed_songs = sum(1 for song in self.data['songs'].values()
if song['status'] == SongStatus.FAILED)
partial_songs = sum(1 for song in self.data['songs'].values()
if song['status'] == SongStatus.PARTIAL)
total_size = 0
for song in self.data['songs'].values():
for size in song['file_sizes'].values():
total_size += size
return {
'total_songs': total_songs,
'total_playlists': total_playlists,
'downloaded_songs': downloaded_songs,
'failed_songs': failed_songs,
'partial_songs': partial_songs,
'total_size_bytes': total_size,
'total_size_mb': round(total_size / (1024 * 1024), 2),
'last_updated': self.data['last_updated']
}
def cleanup_orphaned_files(self, downloads_dir: Path) -> List[str]:
"""Remove tracking entries for files that no longer exist."""
orphaned_songs = []
for song_id, song in self.data['songs'].items():
for format_type, file_path in song['file_paths'].items():
if not Path(file_path).exists():
# Remove the format entry
if format_type in song['formats']:
del song['formats'][format_type]
if format_type in song['file_paths']:
del song['file_paths'][format_type]
if format_type in song['file_hashes']:
del song['file_hashes'][format_type]
if format_type in song['file_sizes']:
del song['file_sizes'][format_type]
# Update status if no formats remain
if not song['formats']:
song['status'] = SongStatus.NOT_DOWNLOADED
orphaned_songs.append(song_id)
self._save_tracking_data()
return orphaned_songs
def export_playlist_report(self, playlist_id: str, output_file: str = None) -> str:
"""Export a detailed report for a playlist."""
if playlist_id not in self.data['playlists']:
return "Playlist not found"
playlist = self.data['playlists'][playlist_id]
songs = self.get_playlist_songs(playlist_id)
report = f"""
Karaoke Playlist Report
======================
Playlist: {playlist['name']}
URL: {playlist['url']}
Total Songs: {playlist['total_songs']}
Downloaded: {playlist['downloaded_songs']}
Failed: {playlist['failed_songs']}
Last Synced: {playlist['last_synced'] or 'Never'}
Song Details:
"""
for song in songs:
status_icon = {
SongStatus.DOWNLOADED: "",
SongStatus.CONVERTED: "",
SongStatus.DOWNLOADING: "",
SongStatus.PARTIAL: "⚠️",
SongStatus.FAILED: "",
SongStatus.NOT_DOWNLOADED: "⏸️"
}.get(song['status'], "")
formats = ", ".join(song['formats'].keys()) if song['formats'] else "None"
report += f"{status_icon} {song['title']} ({formats})\n"
if output_file:
with open(output_file, 'w', encoding='utf-8') as f:
f.write(report)
return report

155
update_resolution.py Normal file
View File

@ -0,0 +1,155 @@
#!/usr/bin/env python3
"""
Resolution Update Utility for Karaoke Playlist Downloader
Easily update the preferred video resolution in the configuration.
"""
import json
import argparse
from pathlib import Path
def update_resolution(resolution):
"""Update the resolution in config.json file."""
config_file = Path("config.json")
# Load existing config or create default
if config_file.exists():
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
except json.JSONDecodeError:
print("❌ Error: Invalid config.json file")
return False
else:
print("📝 Creating new config.json file...")
config = {
"download_settings": {
"format": "best[height<=720][ext=mp4]/best[height<=720]/best[ext=mp4]/best",
"preferred_resolution": "720p",
"audio_format": "mp3",
"audio_quality": "0",
"subtitle_language": "en",
"subtitle_format": "srt",
"write_metadata": True,
"write_thumbnail": True,
"write_description": True,
"write_annotations": True,
"write_comments": True,
"write_subtitles": True,
"embed_metadata": True,
"add_metadata": True,
"continue_downloads": True,
"no_overwrites": True,
"ignore_errors": True,
"no_warnings": False
},
"folder_structure": {
"downloads_dir": "downloads",
"logs_dir": "logs",
"tracking_file": "karaoke_tracking.json"
},
"logging": {
"level": "INFO",
"format": "%(asctime)s - %(levelname)s - %(message)s",
"include_console": True,
"include_file": True
},
"yt_dlp_path": "downloader/yt-dlp.exe"
}
# Resolution mapping
resolution_map = {
'480p': '480',
'720p': '720',
'1080p': '1080',
'1440p': '1440',
'2160p': '2160'
}
if resolution not in resolution_map:
print(f"❌ Error: Invalid resolution '{resolution}'")
print(f"Valid options: {', '.join(resolution_map.keys())}")
return False
height = resolution_map[resolution]
old_resolution = config["download_settings"].get("preferred_resolution", "720p")
# Update the format string
config["download_settings"]["format"] = f"best[height<={height}][ext=mp4]/best[height<={height}]/best[ext=mp4]/best"
config["download_settings"]["preferred_resolution"] = resolution
# Save the updated config
try:
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2, ensure_ascii=False)
print(f"✅ Successfully updated resolution from {old_resolution} to {resolution}")
print(f"📝 Format string: {config['download_settings']['format']}")
return True
except Exception as e:
print(f"❌ Error saving config: {e}")
return False
def show_current_resolution():
"""Show the current resolution setting."""
config_file = Path("config.json")
if not config_file.exists():
print("📝 No config.json file found. Using default 720p resolution.")
return
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
current_resolution = config["download_settings"].get("preferred_resolution", "720p")
current_format = config["download_settings"].get("format", "best[height<=720][ext=mp4]/best[height<=720]/best[ext=mp4]/best")
print(f"🎬 Current resolution: {current_resolution}")
print(f"📝 Format string: {current_format}")
except json.JSONDecodeError:
print("❌ Error: Invalid config.json file")
except Exception as e:
print(f"❌ Error reading config: {e}")
def main():
parser = argparse.ArgumentParser(
description="Update video resolution for Karaoke Playlist Downloader",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python update_resolution.py --show
python update_resolution.py --resolution 1080p
python update_resolution.py --resolution 720p
"""
)
parser.add_argument(
'--resolution', '-r',
choices=['480p', '720p', '1080p', '1440p', '2160p'],
help='Set the preferred video resolution'
)
parser.add_argument(
'--show', '-s',
action='store_true',
help='Show current resolution setting'
)
args = parser.parse_args()
if args.show:
show_current_resolution()
elif args.resolution:
update_resolution(args.resolution)
else:
parser.print_help()
if __name__ == "__main__":
main()