musicbrainz-cleaner/src/cli/main.py

455 lines
19 KiB
Python

#!/usr/bin/env python3
"""
Command-line interface for MusicBrainz Data Cleaner.
Now uses direct database access with fuzzy search for better performance.
"""
import sys
import json
import time
from pathlib import Path
from typing import Dict, Optional, Any, Tuple, List
from datetime import datetime
# Import constants
from ..config.constants import (
DEFAULT_MUSICBRAINZ_URL,
API_REQUEST_DELAY,
PROGRESS_SEPARATOR,
SUCCESS_MESSAGES,
ERROR_MESSAGES,
ExitCode
)
# Import core components
from ..core.song_processor import SongProcessor
from ..core.factory import DataProviderFactory
class MusicBrainzCleaner:
"""Enhanced MusicBrainz Cleaner with interface-based data access."""
def __init__(self, use_database: bool = True, base_url: str = DEFAULT_MUSICBRAINZ_URL, force: bool = False):
self.use_database = use_database
self.base_url = base_url
self.force = force
# Create data provider using factory
self.data_provider = DataProviderFactory.create_provider(use_database, base_url)
# Initialize centralized song processor with the data provider
self.song_processor = SongProcessor(self.data_provider)
def clean_song(self, song: Dict[str, Any]) -> Tuple[Dict[str, Any], bool]:
"""
Clean a single song using the centralized song processor.
Returns (cleaned_song, success_status)
"""
song_copy = song.copy()
# If force flag is set, remove ALL existing MBIDs to force complete reprocessing
if self.force:
song_copy.pop('mbid', None)
song_copy.pop('recording_mbid', None)
return self.song_processor.clean_song(song_copy)
def process_songs(self, source_file: Path, output_success: Path = None, output_failure: Path = None, limit: Optional[int] = None) -> Dict[str, Any]:
"""
Process songs from source file and save successful and failed songs to separate files.
This is the main processing method that handles full dataset processing by default.
"""
if not source_file.exists():
print(f'❌ Source file not found: {source_file}')
return {}
print('🚀 Starting song processing...')
# Load songs
with open(source_file, 'r') as f:
all_songs = json.load(f)
if not isinstance(all_songs, list):
print("Error: Source file should contain a JSON array of songs")
return {}
# Apply limit if specified
if limit is not None:
all_songs = all_songs[:limit]
print(f"⚠️ Limiting processing to first {limit} songs")
total_songs = len(all_songs)
print(f'📊 Total songs to process: {total_songs:,}')
# Check if file is empty
if total_songs == 0:
print('⚠️ No songs to process - file is empty')
return {}
print(f'Using {"database" if self.use_database else "API"} connection')
if self.force:
print('🔄 Force mode enabled - ignoring all existing MBIDs')
print(PROGRESS_SEPARATOR)
# Initialize arrays for batch processing
successful_songs = []
failed_songs = []
# Statistics tracking
stats = {
'total_processed': 0,
'artists_found': 0,
'recordings_found': 0,
'start_time': time.time()
}
# Process each song
for i, song in enumerate(all_songs, 1):
try:
result = self.clean_song(song)
cleaned_song, success = result
artist_found = 'mbid' in cleaned_song
recording_found = 'recording_mbid' in cleaned_song
# Display progress with counter and status
artist_name = song.get('artist', 'Unknown')
title = song.get('title', 'Unknown')
if artist_found and recording_found:
stats['artists_found'] += 1
stats['recordings_found'] += 1
successful_songs.append(cleaned_song)
print(f'[{i:,} of {total_songs:,}] ✅ PASS: {artist_name} - {title}')
else:
# Keep the original song in failed_songs array (same format as source)
failed_songs.append(song)
print(f'[{i:,} of {total_songs:,}] ❌ FAIL: {artist_name} - {title}')
stats['total_processed'] += 1
# Progress update every 100 songs
if i % 100 == 0:
elapsed = time.time() - stats['start_time']
rate = i / elapsed if elapsed > 0 else 0
success_rate = (stats['artists_found'] / i * 100) if i > 0 else 0
print(f' 📈 Progress: {i:,}/{total_songs:,} ({i/total_songs*100:.1f}%) - '
f'Success: {success_rate:.1f}% - Rate: {rate:.1f} songs/sec')
except Exception as e:
print(f' ❌ Error processing song {i}: {e}')
# Keep the original song in failed_songs array
failed_songs.append(song)
stats['total_processed'] += 1
# Only add delay for API calls, not database queries
if not self.use_database:
time.sleep(API_REQUEST_DELAY)
# Determine output file paths
if output_success is None:
output_success = source_file.parent / f"{source_file.stem}-success.json"
if output_failure is None:
output_failure = source_file.parent / f"{source_file.stem}-failure.json"
# Save successful songs (array format, same as source)
with open(output_success, 'w', encoding='utf-8') as f:
json.dump(successful_songs, f, indent=2, ensure_ascii=False)
# Save failed songs (array format, same as source)
with open(output_failure, 'w', encoding='utf-8') as f:
json.dump(failed_songs, f, indent=2, ensure_ascii=False)
# Calculate final statistics
total_time = time.time() - stats['start_time']
# Create human-readable text report
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_file = source_file.parent / f"processing_report_{timestamp}.txt"
with open(report_file, 'w', encoding='utf-8') as f:
f.write("MusicBrainz Data Cleaner - Processing Report\n")
f.write("=" * 50 + "\n\n")
f.write(f"Source File: {source_file}\n")
f.write(f"Processing Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"Processing Time: {total_time:.1f} seconds\n\n")
f.write("SUMMARY\n")
f.write("-" * 20 + "\n")
f.write(f"Total Songs Processed: {stats['total_processed']:,}\n")
f.write(f"Successful Songs: {len(successful_songs):,}\n")
f.write(f"Failed Songs: {len(failed_songs):,}\n")
f.write(f"Success Rate: {(len(successful_songs)/stats['total_processed']*100):.1f}%" if stats['total_processed'] > 0 else "Success Rate: 0.0%")
f.write("\n\n")
f.write("DETAILED STATISTICS\n")
f.write("-" * 20 + "\n")
f.write(f"Artists Found: {stats['artists_found']:,}/{stats['total_processed']:,} ({(stats['artists_found']/stats['total_processed']*100):.1f}%" if stats['total_processed'] > 0 else f"Artists Found: {stats['artists_found']:,}/{stats['total_processed']:,} (0.0%")
f.write(")\n")
f.write(f"Recordings Found: {stats['recordings_found']:,}/{stats['total_processed']:,} ({(stats['recordings_found']/stats['total_processed']*100):.1f}%" if stats['total_processed'] > 0 else f"Recordings Found: {stats['recordings_found']:,}/{stats['total_processed']:,} (0.0%")
f.write(")\n")
f.write(f"Processing Speed: {stats['total_processed'] / total_time:.1f} songs/second" if total_time > 0 else "Processing Speed: 0.0 songs/second")
f.write("\n\n")
f.write("OUTPUT FILES\n")
f.write("-" * 20 + "\n")
f.write(f"Successful Songs: {output_success}\n")
f.write(f"Failed Songs: {output_failure}\n")
f.write(f"Report File: {report_file}\n\n")
if len(failed_songs) > 0:
f.write("FAILED SONGS (First 50)\n")
f.write("-" * 20 + "\n")
for i, song in enumerate(failed_songs[:50], 1):
artist = song.get('artist', 'Unknown')
title = song.get('title', 'Unknown')
f.write(f"{i:3d}. {artist} - {title}\n")
if len(failed_songs) > 50:
f.write(f"... and {len(failed_songs) - 50} more failed songs\n")
# Save detailed JSON report for programmatic access
json_report_file = source_file.parent / f"processing_report_{timestamp}.json"
final_stats = {
'summary': {
'total_tested': stats['total_processed'],
'artists_found': stats['artists_found'],
'recordings_found': stats['recordings_found'],
'failed_count': len(failed_songs),
'artist_success_rate': (stats['artists_found'] / stats['total_processed'] * 100) if stats['total_processed'] > 0 else 0,
'recording_success_rate': (stats['recordings_found'] / stats['total_processed'] * 100) if stats['total_processed'] > 0 else 0,
'processing_time_seconds': total_time,
'songs_per_second': stats['total_processed'] / total_time if total_time > 0 else 0
},
'files': {
'source': str(source_file),
'successful_songs': str(output_success),
'failed_songs': str(output_failure),
'text_report': str(report_file),
'json_report': str(json_report_file)
}
}
with open(json_report_file, 'w') as f:
json.dump(final_stats, f, indent=2)
print(f'\n{PROGRESS_SEPARATOR}')
print(f'🎉 Processing completed!')
print(f'📊 Final Results:')
print(f' ⏱️ Total processing time: {total_time:.1f} seconds')
print(f' 🚀 Average speed: {stats["total_processed"] / total_time:.1f} songs/second' if total_time > 0 else ' 🚀 Average speed: 0.0 songs/second')
print(f' ✅ Artists found: {stats["artists_found"]:,}/{stats["total_processed"]:,} ({stats["artists_found"]/stats["total_processed"]*100:.1f}%)' if stats["total_processed"] > 0 else f' ✅ Artists found: {stats["artists_found"]:,}/{stats["total_processed"]:,} (0.0%)')
print(f' ✅ Recordings found: {stats["recordings_found"]:,}/{stats["total_processed"]:,} ({stats["recordings_found"]/stats["total_processed"]*100:.1f}%)' if stats["total_processed"] > 0 else f' ✅ Recordings found: {stats["recordings_found"]:,}/{stats["total_processed"]:,} (0.0%)')
print(f' ❌ Failed songs: {len(failed_songs):,} ({len(failed_songs)/stats["total_processed"]*100:.1f}%)' if stats["total_processed"] > 0 else f' ❌ Failed songs: {len(failed_songs):,} (0.0%)')
print(f'📄 Files saved:')
print(f' ✅ Successful songs: {output_success}')
print(f' ❌ Failed songs: {output_failure}')
print(f' 📋 Text report: {report_file}')
print(f' 📊 JSON report: {json_report_file}')
return final_stats
def print_help() -> None:
help_text = """
MusicBrainz Data Cleaner - Clean and normalize song data using MusicBrainz
USAGE:
musicbrainz-cleaner [options]
OPTIONS:
--source FILE Source JSON file (default: data/songs.json)
--output-success FILE Output file for successful songs (default: source-success.json)
--output-failure FILE Output file for failed songs (default: source-failure.json)
--limit N Process only the first N songs (default: all songs)
--use-api Force use of HTTP API instead of direct database access
--force Force reprocessing, ignoring all existing MBIDs
--test-connection Test connection to MusicBrainz server
--help, -h Show this help message
--version, -v Show version information
EXAMPLES:
# Process all songs with default settings
musicbrainz-cleaner
# Process specific file
musicbrainz-cleaner --source data/my_songs.json
# Process with custom output files
musicbrainz-cleaner --source data/songs.json --output-success cleaned.json --output-failure failed.json
# Process only first 1000 songs
musicbrainz-cleaner --limit 1000
# Test connection
musicbrainz-cleaner --test-connection
# Force API mode
musicbrainz-cleaner --use-api
# Force reprocessing (ignore all existing MBIDs)
musicbrainz-cleaner --force
REQUIREMENTS:
- MusicBrainz server running on http://localhost:5001
- PostgreSQL database accessible (for direct database access)
- Python 3.6+ with requests, psycopg2-binary, and fuzzywuzzy libraries
PERFORMANCE:
- Direct database access is much faster than API calls
- Fuzzy search provides better matching for similar names
- No rate limiting with database access
"""
print(help_text)
def print_version() -> None:
version_info = """
MusicBrainz Data Cleaner v3.0.0
Enhanced with:
- Direct PostgreSQL database access
- Fuzzy search for better matching
- Improved performance and accuracy
- Separate output files for successful and failed songs
- Detailed progress tracking and reporting
Copyright (c) 2024 MusicBrainz Data Cleaner Contributors
MIT License - see LICENSE file for details
Built with Python 3.6+
"""
print(version_info)
def parse_arguments(args: List[str]) -> Dict[str, Any]:
"""Parse command line arguments into a dictionary"""
parsed = {
'source': 'data/songs.json',
'output_success': None,
'output_failure': None,
'limit': None,
'use_api': False,
'test_connection': False,
'force': False,
'help': False,
'version': False
}
i = 0
while i < len(args):
arg = args[i]
if arg in ['--help', '-h', 'help']:
parsed['help'] = True
elif arg in ['--version', '-v', 'version']:
parsed['version'] = True
elif arg == '--test-connection':
parsed['test_connection'] = True
elif arg == '--use-api':
parsed['use_api'] = True
elif arg == '--force':
parsed['force'] = True
elif arg == '--source':
if i + 1 < len(args) and not args[i + 1].startswith('--'):
parsed['source'] = args[i + 1]
i += 1
else:
print("Error: --source requires a file path")
sys.exit(ExitCode.USAGE_ERROR)
elif arg == '--output-success':
if i + 1 < len(args) and not args[i + 1].startswith('--'):
parsed['output_success'] = args[i + 1]
i += 1
else:
print("Error: --output-success requires a file path")
sys.exit(ExitCode.USAGE_ERROR)
elif arg == '--output-failure':
if i + 1 < len(args) and not args[i + 1].startswith('--'):
parsed['output_failure'] = args[i + 1]
i += 1
else:
print("Error: --output-failure requires a file path")
sys.exit(ExitCode.USAGE_ERROR)
elif arg == '--limit':
if i + 1 < len(args) and not args[i + 1].startswith('--'):
try:
parsed['limit'] = int(args[i + 1])
if parsed['limit'] <= 0:
print("Error: --limit must be a positive number")
sys.exit(ExitCode.USAGE_ERROR)
except ValueError:
print("Error: --limit requires a valid number")
sys.exit(ExitCode.USAGE_ERROR)
i += 1
else:
print("Error: --limit requires a number")
sys.exit(ExitCode.USAGE_ERROR)
i += 1
return parsed
def main() -> int:
try:
args = sys.argv[1:]
parsed = parse_arguments(args)
# Handle help and version flags
if parsed['help']:
print_help()
return ExitCode.SUCCESS
if parsed['version']:
print_version()
return ExitCode.SUCCESS
# Handle test connection
if parsed['test_connection']:
provider = DataProviderFactory.create_provider(not parsed['use_api'])
if provider.test_connection():
provider_type = "API server" if parsed['use_api'] else "database"
print(f"✅ Connection to MusicBrainz {provider_type} successful")
return ExitCode.SUCCESS
else:
provider_type = "API server" if parsed['use_api'] else "database"
print(f"❌ Connection to MusicBrainz {provider_type} failed")
return ExitCode.ERROR
# Process songs (main functionality)
source_file = Path(parsed['source'])
output_success = Path(parsed['output_success']) if parsed['output_success'] else None
output_failure = Path(parsed['output_failure']) if parsed['output_failure'] else None
if not source_file.exists():
print(f"Error: Source file does not exist: {source_file}")
return ExitCode.USAGE_ERROR
if not source_file.is_file():
print(f"Error: Source path is not a file: {source_file}")
return ExitCode.USAGE_ERROR
if source_file.suffix.lower() != '.json':
print(f"Error: Source file must be a JSON file: {source_file}")
return ExitCode.USAGE_ERROR
# Process the file
cleaner = MusicBrainzCleaner(use_database=not parsed['use_api'], force=parsed['force'])
cleaner.process_songs(source_file, output_success, output_failure, parsed['limit'])
return ExitCode.SUCCESS
except KeyboardInterrupt:
print("\nOperation cancelled by user")
return ExitCode.ERROR
except Exception as e:
print(f"Unexpected error: {e}")
return ExitCode.ERROR
if __name__ == "__main__":
sys.exit(main())