310 lines
13 KiB
Python
310 lines
13 KiB
Python
"""
|
|
Song matching and deduplication logic for the Karaoke Song Library Cleanup Tool.
|
|
"""
|
|
from collections import defaultdict
|
|
from typing import Dict, List, Any, Tuple, Optional
|
|
import difflib
|
|
|
|
try:
|
|
from fuzzywuzzy import fuzz
|
|
FUZZY_AVAILABLE = True
|
|
except ImportError:
|
|
FUZZY_AVAILABLE = False
|
|
|
|
from utils import (
|
|
normalize_artist_title,
|
|
extract_channel_from_path,
|
|
get_file_extension,
|
|
parse_multi_artist,
|
|
validate_song_data,
|
|
find_mp3_pairs
|
|
)
|
|
|
|
|
|
class SongMatcher:
|
|
"""Handles song matching and deduplication logic."""
|
|
|
|
def __init__(self, config: Dict[str, Any]):
|
|
self.config = config
|
|
self.channel_priorities = config.get('channel_priorities', [])
|
|
self.case_sensitive = config.get('matching', {}).get('case_sensitive', False)
|
|
self.fuzzy_matching = config.get('matching', {}).get('fuzzy_matching', False)
|
|
self.fuzzy_threshold = config.get('matching', {}).get('fuzzy_threshold', 0.8)
|
|
|
|
# Warn if fuzzy matching is enabled but not available
|
|
if self.fuzzy_matching and not FUZZY_AVAILABLE:
|
|
print("Warning: Fuzzy matching is enabled but fuzzywuzzy is not installed.")
|
|
print("Install with: pip install fuzzywuzzy python-Levenshtein")
|
|
self.fuzzy_matching = False
|
|
|
|
def group_songs_by_artist_title(self, songs: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]:
|
|
"""Group songs by normalized artist-title combination with optional fuzzy matching."""
|
|
if not self.fuzzy_matching:
|
|
# Use exact matching (original logic)
|
|
groups = defaultdict(list)
|
|
|
|
for song in songs:
|
|
if not validate_song_data(song):
|
|
continue
|
|
|
|
# Handle multi-artist songs
|
|
artists = parse_multi_artist(song['artist'])
|
|
if not artists:
|
|
artists = [song['artist']]
|
|
|
|
# Create groups for each artist variation
|
|
for artist in artists:
|
|
normalized_key = normalize_artist_title(artist, song['title'], self.case_sensitive)
|
|
groups[normalized_key].append(song)
|
|
|
|
return dict(groups)
|
|
else:
|
|
# Use optimized fuzzy matching with progress indicator
|
|
print("Using fuzzy matching - this may take a while for large datasets...")
|
|
|
|
# First pass: group by exact matches
|
|
exact_groups = defaultdict(list)
|
|
ungrouped_songs = []
|
|
|
|
for i, song in enumerate(songs):
|
|
if not validate_song_data(song):
|
|
continue
|
|
|
|
# Show progress every 1000 songs
|
|
if i % 1000 == 0 and i > 0:
|
|
print(f"Processing song {i:,}/{len(songs):,}...")
|
|
|
|
# Handle multi-artist songs
|
|
artists = parse_multi_artist(song['artist'])
|
|
if not artists:
|
|
artists = [song['artist']]
|
|
|
|
# Try exact matching first
|
|
added_to_exact = False
|
|
for artist in artists:
|
|
normalized_key = normalize_artist_title(artist, song['title'], self.case_sensitive)
|
|
if normalized_key in exact_groups:
|
|
exact_groups[normalized_key].append(song)
|
|
added_to_exact = True
|
|
break
|
|
|
|
if not added_to_exact:
|
|
ungrouped_songs.append(song)
|
|
|
|
print(f"Exact matches found: {len(exact_groups)} groups")
|
|
print(f"Songs requiring fuzzy matching: {len(ungrouped_songs)}")
|
|
|
|
# Second pass: apply fuzzy matching to ungrouped songs
|
|
fuzzy_groups = []
|
|
|
|
for i, song in enumerate(ungrouped_songs):
|
|
if i % 100 == 0 and i > 0:
|
|
print(f"Fuzzy matching song {i:,}/{len(ungrouped_songs):,}...")
|
|
|
|
# Handle multi-artist songs
|
|
artists = parse_multi_artist(song['artist'])
|
|
if not artists:
|
|
artists = [song['artist']]
|
|
|
|
# Try to find an existing fuzzy group
|
|
added_to_group = False
|
|
for artist in artists:
|
|
for group in fuzzy_groups:
|
|
if group and self.should_group_songs(
|
|
artist, song['title'],
|
|
group[0]['artist'], group[0]['title']
|
|
):
|
|
group.append(song)
|
|
added_to_group = True
|
|
break
|
|
if added_to_group:
|
|
break
|
|
|
|
# If no group found, create a new one
|
|
if not added_to_group:
|
|
fuzzy_groups.append([song])
|
|
|
|
# Combine exact and fuzzy groups
|
|
result = dict(exact_groups)
|
|
|
|
# Add fuzzy groups to result
|
|
for group in fuzzy_groups:
|
|
if group:
|
|
first_song = group[0]
|
|
key = normalize_artist_title(first_song['artist'], first_song['title'], self.case_sensitive)
|
|
result[key] = group
|
|
|
|
print(f"Total groups after fuzzy matching: {len(result)}")
|
|
return result
|
|
|
|
def fuzzy_match_strings(self, str1: str, str2: str) -> float:
|
|
"""Compare two strings using fuzzy matching if available."""
|
|
if not self.fuzzy_matching or not FUZZY_AVAILABLE:
|
|
return 0.0
|
|
|
|
# Use fuzzywuzzy for comparison
|
|
return fuzz.ratio(str1.lower(), str2.lower()) / 100.0
|
|
|
|
def should_group_songs(self, artist1: str, title1: str, artist2: str, title2: str) -> bool:
|
|
"""Determine if two songs should be grouped together based on matching settings."""
|
|
# Exact match check
|
|
if (artist1.lower() == artist2.lower() and title1.lower() == title2.lower()):
|
|
return True
|
|
|
|
# Fuzzy matching check
|
|
if self.fuzzy_matching and FUZZY_AVAILABLE:
|
|
artist_similarity = self.fuzzy_match_strings(artist1, artist2)
|
|
title_similarity = self.fuzzy_match_strings(title1, title2)
|
|
|
|
# Both artist and title must meet threshold
|
|
if artist_similarity >= self.fuzzy_threshold and title_similarity >= self.fuzzy_threshold:
|
|
return True
|
|
|
|
return False
|
|
|
|
def get_channel_priority(self, file_path: str) -> int:
|
|
"""Get channel priority for MP4 files based on configured folder names."""
|
|
if not file_path.lower().endswith('.mp4'):
|
|
return -1 # Not an MP4 file
|
|
|
|
channel = extract_channel_from_path(file_path, self.channel_priorities)
|
|
if not channel:
|
|
return len(self.channel_priorities) # Lowest priority if no channel found
|
|
|
|
try:
|
|
return self.channel_priorities.index(channel)
|
|
except ValueError:
|
|
return len(self.channel_priorities) # Lowest priority if channel not in config
|
|
|
|
def select_best_song(self, songs: List[Dict[str, Any]]) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
|
|
"""Select the best song from a group of duplicates and return the rest as skips."""
|
|
if len(songs) == 1:
|
|
return songs[0], []
|
|
|
|
# Group songs into MP3 pairs and standalone files
|
|
grouped = find_mp3_pairs(songs)
|
|
|
|
# Priority order: MP4 > MP3 pairs > standalone MP3
|
|
best_song = None
|
|
skip_songs = []
|
|
|
|
# 1. First priority: MP4 files (with channel priority)
|
|
if grouped['standalone_mp4']:
|
|
# Sort MP4s by channel priority (lower index = higher priority)
|
|
grouped['standalone_mp4'].sort(key=lambda s: self.get_channel_priority(s['path']))
|
|
best_song = grouped['standalone_mp4'][0]
|
|
skip_songs.extend(grouped['standalone_mp4'][1:])
|
|
# Skip all other formats when we have MP4
|
|
skip_songs.extend([song for pair in grouped['pairs'] for song in pair])
|
|
skip_songs.extend(grouped['standalone_mp3'])
|
|
|
|
# 2. Second priority: MP3 pairs (CDG/MP3 pairs treated as MP3)
|
|
elif grouped['pairs']:
|
|
# For pairs, we'll keep the CDG file as the representative
|
|
# (since CDG contains the lyrics/graphics)
|
|
best_song = grouped['pairs'][0][0] # First pair's CDG file
|
|
skip_songs.extend([song for pair in grouped['pairs'][1:] for song in pair])
|
|
skip_songs.extend(grouped['standalone_mp3'])
|
|
|
|
# 3. Third priority: Standalone MP3
|
|
elif grouped['standalone_mp3']:
|
|
best_song = grouped['standalone_mp3'][0]
|
|
skip_songs.extend(grouped['standalone_mp3'][1:])
|
|
|
|
return best_song, skip_songs
|
|
|
|
def process_songs(self, songs: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], Dict[str, Any]]:
|
|
"""Process all songs and return best songs, skip songs, and statistics."""
|
|
# Group songs by artist-title
|
|
groups = self.group_songs_by_artist_title(songs)
|
|
|
|
best_songs = []
|
|
skip_songs = []
|
|
stats = {
|
|
'total_songs': len(songs),
|
|
'unique_songs': len(groups),
|
|
'duplicates_found': 0,
|
|
'file_type_breakdown': defaultdict(int),
|
|
'channel_breakdown': defaultdict(int),
|
|
'groups_with_duplicates': 0
|
|
}
|
|
|
|
for group_key, group_songs in groups.items():
|
|
# Count file types
|
|
for song in group_songs:
|
|
ext = get_file_extension(song['path'])
|
|
stats['file_type_breakdown'][ext] += 1
|
|
|
|
if ext == '.mp4':
|
|
channel = extract_channel_from_path(song['path'], self.channel_priorities)
|
|
if channel:
|
|
stats['channel_breakdown'][channel] += 1
|
|
|
|
# Select best song and mark others for skipping
|
|
best_song, group_skips = self.select_best_song(group_songs)
|
|
best_songs.append(best_song)
|
|
|
|
if group_skips:
|
|
stats['duplicates_found'] += len(group_skips)
|
|
stats['groups_with_duplicates'] += 1
|
|
|
|
# Add skip songs with reasons
|
|
for skip_song in group_skips:
|
|
skip_entry = {
|
|
'path': skip_song['path'],
|
|
'reason': 'duplicate',
|
|
'artist': skip_song['artist'],
|
|
'title': skip_song['title'],
|
|
'kept_version': best_song['path']
|
|
}
|
|
skip_songs.append(skip_entry)
|
|
|
|
return best_songs, skip_songs, stats
|
|
|
|
def get_detailed_duplicate_info(self, songs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""Get detailed information about duplicate groups for reporting."""
|
|
groups = self.group_songs_by_artist_title(songs)
|
|
duplicate_info = []
|
|
|
|
for group_key, group_songs in groups.items():
|
|
if len(group_songs) > 1:
|
|
# Parse the group key to get artist and title
|
|
artist, title = group_key.split('|', 1)
|
|
|
|
group_info = {
|
|
'artist': artist,
|
|
'title': title,
|
|
'total_versions': len(group_songs),
|
|
'versions': []
|
|
}
|
|
|
|
# Sort by channel priority for MP4s
|
|
mp4_songs = [s for s in group_songs if get_file_extension(s['path']) == '.mp4']
|
|
other_songs = [s for s in group_songs if get_file_extension(s['path']) != '.mp4']
|
|
|
|
# Sort MP4s by channel priority
|
|
mp4_songs.sort(key=lambda s: self.get_channel_priority(s['path']))
|
|
|
|
# Sort others by format priority
|
|
format_priority = {'.cdg': 0, '.mp3': 1}
|
|
other_songs.sort(key=lambda s: format_priority.get(get_file_extension(s['path']), 999))
|
|
|
|
# Combine sorted lists
|
|
sorted_songs = mp4_songs + other_songs
|
|
|
|
for i, song in enumerate(sorted_songs):
|
|
ext = get_file_extension(song['path'])
|
|
channel = extract_channel_from_path(song['path'], self.channel_priorities) if ext == '.mp4' else None
|
|
|
|
version_info = {
|
|
'path': song['path'],
|
|
'file_type': ext,
|
|
'channel': channel,
|
|
'priority_rank': i + 1,
|
|
'will_keep': i == 0 # First song will be kept
|
|
}
|
|
group_info['versions'].append(version_info)
|
|
|
|
duplicate_info.append(group_info)
|
|
|
|
return duplicate_info |