KaraokeMerge/cli/report.py

643 lines
28 KiB
Python

"""
Reporting and output generation for the Karaoke Song Library Cleanup Tool.
"""
from typing import Dict, List, Any
from collections import defaultdict, Counter
from utils import format_file_size, get_file_extension, extract_channel_from_path
class ReportGenerator:
"""Generates reports and statistics for the karaoke cleanup process."""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.verbose = config.get('output', {}).get('verbose', False)
self.include_reasons = config.get('output', {}).get('include_reasons', True)
self.channel_priorities = config.get('channel_priorities', [])
def analyze_skip_patterns(self, skip_songs: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze patterns in the skip list to understand duplicate distribution."""
analysis = {
'total_skipped': len(skip_songs),
'file_type_distribution': defaultdict(int),
'channel_distribution': defaultdict(int),
'duplicate_reasons': defaultdict(int),
'kept_vs_skipped_channels': defaultdict(lambda: {'kept': 0, 'skipped': 0}),
'folder_patterns': defaultdict(int),
'artist_duplicate_counts': defaultdict(int),
'title_duplicate_counts': defaultdict(int)
}
for skip_song in skip_songs:
# File type analysis
ext = get_file_extension(skip_song['path'])
analysis['file_type_distribution'][ext] += 1
# Channel analysis for MP4s
if ext == '.mp4':
channel = extract_channel_from_path(skip_song['path'], self.channel_priorities)
if channel:
analysis['channel_distribution'][channel] += 1
analysis['kept_vs_skipped_channels'][channel]['skipped'] += 1
# Reason analysis
reason = skip_song.get('reason', 'unknown')
analysis['duplicate_reasons'][reason] += 1
# Folder pattern analysis
path_parts = skip_song['path'].split('\\')
if len(path_parts) > 1:
folder = path_parts[-2] # Second to last part (folder name)
analysis['folder_patterns'][folder] += 1
# Artist/Title duplicate counts
artist = skip_song.get('artist', 'Unknown')
title = skip_song.get('title', 'Unknown')
analysis['artist_duplicate_counts'][artist] += 1
analysis['title_duplicate_counts'][title] += 1
return analysis
def analyze_channel_optimization(self, stats: Dict[str, Any], skip_analysis: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze channel priorities and suggest optimizations."""
analysis = {
'current_priorities': self.channel_priorities.copy(),
'priority_effectiveness': {},
'suggested_priorities': [],
'unused_channels': [],
'missing_channels': []
}
# Analyze effectiveness of current priorities
for channel in self.channel_priorities:
kept_count = stats['channel_breakdown'].get(channel, 0)
skipped_count = skip_analysis['kept_vs_skipped_channels'].get(channel, {}).get('skipped', 0)
total_count = kept_count + skipped_count
if total_count > 0:
effectiveness = kept_count / total_count
analysis['priority_effectiveness'][channel] = {
'kept': kept_count,
'skipped': skipped_count,
'total': total_count,
'effectiveness': effectiveness
}
# Find channels not in current priorities
all_channels = set(stats['channel_breakdown'].keys())
used_channels = set(self.channel_priorities)
analysis['unused_channels'] = list(all_channels - used_channels)
# Suggest priority order based on effectiveness
if analysis['priority_effectiveness']:
sorted_channels = sorted(
analysis['priority_effectiveness'].items(),
key=lambda x: x[1]['effectiveness'],
reverse=True
)
analysis['suggested_priorities'] = [channel for channel, _ in sorted_channels]
return analysis
def generate_enhanced_summary_report(self, stats: Dict[str, Any], skip_analysis: Dict[str, Any]) -> str:
"""Generate an enhanced summary report with detailed statistics."""
report = []
report.append("=" * 80)
report.append("ENHANCED KARAOKE SONG LIBRARY ANALYSIS REPORT")
report.append("=" * 80)
report.append("")
# Basic statistics
report.append("📊 BASIC STATISTICS")
report.append("-" * 40)
report.append(f"Total songs processed: {stats['total_songs']:,}")
report.append(f"Unique songs found: {stats['unique_songs']:,}")
report.append(f"Duplicates identified: {stats['duplicates_found']:,}")
report.append(f"Groups with duplicates: {stats['groups_with_duplicates']:,}")
if stats['duplicates_found'] > 0:
duplicate_percentage = (stats['duplicates_found'] / stats['total_songs']) * 100
report.append(f"Duplicate rate: {duplicate_percentage:.1f}%")
report.append("")
# File type analysis
report.append("📁 FILE TYPE ANALYSIS")
report.append("-" * 40)
total_files = sum(stats['file_type_breakdown'].values())
for ext, count in sorted(stats['file_type_breakdown'].items()):
percentage = (count / total_files) * 100
skipped_count = skip_analysis['file_type_distribution'].get(ext, 0)
kept_count = count - skipped_count
report.append(f"{ext}: {count:,} total ({percentage:.1f}%) - {kept_count:,} kept, {skipped_count:,} skipped")
report.append("")
# Channel analysis
if stats['channel_breakdown']:
report.append("🎵 CHANNEL ANALYSIS")
report.append("-" * 40)
for channel, count in sorted(stats['channel_breakdown'].items()):
skipped_count = skip_analysis['kept_vs_skipped_channels'].get(channel, {}).get('skipped', 0)
kept_count = count - skipped_count
effectiveness = (kept_count / count * 100) if count > 0 else 0
report.append(f"{channel}: {count:,} total - {kept_count:,} kept ({effectiveness:.1f}%), {skipped_count:,} skipped")
report.append("")
# Skip pattern analysis
report.append("🗑️ SKIP PATTERN ANALYSIS")
report.append("-" * 40)
report.append(f"Total files to skip: {skip_analysis['total_skipped']:,}")
# Top folders with most skips
top_folders = sorted(skip_analysis['folder_patterns'].items(), key=lambda x: x[1], reverse=True)[:10]
if top_folders:
report.append("Top folders with most duplicates:")
for folder, count in top_folders:
report.append(f" {folder}: {count:,} files")
report.append("")
# Duplicate reasons
if skip_analysis['duplicate_reasons']:
report.append("Duplicate reasons:")
for reason, count in skip_analysis['duplicate_reasons'].items():
percentage = (count / skip_analysis['total_skipped']) * 100
report.append(f" {reason}: {count:,} ({percentage:.1f}%)")
report.append("")
report.append("=" * 80)
return "\n".join(report)
def generate_channel_optimization_report(self, channel_analysis: Dict[str, Any]) -> str:
"""Generate a report with channel priority optimization suggestions."""
report = []
report.append("🔧 CHANNEL PRIORITY OPTIMIZATION ANALYSIS")
report.append("=" * 80)
report.append("")
# Current priorities
report.append("📋 CURRENT PRIORITIES")
report.append("-" * 40)
for i, channel in enumerate(channel_analysis['current_priorities'], 1):
effectiveness = channel_analysis['priority_effectiveness'].get(channel, {})
if effectiveness:
report.append(f"{i}. {channel} - {effectiveness['effectiveness']:.1%} effectiveness "
f"({effectiveness['kept']:,} kept, {effectiveness['skipped']:,} skipped)")
else:
report.append(f"{i}. {channel} - No data available")
report.append("")
# Effectiveness analysis
if channel_analysis['priority_effectiveness']:
report.append("📈 EFFECTIVENESS ANALYSIS")
report.append("-" * 40)
for channel, data in sorted(channel_analysis['priority_effectiveness'].items(),
key=lambda x: x[1]['effectiveness'], reverse=True):
report.append(f"{channel}: {data['effectiveness']:.1%} effectiveness "
f"({data['kept']:,} kept, {data['skipped']:,} skipped)")
report.append("")
# Suggested optimizations
if channel_analysis['suggested_priorities']:
report.append("💡 SUGGESTED OPTIMIZATIONS")
report.append("-" * 40)
report.append("Recommended priority order based on effectiveness:")
for i, channel in enumerate(channel_analysis['suggested_priorities'], 1):
report.append(f"{i}. {channel}")
report.append("")
# Unused channels
if channel_analysis['unused_channels']:
report.append("🔍 UNUSED CHANNELS")
report.append("-" * 40)
report.append("Channels found in your library but not in current priorities:")
for channel in channel_analysis['unused_channels']:
report.append(f" - {channel}")
report.append("")
report.append("=" * 80)
return "\n".join(report)
def generate_duplicate_pattern_report(self, skip_analysis: Dict[str, Any]) -> str:
"""Generate a report analyzing duplicate patterns."""
report = []
report.append("🔄 DUPLICATE PATTERN ANALYSIS")
report.append("=" * 80)
report.append("")
# Most duplicated artists
top_artists = sorted(skip_analysis['artist_duplicate_counts'].items(),
key=lambda x: x[1], reverse=True)[:20]
if top_artists:
report.append("🎤 ARTISTS WITH MOST DUPLICATES")
report.append("-" * 40)
for artist, count in top_artists:
report.append(f"{artist}: {count:,} duplicate files")
report.append("")
# Most duplicated titles
top_titles = sorted(skip_analysis['title_duplicate_counts'].items(),
key=lambda x: x[1], reverse=True)[:20]
if top_titles:
report.append("🎵 TITLES WITH MOST DUPLICATES")
report.append("-" * 40)
for title, count in top_titles:
report.append(f"{title}: {count:,} duplicate files")
report.append("")
# File type duplicate patterns
report.append("📁 DUPLICATE PATTERNS BY FILE TYPE")
report.append("-" * 40)
for ext, count in sorted(skip_analysis['file_type_distribution'].items()):
percentage = (count / skip_analysis['total_skipped']) * 100
report.append(f"{ext}: {count:,} files ({percentage:.1f}% of all duplicates)")
report.append("")
# Channel duplicate patterns
if skip_analysis['channel_distribution']:
report.append("🎵 DUPLICATE PATTERNS BY CHANNEL")
report.append("-" * 40)
for channel, count in sorted(skip_analysis['channel_distribution'].items(),
key=lambda x: x[1], reverse=True):
percentage = (count / skip_analysis['total_skipped']) * 100
report.append(f"{channel}: {count:,} files ({percentage:.1f}% of all duplicates)")
report.append("")
report.append("=" * 80)
return "\n".join(report)
def generate_actionable_insights_report(self, stats: Dict[str, Any], skip_analysis: Dict[str, Any],
channel_analysis: Dict[str, Any]) -> str:
"""Generate actionable insights and recommendations."""
report = []
report.append("💡 ACTIONABLE INSIGHTS & RECOMMENDATIONS")
report.append("=" * 80)
report.append("")
# Space savings
duplicate_percentage = (stats['duplicates_found'] / stats['total_songs']) * 100
report.append("💾 STORAGE OPTIMIZATION")
report.append("-" * 40)
report.append(f"{duplicate_percentage:.1f}% of your library consists of duplicates")
report.append(f"• Removing {stats['duplicates_found']:,} duplicate files will significantly reduce storage")
report.append(f"• This represents a major opportunity for library cleanup")
report.append("")
# Channel priority recommendations
if channel_analysis['suggested_priorities']:
report.append("🎯 CHANNEL PRIORITY RECOMMENDATIONS")
report.append("-" * 40)
report.append("Consider updating your channel priorities to:")
for i, channel in enumerate(channel_analysis['suggested_priorities'][:5], 1):
report.append(f"{i}. Prioritize '{channel}' (highest effectiveness)")
if channel_analysis['unused_channels']:
report.append("")
report.append("Add these channels to your priorities:")
for channel in channel_analysis['unused_channels'][:5]:
report.append(f"'{channel}'")
report.append("")
# File type insights
report.append("📁 FILE TYPE INSIGHTS")
report.append("-" * 40)
mp4_count = stats['file_type_breakdown'].get('.mp4', 0)
mp3_count = stats['file_type_breakdown'].get('.mp3', 0)
if mp4_count > 0:
mp4_percentage = (mp4_count / stats['total_songs']) * 100
report.append(f"{mp4_percentage:.1f}% of your library is MP4 format (highest quality)")
if mp3_count > 0:
report.append("• You have MP3 files (including CDG/MP3 pairs) - the tool correctly handles them")
# Most problematic areas
top_folders = sorted(skip_analysis['folder_patterns'].items(), key=lambda x: x[1], reverse=True)[:5]
if top_folders:
report.append("")
report.append("🔍 AREAS NEEDING ATTENTION")
report.append("-" * 40)
report.append("Folders with the most duplicates:")
for folder, count in top_folders:
report.append(f"'{folder}': {count:,} duplicate files")
report.append("")
report.append("=" * 80)
return "\n".join(report)
def generate_summary_report(self, stats: Dict[str, Any]) -> str:
"""Generate a summary report of the cleanup process."""
report = []
report.append("=" * 60)
report.append("KARAOKE SONG LIBRARY CLEANUP SUMMARY")
report.append("=" * 60)
report.append("")
# Basic statistics
report.append(f"Total songs processed: {stats['total_songs']:,}")
report.append(f"Unique songs found: {stats['unique_songs']:,}")
report.append(f"Duplicates identified: {stats['duplicates_found']:,}")
report.append(f"Groups with duplicates: {stats['groups_with_duplicates']:,}")
report.append("")
# File type breakdown
report.append("FILE TYPE BREAKDOWN:")
for ext, count in sorted(stats['file_type_breakdown'].items()):
percentage = (count / stats['total_songs']) * 100
report.append(f" {ext}: {count:,} ({percentage:.1f}%)")
report.append("")
# Channel breakdown (for MP4s)
if stats['channel_breakdown']:
report.append("MP4 CHANNEL BREAKDOWN:")
for channel, count in sorted(stats['channel_breakdown'].items()):
report.append(f" {channel}: {count:,}")
report.append("")
# Duplicate statistics
if stats['duplicates_found'] > 0:
duplicate_percentage = (stats['duplicates_found'] / stats['total_songs']) * 100
report.append(f"DUPLICATE ANALYSIS:")
report.append(f" Duplicate rate: {duplicate_percentage:.1f}%")
report.append(f" Space savings potential: Significant")
report.append("")
report.append("=" * 60)
return "\n".join(report)
def generate_channel_priority_report(self, stats: Dict[str, Any], channel_priorities: List[str]) -> str:
"""Generate a report about channel priority matching."""
report = []
report.append("CHANNEL PRIORITY ANALYSIS")
report.append("=" * 60)
report.append("")
# Count songs with and without defined channel priorities
total_mp4s = sum(count for ext, count in stats['file_type_breakdown'].items() if ext == '.mp4')
songs_with_priority = sum(stats['channel_breakdown'].values())
songs_without_priority = total_mp4s - songs_with_priority
report.append(f"MP4 files with defined channel priorities: {songs_with_priority:,}")
report.append(f"MP4 files without defined channel priorities: {songs_without_priority:,}")
report.append("")
if songs_without_priority > 0:
report.append("Note: Songs without defined channel priorities will be marked for manual review.")
report.append("Consider adding their folder names to the channel_priorities configuration.")
report.append("")
# Show channel priority order
report.append("Channel Priority Order (highest to lowest):")
for i, channel in enumerate(channel_priorities, 1):
report.append(f" {i}. {channel}")
report.append("")
return "\n".join(report)
def generate_duplicate_details(self, duplicate_info: List[Dict[str, Any]]) -> str:
"""Generate detailed report of duplicate groups."""
if not duplicate_info:
return "No duplicates found."
report = []
report.append("DETAILED DUPLICATE ANALYSIS")
report.append("=" * 60)
report.append("")
for i, group in enumerate(duplicate_info, 1):
report.append(f"Group {i}: {group['artist']} - {group['title']}")
report.append(f" Total versions: {group['total_versions']}")
report.append(" Versions:")
for version in group['versions']:
status = "✓ KEEP" if version['will_keep'] else "✗ SKIP"
channel_info = f" ({version['channel']})" if version['channel'] else ""
report.append(f" {status} {version['priority_rank']}. {version['path']}{channel_info}")
report.append("")
return "\n".join(report)
def generate_skip_list_summary(self, skip_songs: List[Dict[str, Any]]) -> str:
"""Generate a summary of the skip list."""
if not skip_songs:
return "No songs marked for skipping."
report = []
report.append("SKIP LIST SUMMARY")
report.append("=" * 60)
report.append("")
# Group by reason
reasons = {}
for skip_song in skip_songs:
reason = skip_song.get('reason', 'unknown')
if reason not in reasons:
reasons[reason] = []
reasons[reason].append(skip_song)
for reason, songs in reasons.items():
report.append(f"{reason.upper()} ({len(songs)} songs):")
for song in songs[:10]: # Show first 10
report.append(f" {song['artist']} - {song['title']}")
report.append(f" Path: {song['path']}")
if 'kept_version' in song:
report.append(f" Kept: {song['kept_version']}")
report.append("")
if len(songs) > 10:
report.append(f" ... and {len(songs) - 10} more")
report.append("")
return "\n".join(report)
def generate_config_summary(self, config: Dict[str, Any]) -> str:
"""Generate a summary of the current configuration."""
report = []
report.append("CURRENT CONFIGURATION")
report.append("=" * 60)
report.append("")
# Channel priorities
report.append("Channel Priorities (MP4 files):")
for i, channel in enumerate(config.get('channel_priorities', [])):
report.append(f" {i + 1}. {channel}")
report.append("")
# Matching settings
matching = config.get('matching', {})
report.append("Matching Settings:")
report.append(f" Case sensitive: {matching.get('case_sensitive', False)}")
report.append(f" Fuzzy matching: {matching.get('fuzzy_matching', False)}")
if matching.get('fuzzy_matching'):
report.append(f" Fuzzy threshold: {matching.get('fuzzy_threshold', 0.8)}")
report.append("")
# Output settings
output = config.get('output', {})
report.append("Output Settings:")
report.append(f" Verbose mode: {output.get('verbose', False)}")
report.append(f" Include reasons: {output.get('include_reasons', True)}")
report.append("")
return "\n".join(report)
def generate_progress_report(self, current: int, total: int, message: str = "") -> str:
"""Generate a progress report."""
percentage = (current / total) * 100 if total > 0 else 0
bar_length = 30
filled_length = int(bar_length * current // total)
bar = '' * filled_length + '-' * (bar_length - filled_length)
progress_line = f"\r[{bar}] {percentage:.1f}% ({current:,}/{total:,})"
if message:
progress_line += f" - {message}"
return progress_line
def print_report(self, report_type: str, data: Any) -> None:
"""Print a formatted report to console."""
if report_type == "summary":
print(self.generate_summary_report(data))
elif report_type == "duplicates":
if self.verbose:
print(self.generate_duplicate_details(data))
elif report_type == "skip_summary":
print(self.generate_skip_list_summary(data))
elif report_type == "config":
print(self.generate_config_summary(data))
else:
print(f"Unknown report type: {report_type}")
def save_report_to_file(self, report_content: str, file_path: str) -> None:
"""Save a report to a text file."""
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(report_content)
print(f"Report saved to: {file_path}")
def generate_detailed_duplicate_analysis(self, skip_songs: List[Dict[str, Any]], best_songs: List[Dict[str, Any]]) -> str:
"""Generate a detailed analysis showing specific songs and their duplicate versions."""
report = []
report.append("=" * 100)
report.append("DETAILED DUPLICATE ANALYSIS - WHAT'S ACTUALLY HAPPENING")
report.append("=" * 100)
report.append("")
# Group skip songs by artist/title to show duplicates together
duplicate_groups = {}
for skip_song in skip_songs:
artist = skip_song.get('artist', 'Unknown')
title = skip_song.get('title', 'Unknown')
key = f"{artist} - {title}"
if key not in duplicate_groups:
duplicate_groups[key] = {
'artist': artist,
'title': title,
'skipped_versions': [],
'kept_version': skip_song.get('kept_version', 'Unknown')
}
duplicate_groups[key]['skipped_versions'].append({
'path': skip_song['path'],
'reason': skip_song.get('reason', 'duplicate')
})
# Sort by number of duplicates (most duplicates first)
sorted_groups = sorted(duplicate_groups.items(),
key=lambda x: len(x[1]['skipped_versions']),
reverse=True)
report.append(f"📊 FOUND {len(duplicate_groups)} SONGS WITH DUPLICATES")
report.append("")
# Show top 20 most duplicated songs
report.append("🎵 TOP 20 MOST DUPLICATED SONGS:")
report.append("-" * 80)
for i, (key, group) in enumerate(sorted_groups[:20], 1):
num_duplicates = len(group['skipped_versions'])
report.append(f"{i:2d}. {key}")
report.append(f" 📁 KEPT: {group['kept_version']}")
report.append(f" 🗑️ SKIPPING {num_duplicates} duplicate(s):")
for j, version in enumerate(group['skipped_versions'][:5], 1): # Show first 5
report.append(f" {j}. {version['path']}")
if num_duplicates > 5:
report.append(f" ... and {num_duplicates - 5} more")
report.append("")
# Show some examples of different duplicate patterns
report.append("🔍 DUPLICATE PATTERNS EXAMPLES:")
report.append("-" * 80)
# Find examples of different duplicate scenarios
mp4_vs_mp4 = []
mp4_vs_cdg_mp3 = []
same_channel_duplicates = []
for key, group in sorted_groups:
skipped_paths = [v['path'] for v in group['skipped_versions']]
kept_path = group['kept_version']
# Check for MP4 vs MP4 duplicates
if (kept_path.endswith('.mp4') and
any(p.endswith('.mp4') for p in skipped_paths)):
mp4_vs_mp4.append(key)
# Check for MP4 vs CDG/MP3 duplicates
if (kept_path.endswith('.mp4') and
any(p.endswith('.mp3') or p.endswith('.cdg') for p in skipped_paths)):
mp4_vs_cdg_mp3.append(key)
# Check for same channel duplicates
kept_channel = self._extract_channel(kept_path)
if kept_channel and any(self._extract_channel(p) == kept_channel for p in skipped_paths):
same_channel_duplicates.append(key)
report.append("📁 MP4 vs MP4 Duplicates (different channels):")
for song in mp4_vs_mp4[:5]:
report.append(f"{song}")
report.append("")
report.append("🎵 MP4 vs MP3 Duplicates (format differences):")
for song in mp4_vs_cdg_mp3[:5]:
report.append(f"{song}")
report.append("")
report.append("🔄 Same Channel Duplicates (exact duplicates):")
for song in same_channel_duplicates[:5]:
report.append(f"{song}")
report.append("")
# Show file type distribution in duplicates
report.append("📊 DUPLICATE FILE TYPE BREAKDOWN:")
report.append("-" * 80)
file_types = {'mp4': 0, 'mp3': 0}
for group in duplicate_groups.values():
for version in group['skipped_versions']:
path = version['path'].lower()
if path.endswith('.mp4'):
file_types['mp4'] += 1
elif path.endswith('.mp3') or path.endswith('.cdg'):
file_types['mp3'] += 1
total_duplicates = sum(file_types.values())
for file_type, count in file_types.items():
percentage = (count / total_duplicates * 100) if total_duplicates > 0 else 0
report.append(f" {file_type.upper()}: {count:,} files ({percentage:.1f}%)")
report.append("")
report.append("=" * 100)
return "\n".join(report)
def _extract_channel(self, path: str) -> str:
"""Extract channel name from path for analysis."""
for channel in self.channel_priorities:
if channel.lower() in path.lower():
return channel
return None