""" Song matching and deduplication logic for the Karaoke Song Library Cleanup Tool. """ from collections import defaultdict from typing import Dict, List, Any, Tuple, Optional import difflib try: from fuzzywuzzy import fuzz FUZZY_AVAILABLE = True except ImportError: FUZZY_AVAILABLE = False from utils import ( normalize_artist_title, extract_channel_from_path, extract_consolidated_channel_from_path, get_file_extension, parse_multi_artist, validate_song_data, find_mp3_pairs ) class SongMatcher: """Handles song matching and deduplication logic.""" def __init__(self, config: Dict[str, Any]): self.config = config self.channel_priorities = config.get('channel_priorities', []) self.case_sensitive = config.get('matching', {}).get('case_sensitive', False) self.fuzzy_matching = config.get('matching', {}).get('fuzzy_matching', False) self.fuzzy_threshold = config.get('matching', {}).get('fuzzy_threshold', 0.8) # Warn if fuzzy matching is enabled but not available if self.fuzzy_matching and not FUZZY_AVAILABLE: print("Warning: Fuzzy matching is enabled but fuzzywuzzy is not installed.") print("Install with: pip install fuzzywuzzy python-Levenshtein") self.fuzzy_matching = False def group_songs_by_artist_title(self, songs: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]: """Group songs by normalized artist-title combination with optional fuzzy matching.""" if not self.fuzzy_matching: # Use exact matching (original logic) groups = defaultdict(list) for song in songs: if not validate_song_data(song): continue # Handle multi-artist songs artists = parse_multi_artist(song['artist']) if not artists: artists = [song['artist']] # Create groups for each artist variation for artist in artists: normalized_key = normalize_artist_title(artist, song['title'], self.case_sensitive) groups[normalized_key].append(song) return dict(groups) else: # Use optimized fuzzy matching with progress indicator print("Using fuzzy matching - this may take a while for large datasets...") # First pass: group by exact matches exact_groups = defaultdict(list) ungrouped_songs = [] for i, song in enumerate(songs): if not validate_song_data(song): continue # Show progress every 1000 songs if i % 1000 == 0 and i > 0: print(f"Processing song {i:,}/{len(songs):,}...") # Handle multi-artist songs artists = parse_multi_artist(song['artist']) if not artists: artists = [song['artist']] # Try exact matching first added_to_exact = False for artist in artists: normalized_key = normalize_artist_title(artist, song['title'], self.case_sensitive) if normalized_key in exact_groups: exact_groups[normalized_key].append(song) added_to_exact = True break if not added_to_exact: ungrouped_songs.append(song) print(f"Exact matches found: {len(exact_groups)} groups") print(f"Songs requiring fuzzy matching: {len(ungrouped_songs)}") # Second pass: apply fuzzy matching to ungrouped songs fuzzy_groups = [] for i, song in enumerate(ungrouped_songs): if i % 100 == 0 and i > 0: print(f"Fuzzy matching song {i:,}/{len(ungrouped_songs):,}...") # Handle multi-artist songs artists = parse_multi_artist(song['artist']) if not artists: artists = [song['artist']] # Try to find an existing fuzzy group added_to_group = False for artist in artists: for group in fuzzy_groups: if group and self.should_group_songs( artist, song['title'], group[0]['artist'], group[0]['title'] ): group.append(song) added_to_group = True break if added_to_group: break # If no group found, create a new one if not added_to_group: fuzzy_groups.append([song]) # Combine exact and fuzzy groups result = dict(exact_groups) # Add fuzzy groups to result for group in fuzzy_groups: if group: first_song = group[0] key = normalize_artist_title(first_song['artist'], first_song['title'], self.case_sensitive) result[key] = group print(f"Total groups after fuzzy matching: {len(result)}") return result def fuzzy_match_strings(self, str1: str, str2: str) -> float: """Compare two strings using fuzzy matching if available.""" if not self.fuzzy_matching or not FUZZY_AVAILABLE: return 0.0 # Use fuzzywuzzy for comparison return fuzz.ratio(str1.lower(), str2.lower()) / 100.0 def should_group_songs(self, artist1: str, title1: str, artist2: str, title2: str) -> bool: """Determine if two songs should be grouped together based on matching settings.""" # Exact match check if (artist1.lower() == artist2.lower() and title1.lower() == title2.lower()): return True # Fuzzy matching check if self.fuzzy_matching and FUZZY_AVAILABLE: artist_similarity = self.fuzzy_match_strings(artist1, artist2) title_similarity = self.fuzzy_match_strings(title1, title2) # Both artist and title must meet threshold if artist_similarity >= self.fuzzy_threshold and title_similarity >= self.fuzzy_threshold: return True return False def get_channel_priority(self, file_path: str) -> int: """Get channel priority for files based on configured folder names.""" channel = extract_consolidated_channel_from_path(file_path, self.channel_priorities) if not channel: return len(self.channel_priorities) # Lowest priority if no channel found try: return self.channel_priorities.index(channel) except ValueError: return len(self.channel_priorities) # Lowest priority if channel not in config def select_best_song(self, songs: List[Dict[str, Any]]) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: """Select the best song from a group of duplicates and return the rest as skips.""" if len(songs) == 1: return songs[0], [] # Group songs into MP3 pairs and standalone files grouped = find_mp3_pairs(songs) # Priority order: MP4 > MP3 pairs > standalone MP3 best_song = None skip_songs = [] # 1. First priority: MP4 files (with channel priority) if grouped['standalone_mp4']: # Sort MP4s by channel priority (lower index = higher priority) grouped['standalone_mp4'].sort(key=lambda s: self.get_channel_priority(s['path'])) best_song = grouped['standalone_mp4'][0] skip_songs.extend(grouped['standalone_mp4'][1:]) # Skip all other formats when we have MP4 skip_songs.extend([song for pair in grouped['pairs'] for song in pair]) skip_songs.extend(grouped['standalone_mp3']) # 2. Second priority: MP3 pairs (CDG/MP3 pairs treated as MP3) elif grouped['pairs']: # For pairs, we'll keep the CDG file as the representative # (since CDG contains the lyrics/graphics) best_song = grouped['pairs'][0][0] # First pair's CDG file skip_songs.extend([song for pair in grouped['pairs'][1:] for song in pair]) skip_songs.extend(grouped['standalone_mp3']) # 3. Third priority: Standalone MP3 elif grouped['standalone_mp3']: best_song = grouped['standalone_mp3'][0] skip_songs.extend(grouped['standalone_mp3'][1:]) return best_song, skip_songs def process_songs(self, songs: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], Dict[str, Any]]: """Process all songs and return best songs, skip songs, and statistics.""" # Group songs by artist-title groups = self.group_songs_by_artist_title(songs) best_songs = [] skip_songs = [] stats = { 'total_songs': len(songs), 'unique_songs': len(groups), 'duplicates_found': 0, 'file_type_breakdown': defaultdict(int), 'channel_breakdown': defaultdict(int), 'groups_with_duplicates': 0 } for group_key, group_songs in groups.items(): # Count file types for song in group_songs: ext = get_file_extension(song['path']) stats['file_type_breakdown'][ext] += 1 # Use consolidated channel extraction for all file types channel = extract_consolidated_channel_from_path(song['path'], self.channel_priorities) if channel: stats['channel_breakdown'][channel] += 1 # Select best song and mark others for skipping best_song, group_skips = self.select_best_song(group_songs) best_songs.append(best_song) if group_skips: stats['duplicates_found'] += len(group_skips) stats['groups_with_duplicates'] += 1 # Add skip songs with reasons for skip_song in group_skips: skip_entry = { 'path': skip_song['path'], 'reason': 'duplicate', 'artist': skip_song['artist'], 'title': skip_song['title'], 'kept_version': best_song['path'] } skip_songs.append(skip_entry) return best_songs, skip_songs, stats def get_detailed_duplicate_info(self, songs: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Get detailed information about duplicate groups for reporting.""" groups = self.group_songs_by_artist_title(songs) duplicate_info = [] for group_key, group_songs in groups.items(): if len(group_songs) > 1: # Parse the group key to get artist and title artist, title = group_key.split('|', 1) group_info = { 'artist': artist, 'title': title, 'total_versions': len(group_songs), 'versions': [] } # Sort by channel priority for MP4s mp4_songs = [s for s in group_songs if get_file_extension(s['path']) == '.mp4'] other_songs = [s for s in group_songs if get_file_extension(s['path']) != '.mp4'] # Sort MP4s by channel priority mp4_songs.sort(key=lambda s: self.get_channel_priority(s['path'])) # Sort others by format priority format_priority = {'.cdg': 0, '.mp3': 1} other_songs.sort(key=lambda s: format_priority.get(get_file_extension(s['path']), 999)) # Combine sorted lists sorted_songs = mp4_songs + other_songs for i, song in enumerate(sorted_songs): ext = get_file_extension(song['path']) channel = extract_channel_from_path(song['path'], self.channel_priorities) if ext == '.mp4' else None version_info = { 'path': song['path'], 'file_type': ext, 'channel': channel, 'priority_rank': i + 1, 'will_keep': i == 0 # First song will be kept } group_info['versions'].append(version_info) duplicate_info.append(group_info) return duplicate_info