KaraokeMerge/cli/matching.py

310 lines
13 KiB
Python

"""
Song matching and deduplication logic for the Karaoke Song Library Cleanup Tool.
"""
from collections import defaultdict
from typing import Dict, List, Any, Tuple, Optional
import difflib
try:
from fuzzywuzzy import fuzz
FUZZY_AVAILABLE = True
except ImportError:
FUZZY_AVAILABLE = False
from utils import (
normalize_artist_title,
extract_channel_from_path,
get_file_extension,
parse_multi_artist,
validate_song_data,
find_mp3_pairs
)
class SongMatcher:
"""Handles song matching and deduplication logic."""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.channel_priorities = config.get('channel_priorities', [])
self.case_sensitive = config.get('matching', {}).get('case_sensitive', False)
self.fuzzy_matching = config.get('matching', {}).get('fuzzy_matching', False)
self.fuzzy_threshold = config.get('matching', {}).get('fuzzy_threshold', 0.8)
# Warn if fuzzy matching is enabled but not available
if self.fuzzy_matching and not FUZZY_AVAILABLE:
print("Warning: Fuzzy matching is enabled but fuzzywuzzy is not installed.")
print("Install with: pip install fuzzywuzzy python-Levenshtein")
self.fuzzy_matching = False
def group_songs_by_artist_title(self, songs: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]:
"""Group songs by normalized artist-title combination with optional fuzzy matching."""
if not self.fuzzy_matching:
# Use exact matching (original logic)
groups = defaultdict(list)
for song in songs:
if not validate_song_data(song):
continue
# Handle multi-artist songs
artists = parse_multi_artist(song['artist'])
if not artists:
artists = [song['artist']]
# Create groups for each artist variation
for artist in artists:
normalized_key = normalize_artist_title(artist, song['title'], self.case_sensitive)
groups[normalized_key].append(song)
return dict(groups)
else:
# Use optimized fuzzy matching with progress indicator
print("Using fuzzy matching - this may take a while for large datasets...")
# First pass: group by exact matches
exact_groups = defaultdict(list)
ungrouped_songs = []
for i, song in enumerate(songs):
if not validate_song_data(song):
continue
# Show progress every 1000 songs
if i % 1000 == 0 and i > 0:
print(f"Processing song {i:,}/{len(songs):,}...")
# Handle multi-artist songs
artists = parse_multi_artist(song['artist'])
if not artists:
artists = [song['artist']]
# Try exact matching first
added_to_exact = False
for artist in artists:
normalized_key = normalize_artist_title(artist, song['title'], self.case_sensitive)
if normalized_key in exact_groups:
exact_groups[normalized_key].append(song)
added_to_exact = True
break
if not added_to_exact:
ungrouped_songs.append(song)
print(f"Exact matches found: {len(exact_groups)} groups")
print(f"Songs requiring fuzzy matching: {len(ungrouped_songs)}")
# Second pass: apply fuzzy matching to ungrouped songs
fuzzy_groups = []
for i, song in enumerate(ungrouped_songs):
if i % 100 == 0 and i > 0:
print(f"Fuzzy matching song {i:,}/{len(ungrouped_songs):,}...")
# Handle multi-artist songs
artists = parse_multi_artist(song['artist'])
if not artists:
artists = [song['artist']]
# Try to find an existing fuzzy group
added_to_group = False
for artist in artists:
for group in fuzzy_groups:
if group and self.should_group_songs(
artist, song['title'],
group[0]['artist'], group[0]['title']
):
group.append(song)
added_to_group = True
break
if added_to_group:
break
# If no group found, create a new one
if not added_to_group:
fuzzy_groups.append([song])
# Combine exact and fuzzy groups
result = dict(exact_groups)
# Add fuzzy groups to result
for group in fuzzy_groups:
if group:
first_song = group[0]
key = normalize_artist_title(first_song['artist'], first_song['title'], self.case_sensitive)
result[key] = group
print(f"Total groups after fuzzy matching: {len(result)}")
return result
def fuzzy_match_strings(self, str1: str, str2: str) -> float:
"""Compare two strings using fuzzy matching if available."""
if not self.fuzzy_matching or not FUZZY_AVAILABLE:
return 0.0
# Use fuzzywuzzy for comparison
return fuzz.ratio(str1.lower(), str2.lower()) / 100.0
def should_group_songs(self, artist1: str, title1: str, artist2: str, title2: str) -> bool:
"""Determine if two songs should be grouped together based on matching settings."""
# Exact match check
if (artist1.lower() == artist2.lower() and title1.lower() == title2.lower()):
return True
# Fuzzy matching check
if self.fuzzy_matching and FUZZY_AVAILABLE:
artist_similarity = self.fuzzy_match_strings(artist1, artist2)
title_similarity = self.fuzzy_match_strings(title1, title2)
# Both artist and title must meet threshold
if artist_similarity >= self.fuzzy_threshold and title_similarity >= self.fuzzy_threshold:
return True
return False
def get_channel_priority(self, file_path: str) -> int:
"""Get channel priority for MP4 files based on configured folder names."""
if not file_path.lower().endswith('.mp4'):
return -1 # Not an MP4 file
channel = extract_channel_from_path(file_path, self.channel_priorities)
if not channel:
return len(self.channel_priorities) # Lowest priority if no channel found
try:
return self.channel_priorities.index(channel)
except ValueError:
return len(self.channel_priorities) # Lowest priority if channel not in config
def select_best_song(self, songs: List[Dict[str, Any]]) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
"""Select the best song from a group of duplicates and return the rest as skips."""
if len(songs) == 1:
return songs[0], []
# Group songs into MP3 pairs and standalone files
grouped = find_mp3_pairs(songs)
# Priority order: MP4 > MP3 pairs > standalone MP3
best_song = None
skip_songs = []
# 1. First priority: MP4 files (with channel priority)
if grouped['standalone_mp4']:
# Sort MP4s by channel priority (lower index = higher priority)
grouped['standalone_mp4'].sort(key=lambda s: self.get_channel_priority(s['path']))
best_song = grouped['standalone_mp4'][0]
skip_songs.extend(grouped['standalone_mp4'][1:])
# Skip all other formats when we have MP4
skip_songs.extend([song for pair in grouped['pairs'] for song in pair])
skip_songs.extend(grouped['standalone_mp3'])
# 2. Second priority: MP3 pairs (CDG/MP3 pairs treated as MP3)
elif grouped['pairs']:
# For pairs, we'll keep the CDG file as the representative
# (since CDG contains the lyrics/graphics)
best_song = grouped['pairs'][0][0] # First pair's CDG file
skip_songs.extend([song for pair in grouped['pairs'][1:] for song in pair])
skip_songs.extend(grouped['standalone_mp3'])
# 3. Third priority: Standalone MP3
elif grouped['standalone_mp3']:
best_song = grouped['standalone_mp3'][0]
skip_songs.extend(grouped['standalone_mp3'][1:])
return best_song, skip_songs
def process_songs(self, songs: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], Dict[str, Any]]:
"""Process all songs and return best songs, skip songs, and statistics."""
# Group songs by artist-title
groups = self.group_songs_by_artist_title(songs)
best_songs = []
skip_songs = []
stats = {
'total_songs': len(songs),
'unique_songs': len(groups),
'duplicates_found': 0,
'file_type_breakdown': defaultdict(int),
'channel_breakdown': defaultdict(int),
'groups_with_duplicates': 0
}
for group_key, group_songs in groups.items():
# Count file types
for song in group_songs:
ext = get_file_extension(song['path'])
stats['file_type_breakdown'][ext] += 1
if ext == '.mp4':
channel = extract_channel_from_path(song['path'], self.channel_priorities)
if channel:
stats['channel_breakdown'][channel] += 1
# Select best song and mark others for skipping
best_song, group_skips = self.select_best_song(group_songs)
best_songs.append(best_song)
if group_skips:
stats['duplicates_found'] += len(group_skips)
stats['groups_with_duplicates'] += 1
# Add skip songs with reasons
for skip_song in group_skips:
skip_entry = {
'path': skip_song['path'],
'reason': 'duplicate',
'artist': skip_song['artist'],
'title': skip_song['title'],
'kept_version': best_song['path']
}
skip_songs.append(skip_entry)
return best_songs, skip_songs, stats
def get_detailed_duplicate_info(self, songs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Get detailed information about duplicate groups for reporting."""
groups = self.group_songs_by_artist_title(songs)
duplicate_info = []
for group_key, group_songs in groups.items():
if len(group_songs) > 1:
# Parse the group key to get artist and title
artist, title = group_key.split('|', 1)
group_info = {
'artist': artist,
'title': title,
'total_versions': len(group_songs),
'versions': []
}
# Sort by channel priority for MP4s
mp4_songs = [s for s in group_songs if get_file_extension(s['path']) == '.mp4']
other_songs = [s for s in group_songs if get_file_extension(s['path']) != '.mp4']
# Sort MP4s by channel priority
mp4_songs.sort(key=lambda s: self.get_channel_priority(s['path']))
# Sort others by format priority
format_priority = {'.cdg': 0, '.mp3': 1}
other_songs.sort(key=lambda s: format_priority.get(get_file_extension(s['path']), 999))
# Combine sorted lists
sorted_songs = mp4_songs + other_songs
for i, song in enumerate(sorted_songs):
ext = get_file_extension(song['path'])
channel = extract_channel_from_path(song['path'], self.channel_priorities) if ext == '.mp4' else None
version_info = {
'path': song['path'],
'file_type': ext,
'channel': channel,
'priority_rank': i + 1,
'will_keep': i == 0 # First song will be kept
}
group_info['versions'].append(version_info)
duplicate_info.append(group_info)
return duplicate_info