Signed-off-by: Matt Bruce <mbrucedogs@gmail.com>
This commit is contained in:
parent
4bbd03eab7
commit
4e45ef0280
@ -541,68 +541,4 @@ class MusicBrainzDatabase:
|
||||
print(f"Database connection test failed: {e}")
|
||||
return False
|
||||
|
||||
def find_artist_credit(self, artist_name: str, title: str) -> Optional[Tuple[str, str, str]]:
|
||||
"""
|
||||
Find artist credit for collaboration.
|
||||
Returns (artist_credit_id, artist_string, recording_mbid) or None.
|
||||
"""
|
||||
if not self.connection:
|
||||
return None
|
||||
|
||||
try:
|
||||
# Clean artist name
|
||||
clean_name = artist_name.strip()
|
||||
|
||||
# Parse complex collaboration
|
||||
main_artist, collaborators = parse_complex_collaboration(clean_name)
|
||||
collaboration = len(collaborators) > 0
|
||||
|
||||
# Generate title variations using the helper method
|
||||
search_titles = generate_title_variations(title)
|
||||
|
||||
# Build the expected artist credit string
|
||||
expected_artist_string = main_artist
|
||||
if collaborators:
|
||||
expected_artist_string += " & " + " & ".join(collaborators)
|
||||
|
||||
# Try to find recording with matching artist credit using all title variations
|
||||
for search_title in search_titles:
|
||||
# First, find all recordings with matching title (handle both apostrophe types)
|
||||
title_query = """
|
||||
SELECT r.gid as recording_mbid, r.artist_credit as artist_credit_id
|
||||
FROM recording r
|
||||
WHERE r.name ILIKE %s OR r.name ILIKE %s
|
||||
"""
|
||||
# Create versions with both straight and curly apostrophes
|
||||
search_title_curly = search_title.replace("'", "'").replace("'", "'")
|
||||
self.cursor.execute(title_query, (f"%{search_title}%", f"%{search_title_curly}%"))
|
||||
recordings = self.cursor.fetchall()
|
||||
|
||||
for recording in recordings:
|
||||
# Check if this recording's artist credit matches our expected artist string
|
||||
artist_query = """
|
||||
SELECT a.name, acn.join_phrase, acn.position
|
||||
FROM artist_credit_name acn
|
||||
JOIN artist a ON acn.artist = a.id
|
||||
WHERE acn.artist_credit = %s
|
||||
ORDER BY acn.position
|
||||
"""
|
||||
self.cursor.execute(artist_query, (recording['artist_credit_id'],))
|
||||
artists = self.cursor.fetchall()
|
||||
|
||||
# Build the actual artist string for this recording
|
||||
actual_artist_string = ""
|
||||
for artist in artists:
|
||||
actual_artist_string += artist['name']
|
||||
if artist['join_phrase']:
|
||||
actual_artist_string += artist['join_phrase']
|
||||
|
||||
# Check if the artist strings match
|
||||
if actual_artist_string.lower() == expected_artist_string.lower():
|
||||
return (str(recording['artist_credit_id']), actual_artist_string, recording['recording_mbid'])
|
||||
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error in find_artist_credit: {e}")
|
||||
return None
|
||||
|
||||
252
src/cli/main.py
252
src/cli/main.py
@ -7,7 +7,6 @@ Now uses direct database access with fuzzy search for better performance.
|
||||
import sys
|
||||
import json
|
||||
import time
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Dict, Optional, Any, Tuple, List
|
||||
from datetime import datetime
|
||||
@ -22,13 +21,12 @@ from ..config.constants import (
|
||||
ExitCode
|
||||
)
|
||||
|
||||
# Import data loader
|
||||
from ..utils.data_loader import data_loader
|
||||
|
||||
|
||||
# Import database and API clients
|
||||
from ..api.database import MusicBrainzDatabase
|
||||
from ..api.api_client import MusicBrainzAPIClient
|
||||
from src.utils.artist_title_processing import parse_complex_collaboration, parse_collaborators, generate_title_variations
|
||||
from ..core.song_processor import SongProcessor
|
||||
|
||||
|
||||
class MusicBrainzCleaner:
|
||||
@ -48,247 +46,19 @@ class MusicBrainzCleaner:
|
||||
# Initialize API client (fallback method)
|
||||
if not self.use_database:
|
||||
self.api = MusicBrainzAPIClient(base_url)
|
||||
|
||||
# Initialize centralized song processor
|
||||
self.song_processor = SongProcessor(
|
||||
database_client=self.db if use_database else None,
|
||||
api_client=self.api if not use_database else None
|
||||
)
|
||||
|
||||
def _clean_artist_name(self, artist_name: str) -> str:
|
||||
name = artist_name.strip()
|
||||
|
||||
# Load name variations from data file
|
||||
variations = data_loader.load_name_variations()
|
||||
|
||||
for old, new in variations.items():
|
||||
name = re.sub(rf'\b{re.escape(old)}\b', new, name, flags=re.IGNORECASE)
|
||||
|
||||
return name
|
||||
|
||||
def _clean_title(self, title: str) -> str:
|
||||
"""
|
||||
Clean title by removing karaoke patterns.
|
||||
Note: The database search now tries multiple title variations including
|
||||
the full title and versions with parenthetical content removed.
|
||||
"""
|
||||
title = title.strip()
|
||||
karaoke_patterns = [
|
||||
r'\s*\(Karaoke Version\)',
|
||||
r'\s*\(Karaoke\)',
|
||||
r'\s*\(Instrumental\)',
|
||||
r'\s*\(Backing Track\)',
|
||||
]
|
||||
for pattern in karaoke_patterns:
|
||||
title = re.sub(pattern, '', title, flags=re.IGNORECASE)
|
||||
return title.strip()
|
||||
|
||||
def find_artist_mbid(self, artist_name: str) -> Optional[str]:
|
||||
clean_name = self._clean_artist_name(artist_name)
|
||||
|
||||
# Handle collaborations - try to find main artist first
|
||||
# Use the same complex collaboration parsing as the database
|
||||
main_artist, collaborators = parse_complex_collaboration(clean_name)
|
||||
has_collaboration = len(collaborators) > 0
|
||||
|
||||
if has_collaboration:
|
||||
# For collaborations, try to find the main artist's MBID first
|
||||
if main_artist:
|
||||
if self.use_database:
|
||||
result = self.db.fuzzy_search_artist(main_artist)
|
||||
if result and isinstance(result, tuple) and len(result) >= 2:
|
||||
return result[1] # Return main artist's MBID
|
||||
else:
|
||||
# Fallback to API
|
||||
try:
|
||||
result = self.api.search_artist(main_artist)
|
||||
if result:
|
||||
return result['id']
|
||||
except:
|
||||
pass
|
||||
|
||||
# If main artist not found, return None to try artist credit approach
|
||||
return None
|
||||
|
||||
# Try fuzzy search for full artist name
|
||||
if self.use_database:
|
||||
result = self.db.fuzzy_search_artist(clean_name)
|
||||
if result and isinstance(result, tuple) and len(result) >= 2:
|
||||
return result[1] # Return MBID from tuple (artist_name, mbid, score)
|
||||
else:
|
||||
# Fallback to API
|
||||
try:
|
||||
result = self.api.search_artist(clean_name)
|
||||
if result:
|
||||
return result['id']
|
||||
except:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
def find_recording_mbid(self, artist_mbid: str, title: str, original_artist: str = None) -> Optional[str]:
|
||||
# Use the shared title variations logic for comprehensive title cleaning
|
||||
title_variations = generate_title_variations(title)
|
||||
|
||||
# Load known recordings data first
|
||||
known_recordings = data_loader.load_known_recordings()
|
||||
|
||||
# Check known recordings first with all title variations
|
||||
for (known_artist_mbid, known_title), recording_mbid in known_recordings.items():
|
||||
if artist_mbid == known_artist_mbid:
|
||||
for variation in title_variations:
|
||||
if variation.lower() == known_title.lower():
|
||||
return recording_mbid
|
||||
|
||||
# Handle collaborations using artist credit
|
||||
if self.use_database:
|
||||
# If no artist_mbid (collaboration case), try to find by title and verify artist credit
|
||||
if not artist_mbid and original_artist:
|
||||
# This is a collaboration case, try to find by title with all variations
|
||||
for variation in title_variations:
|
||||
# Search for recordings by title first
|
||||
result = self.db.fuzzy_search_recording(variation)
|
||||
if result and isinstance(result, tuple) and len(result) >= 2:
|
||||
recording_mbid = result[1]
|
||||
|
||||
# Verify that this recording has the correct artist credit
|
||||
recording_info = self.get_recording_info(recording_mbid)
|
||||
if recording_info and recording_info.get('artist_credit'):
|
||||
# Check if the artist credit matches our expected collaboration
|
||||
expected_artist_string = original_artist.replace(',', ' & ').replace(' and ', ' & ')
|
||||
if recording_info['artist_credit'].lower() == expected_artist_string.lower():
|
||||
return recording_mbid
|
||||
|
||||
# If exact match fails, try partial match
|
||||
if recording_info and recording_info.get('artist_credit'):
|
||||
# Check if all artists in the collaboration are present in the recording
|
||||
main_artist, collaborators = parse_complex_collaboration(original_artist)
|
||||
recording_artists = recording_info['artist_credit'].lower()
|
||||
|
||||
# Check if main artist is in the recording
|
||||
if main_artist.lower() in recording_artists:
|
||||
# Check if at least one collaborator is also present
|
||||
for collaborator in collaborators:
|
||||
if collaborator.lower() in recording_artists:
|
||||
return recording_mbid
|
||||
|
||||
return None
|
||||
else:
|
||||
# Regular case with artist_mbid - try all title variations
|
||||
for variation in title_variations:
|
||||
result = self.db.fuzzy_search_recording(variation, artist_mbid)
|
||||
if result and isinstance(result, tuple) and len(result) >= 2:
|
||||
return result[1] # Return MBID from tuple (recording_name, mbid, score)
|
||||
else:
|
||||
# Fallback to API - try all title variations
|
||||
for variation in title_variations:
|
||||
try:
|
||||
result = self.api.search_recording(variation, artist_mbid)
|
||||
if result:
|
||||
return result['id']
|
||||
except:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
def get_artist_info(self, mbid: str) -> Optional[Dict[str, Any]]:
|
||||
if self.use_database:
|
||||
return self.db.get_artist_info(mbid)
|
||||
else:
|
||||
return self.api.get_artist_info(mbid)
|
||||
|
||||
def get_recording_info(self, mbid: str) -> Optional[Dict[str, Any]]:
|
||||
if self.use_database:
|
||||
return self.db.get_recording_info(mbid)
|
||||
else:
|
||||
return self.api.get_recording_info(mbid)
|
||||
|
||||
def _build_artist_string(self, artist_credit: list) -> str:
|
||||
if not artist_credit:
|
||||
return ""
|
||||
parts = []
|
||||
for credit in artist_credit:
|
||||
parts.append(credit['name'])
|
||||
if credit.get('joinphrase'):
|
||||
parts.append(credit['joinphrase'])
|
||||
return ''.join(parts).strip()
|
||||
|
||||
def clean_song(self, song: Dict[str, Any]) -> Tuple[Dict[str, Any], bool]:
|
||||
"""
|
||||
Clean a single song and return (cleaned_song, success_status)
|
||||
Clean a single song using the centralized song processor.
|
||||
Returns (cleaned_song, success_status)
|
||||
"""
|
||||
original_artist = song.get('artist', '')
|
||||
original_title = song.get('title', '')
|
||||
|
||||
# Find artist MBID
|
||||
artist_mbid = self.find_artist_mbid(song.get('artist', ''))
|
||||
|
||||
# Handle collaboration case (artist_mbid is None)
|
||||
# Use the same complex collaboration parsing as the database
|
||||
artist_name = song.get('artist', '')
|
||||
main_artist, collaborators = parse_complex_collaboration(artist_name)
|
||||
has_collaboration = len(collaborators) > 0
|
||||
|
||||
if artist_mbid is None and has_collaboration:
|
||||
# Try to find recording using the improved find_recording_mbid method
|
||||
recording_mbid = self.find_recording_mbid(None, song.get('title', ''), original_artist)
|
||||
if recording_mbid:
|
||||
# Get recording info to update the song
|
||||
recording_info = self.get_recording_info(recording_mbid)
|
||||
if recording_info:
|
||||
# Update with the correct artist credit and title
|
||||
if self.use_database and recording_info.get('artist_credit'):
|
||||
song['artist'] = recording_info['artist_credit']
|
||||
elif not self.use_database and recording_info.get('artist-credit'):
|
||||
artist_string = self._build_artist_string(recording_info['artist-credit'])
|
||||
if artist_string:
|
||||
song['artist'] = artist_string
|
||||
|
||||
song['title'] = recording_info['title']
|
||||
song['recording_mbid'] = recording_mbid
|
||||
|
||||
# For collaborations, try to get the main artist's MBID
|
||||
if self.use_database:
|
||||
main_artist, collaborators = parse_complex_collaboration(song.get('artist', ''))
|
||||
if main_artist:
|
||||
artist_result = self.db.fuzzy_search_artist(main_artist)
|
||||
if artist_result and isinstance(artist_result, tuple) and len(artist_result) >= 2:
|
||||
song['mbid'] = artist_result[1] # Set the main artist's MBID
|
||||
|
||||
return song, True
|
||||
return song, False
|
||||
|
||||
# Regular case (non-collaboration or collaboration not found)
|
||||
if not artist_mbid:
|
||||
return song, False
|
||||
|
||||
# Get artist info
|
||||
artist_info = self.get_artist_info(artist_mbid)
|
||||
if artist_info:
|
||||
song['artist'] = artist_info['name']
|
||||
song['mbid'] = artist_mbid
|
||||
|
||||
# Find recording MBID
|
||||
recording_mbid = self.find_recording_mbid(artist_mbid, song.get('title', ''))
|
||||
if not recording_mbid:
|
||||
return song, False
|
||||
|
||||
# Get recording info
|
||||
recording_info = self.get_recording_info(recording_mbid)
|
||||
if recording_info:
|
||||
# Update artist string if there are multiple artists, but preserve the artist MBID
|
||||
if self.use_database and recording_info.get('artist_credit'):
|
||||
song['artist'] = recording_info['artist_credit']
|
||||
# Keep the original artist MBID even when updating artist name
|
||||
if 'mbid' not in song:
|
||||
song['mbid'] = artist_mbid
|
||||
elif not self.use_database and recording_info.get('artist-credit'):
|
||||
artist_string = self._build_artist_string(recording_info['artist-credit'])
|
||||
if artist_string:
|
||||
song['artist'] = artist_string
|
||||
# Keep the original artist MBID even when updating artist name
|
||||
if 'mbid' not in song:
|
||||
song['mbid'] = artist_mbid
|
||||
|
||||
song['title'] = recording_info['title']
|
||||
song['recording_mbid'] = recording_mbid
|
||||
return song, True
|
||||
|
||||
return song, False
|
||||
return self.song_processor.clean_song(song)
|
||||
|
||||
def process_songs(self, source_file: Path, output_success: Path = None, output_failure: Path = None, limit: Optional[int] = None) -> Dict[str, Any]:
|
||||
"""
|
||||
|
||||
249
src/core/song_processor.py
Normal file
249
src/core/song_processor.py
Normal file
@ -0,0 +1,249 @@
|
||||
"""
|
||||
Centralized song processing logic for MusicBrainz Data Cleaner.
|
||||
This module contains all song processing methods that should be shared
|
||||
between CLI and database interactions to ensure consistency.
|
||||
"""
|
||||
|
||||
from typing import Dict, Optional, Tuple, Any, List
|
||||
from ..utils.artist_title_processing import parse_complex_collaboration, generate_title_variations
|
||||
from ..utils.data_loader import data_loader
|
||||
|
||||
|
||||
class SongProcessor:
|
||||
"""Centralized song processing with consistent logic across all interfaces."""
|
||||
|
||||
def __init__(self, database_client=None, api_client=None):
|
||||
"""
|
||||
Initialize with optional database and API clients.
|
||||
|
||||
Args:
|
||||
database_client: MusicBrainzDatabase instance for direct DB access
|
||||
api_client: MusicBrainzAPIClient instance for API fallback
|
||||
"""
|
||||
self.db = database_client
|
||||
self.api = api_client
|
||||
self.use_database = database_client is not None
|
||||
|
||||
def find_artist_mbid(self, artist_name: str) -> Optional[str]:
|
||||
"""
|
||||
Find artist MBID using consistent logic across all interfaces.
|
||||
|
||||
Args:
|
||||
artist_name: Name of the artist to search for
|
||||
|
||||
Returns:
|
||||
Artist MBID if found, None otherwise
|
||||
"""
|
||||
if not artist_name:
|
||||
return None
|
||||
|
||||
# Try database first if available
|
||||
if self.use_database:
|
||||
result = self.db.fuzzy_search_artist(artist_name)
|
||||
if result and isinstance(result, tuple) and len(result) >= 2:
|
||||
return result[1] # Return MBID from tuple (artist_name, mbid, score)
|
||||
|
||||
# Fallback to API
|
||||
if self.api:
|
||||
try:
|
||||
result = self.api.search_artist(artist_name)
|
||||
if result:
|
||||
return result['id']
|
||||
except:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
def find_recording_mbid(self, artist_mbid: str, title: str, original_artist: str = None) -> Optional[str]:
|
||||
"""
|
||||
Find recording MBID using consistent logic across all interfaces.
|
||||
|
||||
Args:
|
||||
artist_mbid: Artist MBID (None for collaborations)
|
||||
title: Song title
|
||||
original_artist: Original artist string (for collaboration verification)
|
||||
|
||||
Returns:
|
||||
Recording MBID if found, None otherwise
|
||||
"""
|
||||
if not title:
|
||||
return None
|
||||
|
||||
# Use the shared title variations logic for comprehensive title cleaning
|
||||
title_variations = generate_title_variations(title)
|
||||
|
||||
# Load known recordings data first
|
||||
known_recordings = data_loader.load_known_recordings()
|
||||
|
||||
# Check known recordings first with all title variations
|
||||
for (known_artist_mbid, known_title), recording_mbid in known_recordings.items():
|
||||
if artist_mbid == known_artist_mbid:
|
||||
for variation in title_variations:
|
||||
if variation.lower() == known_title.lower():
|
||||
return recording_mbid
|
||||
|
||||
# Handle collaborations using artist credit
|
||||
if self.use_database:
|
||||
# If no artist_mbid (collaboration case), try to find by title and verify artist credit
|
||||
if not artist_mbid and original_artist:
|
||||
# This is a collaboration case, try to find by title with all variations
|
||||
for variation in title_variations:
|
||||
result = self.db.fuzzy_search_recording(variation)
|
||||
if result and isinstance(result, tuple) and len(result) >= 2:
|
||||
recording_mbid = result[1]
|
||||
|
||||
# Verify that this recording has the correct artist credit
|
||||
recording_info = self.get_recording_info(recording_mbid)
|
||||
if recording_info and recording_info.get('artist_credit'):
|
||||
# Check if the artist credit matches our expected collaboration
|
||||
expected_artist_string = original_artist.replace(',', ' & ').replace(' and ', ' & ')
|
||||
if recording_info['artist_credit'].lower() == expected_artist_string.lower():
|
||||
return recording_mbid
|
||||
|
||||
# If exact match fails, try partial match
|
||||
if recording_info and recording_info.get('artist_credit'):
|
||||
# Check if all artists in the collaboration are present in the recording
|
||||
main_artist, collaborators = parse_complex_collaboration(original_artist)
|
||||
recording_artists = recording_info['artist_credit'].lower()
|
||||
|
||||
# Check if main artist is in the recording
|
||||
if main_artist.lower() in recording_artists:
|
||||
# Check if at least one collaborator is also present
|
||||
for collaborator in collaborators:
|
||||
if collaborator.lower() in recording_artists:
|
||||
return recording_mbid
|
||||
|
||||
return None
|
||||
else:
|
||||
# Regular case with artist_mbid - try all title variations
|
||||
for variation in title_variations:
|
||||
result = self.db.fuzzy_search_recording(variation, artist_mbid)
|
||||
if result and isinstance(result, tuple) and len(result) >= 2:
|
||||
return result[1] # Return MBID from tuple (recording_name, mbid, score)
|
||||
else:
|
||||
# Fallback to API - try all title variations
|
||||
for variation in title_variations:
|
||||
try:
|
||||
result = self.api.search_recording(variation, artist_mbid)
|
||||
if result:
|
||||
return result['id']
|
||||
except:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
def get_artist_info(self, mbid: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get artist info using consistent logic."""
|
||||
if self.use_database:
|
||||
return self.db.get_artist_info(mbid)
|
||||
elif self.api:
|
||||
return self.api.get_artist_info(mbid)
|
||||
return None
|
||||
|
||||
def get_recording_info(self, mbid: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get recording info using consistent logic."""
|
||||
if self.use_database:
|
||||
return self.db.get_recording_info(mbid)
|
||||
elif self.api:
|
||||
return self.api.get_recording_info(mbid)
|
||||
return None
|
||||
|
||||
def _build_artist_string(self, artist_credit: list) -> str:
|
||||
"""Build artist string from artist credit list (API format)."""
|
||||
if not artist_credit:
|
||||
return ""
|
||||
parts = []
|
||||
for credit in artist_credit:
|
||||
parts.append(credit['name'])
|
||||
if credit.get('joinphrase'):
|
||||
parts.append(credit['joinphrase'])
|
||||
return ''.join(parts).strip()
|
||||
|
||||
def clean_song(self, song: Dict[str, Any]) -> Tuple[Dict[str, Any], bool]:
|
||||
"""
|
||||
Clean a single song using consistent logic across all interfaces.
|
||||
|
||||
Args:
|
||||
song: Song dictionary with 'artist' and 'title' keys
|
||||
|
||||
Returns:
|
||||
Tuple of (cleaned_song, success_status)
|
||||
"""
|
||||
original_artist = song.get('artist', '')
|
||||
original_title = song.get('title', '')
|
||||
|
||||
# Find artist MBID
|
||||
artist_mbid = self.find_artist_mbid(song.get('artist', ''))
|
||||
|
||||
# Handle collaboration case (artist_mbid is None)
|
||||
# Use the same complex collaboration parsing as the database
|
||||
artist_name = song.get('artist', '')
|
||||
main_artist, collaborators = parse_complex_collaboration(artist_name)
|
||||
has_collaboration = len(collaborators) > 0
|
||||
|
||||
if artist_mbid is None and has_collaboration:
|
||||
# Try to find recording using the improved find_recording_mbid method
|
||||
recording_mbid = self.find_recording_mbid(None, song.get('title', ''), original_artist)
|
||||
if recording_mbid:
|
||||
# Get recording info to update the song
|
||||
recording_info = self.get_recording_info(recording_mbid)
|
||||
if recording_info:
|
||||
# Update with the correct artist credit and title
|
||||
if self.use_database and recording_info.get('artist_credit'):
|
||||
song['artist'] = recording_info['artist_credit']
|
||||
elif not self.use_database and recording_info.get('artist-credit'):
|
||||
artist_string = self._build_artist_string(recording_info['artist-credit'])
|
||||
if artist_string:
|
||||
song['artist'] = artist_string
|
||||
|
||||
song['title'] = recording_info['title']
|
||||
song['recording_mbid'] = recording_mbid
|
||||
|
||||
# For collaborations, try to get the main artist's MBID
|
||||
if self.use_database:
|
||||
main_artist, collaborators = parse_complex_collaboration(song.get('artist', ''))
|
||||
if main_artist:
|
||||
artist_result = self.db.fuzzy_search_artist(main_artist)
|
||||
if artist_result and isinstance(artist_result, tuple) and len(artist_result) >= 2:
|
||||
song['mbid'] = artist_result[1] # Set the main artist's MBID
|
||||
|
||||
return song, True
|
||||
return song, False
|
||||
|
||||
# Regular case (non-collaboration or collaboration not found)
|
||||
if not artist_mbid:
|
||||
return song, False
|
||||
|
||||
# Get artist info
|
||||
artist_info = self.get_artist_info(artist_mbid)
|
||||
if artist_info:
|
||||
song['artist'] = artist_info['name']
|
||||
song['mbid'] = artist_mbid
|
||||
|
||||
# Find recording MBID
|
||||
recording_mbid = self.find_recording_mbid(artist_mbid, song.get('title', ''))
|
||||
if not recording_mbid:
|
||||
return song, False
|
||||
|
||||
# Get recording info
|
||||
recording_info = self.get_recording_info(recording_mbid)
|
||||
if recording_info:
|
||||
# Update artist string if there are multiple artists, but preserve the artist MBID
|
||||
if self.use_database and recording_info.get('artist_credit'):
|
||||
song['artist'] = recording_info['artist_credit']
|
||||
# Keep the original artist MBID even when updating artist name
|
||||
if 'mbid' not in song:
|
||||
song['mbid'] = artist_mbid
|
||||
elif not self.use_database and recording_info.get('artist-credit'):
|
||||
artist_string = self._build_artist_string(recording_info['artist-credit'])
|
||||
if artist_string:
|
||||
song['artist'] = artist_string
|
||||
# Keep the original artist MBID even when updating artist name
|
||||
if 'mbid' not in song:
|
||||
song['mbid'] = artist_mbid
|
||||
|
||||
song['title'] = recording_info['title']
|
||||
song['recording_mbid'] = recording_mbid
|
||||
return song, True
|
||||
|
||||
return song, False
|
||||
173
src/utils/artist_title_processing.py
Normal file
173
src/utils/artist_title_processing.py
Normal file
@ -0,0 +1,173 @@
|
||||
import re
|
||||
import json
|
||||
import os
|
||||
from typing import List, Tuple
|
||||
|
||||
def parse_complex_collaboration(artist_string: str) -> Tuple[str, List[str]]:
|
||||
"""
|
||||
Parse complex collaboration strings like "Pitbull ft. Ne-Yo, Afrojack & Nayer"
|
||||
Returns: (main_artist, [collaborators])
|
||||
"""
|
||||
# Primary collaboration indicators
|
||||
primary_patterns = ['ft.', 'feat.', 'featuring', 'ft', 'feat']
|
||||
# Secondary collaboration indicators (need more careful handling)
|
||||
secondary_patterns = ['&', 'and', ',']
|
||||
# Check if this is a collaboration
|
||||
is_collaboration = False
|
||||
split_pattern = None
|
||||
# Special case: Handle malformed artist names like "ft Jamie Foxx West, Kanye"
|
||||
# This should be "Kanye West ft. Jamie Foxx"
|
||||
if artist_string.lower().startswith(('ft ', 'feat ')):
|
||||
# This is a malformed collaboration string
|
||||
# Try to extract the actual artists from the rest
|
||||
remaining = artist_string[artist_string.find(' ') + 1:].strip()
|
||||
if ',' in remaining:
|
||||
# Split on comma and reverse the order
|
||||
parts = [part.strip() for part in remaining.split(',')]
|
||||
if len(parts) >= 2:
|
||||
# Assume the last part is the main artist
|
||||
main_artist = parts[-1].strip()
|
||||
collaborators = parts[:-1]
|
||||
return (main_artist, collaborators)
|
||||
for pattern in primary_patterns:
|
||||
if pattern.lower() in artist_string.lower():
|
||||
is_collaboration = True
|
||||
match = re.search(re.escape(pattern), artist_string, re.IGNORECASE)
|
||||
if match:
|
||||
split_pattern = match.group(0)
|
||||
else:
|
||||
split_pattern = pattern
|
||||
break
|
||||
# If no primary collaboration found, check secondary patterns
|
||||
if not is_collaboration:
|
||||
for pattern in secondary_patterns:
|
||||
if pattern == ',':
|
||||
# Handle comma-separated artists (e.g., "Ariana Grande, Normani, Nicki Minaj")
|
||||
if ',' in artist_string:
|
||||
comma_count = artist_string.count(',')
|
||||
if comma_count >= 1:
|
||||
parts = [part.strip() for part in artist_string.split(',')]
|
||||
if len(parts) >= 2:
|
||||
main_artist = parts[0]
|
||||
collaborators = parts[1:]
|
||||
return (main_artist, collaborators)
|
||||
else:
|
||||
if pattern in ['&', 'and']:
|
||||
pattern_regex = r'\s' + re.escape(pattern) + r'\s'
|
||||
else:
|
||||
pattern_regex = r'\b' + re.escape(pattern) + r'\b'
|
||||
if re.search(pattern_regex, artist_string, re.IGNORECASE):
|
||||
parts = re.split(pattern_regex, artist_string, flags=re.IGNORECASE)
|
||||
if len(parts) == 2:
|
||||
part1 = parts[0].strip()
|
||||
part2 = parts[1].strip()
|
||||
part1_words = len(part1.split())
|
||||
part2_words = len(part2.split())
|
||||
# If one part has a comma, it's likely a collaboration
|
||||
if ',' in part1 or ',' in part2:
|
||||
is_collaboration = True
|
||||
split_pattern = pattern
|
||||
break
|
||||
# For "&" and "and" separators, check against known band names
|
||||
try:
|
||||
json_path = os.path.join(os.path.dirname(__file__), '..', '..', 'data', 'known_artists.json')
|
||||
with open(json_path, 'r') as f:
|
||||
known_data = json.load(f)
|
||||
known_band_names = {name.lower() for name in known_data.get('groups', [])}
|
||||
except (FileNotFoundError, json.JSONDecodeError, KeyError):
|
||||
known_band_names = {
|
||||
"gerry and the pacemakers", "simon & garfunkel", "hall & oates",
|
||||
"brooks & dunn", "the everly brothers"
|
||||
}
|
||||
if artist_string.lower() in known_band_names:
|
||||
continue
|
||||
if pattern.lower() == 'and':
|
||||
if 'the ' in part2.lower():
|
||||
continue
|
||||
is_collaboration = True
|
||||
split_pattern = pattern
|
||||
break
|
||||
if not is_collaboration:
|
||||
return (artist_string, [])
|
||||
# Split on the pattern using the same regex that was used for detection
|
||||
if split_pattern in ['&', 'and']:
|
||||
pattern_regex = r'\s' + re.escape(split_pattern) + r'\s'
|
||||
parts = re.split(pattern_regex, artist_string, flags=re.IGNORECASE)
|
||||
else:
|
||||
parts = artist_string.split(split_pattern)
|
||||
if len(parts) < 2:
|
||||
return (artist_string, [])
|
||||
main_artist = parts[0].strip()
|
||||
collaborators_string = split_pattern.join(parts[1:]).strip()
|
||||
collaborators = parse_collaborators(collaborators_string)
|
||||
return (main_artist, collaborators)
|
||||
|
||||
def parse_collaborators(collaborators_string: str) -> List[str]:
|
||||
if not collaborators_string:
|
||||
return []
|
||||
separators = r'[,&]|\b(?:and)\b'
|
||||
parts = re.split(separators, collaborators_string, flags=re.IGNORECASE)
|
||||
collaborators = [part.strip() for part in parts if part.strip()]
|
||||
return collaborators
|
||||
|
||||
def generate_title_variations(title: str) -> List[str]:
|
||||
search_titles = [title.strip()]
|
||||
title_fixes = title.strip()
|
||||
|
||||
# Normalize apostrophes (curly to straight)
|
||||
title_fixes = title_fixes.replace(''', "'").replace(''', "'")
|
||||
if title_fixes != title.strip():
|
||||
search_titles.append(title_fixes)
|
||||
|
||||
apostrophe_fixes = [
|
||||
(r'\bDont\b', "Don't"), (r'\bCant\b', "Can't"), (r'\bWont\b', "Won't"),
|
||||
(r'\bArent\b', "Aren't"), (r'\bIsnt\b', "Isn't"), (r'\bWasnt\b', "Wasn't"),
|
||||
(r'\bDidnt\b', "Didn't"), (r'\bDoesnt\b', "Doesn't"), (r'\bHavent\b', "Haven't"),
|
||||
(r'\bHasnt\b', "Hasn't"), (r'\bWouldnt\b', "Wouldn't"), (r'\bCouldnt\b', "Couldn't"),
|
||||
(r'\bShouldnt\b', "Shouldn't"), (r'\bPhunk\b', "Funk"), (r'\bBout\b', "About")
|
||||
]
|
||||
for pattern, replacement in apostrophe_fixes:
|
||||
fixed_title = re.sub(pattern, replacement, title_fixes, flags=re.IGNORECASE)
|
||||
if fixed_title != title_fixes:
|
||||
title_fixes = fixed_title
|
||||
if title_fixes not in search_titles:
|
||||
search_titles.append(title_fixes)
|
||||
|
||||
# Remove specific patterns first, then general parentheses
|
||||
specific_patterns = [
|
||||
r'\s*\(Karaoke Version\)', r'\s*\(Karaoke\)', r'\s*\(Instrumental\)', r'\s*\(Backing Track\)',
|
||||
r'\s*\(live [^)]*\)', r'\s*\(Live [^)]*\)', r'\s*\(Acoustic\)', r'\s*\(acoustic\)',
|
||||
r'\s*\(Without Backing Vocals\)', r'\s*\(Clean\)', r'\s*\(clean\)', r'\s*\(Remix\)',
|
||||
r'\s*\(Radio Edit\)', r'\s*\(radio edit\)', r'\s*\(Extended Mix\)', r'\s*\(extended mix\)',
|
||||
r'\s*\(Single Version\)', r'\s*\(single version\)', r'\s*\(Album Version\)', r'\s*\(album version\)',
|
||||
r'\s*\(Original Mix\)', r'\s*\(original mix\)', r'\s*\(John Lewis Christmas Ad \d+\)',
|
||||
r'\s*\(from the movie [^)]*\)', r'\s*\(from the [^)]*\)', r'\s*\(feat\. [^)]*\)',
|
||||
r'\s*\(featuring [^)]*\)', r'\s*\(ft\. [^)]*\)', r'\s*\(duet\)', r'\s*\(Duet\)',
|
||||
r'\s*\(Two Semitones Down\)', r'\s*\(Minus Piano\)', r'\s*\(Cut Down\)',
|
||||
r'\s*\(Boone & Speedy Vocals\)', r'\s*\(My Heart Belongs to You\)'
|
||||
]
|
||||
|
||||
# Apply specific patterns first
|
||||
for pattern in specific_patterns:
|
||||
specific_clean = re.sub(pattern, '', title.strip(), flags=re.IGNORECASE)
|
||||
specific_clean = specific_clean.strip()
|
||||
if specific_clean != title.strip() and specific_clean and specific_clean not in search_titles:
|
||||
search_titles.append(specific_clean)
|
||||
|
||||
# Only remove general parentheses if no specific patterns matched
|
||||
if len(search_titles) == 1: # Only the original title
|
||||
clean_title = re.sub(r'\s*\([^)]*\)', '', title.strip()).strip()
|
||||
if clean_title != title.strip() and clean_title:
|
||||
search_titles.append(clean_title)
|
||||
|
||||
specific_patterns = [
|
||||
r'\s*\(Karaoke Version\)', r'\s*\(Karaoke\)', r'\s*\(Instrumental\)', r'\s*\(Backing Track\)',
|
||||
r'\s*\(live [^)]*\)', r'\s*\(Live [^)]*\)', r'\s*\(Acoustic\)', r'\s*\(acoustic\)',
|
||||
r'\s*\(Without Backing Vocals\)', r'\s*\(Clean\)', r'\s*\(clean\)', r'\s*\(Remix\)'
|
||||
]
|
||||
for pattern in specific_patterns:
|
||||
clean_title = re.sub(pattern, '', title_fixes, flags=re.IGNORECASE).strip()
|
||||
if clean_title and clean_title not in search_titles:
|
||||
search_titles.append(clean_title)
|
||||
|
||||
return search_titles
|
||||
Loading…
Reference in New Issue
Block a user