777 lines
33 KiB
Python
777 lines
33 KiB
Python
"""
|
||
Direct PostgreSQL database access for MusicBrainz Data Cleaner.
|
||
Provides fuzzy search capabilities for artists and recordings.
|
||
"""
|
||
|
||
import psycopg2
|
||
import psycopg2.extras
|
||
from typing import List, Dict, Optional, Tuple, Any
|
||
from fuzzywuzzy import fuzz
|
||
from ..config.constants import (
|
||
DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD, DB_CONNECTION_TIMEOUT,
|
||
FUZZY_SEARCH_THRESHOLD, MAX_SEARCH_RESULTS,
|
||
TITLE_SIMILARITY_THRESHOLD, ARTIST_SIMILARITY_THRESHOLD,
|
||
SUCCESS_MESSAGES, ERROR_MESSAGES
|
||
)
|
||
|
||
|
||
class MusicBrainzDatabase:
|
||
"""Direct PostgreSQL database access for MusicBrainz with fuzzy search."""
|
||
|
||
def __init__(self, host: str = DB_HOST, port: int = DB_PORT,
|
||
database: str = DB_NAME, user: str = DB_USER,
|
||
password: str = DB_PASSWORD):
|
||
self.host = host
|
||
self.port = port
|
||
self.database = database
|
||
self.user = user
|
||
self.password = password
|
||
self.connection = None
|
||
self.cursor = None
|
||
|
||
def connect(self) -> bool:
|
||
"""Establish connection to the MusicBrainz database."""
|
||
try:
|
||
# Use the direct connection method that works
|
||
self.connection = psycopg2.connect(
|
||
host='db', # Use Docker service name
|
||
port=self.port,
|
||
database=self.database,
|
||
user=self.user,
|
||
password=self.password,
|
||
connect_timeout=DB_CONNECTION_TIMEOUT
|
||
)
|
||
self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||
print(SUCCESS_MESSAGES['db_connected'])
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(ERROR_MESSAGES['db_connection_failed'])
|
||
print(f"Connection error: {e}")
|
||
return False
|
||
|
||
def disconnect(self):
|
||
"""Close database connection."""
|
||
if self.cursor:
|
||
self.cursor.close()
|
||
if self.connection:
|
||
self.connection.close()
|
||
|
||
def __enter__(self):
|
||
"""Context manager entry."""
|
||
self.connect()
|
||
return self
|
||
|
||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||
"""Context manager exit."""
|
||
self.disconnect()
|
||
|
||
def fuzzy_search_artist(self, artist_name: str) -> Optional[Tuple[str, str, float]]:
|
||
"""
|
||
Fuzzy search for artist by name.
|
||
Returns (artist_name, mbid, similarity_score) or None.
|
||
"""
|
||
if not self.connection:
|
||
return None
|
||
|
||
try:
|
||
# Clean artist name
|
||
clean_name = artist_name.strip()
|
||
|
||
# Handle collaborations - extract main artist
|
||
if 'ft.' in clean_name.lower():
|
||
main_artist = clean_name.split('ft.')[0].strip()
|
||
elif 'feat.' in clean_name.lower():
|
||
main_artist = clean_name.split('feat.')[0].strip()
|
||
else:
|
||
main_artist = clean_name
|
||
|
||
# Try exact match first - but check for multiple matches
|
||
query = "SELECT name, gid FROM artist WHERE name = %s"
|
||
self.cursor.execute(query, (main_artist,))
|
||
results = self.cursor.fetchall()
|
||
|
||
if results:
|
||
if len(results) == 1:
|
||
# Only one exact match, use it
|
||
result = results[0]
|
||
print(SUCCESS_MESSAGES['fuzzy_match_found'].format(
|
||
original=artist_name,
|
||
matched=result['name'],
|
||
score=1.0
|
||
))
|
||
return (result['name'], result['gid'], 1.0)
|
||
else:
|
||
# Multiple exact matches - choose the one with most recordings
|
||
print(f"🔍 Found {len(results)} exact matches for '{main_artist}', checking recording counts...")
|
||
artists_with_counts = []
|
||
for artist in results:
|
||
query = """
|
||
SELECT COUNT(r.id) as recording_count
|
||
FROM artist a
|
||
JOIN artist_credit_name acn ON a.id = acn.artist
|
||
JOIN artist_credit ac ON acn.artist_credit = ac.id
|
||
JOIN recording r ON ac.id = r.artist_credit
|
||
WHERE a.gid = %s
|
||
"""
|
||
self.cursor.execute(query, (artist['gid'],))
|
||
count_result = self.cursor.fetchone()
|
||
recording_count = count_result['recording_count'] if count_result else 0
|
||
artists_with_counts.append({
|
||
'name': artist['name'],
|
||
'gid': artist['gid'],
|
||
'recording_count': recording_count
|
||
})
|
||
print(f" 📊 {artist['name']} ({artist['gid'][:8]}...): {recording_count} recordings")
|
||
|
||
# Choose the artist with the most recordings
|
||
best_artist = max(artists_with_counts, key=lambda x: x['recording_count'])
|
||
print(f" 🎯 Selected: {best_artist['name']} with {best_artist['recording_count']} recordings")
|
||
|
||
print(SUCCESS_MESSAGES['fuzzy_match_found'].format(
|
||
original=artist_name,
|
||
matched=best_artist['name'],
|
||
score=1.0
|
||
))
|
||
return (best_artist['name'], best_artist['gid'], 1.0)
|
||
|
||
# Try case-insensitive match
|
||
query = "SELECT name, gid FROM artist WHERE LOWER(name) = LOWER(%s) LIMIT 1"
|
||
self.cursor.execute(query, (main_artist,))
|
||
result = self.cursor.fetchone()
|
||
if result:
|
||
print(SUCCESS_MESSAGES['fuzzy_match_found'].format(
|
||
original=artist_name,
|
||
matched=result['name'],
|
||
score=0.99
|
||
))
|
||
return (result['name'], result['gid'], 0.99)
|
||
|
||
# Try matching with different dash characters
|
||
# Replace regular dash with Unicode dash and vice versa
|
||
dash_variants = [
|
||
main_artist.replace('-', '‐'), # Regular dash to Unicode dash
|
||
main_artist.replace('‐', '-'), # Unicode dash to regular dash
|
||
]
|
||
|
||
for variant in dash_variants:
|
||
if variant != main_artist:
|
||
query = "SELECT name, gid FROM artist WHERE name = %s LIMIT 1"
|
||
self.cursor.execute(query, (variant,))
|
||
result = self.cursor.fetchone()
|
||
if result:
|
||
print(SUCCESS_MESSAGES['fuzzy_match_found'].format(
|
||
original=artist_name,
|
||
matched=result['name'],
|
||
score=0.98
|
||
))
|
||
return (result['name'], result['gid'], 0.98)
|
||
|
||
# Try matching with "&" vs "and" variations
|
||
if '&' in main_artist:
|
||
and_variant = main_artist.replace('&', 'and')
|
||
query = "SELECT name, gid FROM artist WHERE name = %s LIMIT 1"
|
||
self.cursor.execute(query, (and_variant,))
|
||
result = self.cursor.fetchone()
|
||
if result:
|
||
print(SUCCESS_MESSAGES['fuzzy_match_found'].format(
|
||
original=artist_name,
|
||
matched=result['name'],
|
||
score=0.97
|
||
))
|
||
return (result['name'], result['gid'], 0.97)
|
||
elif ' and ' in main_artist.lower():
|
||
ampersand_variant = main_artist.replace(' and ', ' & ')
|
||
query = "SELECT name, gid FROM artist WHERE name = %s LIMIT 1"
|
||
self.cursor.execute(query, (ampersand_variant,))
|
||
result = self.cursor.fetchone()
|
||
if result:
|
||
print(SUCCESS_MESSAGES['fuzzy_match_found'].format(
|
||
original=artist_name,
|
||
matched=result['name'],
|
||
score=0.97
|
||
))
|
||
return (result['name'], result['gid'], 0.97)
|
||
|
||
# Try fuzzy search
|
||
# Use more specific patterns to avoid false matches
|
||
# Also try different dash characters and name variations
|
||
dash_variants = [
|
||
main_artist,
|
||
main_artist.replace('-', '‐'), # Regular dash to Unicode dash
|
||
main_artist.replace('‐', '-'), # Unicode dash to regular dash
|
||
]
|
||
|
||
# Add name variations for common cases like "S Club 7" -> "S Club"
|
||
name_variants = []
|
||
for variant in dash_variants:
|
||
name_variants.append(variant)
|
||
# Handle cases like "S Club 7" -> "S Club"
|
||
if ' ' in variant:
|
||
parts = variant.split()
|
||
if len(parts) > 1:
|
||
# Try without the last part if it's a number (e.g., "S Club 7" -> "S Club")
|
||
if parts[-1].isdigit():
|
||
name_variants.append(' '.join(parts[:-1]))
|
||
|
||
# First try exact matches with name variations
|
||
for variant in name_variants:
|
||
# Try exact match in artist names
|
||
query = "SELECT name, gid FROM artist WHERE name = %s LIMIT 1"
|
||
self.cursor.execute(query, (variant,))
|
||
result = self.cursor.fetchone()
|
||
if result:
|
||
print(SUCCESS_MESSAGES['fuzzy_match_found'].format(
|
||
original=artist_name,
|
||
matched=result['name'],
|
||
score=0.95
|
||
))
|
||
return (result['name'], result['gid'], 0.95)
|
||
|
||
# Try exact match in artist aliases
|
||
query = """
|
||
SELECT a.name, a.gid
|
||
FROM artist a
|
||
JOIN artist_alias aa ON a.id = aa.artist
|
||
WHERE aa.name = %s
|
||
LIMIT 1
|
||
"""
|
||
self.cursor.execute(query, (variant,))
|
||
result = self.cursor.fetchone()
|
||
if result:
|
||
print(SUCCESS_MESSAGES['fuzzy_match_found'].format(
|
||
original=artist_name,
|
||
matched=result['name'],
|
||
score=0.94
|
||
))
|
||
return (result['name'], result['gid'], 0.94)
|
||
|
||
# Try exact match in sort_name field
|
||
query = "SELECT name, gid FROM artist WHERE sort_name = %s LIMIT 1"
|
||
self.cursor.execute(query, (variant,))
|
||
result = self.cursor.fetchone()
|
||
if result:
|
||
print(SUCCESS_MESSAGES['fuzzy_match_found'].format(
|
||
original=artist_name,
|
||
matched=result['name'],
|
||
score=0.93
|
||
))
|
||
return (result['name'], result['gid'], 0.93)
|
||
|
||
# Then try starts-with patterns for better precision
|
||
patterns = []
|
||
for variant in name_variants:
|
||
patterns.extend([
|
||
f"{variant}%", # Starts with (most precise)
|
||
f" {variant} ", # Word boundary
|
||
f"%{variant}%", # Contains (least precise)
|
||
])
|
||
|
||
all_artists = []
|
||
for pattern in patterns:
|
||
# Search in artist names
|
||
query = "SELECT name, gid FROM artist WHERE name ILIKE %s LIMIT 20"
|
||
self.cursor.execute(query, (pattern,))
|
||
artists = self.cursor.fetchall()
|
||
all_artists.extend(artists)
|
||
|
||
# Search in artist aliases
|
||
query = """
|
||
SELECT a.name, a.gid
|
||
FROM artist a
|
||
JOIN artist_alias aa ON a.id = aa.artist
|
||
WHERE aa.name ILIKE %s
|
||
LIMIT 20
|
||
"""
|
||
self.cursor.execute(query, (pattern,))
|
||
artists = self.cursor.fetchall()
|
||
all_artists.extend(artists)
|
||
|
||
# Search in sort_name field
|
||
query = "SELECT name, gid FROM artist WHERE sort_name ILIKE %s LIMIT 20"
|
||
self.cursor.execute(query, (pattern,))
|
||
artists = self.cursor.fetchall()
|
||
all_artists.extend(artists)
|
||
|
||
# Remove duplicates based on MBID
|
||
seen_mbids = set()
|
||
unique_artists = []
|
||
for artist in all_artists:
|
||
if artist['gid'] not in seen_mbids:
|
||
seen_mbids.add(artist['gid'])
|
||
unique_artists.append(artist)
|
||
|
||
# Get recording counts for all artists to prioritize famous ones
|
||
artists_with_counts = []
|
||
print(f"🔍 Found {len(unique_artists)} unique artists, checking recording counts...")
|
||
for artist in unique_artists:
|
||
query = """
|
||
SELECT COUNT(r.id) as recording_count
|
||
FROM artist a
|
||
JOIN artist_credit_name acn ON a.id = acn.artist
|
||
JOIN artist_credit ac ON acn.artist_credit = ac.id
|
||
JOIN recording r ON ac.id = r.artist_credit
|
||
WHERE a.gid = %s
|
||
"""
|
||
self.cursor.execute(query, (artist['gid'],))
|
||
result = self.cursor.fetchone()
|
||
recording_count = result['recording_count'] if result else 0
|
||
artists_with_counts.append({
|
||
'name': artist['name'],
|
||
'gid': artist['gid'],
|
||
'recording_count': recording_count
|
||
})
|
||
print(f" 📊 {artist['name']} ({artist['gid'][:8]}...): {recording_count} recordings")
|
||
|
||
best_match = None
|
||
best_score = 0
|
||
best_recording_count = 0
|
||
|
||
for artist in artists_with_counts:
|
||
score = fuzz.ratio(main_artist.lower(), artist['name'].lower())
|
||
|
||
# Only apply strict filtering for very obvious false matches
|
||
# like "Sleazy-E" vs "Eazy-E" where one is clearly a substring of the other
|
||
if (main_artist.lower() in artist['name'].lower() and
|
||
artist['name'].lower() != main_artist.lower() and
|
||
len(artist['name']) > len(main_artist) + 2):
|
||
# This is likely a false match, require very high score
|
||
if score < 90:
|
||
continue
|
||
|
||
# Prioritize artists with more recordings (famous artists)
|
||
# If scores are close (within 5 points), prefer the one with more recordings
|
||
if (score > best_score or
|
||
(score >= best_score - 5 and artist['recording_count'] > best_recording_count * 2)):
|
||
best_score = score
|
||
best_match = artist
|
||
best_recording_count = artist['recording_count']
|
||
print(f" 🎯 New best match: {artist['name']} (score: {score}, recordings: {artist['recording_count']})")
|
||
|
||
if best_score >= 80:
|
||
print(SUCCESS_MESSAGES['fuzzy_match_found'].format(
|
||
original=artist_name,
|
||
matched=best_match['name'],
|
||
score=best_score / 100.0
|
||
))
|
||
return (best_match['name'], best_match['gid'], best_score / 100.0)
|
||
|
||
return None
|
||
|
||
except Exception as e:
|
||
print(f"Error in fuzzy_search_artist: {e}")
|
||
return None
|
||
|
||
def fuzzy_search_recording(self, title: str, artist_mbid: Optional[str] = None) -> Optional[Tuple[str, str, float]]:
|
||
"""
|
||
Fuzzy search for recording by title and optionally artist MBID.
|
||
Returns (recording_name, mbid, similarity_score) or None.
|
||
"""
|
||
if not self.connection:
|
||
return None
|
||
|
||
try:
|
||
# Generate title variations using the helper method
|
||
search_titles = self._generate_title_variations(title)
|
||
|
||
if artist_mbid:
|
||
# Try to find recording by artist and title with all variations
|
||
for search_title in search_titles:
|
||
query = """
|
||
SELECT r.name, r.gid
|
||
FROM recording r
|
||
JOIN artist_credit_name acn ON r.artist_credit = acn.artist_credit
|
||
JOIN artist a ON acn.artist = a.id
|
||
WHERE a.gid = %s AND r.name ILIKE %s
|
||
LIMIT 1
|
||
"""
|
||
self.cursor.execute(query, (artist_mbid, f"%{search_title}%"))
|
||
result = self.cursor.fetchone()
|
||
|
||
if result:
|
||
return (result['name'], result['gid'], 1.0)
|
||
|
||
# Try fuzzy search with all title variations
|
||
query = """
|
||
SELECT r.name, r.gid
|
||
FROM recording r
|
||
JOIN artist_credit_name acn ON r.artist_credit = acn.artist_credit
|
||
JOIN artist a ON acn.artist = a.id
|
||
WHERE a.gid = %s
|
||
LIMIT 100
|
||
"""
|
||
self.cursor.execute(query, (artist_mbid,))
|
||
recordings = self.cursor.fetchall()
|
||
|
||
best_match = None
|
||
best_score = 0
|
||
|
||
for recording in recordings:
|
||
for search_title in search_titles:
|
||
score = fuzz.ratio(search_title.lower(), recording['name'].lower())
|
||
if score > best_score:
|
||
best_score = score
|
||
best_match = recording
|
||
|
||
if best_score >= 80:
|
||
return (best_match['name'], best_match['gid'], best_score / 100.0)
|
||
else:
|
||
# No artist constraint - search by title only with all variations
|
||
for search_title in search_titles:
|
||
# Try exact match first
|
||
query = "SELECT name, gid FROM recording WHERE name = %s LIMIT 1"
|
||
self.cursor.execute(query, (search_title,))
|
||
result = self.cursor.fetchone()
|
||
if result:
|
||
return (result['name'], result['gid'], 1.0)
|
||
|
||
# Try case-insensitive match
|
||
query = "SELECT name, gid FROM recording WHERE LOWER(name) = LOWER(%s) LIMIT 1"
|
||
self.cursor.execute(query, (search_title,))
|
||
result = self.cursor.fetchone()
|
||
if result:
|
||
return (result['name'], result['gid'], 0.99)
|
||
|
||
# Try partial match with all variations
|
||
best_match = None
|
||
best_score = 0
|
||
|
||
for search_title in search_titles:
|
||
query = "SELECT name, gid FROM recording WHERE name ILIKE %s LIMIT 50"
|
||
self.cursor.execute(query, (f"%{search_title}%",))
|
||
recordings = self.cursor.fetchall()
|
||
|
||
for recording in recordings:
|
||
score = fuzz.ratio(search_title.lower(), recording['name'].lower())
|
||
if score > best_score:
|
||
best_score = score
|
||
best_match = recording
|
||
|
||
if best_score >= 80:
|
||
return (best_match['name'], best_match['gid'], best_score / 100.0)
|
||
|
||
return None
|
||
|
||
except Exception as e:
|
||
print(f"Error in fuzzy_search_recording: {e}")
|
||
return None
|
||
|
||
def get_artist_info(self, mbid: str) -> Optional[Dict[str, Any]]:
|
||
"""Get detailed artist information by MBID."""
|
||
if not self.connection:
|
||
return None
|
||
|
||
try:
|
||
query = """
|
||
SELECT name, gid as mbid, sort_name
|
||
FROM artist
|
||
WHERE gid = %s
|
||
"""
|
||
self.cursor.execute(query, (mbid,))
|
||
artist = self.cursor.fetchone()
|
||
|
||
if artist:
|
||
return dict(artist)
|
||
return None
|
||
|
||
except Exception as e:
|
||
print(ERROR_MESSAGES['db_query_failed'].format(error=str(e)))
|
||
return None
|
||
|
||
def get_recording_info(self, mbid: str) -> Optional[Dict[str, Any]]:
|
||
"""Get recording information by MBID."""
|
||
if not self.connection:
|
||
return None
|
||
|
||
try:
|
||
query = """
|
||
SELECT r.name as title, r.gid as mbid, ac.id as artist_credit_id
|
||
FROM recording r
|
||
JOIN artist_credit ac ON r.artist_credit = ac.id
|
||
WHERE r.gid = %s
|
||
LIMIT 1
|
||
"""
|
||
self.cursor.execute(query, (mbid,))
|
||
result = self.cursor.fetchone()
|
||
|
||
if result:
|
||
# Get artist credit information
|
||
artist_query = """
|
||
SELECT a.name, acn.join_phrase, acn.position
|
||
FROM artist_credit_name acn
|
||
JOIN artist a ON acn.artist = a.id
|
||
WHERE acn.artist_credit = %s
|
||
ORDER BY acn.position
|
||
"""
|
||
self.cursor.execute(artist_query, (result['artist_credit_id'],))
|
||
artists = self.cursor.fetchall()
|
||
|
||
# Build the artist string
|
||
artist_string = ""
|
||
for artist in artists:
|
||
artist_string += artist['name']
|
||
if artist['join_phrase']:
|
||
artist_string += artist['join_phrase']
|
||
|
||
return {
|
||
'title': result['title'],
|
||
'mbid': result['mbid'],
|
||
'artist_credit': artist_string
|
||
}
|
||
|
||
return None
|
||
|
||
except Exception as e:
|
||
print(f"Error in get_recording_info: {e}")
|
||
return None
|
||
|
||
def test_connection(self) -> bool:
|
||
"""Test database connection."""
|
||
try:
|
||
if not self.connection:
|
||
return self.connect()
|
||
|
||
self.cursor.execute("SELECT 1")
|
||
return True
|
||
except Exception as e:
|
||
print(f"Database connection test failed: {e}")
|
||
return False
|
||
|
||
def find_artist_credit(self, artist_name: str, title: str) -> Optional[Tuple[str, str, str]]:
|
||
"""
|
||
Find artist credit for collaboration.
|
||
Returns (artist_credit_id, artist_string, recording_mbid) or None.
|
||
"""
|
||
if not self.connection:
|
||
return None
|
||
|
||
try:
|
||
# Clean artist name
|
||
clean_name = artist_name.strip()
|
||
|
||
# Parse complex collaboration
|
||
main_artist, collaborators = self._parse_complex_collaboration(clean_name)
|
||
collaboration = len(collaborators) > 0
|
||
|
||
# Generate title variations using the helper method
|
||
search_titles = self._generate_title_variations(title)
|
||
|
||
# Try to find recording with artist credit using all title variations
|
||
for search_title in search_titles:
|
||
query = """
|
||
SELECT ac.id as artist_credit_id, r.gid as recording_mbid
|
||
FROM recording r
|
||
JOIN artist_credit ac ON r.artist_credit = ac.id
|
||
JOIN artist_credit_name acn ON ac.id = acn.artist_credit
|
||
JOIN artist a ON acn.artist = a.id
|
||
WHERE a.name = %s AND r.name ILIKE %s
|
||
LIMIT 1
|
||
"""
|
||
self.cursor.execute(query, (main_artist, f"%{search_title}%"))
|
||
result = self.cursor.fetchone()
|
||
|
||
if result:
|
||
break
|
||
|
||
if result:
|
||
# Get the full artist string for this credit
|
||
artist_query = """
|
||
SELECT a.name, acn.join_phrase, acn.position
|
||
FROM artist_credit_name acn
|
||
JOIN artist a ON acn.artist = a.id
|
||
WHERE acn.artist_credit = %s
|
||
ORDER BY acn.position
|
||
"""
|
||
self.cursor.execute(artist_query, (result['artist_credit_id'],))
|
||
artists = self.cursor.fetchall()
|
||
|
||
# Build the artist string
|
||
artist_string = ""
|
||
for artist in artists:
|
||
artist_string += artist['name']
|
||
if artist['join_phrase']:
|
||
artist_string += artist['join_phrase']
|
||
|
||
return (str(result['artist_credit_id']), artist_string, result['recording_mbid'])
|
||
|
||
return None
|
||
|
||
except Exception as e:
|
||
print(f"Error in find_artist_credit: {e}")
|
||
return None
|
||
|
||
def _parse_complex_collaboration(self, artist_string: str) -> Tuple[str, List[str]]:
|
||
"""
|
||
Parse complex collaboration strings like "Pitbull ft. Ne-Yo, Afrojack & Nayer"
|
||
Returns: (main_artist, [collaborators])
|
||
"""
|
||
import re
|
||
|
||
# Primary collaboration indicators
|
||
primary_patterns = ['ft.', 'feat.', 'featuring']
|
||
|
||
# Secondary collaboration indicators (need more careful handling)
|
||
secondary_patterns = ['&', 'and']
|
||
|
||
# Check if this is a collaboration
|
||
is_collaboration = False
|
||
split_pattern = None
|
||
|
||
for pattern in primary_patterns:
|
||
if pattern.lower() in artist_string.lower():
|
||
is_collaboration = True
|
||
# Find the actual pattern in the original string (case-insensitive)
|
||
import re
|
||
match = re.search(re.escape(pattern), artist_string, re.IGNORECASE)
|
||
if match:
|
||
split_pattern = match.group(0)
|
||
else:
|
||
split_pattern = pattern
|
||
break
|
||
|
||
# If no primary collaboration found, check secondary patterns
|
||
if not is_collaboration:
|
||
for pattern in secondary_patterns:
|
||
# Use word boundaries to avoid splitting within words like "Orlando"
|
||
import re
|
||
pattern_regex = r'\b' + re.escape(pattern) + r'\b'
|
||
if re.search(pattern_regex, artist_string, re.IGNORECASE):
|
||
# For secondary patterns, be more careful
|
||
# Check if this looks like a band name vs collaboration
|
||
parts = re.split(pattern_regex, artist_string, flags=re.IGNORECASE)
|
||
if len(parts) == 2:
|
||
part1 = parts[0].strip()
|
||
part2 = parts[1].strip()
|
||
|
||
# Check if this looks like a band name vs collaboration
|
||
# Band names typically have simple, short parts
|
||
# Collaborations often have more complex artist names
|
||
part1_words = len(part1.split())
|
||
part2_words = len(part2.split())
|
||
|
||
# Check if this looks like a band name vs collaboration
|
||
# Band names typically have simple, short parts
|
||
# Collaborations often have more complex artist names
|
||
|
||
# If one part has a comma, it's likely a collaboration
|
||
if ',' in part1 or ',' in part2:
|
||
is_collaboration = True
|
||
split_pattern = pattern
|
||
break
|
||
|
||
# For "&" and "and" separators, check against known band names
|
||
# Load band names from JSON file
|
||
try:
|
||
import json
|
||
import os
|
||
json_path = os.path.join(os.path.dirname(__file__), '..', '..', 'data', 'known_artists.json')
|
||
with open(json_path, 'r') as f:
|
||
known_data = json.load(f)
|
||
known_band_names = {name.lower() for name in known_data.get('groups', [])}
|
||
except (FileNotFoundError, json.JSONDecodeError, KeyError):
|
||
# Fallback to a minimal list if file can't be loaded
|
||
known_band_names = {
|
||
"gerry and the pacemakers", "simon & garfunkel", "hall & oates",
|
||
"brooks & dunn", "the everly brothers"
|
||
}
|
||
|
||
if artist_string.lower() in known_band_names:
|
||
# This is a known band name, not a collaboration
|
||
continue
|
||
|
||
# For "and" separators, be more conservative
|
||
# Many band names use "and" (e.g., "Gerry and the Pacemakers")
|
||
if pattern.lower() == 'and':
|
||
# Only treat as collaboration if it looks like individual artists
|
||
# Band names often have "the" in the second part
|
||
if 'the ' in part2.lower():
|
||
# Likely a band name, skip
|
||
continue
|
||
|
||
# Likely a collaboration
|
||
is_collaboration = True
|
||
split_pattern = pattern
|
||
break
|
||
|
||
if not is_collaboration:
|
||
return (artist_string, [])
|
||
|
||
# Split on the pattern
|
||
parts = artist_string.split(split_pattern)
|
||
if len(parts) < 2:
|
||
return (artist_string, [])
|
||
|
||
main_artist = parts[0].strip()
|
||
collaborators_string = split_pattern.join(parts[1:]).strip()
|
||
|
||
# Now parse the collaborators string which might have multiple separators
|
||
collaborators = self._parse_collaborators(collaborators_string)
|
||
|
||
return (main_artist, collaborators)
|
||
|
||
def _generate_title_variations(self, title: str) -> List[str]:
|
||
"""
|
||
Generate title variations by removing parenthetical content.
|
||
Returns list of title variations to try.
|
||
"""
|
||
import re
|
||
|
||
search_titles = [title.strip()]
|
||
|
||
# Remove complete parentheses (content)
|
||
clean_title = re.sub(r'\s*\([^)]*\)', '', title.strip())
|
||
clean_title = clean_title.strip()
|
||
if clean_title != title.strip() and clean_title:
|
||
search_titles.append(clean_title)
|
||
|
||
# Remove unmatched opening parenthesis at end
|
||
clean_title2 = re.sub(r'\s*\([^)]*$', '', title.strip())
|
||
clean_title2 = clean_title2.strip()
|
||
if clean_title2 != title.strip() and clean_title2 and clean_title2 not in search_titles:
|
||
search_titles.append(clean_title2)
|
||
|
||
# Remove unmatched closing parenthesis at start
|
||
clean_title3 = re.sub(r'^[^)]*\)\s*', '', title.strip())
|
||
clean_title3 = clean_title3.strip()
|
||
if clean_title3 != title.strip() and clean_title3 and clean_title3 not in search_titles:
|
||
search_titles.append(clean_title3)
|
||
|
||
# Also try with specific karaoke patterns removed
|
||
karaoke_patterns = [
|
||
r'\s*\(Karaoke Version\)',
|
||
r'\s*\(Karaoke\)',
|
||
r'\s*\(Instrumental\)',
|
||
r'\s*\(Backing Track\)',
|
||
]
|
||
for pattern in karaoke_patterns:
|
||
karaoke_clean = re.sub(pattern, '', title.strip(), flags=re.IGNORECASE)
|
||
karaoke_clean = karaoke_clean.strip()
|
||
if karaoke_clean != title.strip() and karaoke_clean not in search_titles:
|
||
search_titles.append(karaoke_clean)
|
||
|
||
return search_titles
|
||
|
||
def _parse_collaborators(self, collaborators_string: str) -> List[str]:
|
||
"""
|
||
Parse a string like "Ne-Yo, Afrojack & Nayer" into individual artists
|
||
"""
|
||
import re
|
||
|
||
if not collaborators_string:
|
||
return []
|
||
|
||
# Split on common separators: comma, ampersand, "and"
|
||
# Use regex to handle multiple separators
|
||
# Use word boundaries to avoid splitting within words like "Orlando"
|
||
separators = r'[,&]|\b(?:and)\b'
|
||
|
||
# Split and clean up each part
|
||
parts = re.split(separators, collaborators_string, flags=re.IGNORECASE)
|
||
|
||
collaborators = []
|
||
for part in parts:
|
||
part = part.strip()
|
||
if part: # Skip empty parts
|
||
collaborators.append(part)
|
||
|
||
return collaborators |