643 lines
28 KiB
Python
643 lines
28 KiB
Python
"""
|
|
Reporting and output generation for the Karaoke Song Library Cleanup Tool.
|
|
"""
|
|
from typing import Dict, List, Any
|
|
from collections import defaultdict, Counter
|
|
from utils import format_file_size, get_file_extension, extract_channel_from_path
|
|
|
|
|
|
class ReportGenerator:
|
|
"""Generates reports and statistics for the karaoke cleanup process."""
|
|
|
|
def __init__(self, config: Dict[str, Any]):
|
|
self.config = config
|
|
self.verbose = config.get('output', {}).get('verbose', False)
|
|
self.include_reasons = config.get('output', {}).get('include_reasons', True)
|
|
self.channel_priorities = config.get('channel_priorities', [])
|
|
|
|
def analyze_skip_patterns(self, skip_songs: List[Dict[str, Any]]) -> Dict[str, Any]:
|
|
"""Analyze patterns in the skip list to understand duplicate distribution."""
|
|
analysis = {
|
|
'total_skipped': len(skip_songs),
|
|
'file_type_distribution': defaultdict(int),
|
|
'channel_distribution': defaultdict(int),
|
|
'duplicate_reasons': defaultdict(int),
|
|
'kept_vs_skipped_channels': defaultdict(lambda: {'kept': 0, 'skipped': 0}),
|
|
'folder_patterns': defaultdict(int),
|
|
'artist_duplicate_counts': defaultdict(int),
|
|
'title_duplicate_counts': defaultdict(int)
|
|
}
|
|
|
|
for skip_song in skip_songs:
|
|
# File type analysis
|
|
ext = get_file_extension(skip_song['path'])
|
|
analysis['file_type_distribution'][ext] += 1
|
|
|
|
# Channel analysis for MP4s
|
|
if ext == '.mp4':
|
|
channel = extract_channel_from_path(skip_song['path'], self.channel_priorities)
|
|
if channel:
|
|
analysis['channel_distribution'][channel] += 1
|
|
analysis['kept_vs_skipped_channels'][channel]['skipped'] += 1
|
|
|
|
# Reason analysis
|
|
reason = skip_song.get('reason', 'unknown')
|
|
analysis['duplicate_reasons'][reason] += 1
|
|
|
|
# Folder pattern analysis
|
|
path_parts = skip_song['path'].split('\\')
|
|
if len(path_parts) > 1:
|
|
folder = path_parts[-2] # Second to last part (folder name)
|
|
analysis['folder_patterns'][folder] += 1
|
|
|
|
# Artist/Title duplicate counts
|
|
artist = skip_song.get('artist', 'Unknown')
|
|
title = skip_song.get('title', 'Unknown')
|
|
analysis['artist_duplicate_counts'][artist] += 1
|
|
analysis['title_duplicate_counts'][title] += 1
|
|
|
|
return analysis
|
|
|
|
def analyze_channel_optimization(self, stats: Dict[str, Any], skip_analysis: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Analyze channel priorities and suggest optimizations."""
|
|
analysis = {
|
|
'current_priorities': self.channel_priorities.copy(),
|
|
'priority_effectiveness': {},
|
|
'suggested_priorities': [],
|
|
'unused_channels': [],
|
|
'missing_channels': []
|
|
}
|
|
|
|
# Analyze effectiveness of current priorities
|
|
for channel in self.channel_priorities:
|
|
kept_count = stats['channel_breakdown'].get(channel, 0)
|
|
skipped_count = skip_analysis['kept_vs_skipped_channels'].get(channel, {}).get('skipped', 0)
|
|
total_count = kept_count + skipped_count
|
|
|
|
if total_count > 0:
|
|
effectiveness = kept_count / total_count
|
|
analysis['priority_effectiveness'][channel] = {
|
|
'kept': kept_count,
|
|
'skipped': skipped_count,
|
|
'total': total_count,
|
|
'effectiveness': effectiveness
|
|
}
|
|
|
|
# Find channels not in current priorities
|
|
all_channels = set(stats['channel_breakdown'].keys())
|
|
used_channels = set(self.channel_priorities)
|
|
analysis['unused_channels'] = list(all_channels - used_channels)
|
|
|
|
# Suggest priority order based on effectiveness
|
|
if analysis['priority_effectiveness']:
|
|
sorted_channels = sorted(
|
|
analysis['priority_effectiveness'].items(),
|
|
key=lambda x: x[1]['effectiveness'],
|
|
reverse=True
|
|
)
|
|
analysis['suggested_priorities'] = [channel for channel, _ in sorted_channels]
|
|
|
|
return analysis
|
|
|
|
def generate_enhanced_summary_report(self, stats: Dict[str, Any], skip_analysis: Dict[str, Any]) -> str:
|
|
"""Generate an enhanced summary report with detailed statistics."""
|
|
report = []
|
|
report.append("=" * 80)
|
|
report.append("ENHANCED KARAOKE SONG LIBRARY ANALYSIS REPORT")
|
|
report.append("=" * 80)
|
|
report.append("")
|
|
|
|
# Basic statistics
|
|
report.append("📊 BASIC STATISTICS")
|
|
report.append("-" * 40)
|
|
report.append(f"Total songs processed: {stats['total_songs']:,}")
|
|
report.append(f"Unique songs found: {stats['unique_songs']:,}")
|
|
report.append(f"Duplicates identified: {stats['duplicates_found']:,}")
|
|
report.append(f"Groups with duplicates: {stats['groups_with_duplicates']:,}")
|
|
|
|
if stats['duplicates_found'] > 0:
|
|
duplicate_percentage = (stats['duplicates_found'] / stats['total_songs']) * 100
|
|
report.append(f"Duplicate rate: {duplicate_percentage:.1f}%")
|
|
report.append("")
|
|
|
|
# File type analysis
|
|
report.append("📁 FILE TYPE ANALYSIS")
|
|
report.append("-" * 40)
|
|
total_files = sum(stats['file_type_breakdown'].values())
|
|
for ext, count in sorted(stats['file_type_breakdown'].items()):
|
|
percentage = (count / total_files) * 100
|
|
skipped_count = skip_analysis['file_type_distribution'].get(ext, 0)
|
|
kept_count = count - skipped_count
|
|
report.append(f"{ext}: {count:,} total ({percentage:.1f}%) - {kept_count:,} kept, {skipped_count:,} skipped")
|
|
report.append("")
|
|
|
|
# Channel analysis
|
|
if stats['channel_breakdown']:
|
|
report.append("🎵 CHANNEL ANALYSIS")
|
|
report.append("-" * 40)
|
|
for channel, count in sorted(stats['channel_breakdown'].items()):
|
|
skipped_count = skip_analysis['kept_vs_skipped_channels'].get(channel, {}).get('skipped', 0)
|
|
kept_count = count - skipped_count
|
|
effectiveness = (kept_count / count * 100) if count > 0 else 0
|
|
report.append(f"{channel}: {count:,} total - {kept_count:,} kept ({effectiveness:.1f}%), {skipped_count:,} skipped")
|
|
report.append("")
|
|
|
|
# Skip pattern analysis
|
|
report.append("🗑️ SKIP PATTERN ANALYSIS")
|
|
report.append("-" * 40)
|
|
report.append(f"Total files to skip: {skip_analysis['total_skipped']:,}")
|
|
|
|
# Top folders with most skips
|
|
top_folders = sorted(skip_analysis['folder_patterns'].items(), key=lambda x: x[1], reverse=True)[:10]
|
|
if top_folders:
|
|
report.append("Top folders with most duplicates:")
|
|
for folder, count in top_folders:
|
|
report.append(f" {folder}: {count:,} files")
|
|
report.append("")
|
|
|
|
# Duplicate reasons
|
|
if skip_analysis['duplicate_reasons']:
|
|
report.append("Duplicate reasons:")
|
|
for reason, count in skip_analysis['duplicate_reasons'].items():
|
|
percentage = (count / skip_analysis['total_skipped']) * 100
|
|
report.append(f" {reason}: {count:,} ({percentage:.1f}%)")
|
|
report.append("")
|
|
|
|
report.append("=" * 80)
|
|
return "\n".join(report)
|
|
|
|
def generate_channel_optimization_report(self, channel_analysis: Dict[str, Any]) -> str:
|
|
"""Generate a report with channel priority optimization suggestions."""
|
|
report = []
|
|
report.append("🔧 CHANNEL PRIORITY OPTIMIZATION ANALYSIS")
|
|
report.append("=" * 80)
|
|
report.append("")
|
|
|
|
# Current priorities
|
|
report.append("📋 CURRENT PRIORITIES")
|
|
report.append("-" * 40)
|
|
for i, channel in enumerate(channel_analysis['current_priorities'], 1):
|
|
effectiveness = channel_analysis['priority_effectiveness'].get(channel, {})
|
|
if effectiveness:
|
|
report.append(f"{i}. {channel} - {effectiveness['effectiveness']:.1%} effectiveness "
|
|
f"({effectiveness['kept']:,} kept, {effectiveness['skipped']:,} skipped)")
|
|
else:
|
|
report.append(f"{i}. {channel} - No data available")
|
|
report.append("")
|
|
|
|
# Effectiveness analysis
|
|
if channel_analysis['priority_effectiveness']:
|
|
report.append("📈 EFFECTIVENESS ANALYSIS")
|
|
report.append("-" * 40)
|
|
for channel, data in sorted(channel_analysis['priority_effectiveness'].items(),
|
|
key=lambda x: x[1]['effectiveness'], reverse=True):
|
|
report.append(f"{channel}: {data['effectiveness']:.1%} effectiveness "
|
|
f"({data['kept']:,} kept, {data['skipped']:,} skipped)")
|
|
report.append("")
|
|
|
|
# Suggested optimizations
|
|
if channel_analysis['suggested_priorities']:
|
|
report.append("💡 SUGGESTED OPTIMIZATIONS")
|
|
report.append("-" * 40)
|
|
report.append("Recommended priority order based on effectiveness:")
|
|
for i, channel in enumerate(channel_analysis['suggested_priorities'], 1):
|
|
report.append(f"{i}. {channel}")
|
|
report.append("")
|
|
|
|
# Unused channels
|
|
if channel_analysis['unused_channels']:
|
|
report.append("🔍 UNUSED CHANNELS")
|
|
report.append("-" * 40)
|
|
report.append("Channels found in your library but not in current priorities:")
|
|
for channel in channel_analysis['unused_channels']:
|
|
report.append(f" - {channel}")
|
|
report.append("")
|
|
|
|
report.append("=" * 80)
|
|
return "\n".join(report)
|
|
|
|
def generate_duplicate_pattern_report(self, skip_analysis: Dict[str, Any]) -> str:
|
|
"""Generate a report analyzing duplicate patterns."""
|
|
report = []
|
|
report.append("🔄 DUPLICATE PATTERN ANALYSIS")
|
|
report.append("=" * 80)
|
|
report.append("")
|
|
|
|
# Most duplicated artists
|
|
top_artists = sorted(skip_analysis['artist_duplicate_counts'].items(),
|
|
key=lambda x: x[1], reverse=True)[:20]
|
|
if top_artists:
|
|
report.append("🎤 ARTISTS WITH MOST DUPLICATES")
|
|
report.append("-" * 40)
|
|
for artist, count in top_artists:
|
|
report.append(f"{artist}: {count:,} duplicate files")
|
|
report.append("")
|
|
|
|
# Most duplicated titles
|
|
top_titles = sorted(skip_analysis['title_duplicate_counts'].items(),
|
|
key=lambda x: x[1], reverse=True)[:20]
|
|
if top_titles:
|
|
report.append("🎵 TITLES WITH MOST DUPLICATES")
|
|
report.append("-" * 40)
|
|
for title, count in top_titles:
|
|
report.append(f"{title}: {count:,} duplicate files")
|
|
report.append("")
|
|
|
|
# File type duplicate patterns
|
|
report.append("📁 DUPLICATE PATTERNS BY FILE TYPE")
|
|
report.append("-" * 40)
|
|
for ext, count in sorted(skip_analysis['file_type_distribution'].items()):
|
|
percentage = (count / skip_analysis['total_skipped']) * 100
|
|
report.append(f"{ext}: {count:,} files ({percentage:.1f}% of all duplicates)")
|
|
report.append("")
|
|
|
|
# Channel duplicate patterns
|
|
if skip_analysis['channel_distribution']:
|
|
report.append("🎵 DUPLICATE PATTERNS BY CHANNEL")
|
|
report.append("-" * 40)
|
|
for channel, count in sorted(skip_analysis['channel_distribution'].items(),
|
|
key=lambda x: x[1], reverse=True):
|
|
percentage = (count / skip_analysis['total_skipped']) * 100
|
|
report.append(f"{channel}: {count:,} files ({percentage:.1f}% of all duplicates)")
|
|
report.append("")
|
|
|
|
report.append("=" * 80)
|
|
return "\n".join(report)
|
|
|
|
def generate_actionable_insights_report(self, stats: Dict[str, Any], skip_analysis: Dict[str, Any],
|
|
channel_analysis: Dict[str, Any]) -> str:
|
|
"""Generate actionable insights and recommendations."""
|
|
report = []
|
|
report.append("💡 ACTIONABLE INSIGHTS & RECOMMENDATIONS")
|
|
report.append("=" * 80)
|
|
report.append("")
|
|
|
|
# Space savings
|
|
duplicate_percentage = (stats['duplicates_found'] / stats['total_songs']) * 100
|
|
report.append("💾 STORAGE OPTIMIZATION")
|
|
report.append("-" * 40)
|
|
report.append(f"• {duplicate_percentage:.1f}% of your library consists of duplicates")
|
|
report.append(f"• Removing {stats['duplicates_found']:,} duplicate files will significantly reduce storage")
|
|
report.append(f"• This represents a major opportunity for library cleanup")
|
|
report.append("")
|
|
|
|
# Channel priority recommendations
|
|
if channel_analysis['suggested_priorities']:
|
|
report.append("🎯 CHANNEL PRIORITY RECOMMENDATIONS")
|
|
report.append("-" * 40)
|
|
report.append("Consider updating your channel priorities to:")
|
|
for i, channel in enumerate(channel_analysis['suggested_priorities'][:5], 1):
|
|
report.append(f"{i}. Prioritize '{channel}' (highest effectiveness)")
|
|
|
|
if channel_analysis['unused_channels']:
|
|
report.append("")
|
|
report.append("Add these channels to your priorities:")
|
|
for channel in channel_analysis['unused_channels'][:5]:
|
|
report.append(f"• '{channel}'")
|
|
report.append("")
|
|
|
|
# File type insights
|
|
report.append("📁 FILE TYPE INSIGHTS")
|
|
report.append("-" * 40)
|
|
mp4_count = stats['file_type_breakdown'].get('.mp4', 0)
|
|
mp3_count = stats['file_type_breakdown'].get('.mp3', 0)
|
|
|
|
if mp4_count > 0:
|
|
mp4_percentage = (mp4_count / stats['total_songs']) * 100
|
|
report.append(f"• {mp4_percentage:.1f}% of your library is MP4 format (highest quality)")
|
|
|
|
if mp3_count > 0:
|
|
report.append("• You have MP3 files (including CDG/MP3 pairs) - the tool correctly handles them")
|
|
|
|
# Most problematic areas
|
|
top_folders = sorted(skip_analysis['folder_patterns'].items(), key=lambda x: x[1], reverse=True)[:5]
|
|
if top_folders:
|
|
report.append("")
|
|
report.append("🔍 AREAS NEEDING ATTENTION")
|
|
report.append("-" * 40)
|
|
report.append("Folders with the most duplicates:")
|
|
for folder, count in top_folders:
|
|
report.append(f"• '{folder}': {count:,} duplicate files")
|
|
report.append("")
|
|
|
|
report.append("=" * 80)
|
|
return "\n".join(report)
|
|
|
|
def generate_summary_report(self, stats: Dict[str, Any]) -> str:
|
|
"""Generate a summary report of the cleanup process."""
|
|
report = []
|
|
report.append("=" * 60)
|
|
report.append("KARAOKE SONG LIBRARY CLEANUP SUMMARY")
|
|
report.append("=" * 60)
|
|
report.append("")
|
|
|
|
# Basic statistics
|
|
report.append(f"Total songs processed: {stats['total_songs']:,}")
|
|
report.append(f"Unique songs found: {stats['unique_songs']:,}")
|
|
report.append(f"Duplicates identified: {stats['duplicates_found']:,}")
|
|
report.append(f"Groups with duplicates: {stats['groups_with_duplicates']:,}")
|
|
report.append("")
|
|
|
|
# File type breakdown
|
|
report.append("FILE TYPE BREAKDOWN:")
|
|
for ext, count in sorted(stats['file_type_breakdown'].items()):
|
|
percentage = (count / stats['total_songs']) * 100
|
|
report.append(f" {ext}: {count:,} ({percentage:.1f}%)")
|
|
report.append("")
|
|
|
|
# Channel breakdown (for MP4s)
|
|
if stats['channel_breakdown']:
|
|
report.append("MP4 CHANNEL BREAKDOWN:")
|
|
for channel, count in sorted(stats['channel_breakdown'].items()):
|
|
report.append(f" {channel}: {count:,}")
|
|
report.append("")
|
|
|
|
# Duplicate statistics
|
|
if stats['duplicates_found'] > 0:
|
|
duplicate_percentage = (stats['duplicates_found'] / stats['total_songs']) * 100
|
|
report.append(f"DUPLICATE ANALYSIS:")
|
|
report.append(f" Duplicate rate: {duplicate_percentage:.1f}%")
|
|
report.append(f" Space savings potential: Significant")
|
|
report.append("")
|
|
|
|
report.append("=" * 60)
|
|
return "\n".join(report)
|
|
|
|
def generate_channel_priority_report(self, stats: Dict[str, Any], channel_priorities: List[str]) -> str:
|
|
"""Generate a report about channel priority matching."""
|
|
report = []
|
|
report.append("CHANNEL PRIORITY ANALYSIS")
|
|
report.append("=" * 60)
|
|
report.append("")
|
|
|
|
# Count songs with and without defined channel priorities
|
|
total_mp4s = sum(count for ext, count in stats['file_type_breakdown'].items() if ext == '.mp4')
|
|
songs_with_priority = sum(stats['channel_breakdown'].values())
|
|
songs_without_priority = total_mp4s - songs_with_priority
|
|
|
|
report.append(f"MP4 files with defined channel priorities: {songs_with_priority:,}")
|
|
report.append(f"MP4 files without defined channel priorities: {songs_without_priority:,}")
|
|
report.append("")
|
|
|
|
if songs_without_priority > 0:
|
|
report.append("Note: Songs without defined channel priorities will be marked for manual review.")
|
|
report.append("Consider adding their folder names to the channel_priorities configuration.")
|
|
report.append("")
|
|
|
|
# Show channel priority order
|
|
report.append("Channel Priority Order (highest to lowest):")
|
|
for i, channel in enumerate(channel_priorities, 1):
|
|
report.append(f" {i}. {channel}")
|
|
report.append("")
|
|
|
|
return "\n".join(report)
|
|
|
|
def generate_duplicate_details(self, duplicate_info: List[Dict[str, Any]]) -> str:
|
|
"""Generate detailed report of duplicate groups."""
|
|
if not duplicate_info:
|
|
return "No duplicates found."
|
|
|
|
report = []
|
|
report.append("DETAILED DUPLICATE ANALYSIS")
|
|
report.append("=" * 60)
|
|
report.append("")
|
|
|
|
for i, group in enumerate(duplicate_info, 1):
|
|
report.append(f"Group {i}: {group['artist']} - {group['title']}")
|
|
report.append(f" Total versions: {group['total_versions']}")
|
|
report.append(" Versions:")
|
|
|
|
for version in group['versions']:
|
|
status = "✓ KEEP" if version['will_keep'] else "✗ SKIP"
|
|
channel_info = f" ({version['channel']})" if version['channel'] else ""
|
|
report.append(f" {status} {version['priority_rank']}. {version['path']}{channel_info}")
|
|
|
|
report.append("")
|
|
|
|
return "\n".join(report)
|
|
|
|
def generate_skip_list_summary(self, skip_songs: List[Dict[str, Any]]) -> str:
|
|
"""Generate a summary of the skip list."""
|
|
if not skip_songs:
|
|
return "No songs marked for skipping."
|
|
|
|
report = []
|
|
report.append("SKIP LIST SUMMARY")
|
|
report.append("=" * 60)
|
|
report.append("")
|
|
|
|
# Group by reason
|
|
reasons = {}
|
|
for skip_song in skip_songs:
|
|
reason = skip_song.get('reason', 'unknown')
|
|
if reason not in reasons:
|
|
reasons[reason] = []
|
|
reasons[reason].append(skip_song)
|
|
|
|
for reason, songs in reasons.items():
|
|
report.append(f"{reason.upper()} ({len(songs)} songs):")
|
|
for song in songs[:10]: # Show first 10
|
|
report.append(f" {song['artist']} - {song['title']}")
|
|
report.append(f" Path: {song['path']}")
|
|
if 'kept_version' in song:
|
|
report.append(f" Kept: {song['kept_version']}")
|
|
report.append("")
|
|
|
|
if len(songs) > 10:
|
|
report.append(f" ... and {len(songs) - 10} more")
|
|
report.append("")
|
|
|
|
return "\n".join(report)
|
|
|
|
def generate_config_summary(self, config: Dict[str, Any]) -> str:
|
|
"""Generate a summary of the current configuration."""
|
|
report = []
|
|
report.append("CURRENT CONFIGURATION")
|
|
report.append("=" * 60)
|
|
report.append("")
|
|
|
|
# Channel priorities
|
|
report.append("Channel Priorities (MP4 files):")
|
|
for i, channel in enumerate(config.get('channel_priorities', [])):
|
|
report.append(f" {i + 1}. {channel}")
|
|
report.append("")
|
|
|
|
# Matching settings
|
|
matching = config.get('matching', {})
|
|
report.append("Matching Settings:")
|
|
report.append(f" Case sensitive: {matching.get('case_sensitive', False)}")
|
|
report.append(f" Fuzzy matching: {matching.get('fuzzy_matching', False)}")
|
|
if matching.get('fuzzy_matching'):
|
|
report.append(f" Fuzzy threshold: {matching.get('fuzzy_threshold', 0.8)}")
|
|
report.append("")
|
|
|
|
# Output settings
|
|
output = config.get('output', {})
|
|
report.append("Output Settings:")
|
|
report.append(f" Verbose mode: {output.get('verbose', False)}")
|
|
report.append(f" Include reasons: {output.get('include_reasons', True)}")
|
|
report.append("")
|
|
|
|
return "\n".join(report)
|
|
|
|
def generate_progress_report(self, current: int, total: int, message: str = "") -> str:
|
|
"""Generate a progress report."""
|
|
percentage = (current / total) * 100 if total > 0 else 0
|
|
bar_length = 30
|
|
filled_length = int(bar_length * current // total)
|
|
bar = '█' * filled_length + '-' * (bar_length - filled_length)
|
|
|
|
progress_line = f"\r[{bar}] {percentage:.1f}% ({current:,}/{total:,})"
|
|
if message:
|
|
progress_line += f" - {message}"
|
|
|
|
return progress_line
|
|
|
|
def print_report(self, report_type: str, data: Any) -> None:
|
|
"""Print a formatted report to console."""
|
|
if report_type == "summary":
|
|
print(self.generate_summary_report(data))
|
|
elif report_type == "duplicates":
|
|
if self.verbose:
|
|
print(self.generate_duplicate_details(data))
|
|
elif report_type == "skip_summary":
|
|
print(self.generate_skip_list_summary(data))
|
|
elif report_type == "config":
|
|
print(self.generate_config_summary(data))
|
|
else:
|
|
print(f"Unknown report type: {report_type}")
|
|
|
|
def save_report_to_file(self, report_content: str, file_path: str) -> None:
|
|
"""Save a report to a text file."""
|
|
import os
|
|
os.makedirs(os.path.dirname(file_path), exist_ok=True)
|
|
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
f.write(report_content)
|
|
|
|
print(f"Report saved to: {file_path}")
|
|
|
|
def generate_detailed_duplicate_analysis(self, skip_songs: List[Dict[str, Any]], best_songs: List[Dict[str, Any]]) -> str:
|
|
"""Generate a detailed analysis showing specific songs and their duplicate versions."""
|
|
report = []
|
|
report.append("=" * 100)
|
|
report.append("DETAILED DUPLICATE ANALYSIS - WHAT'S ACTUALLY HAPPENING")
|
|
report.append("=" * 100)
|
|
report.append("")
|
|
|
|
# Group skip songs by artist/title to show duplicates together
|
|
duplicate_groups = {}
|
|
for skip_song in skip_songs:
|
|
artist = skip_song.get('artist', 'Unknown')
|
|
title = skip_song.get('title', 'Unknown')
|
|
key = f"{artist} - {title}"
|
|
|
|
if key not in duplicate_groups:
|
|
duplicate_groups[key] = {
|
|
'artist': artist,
|
|
'title': title,
|
|
'skipped_versions': [],
|
|
'kept_version': skip_song.get('kept_version', 'Unknown')
|
|
}
|
|
|
|
duplicate_groups[key]['skipped_versions'].append({
|
|
'path': skip_song['path'],
|
|
'reason': skip_song.get('reason', 'duplicate')
|
|
})
|
|
|
|
# Sort by number of duplicates (most duplicates first)
|
|
sorted_groups = sorted(duplicate_groups.items(),
|
|
key=lambda x: len(x[1]['skipped_versions']),
|
|
reverse=True)
|
|
|
|
report.append(f"📊 FOUND {len(duplicate_groups)} SONGS WITH DUPLICATES")
|
|
report.append("")
|
|
|
|
# Show top 20 most duplicated songs
|
|
report.append("🎵 TOP 20 MOST DUPLICATED SONGS:")
|
|
report.append("-" * 80)
|
|
|
|
for i, (key, group) in enumerate(sorted_groups[:20], 1):
|
|
num_duplicates = len(group['skipped_versions'])
|
|
report.append(f"{i:2d}. {key}")
|
|
report.append(f" 📁 KEPT: {group['kept_version']}")
|
|
report.append(f" 🗑️ SKIPPING {num_duplicates} duplicate(s):")
|
|
|
|
for j, version in enumerate(group['skipped_versions'][:5], 1): # Show first 5
|
|
report.append(f" {j}. {version['path']}")
|
|
|
|
if num_duplicates > 5:
|
|
report.append(f" ... and {num_duplicates - 5} more")
|
|
report.append("")
|
|
|
|
# Show some examples of different duplicate patterns
|
|
report.append("🔍 DUPLICATE PATTERNS EXAMPLES:")
|
|
report.append("-" * 80)
|
|
|
|
# Find examples of different duplicate scenarios
|
|
mp4_vs_mp4 = []
|
|
mp4_vs_cdg_mp3 = []
|
|
same_channel_duplicates = []
|
|
|
|
for key, group in sorted_groups:
|
|
skipped_paths = [v['path'] for v in group['skipped_versions']]
|
|
kept_path = group['kept_version']
|
|
|
|
# Check for MP4 vs MP4 duplicates
|
|
if (kept_path.endswith('.mp4') and
|
|
any(p.endswith('.mp4') for p in skipped_paths)):
|
|
mp4_vs_mp4.append(key)
|
|
|
|
# Check for MP4 vs CDG/MP3 duplicates
|
|
if (kept_path.endswith('.mp4') and
|
|
any(p.endswith('.mp3') or p.endswith('.cdg') for p in skipped_paths)):
|
|
mp4_vs_cdg_mp3.append(key)
|
|
|
|
# Check for same channel duplicates
|
|
kept_channel = self._extract_channel(kept_path)
|
|
if kept_channel and any(self._extract_channel(p) == kept_channel for p in skipped_paths):
|
|
same_channel_duplicates.append(key)
|
|
|
|
report.append("📁 MP4 vs MP4 Duplicates (different channels):")
|
|
for song in mp4_vs_mp4[:5]:
|
|
report.append(f" • {song}")
|
|
report.append("")
|
|
|
|
report.append("🎵 MP4 vs MP3 Duplicates (format differences):")
|
|
for song in mp4_vs_cdg_mp3[:5]:
|
|
report.append(f" • {song}")
|
|
report.append("")
|
|
|
|
report.append("🔄 Same Channel Duplicates (exact duplicates):")
|
|
for song in same_channel_duplicates[:5]:
|
|
report.append(f" • {song}")
|
|
report.append("")
|
|
|
|
# Show file type distribution in duplicates
|
|
report.append("📊 DUPLICATE FILE TYPE BREAKDOWN:")
|
|
report.append("-" * 80)
|
|
|
|
file_types = {'mp4': 0, 'mp3': 0}
|
|
for group in duplicate_groups.values():
|
|
for version in group['skipped_versions']:
|
|
path = version['path'].lower()
|
|
if path.endswith('.mp4'):
|
|
file_types['mp4'] += 1
|
|
elif path.endswith('.mp3') or path.endswith('.cdg'):
|
|
file_types['mp3'] += 1
|
|
|
|
total_duplicates = sum(file_types.values())
|
|
for file_type, count in file_types.items():
|
|
percentage = (count / total_duplicates * 100) if total_duplicates > 0 else 0
|
|
report.append(f" {file_type.upper()}: {count:,} files ({percentage:.1f}%)")
|
|
report.append("")
|
|
|
|
report.append("=" * 100)
|
|
return "\n".join(report)
|
|
|
|
def _extract_channel(self, path: str) -> str:
|
|
"""Extract channel name from path for analysis."""
|
|
for channel in self.channel_priorities:
|
|
if channel.lower() in path.lower():
|
|
return channel
|
|
return None |