KaraokeMerge/web/app.py

719 lines
28 KiB
Python

#!/usr/bin/env python3
"""
Web UI for Karaoke Song Library Cleanup Tool
Provides interactive interface for reviewing duplicates and making decisions.
"""
from flask import Flask, render_template, jsonify, request, send_from_directory
import json
import os
from typing import Dict, List, Any
from datetime import datetime
app = Flask(__name__)
# Configuration
DATA_DIR = '../data'
REPORTS_DIR = os.path.join(DATA_DIR, 'reports')
CONFIG_FILE = '../config/config.json'
def load_json_file(file_path: str) -> Any:
"""Load JSON file safely."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Error loading {file_path}: {e}")
return None
def get_duplicate_groups(skip_songs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Group skip songs by artist/title to show duplicates together."""
duplicate_groups = {}
for skip_song in skip_songs:
artist = skip_song.get('artist', 'Unknown')
title = skip_song.get('title', 'Unknown')
key = f"{artist} - {title}"
if key not in duplicate_groups:
duplicate_groups[key] = {
'artist': artist,
'title': title,
'kept_version': skip_song.get('kept_version', 'Unknown'),
'skipped_versions': [],
'total_duplicates': 0
}
duplicate_groups[key]['skipped_versions'].append({
'path': skip_song['path'],
'reason': skip_song.get('reason', 'duplicate'),
'file_type': get_file_type(skip_song['path']),
'channel': extract_channel(skip_song['path'])
})
duplicate_groups[key]['total_duplicates'] = len(duplicate_groups[key]['skipped_versions'])
# Convert to list and sort by artist first, then by title
groups_list = list(duplicate_groups.values())
groups_list.sort(key=lambda x: (x['artist'].lower(), x['title'].lower()))
return groups_list
def generate_mp3_song_list(all_songs: List[Dict[str, Any]], skip_songs: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Generate a list of MP3 songs that remain after cleanup."""
# Create a set of paths that are being skipped
skip_paths = {song['path'] for song in skip_songs}
# Filter for MP3/CDG songs that are NOT being skipped
mp3_songs = []
for song in all_songs:
path = song.get('path', '')
if path.lower().endswith(('.mp3', '.cdg')) and path not in skip_paths:
mp3_songs.append({
'position': len(mp3_songs) + 1,
'title': song.get('title', 'Unknown'),
'artist': song.get('artist', 'Unknown')
})
# Sort by artist, then by title
mp3_songs.sort(key=lambda x: (x['artist'].lower(), x['title'].lower()))
# Update positions after sorting
for i, song in enumerate(mp3_songs):
song['position'] = i + 1
return {
'title': 'MP3 Songs need to be found',
'songs': mp3_songs
}
def get_file_type(path: str) -> str:
"""Extract file type from path."""
path_lower = path.lower()
if path_lower.endswith('.mp4'):
return 'MP4'
elif path_lower.endswith('.mp3'):
return 'MP3'
elif path_lower.endswith('.cdg'):
return 'MP3' # Treat CDG as MP3 since they're paired
return 'Unknown'
def extract_channel(path: str) -> str:
"""Extract channel name from path with consolidated MP3/CDG handling."""
path_lower = path.lower()
parts = path.split('\\')
# Handle MP3/CDG files - consolidate all CDG folder content under "MP3"
if path.lower().endswith(('.mp3', '.cdg')):
# Look for CDG folder structure: CDG/ParentFolder/SubFolder/file.mp3
for i, part in enumerate(parts):
if part.lower() == 'cdg' and i < len(parts) - 1:
# If CDG is found, return "MP3" as the consolidated channel
return 'MP3'
# If not in CDG folder but still MP3/CDG, return "MP3"
return 'MP3'
# Handle MP4 files - keep existing channel logic
# Look for specific known channels first
known_channels = ['Sing King Karaoke', 'KaraFun Karaoke', 'Stingray Karaoke']
for channel in known_channels:
if channel.lower() in path_lower:
return channel
# Look for MP4 folder structure: MP4/ChannelName/song.mp4
for i, part in enumerate(parts):
if part.lower() == 'mp4' and i < len(parts) - 1:
# If MP4 is found, return the next folder (the actual channel)
if i + 1 < len(parts):
next_part = parts[i + 1]
# Check if the next part is a folder (no file extension) or a file
if '.' not in next_part:
return next_part # It's a folder, so it's the channel name
else:
return 'MP4 Root' # File is directly in MP4 folder
else:
return 'MP4 Root'
# Look for any folder that contains 'karaoke' (fallback)
for part in parts:
if 'karaoke' in part.lower() and '.' not in part:
return part
# If no specific channel found, return the folder containing the file
if len(parts) >= 2:
parent_folder = parts[-2] # Second to last part (folder containing the file)
# If parent folder is MP4, then file is in root
if parent_folder.lower() == 'mp4':
return 'MP4 Root'
# Make sure we're returning a folder name, not a filename
if '.' not in parent_folder:
return parent_folder
else:
# If parent is also a file, go up one more level
if len(parts) >= 3:
grandparent = parts[-3]
if '.' not in grandparent:
return grandparent
return 'Unknown'
def normalize_path(file_path: str) -> str:
"""Normalize malformed file paths that have been corrupted with ://."""
# Fix malformed paths that have been corrupted with ://
# Example: z://MP4KaraFun KaraokeKaraoke I m Not In Love - 10CC.mp4
# Should be: z:\MP4\KaraFun Karaoke\Karaoke I'm Not In Love - 10CC.mp4
if '://' in file_path:
print(f"DEBUG: Detected malformed path, attempting to fix: {file_path}")
# Extract drive letter and rest of path
import re
match = re.match(r'^([a-zA-Z])://(.+)$', file_path)
if match:
drive_letter = match.group(1)
rest_of_path = match.group(2)
# Try to reconstruct the proper path structure
# Look for common patterns in the corrupted path
# Pattern 1: Handle the specific case where "Karaoke" appears twice
# Example: "MP4KaraFun KaraokeKaraoke I m Not In Love - 10CC.mp4"
# Should become: "MP4\KaraFun Karaoke\Karaoke I'm Not In Love - 10CC.mp4"
karaoke_double_match = re.match(r'^MP4([A-Za-z\s]+)KaraokeKaraoke(.+)$', rest_of_path)
if karaoke_double_match:
channel_name = karaoke_double_match.group(1) + "Karaoke"
file_name = "Karaoke" + karaoke_double_match.group(2)
fixed_path = f"{drive_letter}:\\MP4\\{channel_name}\\{file_name}"
print(f"DEBUG: Fixed path (pattern 1 - double karaoke): {fixed_path}")
return fixed_path
# Pattern 2: MP4 followed by channel name (e.g., MP4KaraFun Karaoke)
mp4_match = re.match(r'^MP4([A-Za-z\s]+Karaoke)(.+)$', rest_of_path)
if mp4_match:
channel_name = mp4_match.group(1)
file_name = mp4_match.group(2)
fixed_path = f"{drive_letter}:\\MP4\\{channel_name}\\{file_name}"
print(f"DEBUG: Fixed path (pattern 2): {fixed_path}")
return fixed_path
# Pattern 3: Direct channel name followed by filename
channel_match = re.match(r'^([A-Za-z\s]+Karaoke)(.+)$', rest_of_path)
if channel_match:
channel_name = channel_match.group(1)
file_name = channel_match.group(2)
fixed_path = f"{drive_letter}:\\MP4\\{channel_name}\\{file_name}"
print(f"DEBUG: Fixed path (pattern 3): {fixed_path}")
return fixed_path
# Pattern 4: Look for any known channel names
known_channels = ['Sing King Karaoke', 'KaraFun Karaoke', 'Stingray Karaoke']
for channel in known_channels:
if channel.lower().replace(' ', '') in rest_of_path.lower().replace(' ', ''):
# Extract the part before and after the channel name
channel_lower = channel.lower().replace(' ', '')
rest_lower = rest_of_path.lower().replace(' ', '')
channel_index = rest_lower.find(channel_lower)
if channel_index >= 0:
# Reconstruct the path
before_channel = rest_of_path[:channel_index]
after_channel = rest_of_path[channel_index + len(channel):]
# If there's content before the channel, it might be a folder like "MP4"
if before_channel and before_channel.lower() in ['mp4']:
fixed_path = f"{drive_letter}:\\{before_channel}\\{channel}{after_channel}"
else:
fixed_path = f"{drive_letter}:\\MP4\\{channel}{after_channel}"
print(f"DEBUG: Fixed path (pattern 4): {fixed_path}")
return fixed_path
# Pattern 5: Try to split by common separators and reconstruct
# Look for patterns like "MP4KaraFunKaraoke" -> "MP4\KaraFun Karaoke"
if 'karaoke' in rest_of_path.lower():
# Try to find where "karaoke" appears and reconstruct
karaoke_index = rest_of_path.lower().find('karaoke')
if karaoke_index > 0:
before_karaoke = rest_of_path[:karaoke_index]
after_karaoke = rest_of_path[karaoke_index + 7:] # length of "karaoke"
# If before_karaoke starts with "MP4", extract the channel name
if before_karaoke.lower().startswith('mp4'):
channel_part = before_karaoke[3:] # Remove "MP4"
if channel_part:
fixed_path = f"{drive_letter}:\\MP4\\{channel_part} Karaoke{after_karaoke}"
print(f"DEBUG: Fixed path (pattern 5): {fixed_path}")
return fixed_path
# Fallback: just replace :// with :\ and hope for the best
fallback_path = file_path.replace('://', ':\\')
print(f"DEBUG: Fallback path fix: {fallback_path}")
return fallback_path
return file_path
@app.route('/')
def index():
"""Main dashboard page."""
return render_template('index.html')
@app.route('/api/duplicates')
def get_duplicates():
"""API endpoint to get duplicate data."""
# Try to load detailed skip songs first, fallback to basic skip list
skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'))
if not skip_songs:
skip_songs = load_json_file(os.path.join(DATA_DIR, 'skipSongs.json'))
if not skip_songs:
return jsonify({'error': 'No skip songs data found'}), 404
duplicate_groups = get_duplicate_groups(skip_songs)
# Apply filters
artist_filter = request.args.get('artist', '').lower()
title_filter = request.args.get('title', '').lower()
channel_filter = request.args.get('channel', '').lower()
file_type_filter = request.args.get('file_type', '').lower()
min_duplicates = int(request.args.get('min_duplicates', 0))
filtered_groups = []
for group in duplicate_groups:
# Apply filters
if artist_filter and artist_filter not in group['artist'].lower():
continue
if title_filter and title_filter not in group['title'].lower():
continue
if group['total_duplicates'] < min_duplicates:
continue
# Check if any version (kept or skipped) matches channel/file_type filters
if channel_filter or file_type_filter:
matches_filter = False
# Check kept version
kept_channel = extract_channel(group['kept_version'])
kept_file_type = get_file_type(group['kept_version'])
if (not channel_filter or channel_filter in kept_channel.lower()) and \
(not file_type_filter or file_type_filter in kept_file_type.lower()):
matches_filter = True
# Check skipped versions if kept version doesn't match
if not matches_filter:
for version in group['skipped_versions']:
if (not channel_filter or channel_filter in version['channel'].lower()) and \
(not file_type_filter or file_type_filter in version['file_type'].lower()):
matches_filter = True
break
if not matches_filter:
continue
filtered_groups.append(group)
# Pagination
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 50))
start_idx = (page - 1) * per_page
end_idx = start_idx + per_page
paginated_groups = filtered_groups[start_idx:end_idx]
return jsonify({
'duplicates': paginated_groups,
'total': len(filtered_groups),
'page': page,
'per_page': per_page,
'total_pages': (len(filtered_groups) + per_page - 1) // per_page
})
@app.route('/api/stats')
def get_stats():
"""API endpoint to get overall statistics."""
# Try to load detailed skip songs first, fallback to basic skip list
skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'))
if not skip_songs:
skip_songs = load_json_file(os.path.join(DATA_DIR, 'skipSongs.json'))
if not skip_songs:
return jsonify({'error': 'No skip songs data found'}), 404
# Load original all songs data to get total counts
all_songs = load_json_file(os.path.join(DATA_DIR, 'allSongs.json'))
if not all_songs:
all_songs = []
duplicate_groups = get_duplicate_groups(skip_songs)
# Calculate current statistics
total_duplicates = len(duplicate_groups)
total_files_to_skip = len(skip_songs)
# File type breakdown for skipped files
skip_file_types = {'MP4': 0, 'MP3': 0}
channels = {}
for group in duplicate_groups:
# Include kept version in channel stats
kept_channel = extract_channel(group['kept_version'])
channels[kept_channel] = channels.get(kept_channel, 0) + 1
# Include skipped versions
for version in group['skipped_versions']:
skip_file_types[version['file_type']] += 1
channel = version['channel']
channels[channel] = channels.get(channel, 0) + 1
# Calculate total file type breakdown from all songs
total_file_types = {'MP4': 0, 'MP3': 0}
total_songs = len(all_songs)
for song in all_songs:
file_type = get_file_type(song.get('path', ''))
if file_type in total_file_types:
total_file_types[file_type] += 1
# Calculate what will remain after skipping
remaining_file_types = {
'MP4': total_file_types['MP4'] - skip_file_types['MP4'],
'MP3': total_file_types['MP3'] - skip_file_types['MP3']
}
total_remaining = sum(remaining_file_types.values())
# Most duplicated songs
most_duplicated = sorted(duplicate_groups, key=lambda x: x['total_duplicates'], reverse=True)[:10]
return jsonify({
'total_songs': total_songs,
'total_duplicates': total_duplicates,
'total_files_to_skip': total_files_to_skip,
'total_remaining': total_remaining,
'total_file_types': total_file_types,
'skip_file_types': skip_file_types,
'remaining_file_types': remaining_file_types,
'channels': channels,
'most_duplicated': most_duplicated
})
@app.route('/api/config')
def get_config():
"""API endpoint to get current configuration."""
config = load_json_file(CONFIG_FILE)
return jsonify(config or {})
@app.route('/api/save-changes', methods=['POST'])
def save_changes():
"""API endpoint to save user changes to the skip list."""
try:
data = request.get_json()
changes = data.get('changes', [])
# Load current skip list
skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'))
if not skip_songs:
return jsonify({'error': 'No skip songs data found'}), 404
# Apply changes
for change in changes:
change_type = change.get('type')
song_key = change.get('song_key') # artist - title
file_path = change.get('file_path')
if change_type == 'keep_file':
# Remove this file from skip list
skip_songs = [s for s in skip_songs if s['path'] != file_path]
elif change_type == 'skip_file':
# Add this file to skip list
new_entry = {
'path': file_path,
'reason': 'manual_skip',
'artist': change.get('artist'),
'title': change.get('title'),
'kept_version': change.get('kept_version')
}
skip_songs.append(new_entry)
# Save updated skip list
backup_path = os.path.join(DATA_DIR, 'reports', f'skip_songs_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json')
import shutil
shutil.copy2(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'), backup_path)
with open(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'), 'w', encoding='utf-8') as f:
json.dump(skip_songs, f, indent=2, ensure_ascii=False)
return jsonify({
'success': True,
'message': f'Changes saved successfully. Backup created at: {backup_path}',
'total_files': len(skip_songs)
})
except Exception as e:
return jsonify({'error': f'Error saving changes: {str(e)}'}), 500
@app.route('/api/artists')
def get_artists():
"""API endpoint to get list of all artists for grouping."""
skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'))
if not skip_songs:
return jsonify({'error': 'No skip songs data found'}), 404
duplicate_groups = get_duplicate_groups(skip_songs)
# Group by artist
artists = {}
for group in duplicate_groups:
artist = group['artist']
if artist not in artists:
artists[artist] = {
'name': artist,
'songs': [],
'total_duplicates': 0
}
artists[artist]['songs'].append(group)
artists[artist]['total_duplicates'] += group['total_duplicates']
# Convert to list and sort by artist name
artists_list = list(artists.values())
artists_list.sort(key=lambda x: x['name'].lower())
return jsonify({
'artists': artists_list,
'total_artists': len(artists_list)
})
@app.route('/api/mp3-songs')
def get_mp3_songs():
"""API endpoint to get MP3 songs that remain after cleanup."""
# Load all songs and skip songs
all_songs = load_json_file(os.path.join(DATA_DIR, 'allSongs.json'))
skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'))
if not all_songs:
return jsonify({'error': 'No all songs data found'}), 404
if not skip_songs:
skip_songs = []
# Generate MP3 song list
mp3_song_list = generate_mp3_song_list(all_songs, skip_songs)
return jsonify(mp3_song_list)
@app.route('/api/download/mp3-songs')
def download_mp3_songs():
"""Download MP3 songs list as JSON file."""
# Load all songs and skip songs
all_songs = load_json_file(os.path.join(DATA_DIR, 'allSongs.json'))
skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'))
if not all_songs:
return jsonify({'error': 'No all songs data found'}), 404
if not skip_songs:
skip_songs = []
# Generate MP3 song list
mp3_song_list = generate_mp3_song_list(all_songs, skip_songs)
# Save to file
output_path = os.path.join(DATA_DIR, 'reports', 'mp3SongList.json')
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(mp3_song_list, f, indent=2, ensure_ascii=False)
# Return the file for download
return send_from_directory(
os.path.join(DATA_DIR, 'reports'),
'mp3SongList.json',
as_attachment=True,
download_name='mp3SongList.json'
)
@app.route('/api/save-priority-preferences', methods=['POST'])
def save_priority_preferences():
"""API endpoint to save user priority preferences."""
try:
data = request.get_json()
priority_changes = data.get('priority_changes', {})
if not priority_changes:
return jsonify({'error': 'No priority changes provided'}), 400
# Create preferences directory if it doesn't exist
preferences_dir = os.path.join(DATA_DIR, 'preferences')
os.makedirs(preferences_dir, exist_ok=True)
# Load existing preferences
preferences_file = os.path.join(preferences_dir, 'priority_preferences.json')
existing_preferences = {}
if os.path.exists(preferences_file):
with open(preferences_file, 'r', encoding='utf-8') as f:
existing_preferences = json.load(f)
# Update with new preferences
existing_preferences.update(priority_changes)
# Save updated preferences
with open(preferences_file, 'w', encoding='utf-8') as f:
json.dump(existing_preferences, f, indent=2, ensure_ascii=False)
# Create backup
backup_path = os.path.join(preferences_dir, f'priority_preferences_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json')
with open(backup_path, 'w', encoding='utf-8') as f:
json.dump(existing_preferences, f, indent=2, ensure_ascii=False)
return jsonify({
'success': True,
'message': f'Saved {len(priority_changes)} priority preferences. Backup created at: {backup_path}',
'total_preferences': len(existing_preferences)
})
except Exception as e:
return jsonify({'error': f'Error saving priority preferences: {str(e)}'}), 500
@app.route('/api/reset-priority-preferences', methods=['POST'])
def reset_priority_preferences():
"""API endpoint to reset all priority preferences."""
try:
preferences_dir = os.path.join(DATA_DIR, 'preferences')
preferences_file = os.path.join(preferences_dir, 'priority_preferences.json')
if os.path.exists(preferences_file):
# Create backup before deletion
backup_path = os.path.join(preferences_dir, f'priority_preferences_reset_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json')
import shutil
shutil.copy2(preferences_file, backup_path)
# Delete the preferences file
os.remove(preferences_file)
return jsonify({
'success': True,
'message': f'Priority preferences reset successfully. Backup created at: {backup_path}'
})
else:
return jsonify({
'success': True,
'message': 'No priority preferences found to reset'
})
except Exception as e:
return jsonify({'error': f'Error resetting priority preferences: {str(e)}'}), 500
@app.route('/api/load-priority-preferences')
def load_priority_preferences():
"""API endpoint to load current priority preferences."""
try:
preferences_file = os.path.join(DATA_DIR, 'preferences', 'priority_preferences.json')
if os.path.exists(preferences_file):
with open(preferences_file, 'r', encoding='utf-8') as f:
preferences = json.load(f)
return jsonify({
'success': True,
'preferences': preferences
})
else:
return jsonify({
'success': True,
'preferences': {}
})
except Exception as e:
return jsonify({'error': f'Error loading priority preferences: {str(e)}'}), 500
@app.route('/api/video/<path:file_path>')
def serve_video(file_path):
"""Serve video files for playback in the web UI."""
try:
# Decode the file path (it comes URL-encoded)
import urllib.parse
decoded_path = urllib.parse.unquote(file_path)
# Normalize the path to fix any malformed paths
normalized_path = normalize_path(decoded_path)
# Debug logging
print(f"DEBUG: Video request for path: {decoded_path}")
print(f"DEBUG: Normalized path: {normalized_path}")
print(f"DEBUG: Current working directory: {os.getcwd()}")
# Security check: ensure the path is within allowed directories
# This prevents directory traversal attacks
if '..' in normalized_path:
print(f"DEBUG: Security check failed - path contains '..'")
return jsonify({'error': 'Invalid file path'}), 400
# On Windows, allow absolute paths with drive letters
# On Unix-like systems, block absolute paths
if os.name == 'nt': # Windows
if normalized_path.startswith('/') and not normalized_path[1:].startswith(':'):
print(f"DEBUG: Security check failed - Unix-style absolute path on Windows")
return jsonify({'error': 'Invalid file path'}), 400
else: # Unix-like systems
if normalized_path.startswith('/'):
print(f"DEBUG: Security check failed - absolute path on Unix")
return jsonify({'error': 'Invalid file path'}), 400
# Check if file exists
if not os.path.exists(normalized_path):
print(f"DEBUG: File does not exist: {normalized_path}")
return jsonify({'error': 'Video file not found'}), 404
# Check if it's a video file and determine MIME type
file_extension = os.path.splitext(normalized_path)[1].lower()
mime_types = {
'.mp4': 'video/mp4',
'.avi': 'video/x-msvideo',
'.mkv': 'video/x-matroska',
'.mov': 'video/quicktime',
'.wmv': 'video/x-ms-wmv',
'.flv': 'video/x-flv',
'.webm': 'video/webm'
}
if file_extension not in mime_types:
print(f"DEBUG: Invalid file type: {normalized_path}")
return jsonify({'error': 'Invalid file type'}), 400
mime_type = mime_types[file_extension]
# Get file info for debugging
file_size = os.path.getsize(normalized_path)
print(f"DEBUG: File exists, size: {file_size} bytes")
print(f"DEBUG: MIME type: {mime_type}")
# Serve the video file
directory = os.path.dirname(normalized_path)
filename = os.path.basename(normalized_path)
print(f"DEBUG: Serving from directory: {directory}")
print(f"DEBUG: Filename: {filename}")
# Add headers for better video streaming
response = send_from_directory(
directory,
filename,
mimetype=mime_type
)
# Add CORS headers to allow cross-origin requests
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'GET, HEAD, OPTIONS'
response.headers['Access-Control-Allow-Headers'] = 'Range'
# Add cache control headers
response.headers['Cache-Control'] = 'public, max-age=3600'
return response
except Exception as e:
print(f"DEBUG: Exception in serve_video: {str(e)}")
return jsonify({'error': f'Error serving video: {str(e)}'}), 500
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)