#!/usr/bin/env python3 """ Web UI for Karaoke Song Library Cleanup Tool Provides interactive interface for reviewing duplicates and making decisions. """ from flask import Flask, render_template, jsonify, request, send_from_directory import json import os from typing import Dict, List, Any from datetime import datetime app = Flask(__name__) # Configuration DATA_DIR = '../data' REPORTS_DIR = os.path.join(DATA_DIR, 'reports') CONFIG_FILE = '../config/config.json' def load_json_file(file_path: str) -> Any: """Load JSON file safely.""" try: with open(file_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"Error loading {file_path}: {e}") return None def get_duplicate_groups(skip_songs: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Group skip songs by artist/title to show duplicates together.""" duplicate_groups = {} for skip_song in skip_songs: artist = skip_song.get('artist', 'Unknown') title = skip_song.get('title', 'Unknown') key = f"{artist} - {title}" if key not in duplicate_groups: duplicate_groups[key] = { 'artist': artist, 'title': title, 'kept_version': skip_song.get('kept_version', 'Unknown'), 'skipped_versions': [], 'total_duplicates': 0 } duplicate_groups[key]['skipped_versions'].append({ 'path': skip_song['path'], 'reason': skip_song.get('reason', 'duplicate'), 'file_type': get_file_type(skip_song['path']), 'channel': extract_channel(skip_song['path']) }) duplicate_groups[key]['total_duplicates'] = len(duplicate_groups[key]['skipped_versions']) # Convert to list and sort by artist first, then by title groups_list = list(duplicate_groups.values()) groups_list.sort(key=lambda x: (x['artist'].lower(), x['title'].lower())) return groups_list def get_file_type(path: str) -> str: """Extract file type from path.""" path_lower = path.lower() if path_lower.endswith('.mp4'): return 'MP4' elif path_lower.endswith('.mp3'): return 'MP3' elif path_lower.endswith('.cdg'): return 'MP3' # Treat CDG as MP3 since they're paired return 'Unknown' def extract_channel(path: str) -> str: """Extract channel name from path with consolidated MP3/CDG handling.""" path_lower = path.lower() parts = path.split('\\') # Handle MP3/CDG files - consolidate all CDG folder content under "MP3" if path.lower().endswith(('.mp3', '.cdg')): # Look for CDG folder structure: CDG/ParentFolder/SubFolder/file.mp3 for i, part in enumerate(parts): if part.lower() == 'cdg' and i < len(parts) - 1: # If CDG is found, return "MP3" as the consolidated channel return 'MP3' # If not in CDG folder but still MP3/CDG, return "MP3" return 'MP3' # Handle MP4 files - keep existing channel logic # Look for specific known channels first known_channels = ['Sing King Karaoke', 'KaraFun Karaoke', 'Stingray Karaoke'] for channel in known_channels: if channel.lower() in path_lower: return channel # Look for MP4 folder structure: MP4/ChannelName/song.mp4 for i, part in enumerate(parts): if part.lower() == 'mp4' and i < len(parts) - 1: # If MP4 is found, return the next folder (the actual channel) if i + 1 < len(parts): next_part = parts[i + 1] # Check if the next part is a folder (no file extension) or a file if '.' not in next_part: return next_part # It's a folder, so it's the channel name else: return 'MP4 Root' # File is directly in MP4 folder else: return 'MP4 Root' # Look for any folder that contains 'karaoke' (fallback) for part in parts: if 'karaoke' in part.lower() and '.' not in part: return part # If no specific channel found, return the folder containing the file if len(parts) >= 2: parent_folder = parts[-2] # Second to last part (folder containing the file) # If parent folder is MP4, then file is in root if parent_folder.lower() == 'mp4': return 'MP4 Root' # Make sure we're returning a folder name, not a filename if '.' not in parent_folder: return parent_folder else: # If parent is also a file, go up one more level if len(parts) >= 3: grandparent = parts[-3] if '.' not in grandparent: return grandparent return 'Unknown' @app.route('/') def index(): """Main dashboard page.""" return render_template('index.html') @app.route('/api/duplicates') def get_duplicates(): """API endpoint to get duplicate data.""" # Try to load detailed skip songs first, fallback to basic skip list skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json')) if not skip_songs: skip_songs = load_json_file(os.path.join(DATA_DIR, 'skipSongs.json')) if not skip_songs: return jsonify({'error': 'No skip songs data found'}), 404 duplicate_groups = get_duplicate_groups(skip_songs) # Apply filters artist_filter = request.args.get('artist', '').lower() title_filter = request.args.get('title', '').lower() channel_filter = request.args.get('channel', '').lower() file_type_filter = request.args.get('file_type', '').lower() min_duplicates = int(request.args.get('min_duplicates', 0)) filtered_groups = [] for group in duplicate_groups: # Apply filters if artist_filter and artist_filter not in group['artist'].lower(): continue if title_filter and title_filter not in group['title'].lower(): continue if group['total_duplicates'] < min_duplicates: continue # Check if any version (kept or skipped) matches channel/file_type filters if channel_filter or file_type_filter: matches_filter = False # Check kept version kept_channel = extract_channel(group['kept_version']) kept_file_type = get_file_type(group['kept_version']) if (not channel_filter or channel_filter in kept_channel.lower()) and \ (not file_type_filter or file_type_filter in kept_file_type.lower()): matches_filter = True # Check skipped versions if kept version doesn't match if not matches_filter: for version in group['skipped_versions']: if (not channel_filter or channel_filter in version['channel'].lower()) and \ (not file_type_filter or file_type_filter in version['file_type'].lower()): matches_filter = True break if not matches_filter: continue filtered_groups.append(group) # Pagination page = int(request.args.get('page', 1)) per_page = int(request.args.get('per_page', 50)) start_idx = (page - 1) * per_page end_idx = start_idx + per_page paginated_groups = filtered_groups[start_idx:end_idx] return jsonify({ 'duplicates': paginated_groups, 'total': len(filtered_groups), 'page': page, 'per_page': per_page, 'total_pages': (len(filtered_groups) + per_page - 1) // per_page }) @app.route('/api/stats') def get_stats(): """API endpoint to get overall statistics.""" # Try to load detailed skip songs first, fallback to basic skip list skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json')) if not skip_songs: skip_songs = load_json_file(os.path.join(DATA_DIR, 'skipSongs.json')) if not skip_songs: return jsonify({'error': 'No skip songs data found'}), 404 # Load original all songs data to get total counts all_songs = load_json_file(os.path.join(DATA_DIR, 'allSongs.json')) if not all_songs: all_songs = [] duplicate_groups = get_duplicate_groups(skip_songs) # Calculate current statistics total_duplicates = len(duplicate_groups) total_files_to_skip = len(skip_songs) # File type breakdown for skipped files skip_file_types = {'MP4': 0, 'MP3': 0} channels = {} for group in duplicate_groups: # Include kept version in channel stats kept_channel = extract_channel(group['kept_version']) channels[kept_channel] = channels.get(kept_channel, 0) + 1 # Include skipped versions for version in group['skipped_versions']: skip_file_types[version['file_type']] += 1 channel = version['channel'] channels[channel] = channels.get(channel, 0) + 1 # Calculate total file type breakdown from all songs total_file_types = {'MP4': 0, 'MP3': 0} total_songs = len(all_songs) for song in all_songs: file_type = get_file_type(song.get('path', '')) if file_type in total_file_types: total_file_types[file_type] += 1 # Calculate what will remain after skipping remaining_file_types = { 'MP4': total_file_types['MP4'] - skip_file_types['MP4'], 'MP3': total_file_types['MP3'] - skip_file_types['MP3'] } total_remaining = sum(remaining_file_types.values()) # Most duplicated songs most_duplicated = sorted(duplicate_groups, key=lambda x: x['total_duplicates'], reverse=True)[:10] return jsonify({ 'total_songs': total_songs, 'total_duplicates': total_duplicates, 'total_files_to_skip': total_files_to_skip, 'total_remaining': total_remaining, 'total_file_types': total_file_types, 'skip_file_types': skip_file_types, 'remaining_file_types': remaining_file_types, 'channels': channels, 'most_duplicated': most_duplicated }) @app.route('/api/config') def get_config(): """API endpoint to get current configuration.""" config = load_json_file(CONFIG_FILE) return jsonify(config or {}) @app.route('/api/save-changes', methods=['POST']) def save_changes(): """API endpoint to save user changes to the skip list.""" try: data = request.get_json() changes = data.get('changes', []) # Load current skip list skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json')) if not skip_songs: return jsonify({'error': 'No skip songs data found'}), 404 # Apply changes for change in changes: change_type = change.get('type') song_key = change.get('song_key') # artist - title file_path = change.get('file_path') if change_type == 'keep_file': # Remove this file from skip list skip_songs = [s for s in skip_songs if s['path'] != file_path] elif change_type == 'skip_file': # Add this file to skip list new_entry = { 'path': file_path, 'reason': 'manual_skip', 'artist': change.get('artist'), 'title': change.get('title'), 'kept_version': change.get('kept_version') } skip_songs.append(new_entry) # Save updated skip list backup_path = os.path.join(DATA_DIR, 'reports', f'skip_songs_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json') import shutil shutil.copy2(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'), backup_path) with open(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'), 'w', encoding='utf-8') as f: json.dump(skip_songs, f, indent=2, ensure_ascii=False) return jsonify({ 'success': True, 'message': f'Changes saved successfully. Backup created at: {backup_path}', 'total_files': len(skip_songs) }) except Exception as e: return jsonify({'error': f'Error saving changes: {str(e)}'}), 500 @app.route('/api/artists') def get_artists(): """API endpoint to get list of all artists for grouping.""" skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json')) if not skip_songs: return jsonify({'error': 'No skip songs data found'}), 404 duplicate_groups = get_duplicate_groups(skip_songs) # Group by artist artists = {} for group in duplicate_groups: artist = group['artist'] if artist not in artists: artists[artist] = { 'name': artist, 'songs': [], 'total_duplicates': 0 } artists[artist]['songs'].append(group) artists[artist]['total_duplicates'] += group['total_duplicates'] # Convert to list and sort by artist name artists_list = list(artists.values()) artists_list.sort(key=lambda x: x['name'].lower()) return jsonify({ 'artists': artists_list, 'total_artists': len(artists_list) }) if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=5000)