534 lines
20 KiB
Python
534 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Web UI for Karaoke Song Library Cleanup Tool
|
|
Provides interactive interface for reviewing duplicates and making decisions.
|
|
"""
|
|
|
|
from flask import Flask, render_template, jsonify, request, send_from_directory
|
|
import json
|
|
import os
|
|
from typing import Dict, List, Any
|
|
from datetime import datetime
|
|
|
|
app = Flask(__name__)
|
|
|
|
# Configuration
|
|
DATA_DIR = '../data'
|
|
REPORTS_DIR = os.path.join(DATA_DIR, 'reports')
|
|
CONFIG_FILE = '../config/config.json'
|
|
|
|
def load_json_file(file_path: str) -> Any:
|
|
"""Load JSON file safely."""
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
print(f"Error loading {file_path}: {e}")
|
|
return None
|
|
|
|
def get_duplicate_groups(skip_songs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""Group skip songs by artist/title to show duplicates together."""
|
|
duplicate_groups = {}
|
|
|
|
for skip_song in skip_songs:
|
|
artist = skip_song.get('artist', 'Unknown')
|
|
title = skip_song.get('title', 'Unknown')
|
|
key = f"{artist} - {title}"
|
|
|
|
if key not in duplicate_groups:
|
|
duplicate_groups[key] = {
|
|
'artist': artist,
|
|
'title': title,
|
|
'kept_version': skip_song.get('kept_version', 'Unknown'),
|
|
'skipped_versions': [],
|
|
'total_duplicates': 0
|
|
}
|
|
|
|
duplicate_groups[key]['skipped_versions'].append({
|
|
'path': skip_song['path'],
|
|
'reason': skip_song.get('reason', 'duplicate'),
|
|
'file_type': get_file_type(skip_song['path']),
|
|
'channel': extract_channel(skip_song['path'])
|
|
})
|
|
duplicate_groups[key]['total_duplicates'] = len(duplicate_groups[key]['skipped_versions'])
|
|
|
|
# Convert to list and sort by artist first, then by title
|
|
groups_list = list(duplicate_groups.values())
|
|
groups_list.sort(key=lambda x: (x['artist'].lower(), x['title'].lower()))
|
|
|
|
return groups_list
|
|
|
|
|
|
def generate_mp3_song_list(all_songs: List[Dict[str, Any]], skip_songs: List[Dict[str, Any]]) -> Dict[str, Any]:
|
|
"""Generate a list of MP3 songs that remain after cleanup."""
|
|
# Create a set of paths that are being skipped
|
|
skip_paths = {song['path'] for song in skip_songs}
|
|
|
|
# Filter for MP3/CDG songs that are NOT being skipped
|
|
mp3_songs = []
|
|
for song in all_songs:
|
|
path = song.get('path', '')
|
|
if path.lower().endswith(('.mp3', '.cdg')) and path not in skip_paths:
|
|
mp3_songs.append({
|
|
'position': len(mp3_songs) + 1,
|
|
'title': song.get('title', 'Unknown'),
|
|
'artist': song.get('artist', 'Unknown')
|
|
})
|
|
|
|
# Sort by artist, then by title
|
|
mp3_songs.sort(key=lambda x: (x['artist'].lower(), x['title'].lower()))
|
|
|
|
# Update positions after sorting
|
|
for i, song in enumerate(mp3_songs):
|
|
song['position'] = i + 1
|
|
|
|
return {
|
|
'title': 'MP3 Songs need to be found',
|
|
'songs': mp3_songs
|
|
}
|
|
|
|
def get_file_type(path: str) -> str:
|
|
"""Extract file type from path."""
|
|
path_lower = path.lower()
|
|
if path_lower.endswith('.mp4'):
|
|
return 'MP4'
|
|
elif path_lower.endswith('.mp3'):
|
|
return 'MP3'
|
|
elif path_lower.endswith('.cdg'):
|
|
return 'MP3' # Treat CDG as MP3 since they're paired
|
|
return 'Unknown'
|
|
|
|
def extract_channel(path: str) -> str:
|
|
"""Extract channel name from path with consolidated MP3/CDG handling."""
|
|
path_lower = path.lower()
|
|
parts = path.split('\\')
|
|
|
|
# Handle MP3/CDG files - consolidate all CDG folder content under "MP3"
|
|
if path.lower().endswith(('.mp3', '.cdg')):
|
|
# Look for CDG folder structure: CDG/ParentFolder/SubFolder/file.mp3
|
|
for i, part in enumerate(parts):
|
|
if part.lower() == 'cdg' and i < len(parts) - 1:
|
|
# If CDG is found, return "MP3" as the consolidated channel
|
|
return 'MP3'
|
|
|
|
# If not in CDG folder but still MP3/CDG, return "MP3"
|
|
return 'MP3'
|
|
|
|
# Handle MP4 files - keep existing channel logic
|
|
# Look for specific known channels first
|
|
known_channels = ['Sing King Karaoke', 'KaraFun Karaoke', 'Stingray Karaoke']
|
|
for channel in known_channels:
|
|
if channel.lower() in path_lower:
|
|
return channel
|
|
|
|
# Look for MP4 folder structure: MP4/ChannelName/song.mp4
|
|
for i, part in enumerate(parts):
|
|
if part.lower() == 'mp4' and i < len(parts) - 1:
|
|
# If MP4 is found, return the next folder (the actual channel)
|
|
if i + 1 < len(parts):
|
|
next_part = parts[i + 1]
|
|
# Check if the next part is a folder (no file extension) or a file
|
|
if '.' not in next_part:
|
|
return next_part # It's a folder, so it's the channel name
|
|
else:
|
|
return 'MP4 Root' # File is directly in MP4 folder
|
|
else:
|
|
return 'MP4 Root'
|
|
|
|
# Look for any folder that contains 'karaoke' (fallback)
|
|
for part in parts:
|
|
if 'karaoke' in part.lower() and '.' not in part:
|
|
return part
|
|
|
|
# If no specific channel found, return the folder containing the file
|
|
if len(parts) >= 2:
|
|
parent_folder = parts[-2] # Second to last part (folder containing the file)
|
|
# If parent folder is MP4, then file is in root
|
|
if parent_folder.lower() == 'mp4':
|
|
return 'MP4 Root'
|
|
# Make sure we're returning a folder name, not a filename
|
|
if '.' not in parent_folder:
|
|
return parent_folder
|
|
else:
|
|
# If parent is also a file, go up one more level
|
|
if len(parts) >= 3:
|
|
grandparent = parts[-3]
|
|
if '.' not in grandparent:
|
|
return grandparent
|
|
|
|
return 'Unknown'
|
|
|
|
@app.route('/')
|
|
def index():
|
|
"""Main dashboard page."""
|
|
return render_template('index.html')
|
|
|
|
@app.route('/api/duplicates')
|
|
def get_duplicates():
|
|
"""API endpoint to get duplicate data."""
|
|
# Try to load detailed skip songs first, fallback to basic skip list
|
|
skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'))
|
|
if not skip_songs:
|
|
skip_songs = load_json_file(os.path.join(DATA_DIR, 'skipSongs.json'))
|
|
|
|
if not skip_songs:
|
|
return jsonify({'error': 'No skip songs data found'}), 404
|
|
|
|
duplicate_groups = get_duplicate_groups(skip_songs)
|
|
|
|
# Apply filters
|
|
artist_filter = request.args.get('artist', '').lower()
|
|
title_filter = request.args.get('title', '').lower()
|
|
channel_filter = request.args.get('channel', '').lower()
|
|
file_type_filter = request.args.get('file_type', '').lower()
|
|
min_duplicates = int(request.args.get('min_duplicates', 0))
|
|
|
|
filtered_groups = []
|
|
for group in duplicate_groups:
|
|
# Apply filters
|
|
if artist_filter and artist_filter not in group['artist'].lower():
|
|
continue
|
|
if title_filter and title_filter not in group['title'].lower():
|
|
continue
|
|
if group['total_duplicates'] < min_duplicates:
|
|
continue
|
|
|
|
# Check if any version (kept or skipped) matches channel/file_type filters
|
|
if channel_filter or file_type_filter:
|
|
matches_filter = False
|
|
|
|
# Check kept version
|
|
kept_channel = extract_channel(group['kept_version'])
|
|
kept_file_type = get_file_type(group['kept_version'])
|
|
if (not channel_filter or channel_filter in kept_channel.lower()) and \
|
|
(not file_type_filter or file_type_filter in kept_file_type.lower()):
|
|
matches_filter = True
|
|
|
|
# Check skipped versions if kept version doesn't match
|
|
if not matches_filter:
|
|
for version in group['skipped_versions']:
|
|
if (not channel_filter or channel_filter in version['channel'].lower()) and \
|
|
(not file_type_filter or file_type_filter in version['file_type'].lower()):
|
|
matches_filter = True
|
|
break
|
|
|
|
if not matches_filter:
|
|
continue
|
|
|
|
filtered_groups.append(group)
|
|
|
|
# Pagination
|
|
page = int(request.args.get('page', 1))
|
|
per_page = int(request.args.get('per_page', 50))
|
|
start_idx = (page - 1) * per_page
|
|
end_idx = start_idx + per_page
|
|
|
|
paginated_groups = filtered_groups[start_idx:end_idx]
|
|
|
|
return jsonify({
|
|
'duplicates': paginated_groups,
|
|
'total': len(filtered_groups),
|
|
'page': page,
|
|
'per_page': per_page,
|
|
'total_pages': (len(filtered_groups) + per_page - 1) // per_page
|
|
})
|
|
|
|
@app.route('/api/stats')
|
|
def get_stats():
|
|
"""API endpoint to get overall statistics."""
|
|
# Try to load detailed skip songs first, fallback to basic skip list
|
|
skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'))
|
|
if not skip_songs:
|
|
skip_songs = load_json_file(os.path.join(DATA_DIR, 'skipSongs.json'))
|
|
|
|
if not skip_songs:
|
|
return jsonify({'error': 'No skip songs data found'}), 404
|
|
|
|
# Load original all songs data to get total counts
|
|
all_songs = load_json_file(os.path.join(DATA_DIR, 'allSongs.json'))
|
|
if not all_songs:
|
|
all_songs = []
|
|
|
|
duplicate_groups = get_duplicate_groups(skip_songs)
|
|
|
|
# Calculate current statistics
|
|
total_duplicates = len(duplicate_groups)
|
|
total_files_to_skip = len(skip_songs)
|
|
|
|
# File type breakdown for skipped files
|
|
skip_file_types = {'MP4': 0, 'MP3': 0}
|
|
channels = {}
|
|
|
|
for group in duplicate_groups:
|
|
# Include kept version in channel stats
|
|
kept_channel = extract_channel(group['kept_version'])
|
|
channels[kept_channel] = channels.get(kept_channel, 0) + 1
|
|
|
|
# Include skipped versions
|
|
for version in group['skipped_versions']:
|
|
skip_file_types[version['file_type']] += 1
|
|
channel = version['channel']
|
|
channels[channel] = channels.get(channel, 0) + 1
|
|
|
|
# Calculate total file type breakdown from all songs
|
|
total_file_types = {'MP4': 0, 'MP3': 0}
|
|
total_songs = len(all_songs)
|
|
|
|
for song in all_songs:
|
|
file_type = get_file_type(song.get('path', ''))
|
|
if file_type in total_file_types:
|
|
total_file_types[file_type] += 1
|
|
|
|
# Calculate what will remain after skipping
|
|
remaining_file_types = {
|
|
'MP4': total_file_types['MP4'] - skip_file_types['MP4'],
|
|
'MP3': total_file_types['MP3'] - skip_file_types['MP3']
|
|
}
|
|
|
|
total_remaining = sum(remaining_file_types.values())
|
|
|
|
# Most duplicated songs
|
|
most_duplicated = sorted(duplicate_groups, key=lambda x: x['total_duplicates'], reverse=True)[:10]
|
|
|
|
return jsonify({
|
|
'total_songs': total_songs,
|
|
'total_duplicates': total_duplicates,
|
|
'total_files_to_skip': total_files_to_skip,
|
|
'total_remaining': total_remaining,
|
|
'total_file_types': total_file_types,
|
|
'skip_file_types': skip_file_types,
|
|
'remaining_file_types': remaining_file_types,
|
|
'channels': channels,
|
|
'most_duplicated': most_duplicated
|
|
})
|
|
|
|
@app.route('/api/config')
|
|
def get_config():
|
|
"""API endpoint to get current configuration."""
|
|
config = load_json_file(CONFIG_FILE)
|
|
return jsonify(config or {})
|
|
|
|
@app.route('/api/save-changes', methods=['POST'])
|
|
def save_changes():
|
|
"""API endpoint to save user changes to the skip list."""
|
|
try:
|
|
data = request.get_json()
|
|
changes = data.get('changes', [])
|
|
|
|
# Load current skip list
|
|
skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'))
|
|
if not skip_songs:
|
|
return jsonify({'error': 'No skip songs data found'}), 404
|
|
|
|
# Apply changes
|
|
for change in changes:
|
|
change_type = change.get('type')
|
|
song_key = change.get('song_key') # artist - title
|
|
file_path = change.get('file_path')
|
|
|
|
if change_type == 'keep_file':
|
|
# Remove this file from skip list
|
|
skip_songs = [s for s in skip_songs if s['path'] != file_path]
|
|
elif change_type == 'skip_file':
|
|
# Add this file to skip list
|
|
new_entry = {
|
|
'path': file_path,
|
|
'reason': 'manual_skip',
|
|
'artist': change.get('artist'),
|
|
'title': change.get('title'),
|
|
'kept_version': change.get('kept_version')
|
|
}
|
|
skip_songs.append(new_entry)
|
|
|
|
# Save updated skip list
|
|
backup_path = os.path.join(DATA_DIR, 'reports', f'skip_songs_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json')
|
|
import shutil
|
|
shutil.copy2(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'), backup_path)
|
|
|
|
with open(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'), 'w', encoding='utf-8') as f:
|
|
json.dump(skip_songs, f, indent=2, ensure_ascii=False)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'Changes saved successfully. Backup created at: {backup_path}',
|
|
'total_files': len(skip_songs)
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': f'Error saving changes: {str(e)}'}), 500
|
|
|
|
@app.route('/api/artists')
|
|
def get_artists():
|
|
"""API endpoint to get list of all artists for grouping."""
|
|
skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'))
|
|
if not skip_songs:
|
|
return jsonify({'error': 'No skip songs data found'}), 404
|
|
|
|
duplicate_groups = get_duplicate_groups(skip_songs)
|
|
|
|
# Group by artist
|
|
artists = {}
|
|
for group in duplicate_groups:
|
|
artist = group['artist']
|
|
if artist not in artists:
|
|
artists[artist] = {
|
|
'name': artist,
|
|
'songs': [],
|
|
'total_duplicates': 0
|
|
}
|
|
artists[artist]['songs'].append(group)
|
|
artists[artist]['total_duplicates'] += group['total_duplicates']
|
|
|
|
# Convert to list and sort by artist name
|
|
artists_list = list(artists.values())
|
|
artists_list.sort(key=lambda x: x['name'].lower())
|
|
|
|
return jsonify({
|
|
'artists': artists_list,
|
|
'total_artists': len(artists_list)
|
|
})
|
|
|
|
|
|
@app.route('/api/mp3-songs')
|
|
def get_mp3_songs():
|
|
"""API endpoint to get MP3 songs that remain after cleanup."""
|
|
# Load all songs and skip songs
|
|
all_songs = load_json_file(os.path.join(DATA_DIR, 'allSongs.json'))
|
|
skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'))
|
|
|
|
if not all_songs:
|
|
return jsonify({'error': 'No all songs data found'}), 404
|
|
|
|
if not skip_songs:
|
|
skip_songs = []
|
|
|
|
# Generate MP3 song list
|
|
mp3_song_list = generate_mp3_song_list(all_songs, skip_songs)
|
|
|
|
return jsonify(mp3_song_list)
|
|
|
|
|
|
@app.route('/api/download/mp3-songs')
|
|
def download_mp3_songs():
|
|
"""Download MP3 songs list as JSON file."""
|
|
# Load all songs and skip songs
|
|
all_songs = load_json_file(os.path.join(DATA_DIR, 'allSongs.json'))
|
|
skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'))
|
|
|
|
if not all_songs:
|
|
return jsonify({'error': 'No all songs data found'}), 404
|
|
|
|
if not skip_songs:
|
|
skip_songs = []
|
|
|
|
# Generate MP3 song list
|
|
mp3_song_list = generate_mp3_song_list(all_songs, skip_songs)
|
|
|
|
# Save to file
|
|
output_path = os.path.join(DATA_DIR, 'reports', 'mp3SongList.json')
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
json.dump(mp3_song_list, f, indent=2, ensure_ascii=False)
|
|
|
|
# Return the file for download
|
|
return send_from_directory(
|
|
os.path.join(DATA_DIR, 'reports'),
|
|
'mp3SongList.json',
|
|
as_attachment=True,
|
|
download_name='mp3SongList.json'
|
|
)
|
|
|
|
@app.route('/api/save-priority-preferences', methods=['POST'])
|
|
def save_priority_preferences():
|
|
"""API endpoint to save user priority preferences."""
|
|
try:
|
|
data = request.get_json()
|
|
priority_changes = data.get('priority_changes', {})
|
|
|
|
if not priority_changes:
|
|
return jsonify({'error': 'No priority changes provided'}), 400
|
|
|
|
# Create preferences directory if it doesn't exist
|
|
preferences_dir = os.path.join(DATA_DIR, 'preferences')
|
|
os.makedirs(preferences_dir, exist_ok=True)
|
|
|
|
# Load existing preferences
|
|
preferences_file = os.path.join(preferences_dir, 'priority_preferences.json')
|
|
existing_preferences = {}
|
|
if os.path.exists(preferences_file):
|
|
with open(preferences_file, 'r', encoding='utf-8') as f:
|
|
existing_preferences = json.load(f)
|
|
|
|
# Update with new preferences
|
|
existing_preferences.update(priority_changes)
|
|
|
|
# Save updated preferences
|
|
with open(preferences_file, 'w', encoding='utf-8') as f:
|
|
json.dump(existing_preferences, f, indent=2, ensure_ascii=False)
|
|
|
|
# Create backup
|
|
backup_path = os.path.join(preferences_dir, f'priority_preferences_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json')
|
|
with open(backup_path, 'w', encoding='utf-8') as f:
|
|
json.dump(existing_preferences, f, indent=2, ensure_ascii=False)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'Saved {len(priority_changes)} priority preferences. Backup created at: {backup_path}',
|
|
'total_preferences': len(existing_preferences)
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': f'Error saving priority preferences: {str(e)}'}), 500
|
|
|
|
@app.route('/api/reset-priority-preferences', methods=['POST'])
|
|
def reset_priority_preferences():
|
|
"""API endpoint to reset all priority preferences."""
|
|
try:
|
|
preferences_dir = os.path.join(DATA_DIR, 'preferences')
|
|
preferences_file = os.path.join(preferences_dir, 'priority_preferences.json')
|
|
|
|
if os.path.exists(preferences_file):
|
|
# Create backup before deletion
|
|
backup_path = os.path.join(preferences_dir, f'priority_preferences_reset_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json')
|
|
import shutil
|
|
shutil.copy2(preferences_file, backup_path)
|
|
|
|
# Delete the preferences file
|
|
os.remove(preferences_file)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'Priority preferences reset successfully. Backup created at: {backup_path}'
|
|
})
|
|
else:
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'No priority preferences found to reset'
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': f'Error resetting priority preferences: {str(e)}'}), 500
|
|
|
|
@app.route('/api/load-priority-preferences')
|
|
def load_priority_preferences():
|
|
"""API endpoint to load current priority preferences."""
|
|
try:
|
|
preferences_file = os.path.join(DATA_DIR, 'preferences', 'priority_preferences.json')
|
|
|
|
if os.path.exists(preferences_file):
|
|
with open(preferences_file, 'r', encoding='utf-8') as f:
|
|
preferences = json.load(f)
|
|
return jsonify({
|
|
'success': True,
|
|
'preferences': preferences
|
|
})
|
|
else:
|
|
return jsonify({
|
|
'success': True,
|
|
'preferences': {}
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': f'Error loading priority preferences: {str(e)}'}), 500
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug=True, host='0.0.0.0', port=5000) |