#!/usr/bin/env python3 """ Web UI for Karaoke Song Library Cleanup Tool Provides interactive interface for reviewing duplicates and making decisions. """ from flask import Flask, render_template, jsonify, request, send_from_directory import json import os import time from typing import Dict, List, Any from datetime import datetime # Import playlist validator import sys sys.path.append('../cli') from playlist_validator import PlaylistValidator app = Flask(__name__) # Configuration CONFIG_FILE = '../config/config.json' # Global variable to store progress progress_data = { 'status': 'idle', 'message': '', 'progress': 0, 'current_step': '', 'cli_output': [] } def reset_progress(): """Reset progress data to idle state.""" global progress_data progress_data = { 'status': 'idle', 'message': '', 'progress': 0, 'current_step': '', 'cli_output': [] } def load_json_file(file_path: str) -> Any: """Load JSON file safely.""" try: with open(file_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"Error loading {file_path}: {e}") return None def get_data_directory(): """Get data directory from config.""" config = load_json_file(CONFIG_FILE) if config and 'data_directory' in config: # When running from web/ directory, we need to go up one level data_dir = config['data_directory'] if not os.path.isabs(data_dir): return os.path.join('..', data_dir) return data_dir return '../data' DATA_DIR = get_data_directory() REPORTS_DIR = os.path.join(DATA_DIR, 'reports') def get_duplicate_groups(skip_songs: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Group skip songs by artist/title to show duplicates together.""" duplicate_groups = {} for skip_song in skip_songs: artist = skip_song.get('artist', 'Unknown') title = skip_song.get('title', 'Unknown') key = f"{artist} - {title}" if key not in duplicate_groups: duplicate_groups[key] = { 'artist': artist, 'title': title, 'kept_version': skip_song.get('kept_version', 'Unknown'), 'skipped_versions': [], 'total_duplicates': 0 } duplicate_groups[key]['skipped_versions'].append({ 'path': skip_song['path'], 'reason': skip_song.get('reason', 'duplicate'), 'file_type': get_file_type(skip_song['path']), 'channel': extract_channel(skip_song['path']) }) duplicate_groups[key]['total_duplicates'] = len(duplicate_groups[key]['skipped_versions']) # Convert to list and sort by artist first, then by title groups_list = list(duplicate_groups.values()) groups_list.sort(key=lambda x: (x['artist'].lower(), x['title'].lower())) return groups_list def generate_mp3_song_list(all_songs: List[Dict[str, Any]], skip_songs: List[Dict[str, Any]]) -> Dict[str, Any]: """Generate a list of MP3 songs that remain after cleanup.""" # Create a set of paths that are being skipped skip_paths = {song['path'] for song in skip_songs} # Filter for MP3/CDG songs that are NOT being skipped mp3_songs = [] for song in all_songs: path = song.get('path', '') if path.lower().endswith(('.mp3', '.cdg')) and path not in skip_paths: mp3_songs.append({ 'position': len(mp3_songs) + 1, 'title': song.get('title', 'Unknown'), 'artist': song.get('artist', 'Unknown') }) # Sort by artist, then by title mp3_songs.sort(key=lambda x: (x['artist'].lower(), x['title'].lower())) # Update positions after sorting for i, song in enumerate(mp3_songs): song['position'] = i + 1 return { 'title': 'MP3 Songs need to be found', 'songs': mp3_songs } def get_file_type(path: str) -> str: """Extract file type from path.""" path_lower = path.lower() if path_lower.endswith('.mp4'): return 'MP4' elif path_lower.endswith('.mp3'): return 'MP3' elif path_lower.endswith('.cdg'): return 'MP3' # Treat CDG as MP3 since they're paired return 'Unknown' def extract_channel(path: str) -> str: """Extract channel name from path with consolidated MP3/CDG handling.""" path_lower = path.lower() parts = path.split('\\') # Handle MP3/CDG files - consolidate all CDG folder content under "MP3" if path.lower().endswith(('.mp3', '.cdg')): # Look for CDG folder structure: CDG/ParentFolder/SubFolder/file.mp3 for i, part in enumerate(parts): if part.lower() == 'cdg' and i < len(parts) - 1: # If CDG is found, return "MP3" as the consolidated channel return 'MP3' # If not in CDG folder but still MP3/CDG, return "MP3" return 'MP3' # Handle MP4 files - keep existing channel logic # Look for specific known channels first known_channels = ['Sing King Karaoke', 'KaraFun Karaoke', 'Stingray Karaoke'] for channel in known_channels: if channel.lower() in path_lower: return channel # Look for MP4 folder structure: MP4/ChannelName/song.mp4 for i, part in enumerate(parts): if part.lower() == 'mp4' and i < len(parts) - 1: # If MP4 is found, return the next folder (the actual channel) if i + 1 < len(parts): next_part = parts[i + 1] # Check if the next part is a folder (no file extension) or a file if '.' not in next_part: return next_part # It's a folder, so it's the channel name else: return 'MP4 Root' # File is directly in MP4 folder else: return 'MP4 Root' # Look for any folder that contains 'karaoke' (fallback) for part in parts: if 'karaoke' in part.lower() and '.' not in part: return part # If no specific channel found, return the folder containing the file if len(parts) >= 2: parent_folder = parts[-2] # Second to last part (folder containing the file) # If parent folder is MP4, then file is in root if parent_folder.lower() == 'mp4': return 'MP4 Root' # Make sure we're returning a folder name, not a filename if '.' not in parent_folder: return parent_folder else: # If parent is also a file, go up one more level if len(parts) >= 3: grandparent = parts[-3] if '.' not in grandparent: return grandparent return 'Unknown' def normalize_path(file_path: str) -> str: """Normalize malformed file paths that have been corrupted with ://.""" # Debug logging to track path transformation - show original path first print(f"DEBUG: normalize_path ORIGINAL PATH: {file_path}") # Fix malformed paths that have been corrupted with :// # Since we fixed the HTML string literal escaping, paths should now come in correctly # Just handle the basic :// to :\ conversion if '://' in file_path: print(f"DEBUG: Detected malformed path with ://, attempting to fix: {file_path}") # Clean up special characters in the path first - but be more conservative import re # Only remove actual control characters, not legitimate path characters file_path = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', file_path) # Remove only control characters file_path = re.sub(r'\s+', ' ', file_path) # Normalize multiple spaces to single space file_path = file_path.strip() # Remove leading/trailing spaces # Simple fix: replace :// with :\ fixed_path = file_path.replace('://', ':\\') print(f"DEBUG: Fixed path (simple :// to :\\ conversion): {fixed_path}") return fixed_path # If no :// corruption detected, return the path as-is print(f"DEBUG: normalize_path output (no changes): {file_path}") return file_path @app.route('/') def index(): """Main dashboard page.""" return render_template('index.html') @app.route('/remaining-songs') def remaining_songs(): """Page showing remaining songs after cleanup.""" return render_template('remaining_songs.html') @app.route('/favorites') def favorites(): """Page showing favorites with matching songs.""" return render_template('favorites.html') @app.route('/history') def history(): """Page showing history with matching songs.""" return render_template('history.html') @app.route('/api/duplicates') def get_duplicates(): """API endpoint to get duplicate data.""" # Try to load detailed skip songs first, fallback to basic skip list skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json')) if not skip_songs: skip_songs = load_json_file(os.path.join(DATA_DIR, 'skipSongs.json')) if not skip_songs: return jsonify({'error': 'No skip songs data found'}), 404 duplicate_groups = get_duplicate_groups(skip_songs) # Apply filters artist_filter = request.args.get('artist', '').lower() title_filter = request.args.get('title', '').lower() channel_filter = request.args.get('channel', '').lower() file_type_filter = request.args.get('file_type', '').lower() min_duplicates = int(request.args.get('min_duplicates', 0)) filtered_groups = [] for group in duplicate_groups: # Apply filters if artist_filter and artist_filter not in group['artist'].lower(): continue if title_filter and title_filter not in group['title'].lower(): continue if group['total_duplicates'] < min_duplicates: continue # Check if any version (kept or skipped) matches channel/file_type filters if channel_filter or file_type_filter: matches_filter = False # Special handling for mp3-only filter if file_type_filter == 'mp3-only': # Check if kept version is MP3 and there are no MP4 alternatives kept_file_type = get_file_type(group['kept_version']) has_mp4_alternative = False # Check if any skipped version is MP4 for version in group['skipped_versions']: if version['file_type'].upper() == 'MP4': has_mp4_alternative = True break # Only show if kept version is MP3 and no MP4 alternatives exist if kept_file_type.upper() != 'MP3' or has_mp4_alternative: continue # Apply channel filter if specified if channel_filter: kept_channel = extract_channel(group['kept_version']) if channel_filter not in kept_channel.lower(): continue elif file_type_filter == 'mp3': # Special handling for MP3 filter - show songs where primary is MP3 and has alternatives kept_file_type = get_file_type(group['kept_version']) # Only show if kept version is MP3 and there are alternatives if kept_file_type.upper() != 'MP3' or len(group['skipped_versions']) == 0: continue # Apply channel filter if specified if channel_filter: kept_channel = extract_channel(group['kept_version']) if channel_filter not in kept_channel.lower(): continue else: # Regular file type and channel filtering # Check kept version kept_channel = extract_channel(group['kept_version']) kept_file_type = get_file_type(group['kept_version']) if (not channel_filter or channel_filter in kept_channel.lower()) and \ (not file_type_filter or file_type_filter in kept_file_type.lower()): matches_filter = True # Check skipped versions if kept version doesn't match if not matches_filter: for version in group['skipped_versions']: if (not channel_filter or channel_filter in version['channel'].lower()) and \ (not file_type_filter or file_type_filter in version['file_type'].lower()): matches_filter = True break if not matches_filter: continue filtered_groups.append(group) # Pagination page = int(request.args.get('page', 1)) per_page = int(request.args.get('per_page', 50)) start_idx = (page - 1) * per_page end_idx = start_idx + per_page paginated_groups = filtered_groups[start_idx:end_idx] return jsonify({ 'duplicates': paginated_groups, 'total': len(filtered_groups), 'page': page, 'per_page': per_page, 'total_pages': (len(filtered_groups) + per_page - 1) // per_page }) @app.route('/api/stats') def get_stats(): """API endpoint to get overall statistics.""" # Try to load detailed skip songs first, fallback to basic skip list skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json')) if not skip_songs: skip_songs = load_json_file(os.path.join(DATA_DIR, 'skipSongs.json')) if not skip_songs: return jsonify({'error': 'No skip songs data found'}), 404 # Load original all songs data to get total counts all_songs = load_json_file(os.path.join(DATA_DIR, 'songs.json')) if not all_songs: all_songs = [] duplicate_groups = get_duplicate_groups(skip_songs) # Calculate current statistics total_duplicates = len(duplicate_groups) total_files_to_skip = len(skip_songs) # File type breakdown for skipped files skip_file_types = {'MP4': 0, 'MP3': 0} channels = {} for group in duplicate_groups: # Include kept version in channel stats kept_channel = extract_channel(group['kept_version']) channels[kept_channel] = channels.get(kept_channel, 0) + 1 # Include skipped versions for version in group['skipped_versions']: skip_file_types[version['file_type']] += 1 channel = version['channel'] channels[channel] = channels.get(channel, 0) + 1 # Calculate total file type breakdown from all songs total_file_types = {'MP4': 0, 'MP3': 0} total_songs = len(all_songs) for song in all_songs: file_type = get_file_type(song.get('path', '')) if file_type in total_file_types: total_file_types[file_type] += 1 # Calculate what will remain after skipping remaining_file_types = { 'MP4': total_file_types['MP4'] - skip_file_types['MP4'], 'MP3': total_file_types['MP3'] - skip_file_types['MP3'] } total_remaining = sum(remaining_file_types.values()) # Most duplicated songs most_duplicated = sorted(duplicate_groups, key=lambda x: x['total_duplicates'], reverse=True)[:10] return jsonify({ 'total_songs': total_songs, 'total_duplicates': total_duplicates, 'total_files_to_skip': total_files_to_skip, 'total_remaining': total_remaining, 'total_file_types': total_file_types, 'skip_file_types': skip_file_types, 'remaining_file_types': remaining_file_types, 'channels': channels, 'most_duplicated': most_duplicated }) @app.route('/api/config') def get_config(): """API endpoint to get current configuration.""" config = load_json_file(CONFIG_FILE) return jsonify(config or {}) @app.route('/api/save-changes', methods=['POST']) def save_changes(): """API endpoint to save user changes to the skip list.""" try: data = request.get_json() changes = data.get('changes', []) # Load current skip list skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json')) if not skip_songs: return jsonify({'error': 'No skip songs data found'}), 404 # Apply changes for change in changes: change_type = change.get('type') song_key = change.get('song_key') # artist - title file_path = change.get('file_path') if change_type == 'keep_file': # Remove this file from skip list skip_songs = [s for s in skip_songs if s['path'] != file_path] elif change_type == 'skip_file': # Add this file to skip list new_entry = { 'path': file_path, 'reason': 'manual_skip', 'artist': change.get('artist'), 'title': change.get('title'), 'kept_version': change.get('kept_version') } skip_songs.append(new_entry) # Save updated skip list backup_path = os.path.join(DATA_DIR, 'reports', f'skip_songs_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json') import shutil shutil.copy2(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'), backup_path) with open(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'), 'w', encoding='utf-8') as f: json.dump(skip_songs, f, indent=2, ensure_ascii=False) return jsonify({ 'success': True, 'message': f'Changes saved successfully. Backup created at: {backup_path}', 'total_files': len(skip_songs) }) except Exception as e: return jsonify({'error': f'Error saving changes: {str(e)}'}), 500 @app.route('/api/artists') def get_artists(): """API endpoint to get list of all artists for grouping.""" skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json')) if not skip_songs: return jsonify({'error': 'No skip songs data found'}), 404 duplicate_groups = get_duplicate_groups(skip_songs) # Group by artist artists = {} for group in duplicate_groups: artist = group['artist'] if artist not in artists: artists[artist] = { 'name': artist, 'songs': [], 'total_duplicates': 0 } artists[artist]['songs'].append(group) artists[artist]['total_duplicates'] += group['total_duplicates'] # Convert to list and sort by artist name artists_list = list(artists.values()) artists_list.sort(key=lambda x: x['name'].lower()) return jsonify({ 'artists': artists_list, 'total_artists': len(artists_list) }) @app.route('/api/mp3-songs') def get_mp3_songs(): """API endpoint to get MP3 songs that remain after cleanup.""" # Load all songs and skip songs all_songs = load_json_file(os.path.join(DATA_DIR, 'songs.json')) skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json')) if not all_songs: return jsonify({'error': 'No all songs data found'}), 404 if not skip_songs: skip_songs = [] # Generate MP3 song list mp3_song_list = generate_mp3_song_list(all_songs, skip_songs) return jsonify(mp3_song_list) @app.route('/api/remaining-songs') def get_remaining_songs(): """Get all remaining songs (MP4 and MP3) after cleanup with pagination.""" try: all_songs = load_json_file(os.path.join(DATA_DIR, 'songs.json')) skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json')) if not all_songs: return jsonify({'error': 'No all songs data found'}), 404 if not skip_songs: skip_songs = [] # Get pagination parameters page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 50, type=int) search = request.args.get('search', '').lower() file_type_filter = request.args.get('file_type', 'all') artist_filter = request.args.get('artist', '') # Create a set of paths that are being skipped skip_paths = {song['path'] for song in skip_songs} # Filter for songs that are NOT being skipped remaining_songs = [] for song in all_songs: path = song.get('path', '') if path not in skip_paths: # Apply file type filter if file_type_filter != 'all': if file_type_filter == 'mp4' and not path.lower().endswith('.mp4'): continue elif file_type_filter == 'mp3' and not path.lower().endswith(('.mp3', '.cdg')): continue # Apply search filter if search: title = song.get('title', '').lower() artist = song.get('artist', '').lower() if search not in title and search not in artist: continue # Apply artist filter if artist_filter: artist = song.get('artist', '').lower() if artist_filter.lower() not in artist: continue remaining_songs.append({ 'title': song.get('title', 'Unknown'), 'artist': song.get('artist', 'Unknown'), 'path': song.get('path', ''), 'file_type': get_file_type(song.get('path', '')), 'channel': extract_channel(song.get('path', '')) }) # Sort by artist, then by title remaining_songs.sort(key=lambda x: (x['artist'].lower(), x['title'].lower())) # Calculate pagination total_songs = len(remaining_songs) total_pages = (total_songs + per_page - 1) // per_page start_idx = (page - 1) * per_page end_idx = start_idx + per_page # Get songs for current page page_songs = remaining_songs[start_idx:end_idx] return jsonify({ 'songs': page_songs, 'pagination': { 'current_page': page, 'per_page': per_page, 'total_songs': total_songs, 'total_pages': total_pages } }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/download/mp3-songs') def download_mp3_songs(): """Download MP3 songs list as JSON file.""" # Load all songs and skip songs all_songs = load_json_file(os.path.join(DATA_DIR, 'songs.json')) skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json')) if not all_songs: return jsonify({'error': 'No all songs data found'}), 404 if not skip_songs: skip_songs = [] # Generate MP3 song list mp3_song_list = generate_mp3_song_list(all_songs, skip_songs) # Save to file output_path = os.path.join(DATA_DIR, 'reports', 'mp3SongList.json') with open(output_path, 'w', encoding='utf-8') as f: json.dump(mp3_song_list, f, indent=2, ensure_ascii=False) # Return the file for download return send_from_directory( os.path.join(DATA_DIR, 'reports'), 'mp3SongList.json', as_attachment=True, download_name='mp3SongList.json' ) @app.route('/api/save-priority-preferences', methods=['POST']) def save_priority_preferences(): """API endpoint to save user priority preferences.""" try: data = request.get_json() priority_changes = data.get('priority_changes', {}) if not priority_changes: return jsonify({'error': 'No priority changes provided'}), 400 # Create preferences directory if it doesn't exist preferences_dir = os.path.join(DATA_DIR, 'preferences') os.makedirs(preferences_dir, exist_ok=True) # Load existing preferences preferences_file = os.path.join(preferences_dir, 'priority_preferences.json') existing_preferences = {} if os.path.exists(preferences_file): with open(preferences_file, 'r', encoding='utf-8') as f: existing_preferences = json.load(f) # Update with new preferences existing_preferences.update(priority_changes) # Save updated preferences with open(preferences_file, 'w', encoding='utf-8') as f: json.dump(existing_preferences, f, indent=2, ensure_ascii=False) # Create backup backup_path = os.path.join(preferences_dir, f'priority_preferences_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json') with open(backup_path, 'w', encoding='utf-8') as f: json.dump(existing_preferences, f, indent=2, ensure_ascii=False) return jsonify({ 'success': True, 'message': f'Saved {len(priority_changes)} priority preferences. Backup created at: {backup_path}', 'total_preferences': len(existing_preferences) }) except Exception as e: return jsonify({'error': f'Error saving priority preferences: {str(e)}'}), 500 @app.route('/api/reset-priority-preferences', methods=['POST']) def reset_priority_preferences(): """API endpoint to reset all priority preferences.""" try: preferences_dir = os.path.join(DATA_DIR, 'preferences') preferences_file = os.path.join(preferences_dir, 'priority_preferences.json') if os.path.exists(preferences_file): # Create backup before deletion backup_path = os.path.join(preferences_dir, f'priority_preferences_reset_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json') import shutil shutil.copy2(preferences_file, backup_path) # Delete the preferences file os.remove(preferences_file) return jsonify({ 'success': True, 'message': f'Priority preferences reset successfully. Backup created at: {backup_path}' }) else: return jsonify({ 'success': True, 'message': 'No priority preferences found to reset' }) except Exception as e: return jsonify({'error': f'Error resetting priority preferences: {str(e)}'}), 500 @app.route('/api/load-priority-preferences') def load_priority_preferences(): """API endpoint to load current priority preferences.""" try: preferences_file = os.path.join(DATA_DIR, 'preferences', 'priority_preferences.json') if os.path.exists(preferences_file): with open(preferences_file, 'r', encoding='utf-8') as f: preferences = json.load(f) return jsonify({ 'success': True, 'preferences': preferences }) else: return jsonify({ 'success': True, 'preferences': {} }) except Exception as e: return jsonify({'error': f'Error loading priority preferences: {str(e)}'}), 500 def find_matching_songs(item: Dict[str, Any], all_songs: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Find all songs from songs.json that match a given item (favorite/history).""" matching_songs = [] # Validate input if item is None or not isinstance(item, dict): print(f"Warning: Invalid item passed to find_matching_songs: {item}") return matching_songs item_artist = item.get('artist', '').lower().strip() item_title = item.get('title', '').lower().strip() for song in all_songs: # Skip None songs if song is None or not isinstance(song, dict): continue song_artist = song.get('artist', '').lower().strip() song_title = song.get('title', '').lower().strip() # Exact match on artist and title if song_artist == item_artist and song_title == item_title: matching_songs.append({ 'path': song.get('path', ''), 'artist': song.get('artist', 'Unknown'), 'title': song.get('title', 'Unknown'), 'file_type': get_file_type(song.get('path', '')), 'channel': extract_channel(song.get('path', '')), 'is_current': song.get('path', '') == item.get('path', '') }) # Sort by priority: current path first, then by file type priority def sort_key(song): # Current path gets highest priority if song['is_current']: return (0, 0) # File type priority: MP4 > CDG/MP3 > standalone MP3 > standalone CDG file_type_priority = { 'MP4': 1, 'MP3': 2, 'Unknown': 3 } return (1, file_type_priority.get(song['file_type'], 3)) matching_songs.sort(key=sort_key) return matching_songs @app.route('/api/favorites') def get_favorites(): """Get favorites data with matching songs for each favorite.""" try: # Load data files favorites_file = os.path.join(DATA_DIR, 'favorites.json') all_songs_file = os.path.join(DATA_DIR, 'songs.json') favorites = load_json_file(favorites_file) all_songs = load_json_file(all_songs_file) if not favorites or not all_songs: return jsonify({'error': 'Failed to load data files'}), 500 # Process each favorite processed_favorites = [] for i, favorite in enumerate(favorites): # Skip None items or items without required fields if favorite is None: print(f"Warning: Skipping None item at index {i} in favorites") continue if not isinstance(favorite, dict): print(f"Warning: Skipping non-dict item at index {i} in favorites: {type(favorite)}") continue try: matching_songs = find_matching_songs(favorite, all_songs) processed_favorite = { 'index': i, 'original': favorite, 'matching_songs': matching_songs, 'artist': favorite.get('artist', 'Unknown'), 'title': favorite.get('title', 'Unknown'), 'current_path': favorite.get('path', ''), 'count': favorite.get('count', 0), 'favorite': favorite.get('favorite', False), 'disabled': favorite.get('disabled', False) } processed_favorites.append(processed_favorite) except Exception as item_error: print(f"Error processing favorite item {i}: {item_error}") print(f"Item data: {favorite}") continue # Sort by artist, then by title processed_favorites.sort(key=lambda x: (x['artist'].lower(), x['title'].lower())) return jsonify({ 'success': True, 'favorites': processed_favorites, 'total': len(processed_favorites) }) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/history') def get_history(): """Get history data with matching songs for each history item.""" try: # Load data files history_file = os.path.join(DATA_DIR, 'history.json') all_songs_file = os.path.join(DATA_DIR, 'songs.json') history = load_json_file(history_file) all_songs = load_json_file(all_songs_file) if not history or not all_songs: return jsonify({'error': 'Failed to load data files'}), 500 # Process each history item processed_history = [] for i, item in enumerate(history): # Skip None items or items without required fields if item is None: print(f"Warning: Skipping None item at index {i} in history") continue if not isinstance(item, dict): print(f"Warning: Skipping non-dict item at index {i} in history: {type(item)}") continue try: matching_songs = find_matching_songs(item, all_songs) processed_item = { 'index': i, 'original': item, 'matching_songs': matching_songs, 'artist': item.get('artist', 'Unknown'), 'title': item.get('title', 'Unknown'), 'current_path': item.get('path', ''), 'count': item.get('count', 0), 'favorite': item.get('favorite', False), 'disabled': item.get('disabled', False) } processed_history.append(processed_item) except Exception as item_error: print(f"Error processing history item {i}: {item_error}") print(f"Item data: {item}") continue # Sort by artist, then by title processed_history.sort(key=lambda x: (x['artist'].lower(), x['title'].lower())) return jsonify({ 'success': True, 'history': processed_history, 'total': len(processed_history) }) except Exception as e: print(f"Error in get_history: {e}") import traceback print(f"Full traceback: {traceback.format_exc()}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/update-favorite-path', methods=['POST']) def update_favorite_path(): """Update the path of a favorite item.""" try: data = request.get_json() favorite_index = data.get('index') new_path = data.get('path') if favorite_index is None or new_path is None: return jsonify({'success': False, 'error': 'Missing index or path'}), 400 # Load favorites file favorites_file = os.path.join(DATA_DIR, 'favorites.json') favorites = load_json_file(favorites_file) if not favorites or favorite_index >= len(favorites): return jsonify({'success': False, 'error': 'Invalid favorite index'}), 400 # Update the path favorites[favorite_index]['path'] = new_path # Save back to file with open(favorites_file, 'w', encoding='utf-8') as f: json.dump(favorites, f, indent=2, ensure_ascii=False) return jsonify({'success': True}) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/update-history-path', methods=['POST']) def update_history_path(): """Update the path of a history item.""" try: data = request.get_json() history_index = data.get('index') new_path = data.get('path') if history_index is None or new_path is None: return jsonify({'success': False, 'error': 'Missing index or path'}), 400 # Load history file history_file = os.path.join(DATA_DIR, 'history.json') history = load_json_file(history_file) if not history or history_index >= len(history): return jsonify({'success': False, 'error': 'Invalid history index'}), 400 # Update the path history[history_index]['path'] = new_path # Save back to file with open(history_file, 'w', encoding='utf-8') as f: json.dump(history, f, indent=2, ensure_ascii=False) return jsonify({'success': True}) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/update-favorite-property', methods=['POST']) def update_favorite_property(): """Update any property of a favorite item.""" try: data = request.get_json() index = data.get('index') property_name = data.get('property') new_value = data.get('value') if index is None or property_name is None or new_value is None: return jsonify({'success': False, 'error': 'Missing index, property, or value'}), 400 # Load current favorites favorites_file = os.path.join(DATA_DIR, 'favorites.json') if not os.path.exists(favorites_file): return jsonify({'success': False, 'error': 'Favorites file not found'}), 404 with open(favorites_file, 'r', encoding='utf-8') as f: favorites = json.load(f) # Update the property if 0 <= index < len(favorites): favorites[index][property_name] = new_value # Special handling for path updates if property_name == 'path' and 'original_path' in favorites[index]: # Update original_path if it matches the old path if favorites[index]['original_path'] == favorites[index].get('path'): favorites[index]['original_path'] = new_value # Save updated favorites with open(favorites_file, 'w', encoding='utf-8') as f: json.dump(favorites, f, indent=2, ensure_ascii=False) return jsonify({'success': True}) else: return jsonify({'success': False, 'error': 'Invalid index'}), 400 except Exception as e: print(f"Error updating favorite property: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/update-history-property', methods=['POST']) def update_history_property(): """Update any property of a history item.""" try: data = request.get_json() index = data.get('index') property_name = data.get('property') new_value = data.get('value') if index is None or property_name is None or new_value is None: return jsonify({'success': False, 'error': 'Missing index, property, or value'}), 400 # Load current history history_file = os.path.join(DATA_DIR, 'history.json') if not os.path.exists(history_file): return jsonify({'success': False, 'error': 'History file not found'}), 404 with open(history_file, 'r', encoding='utf-8') as f: history = json.load(f) # Update the property if 0 <= index < len(history): history[index][property_name] = new_value # Special handling for path updates if property_name == 'path' and 'original_path' in history[index]: # Update original_path if it matches the old path if history[index]['original_path'] == history[index].get('path'): history[index]['original_path'] = new_value # Save updated history with open(history_file, 'w', encoding='utf-8') as f: json.dump(history, f, indent=2, ensure_ascii=False) return jsonify({'success': True}) else: return jsonify({'success': False, 'error': 'Invalid index'}), 400 except Exception as e: print(f"Error updating history property: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/delete-favorite', methods=['POST']) def delete_favorite(): """Delete a favorite item by index.""" try: data = request.get_json() index = data.get('index') if index is None: return jsonify({'success': False, 'error': 'Missing index'}), 400 # Load current favorites favorites_file = os.path.join(DATA_DIR, 'favorites.json') if not os.path.exists(favorites_file): return jsonify({'success': False, 'error': 'Favorites file not found'}), 404 with open(favorites_file, 'r', encoding='utf-8') as f: favorites = json.load(f) # Delete the item at the specified index if 0 <= index < len(favorites): deleted_item = favorites.pop(index) # Save updated favorites with open(favorites_file, 'w', encoding='utf-8') as f: json.dump(favorites, f, indent=2, ensure_ascii=False) return jsonify({ 'success': True, 'deleted_item': deleted_item, 'message': f'Deleted "{deleted_item.get("artist", "Unknown")} - {deleted_item.get("title", "Unknown")}"' }) else: return jsonify({'success': False, 'error': 'Invalid index'}), 400 except Exception as e: print(f"Error deleting favorite: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/delete-history', methods=['POST']) def delete_history(): """Delete a history item by index.""" try: data = request.get_json() index = data.get('index') if index is None: return jsonify({'success': False, 'error': 'Missing index'}), 400 # Load current history history_file = os.path.join(DATA_DIR, 'history.json') if not os.path.exists(history_file): return jsonify({'success': False, 'error': 'History file not found'}), 404 with open(history_file, 'r', encoding='utf-8') as f: history = json.load(f) # Delete the item at the specified index if 0 <= index < len(history): deleted_item = history.pop(index) # Save updated history with open(history_file, 'w', encoding='utf-8') as f: json.dump(history, f, indent=2, ensure_ascii=False) return jsonify({ 'success': True, 'deleted_item': deleted_item, 'message': f'Deleted "{deleted_item.get("artist", "Unknown")} - {deleted_item.get("title", "Unknown")}"' }) else: return jsonify({'success': False, 'error': 'Invalid index'}), 400 except Exception as e: print(f"Error deleting history item: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/merge-history', methods=['POST']) def merge_history(): """Merge history objects that match on artist, title, and path, summing their count properties.""" try: # Load current history history_file = os.path.join(DATA_DIR, 'history.json') if not os.path.exists(history_file): return jsonify({'success': False, 'error': 'History file not found'}), 404 with open(history_file, 'r', encoding='utf-8') as f: history_items = json.load(f) if not history_items: return jsonify({'success': False, 'error': 'No history items found to merge'}), 400 # Create a dictionary to group items by artist, title, and path grouped_items = {} merged_count = 0 total_merged_entries = 0 for item in history_items: if not isinstance(item, dict): continue artist = item.get('artist', '').strip() title = item.get('title', '').strip() path = item.get('path', '').strip() if not artist or not title or not path: continue # Create a key for grouping key = (artist.lower(), title.lower(), path.lower()) if key not in grouped_items: grouped_items[key] = [] grouped_items[key].append(item) # Process groups with multiple items merged_items = [] for key, items in grouped_items.items(): if len(items) == 1: # Single item, keep as is merged_items.append(items[0]) else: # Multiple items, merge them artist, title, path = key # Start with the first item as the base merged_item = items[0].copy() # Sum the counts (handle both int and string values) total_count = 0 for item in items: count_value = item.get('count', 0) if isinstance(count_value, str): try: total_count += int(count_value) except ValueError: total_count += 0 else: total_count += count_value merged_item['count'] = total_count # For boolean properties, if any are True, keep True merged_item['favorite'] = any(item.get('favorite', False) for item in items) merged_item['disabled'] = any(item.get('disabled', False) for item in items) # For other properties, keep the first non-empty value for prop in ['key', 'original_path', 'genre']: if prop in merged_item and merged_item[prop]: continue for item in items[1:]: # Skip first item since we already have it if item.get(prop): merged_item[prop] = item[prop] break merged_items.append(merged_item) merged_count += 1 total_merged_entries += len(items) # Save the merged history with open(history_file, 'w', encoding='utf-8') as f: json.dump(merged_items, f, indent=2, ensure_ascii=False) return jsonify({ 'success': True, 'merged_groups': merged_count, 'total_merged_entries': total_merged_entries, 'final_count': len(merged_items), 'message': f'Successfully merged {merged_count} groups ({total_merged_entries} total entries → {len(merged_items)} entries)' }) except Exception as e: print(f"Error merging history objects: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/video/') def serve_video(file_path): """Serve video files for playback in the web UI.""" try: # Decode the file path (it comes URL-encoded) import urllib.parse decoded_path = urllib.parse.unquote(file_path) # Normalize the path to fix any malformed paths normalized_path = normalize_path(decoded_path) # Debug logging - show original path first print(f"DEBUG: serve_video ORIGINAL PATH: {decoded_path}") print(f"DEBUG: Normalized path: {normalized_path}") print(f"DEBUG: Current working directory: {os.getcwd()}") print(f"DEBUG: Path transformation: '{decoded_path}' -> '{normalized_path}'") # Security check: ensure the path is within allowed directories # This prevents directory traversal attacks if '..' in normalized_path: print(f"DEBUG: Security check failed - path contains '..'") return jsonify({'error': 'Invalid file path'}), 400 # On Windows, allow absolute paths with drive letters # On Unix-like systems, block absolute paths if os.name == 'nt': # Windows if normalized_path.startswith('/') and not normalized_path[1:].startswith(':'): print(f"DEBUG: Security check failed - Unix-style absolute path on Windows") return jsonify({'error': 'Invalid file path'}), 400 else: # Unix-like systems if normalized_path.startswith('/'): print(f"DEBUG: Security check failed - absolute path on Unix") return jsonify({'error': 'Invalid file path'}), 400 # Check if file exists if not os.path.exists(normalized_path): print(f"DEBUG: File does not exist: {normalized_path}") # Try to list the directory to see what's available try: directory = os.path.dirname(normalized_path) if os.path.exists(directory): files_in_dir = os.listdir(directory) print(f"DEBUG: Files in directory {directory}: {files_in_dir[:10]}...") # Show first 10 files else: print(f"DEBUG: Directory does not exist: {directory}") except Exception as e: print(f"DEBUG: Error listing directory: {e}") return jsonify({'error': 'Video file not found'}), 404 # Check if it's a video file and determine MIME type file_extension = os.path.splitext(normalized_path)[1].lower() mime_types = { '.mp4': 'video/mp4', '.avi': 'video/x-msvideo', '.mkv': 'video/x-matroska', '.mov': 'video/quicktime', '.wmv': 'video/x-ms-wmv', '.flv': 'video/x-flv', '.webm': 'video/webm' } if file_extension not in mime_types: print(f"DEBUG: Invalid file type: {normalized_path}") return jsonify({'error': 'Invalid file type'}), 400 mime_type = mime_types[file_extension] # Get file info for debugging file_size = os.path.getsize(normalized_path) print(f"DEBUG: File exists, size: {file_size} bytes") print(f"DEBUG: MIME type: {mime_type}") # Serve the video file directory = os.path.dirname(normalized_path) filename = os.path.basename(normalized_path) print(f"DEBUG: Serving from directory: {directory}") print(f"DEBUG: Filename: {filename}") # Add headers for better video streaming response = send_from_directory( directory, filename, mimetype=mime_type ) # Add CORS headers to allow cross-origin requests response.headers['Access-Control-Allow-Origin'] = '*' response.headers['Access-Control-Allow-Methods'] = 'GET, HEAD, OPTIONS' response.headers['Access-Control-Allow-Headers'] = 'Range' # Add cache control headers response.headers['Cache-Control'] = 'public, max-age=3600' return response except Exception as e: print(f"DEBUG: Exception in serve_video: {str(e)}") import traceback print(f"DEBUG: Full traceback: {traceback.format_exc()}") return jsonify({'error': f'Error serving video: {str(e)}'}), 500 # Playlist Validation Endpoints @app.route('/playlist-validation') def playlist_validation(): """Playlist validation page.""" return render_template('playlist_validation.html') @app.route('/api/playlists') def get_playlists(): """Get list of all playlists.""" try: playlists_path = os.path.join(DATA_DIR, 'songList.json') with open(playlists_path, 'r', encoding='utf-8') as f: playlists = json.load(f) # Return basic playlist info playlist_list = [] for i, playlist in enumerate(playlists): playlist_list.append({ 'index': i, 'title': playlist.get('title', 'Unknown Playlist'), 'song_count': len(playlist.get('songs', [])) }) return jsonify(playlist_list) except Exception as e: return jsonify({'error': f'Error loading playlists: {str(e)}'}), 500 @app.route('/api/validate-playlist/') def validate_playlist(playlist_index): """Validate a specific playlist.""" try: # Load configuration config = load_json_file(CONFIG_FILE) if not config: return jsonify({'error': 'Could not load configuration'}), 500 # Create validator validator = PlaylistValidator(config, DATA_DIR) # Load playlists playlists_path = os.path.join(DATA_DIR, 'songList.json') with open(playlists_path, 'r', encoding='utf-8') as f: playlists = json.load(f) if playlist_index >= len(playlists): return jsonify({'error': f'Invalid playlist index: {playlist_index}'}), 400 # Validate the playlist result = validator.validate_playlist(playlists[playlist_index], dry_run=True) return jsonify(result) except Exception as e: return jsonify({'error': f'Error validating playlist: {str(e)}'}), 500 @app.route('/api/validate-all-playlists') def validate_all_playlists(): """Validate all playlists.""" try: # Load configuration config = load_json_file(CONFIG_FILE) if not config: return jsonify({'error': 'Could not load configuration'}), 500 # Create validator validator = PlaylistValidator(config, DATA_DIR) # Validate all playlists results = validator.validate_all_playlists(dry_run=True) return jsonify(results) except Exception as e: return jsonify({'error': f'Error validating playlists: {str(e)}'}), 500 @app.route('/api/update-playlist-song', methods=['POST']) def update_playlist_song(): """Update a playlist song with corrected artist/title.""" try: data = request.get_json() playlist_index = data.get('playlist_index') song_position = data.get('song_position') new_artist = data.get('new_artist') new_title = data.get('new_title') dry_run = data.get('dry_run', True) if any(x is None for x in [playlist_index, song_position, new_artist, new_title]): return jsonify({'error': 'Missing required parameters'}), 400 # Load configuration config = load_json_file(CONFIG_FILE) if not config: return jsonify({'error': 'Could not load configuration'}), 500 # Create validator validator = PlaylistValidator(config, DATA_DIR) # Update the song success = validator.update_playlist_song( playlist_index, song_position, new_artist, new_title, dry_run ) if success: return jsonify({'success': True, 'message': 'Playlist song updated successfully'}) else: return jsonify({'error': 'Failed to update playlist song'}), 500 except Exception as e: return jsonify({'error': f'Error updating playlist song: {str(e)}'}), 500 @app.route('/api/apply-all-updates', methods=['POST']) def apply_all_updates(): """Apply all pending playlist updates in batch.""" try: data = request.get_json() changes = data.get('changes', []) if not changes: return jsonify({'error': 'No changes to apply'}), 400 # Load configuration config = load_json_file(CONFIG_FILE) if not config: return jsonify({'error': 'Could not load configuration'}), 500 # Create validator validator = PlaylistValidator(config, DATA_DIR) # Apply all changes success_count = 0 failed_count = 0 for change in changes: success = validator.update_playlist_song( change['playlistIndex'], change['songPosition'], change['newArtist'], change['newTitle'], False # Always apply changes (not dry run) ) if success: success_count += 1 else: failed_count += 1 if failed_count == 0: message = f'Successfully applied {success_count} changes to playlists.' else: message = f'Applied {success_count} changes, {failed_count} failed.' return jsonify({ 'success': True, 'message': message, 'applied': success_count, 'failed': failed_count }) except Exception as e: return jsonify({'error': f'Error applying updates: {str(e)}'}), 500 @app.route('/api/reset-and-regenerate', methods=['POST']) def reset_and_regenerate(): """Delete all generated files and run the CLI tool again.""" try: import subprocess import shutil import os import threading # Reset progress data reset_progress() progress_data.update({ 'status': 'starting', 'message': 'Initializing reset and regenerate process...', 'progress': 0, 'current_step': 'Initializing', 'cli_output': [] }) # Get the project root directory (parent of web directory) project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # Files and directories to delete files_to_delete = [ os.path.join(DATA_DIR, 'skipSongs.json'), os.path.join(DATA_DIR, 'reports'), os.path.join(DATA_DIR, 'preferences') ] deleted_items = [] # Update progress progress_data.update({ 'status': 'deleting', 'message': 'Deleting generated files...', 'progress': 10, 'current_step': 'Cleaning up old files' }) # Delete files and directories for item_path in files_to_delete: if os.path.exists(item_path): try: if os.path.isfile(item_path): os.remove(item_path) deleted_items.append(f"File: {os.path.basename(item_path)}") elif os.path.isdir(item_path): shutil.rmtree(item_path) deleted_items.append(f"Directory: {os.path.basename(item_path)}") except Exception as e: print(f"Warning: Could not delete {item_path}: {e}") # Run the CLI tool cli_dir = os.path.join(project_root, 'cli') cli_script = os.path.join(cli_dir, 'main.py') if not os.path.exists(cli_script): progress_data.update({ 'status': 'error', 'message': 'CLI script not found', 'progress': 0, 'current_step': 'Error' }) return jsonify({'error': 'CLI script not found'}), 500 # Update progress progress_data.update({ 'status': 'running', 'message': 'Running CLI tool to analyze songs...', 'progress': 20, 'current_step': 'Running CLI Analysis' }) # Change to CLI directory and run the tool original_cwd = os.getcwd() os.chdir(cli_dir) def run_cli_with_progress(): global progress_data try: # Run the CLI tool with process-all flag and capture output in real-time process = subprocess.Popen( [sys.executable, 'main.py', '--process-all', '--verbose'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True ) # Read output line by line line_count = 0 for line in iter(process.stdout.readline, ''): if line: line = line.strip() progress_data['cli_output'].append(line) line_count += 1 # Update progress based on output patterns if 'Loading songs' in line or 'loading' in line.lower(): progress_data.update({ 'progress': 30, 'current_step': 'Loading song library', 'message': f'Loading song library... ({line_count} lines processed)' }) elif 'Finding duplicates' in line or 'duplicate' in line.lower(): progress_data.update({ 'progress': 50, 'current_step': 'Finding duplicates', 'message': f'Finding duplicates... ({line_count} lines processed)' }) elif 'Saving reports' in line or 'saving' in line.lower(): progress_data.update({ 'progress': 80, 'current_step': 'Saving reports', 'message': f'Saving reports... ({line_count} lines processed)' }) elif 'Complete' in line or 'Finished' in line or 'done' in line.lower(): progress_data.update({ 'progress': 100, 'current_step': 'Complete', 'message': 'Process completed successfully!' }) else: # Update progress gradually based on line count if line_count < 100: progress_data.update({ 'progress': min(20 + (line_count * 2), 30), 'current_step': 'Initializing', 'message': f'Initializing... ({line_count} lines processed)' }) elif line_count < 500: progress_data.update({ 'progress': min(30 + ((line_count - 100) * 0.1), 50), 'current_step': 'Processing', 'message': f'Processing songs... ({line_count} lines processed)' }) elif line_count < 1000: progress_data.update({ 'progress': min(50 + ((line_count - 500) * 0.05), 80), 'current_step': 'Analyzing', 'message': f'Analyzing duplicates... ({line_count} lines processed)' }) else: progress_data.update({ 'progress': min(80 + ((line_count - 1000) * 0.02), 95), 'current_step': 'Finalizing', 'message': f'Finalizing... ({line_count} lines processed)' }) process.stdout.close() return_code = process.wait() if return_code == 0: progress_data.update({ 'status': 'completed', 'message': f'✅ Reset and regeneration completed successfully!\n\nDeleted items:\n' + "\n".join(deleted_items), 'progress': 100, 'current_step': 'Complete' }) else: progress_data.update({ 'status': 'error', 'message': f'CLI tool failed with return code {return_code}', 'progress': 0, 'current_step': 'Error' }) except Exception as e: progress_data.update({ 'status': 'error', 'message': f'Error during CLI execution: {str(e)}', 'progress': 0, 'current_step': 'Error' }) finally: # Restore original working directory os.chdir(original_cwd) # Run CLI in a separate thread cli_thread = threading.Thread(target=run_cli_with_progress) cli_thread.daemon = True cli_thread.start() return jsonify({ 'success': True, 'message': 'Reset and regenerate process started. Check progress endpoint for updates.', 'deleted_items': deleted_items }) except Exception as e: progress_data.update({ 'status': 'error', 'message': f'Error during reset and regenerate: {str(e)}', 'progress': 0, 'current_step': 'Error' }) return jsonify({'error': f'Error during reset and regenerate: {str(e)}'}), 500 @app.route('/api/progress') def get_progress(): """Get current progress of reset and regenerate process.""" global progress_data return jsonify(progress_data) @app.route('/api/progress/reset', methods=['POST']) def reset_progress_endpoint(): """Reset progress data to idle state.""" reset_progress() return jsonify({'success': True, 'message': 'Progress reset to idle'}) @app.route('/api/progress/stream') def progress_stream(): """Server-Sent Events endpoint for real-time progress updates.""" def generate(): global progress_data last_data = None while True: current_data = progress_data.copy() # Only send if data has changed if current_data != last_data: yield f"data: {json.dumps(current_data)}\n\n" last_data = current_data.copy() # If process is complete or error, stop streaming if current_data['status'] in ['completed', 'error']: break time.sleep(1) # Update every second return app.response_class( generate(), mimetype='text/plain' ) if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=5002)