KaraokeMerge/web/app.py

1726 lines
66 KiB
Python

#!/usr/bin/env python3
"""
Web UI for Karaoke Song Library Cleanup Tool
Provides interactive interface for reviewing duplicates and making decisions.
"""
from flask import Flask, render_template, jsonify, request, send_from_directory
import json
import os
import time
from typing import Dict, List, Any
from datetime import datetime
# Import playlist validator
import sys
sys.path.append('../cli')
from playlist_validator import PlaylistValidator
app = Flask(__name__)
# Configuration
CONFIG_FILE = '../config/config.json'
# Global variable to store progress
progress_data = {
'status': 'idle',
'message': '',
'progress': 0,
'current_step': '',
'cli_output': []
}
def reset_progress():
"""Reset progress data to idle state."""
global progress_data
progress_data = {
'status': 'idle',
'message': '',
'progress': 0,
'current_step': '',
'cli_output': []
}
def load_json_file(file_path: str) -> Any:
"""Load JSON file safely."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Error loading {file_path}: {e}")
return None
def get_data_directory():
"""Get data directory from config."""
config = load_json_file(CONFIG_FILE)
if config and 'data_directory' in config:
# When running from web/ directory, we need to go up one level
data_dir = config['data_directory']
if not os.path.isabs(data_dir):
return os.path.join('..', data_dir)
return data_dir
return '../data'
DATA_DIR = get_data_directory()
REPORTS_DIR = os.path.join(DATA_DIR, 'reports')
def get_duplicate_groups(skip_songs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Group skip songs by artist/title to show duplicates together."""
duplicate_groups = {}
for skip_song in skip_songs:
artist = skip_song.get('artist', 'Unknown')
title = skip_song.get('title', 'Unknown')
key = f"{artist} - {title}"
if key not in duplicate_groups:
duplicate_groups[key] = {
'artist': artist,
'title': title,
'kept_version': skip_song.get('kept_version', 'Unknown'),
'skipped_versions': [],
'total_duplicates': 0
}
duplicate_groups[key]['skipped_versions'].append({
'path': skip_song['path'],
'reason': skip_song.get('reason', 'duplicate'),
'file_type': get_file_type(skip_song['path']),
'channel': extract_channel(skip_song['path'])
})
duplicate_groups[key]['total_duplicates'] = len(duplicate_groups[key]['skipped_versions'])
# Convert to list and sort by artist first, then by title
groups_list = list(duplicate_groups.values())
groups_list.sort(key=lambda x: (x['artist'].lower(), x['title'].lower()))
return groups_list
def generate_mp3_song_list(all_songs: List[Dict[str, Any]], skip_songs: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Generate a list of MP3 songs that remain after cleanup."""
# Create a set of paths that are being skipped
skip_paths = {song['path'] for song in skip_songs}
# Filter for MP3/CDG songs that are NOT being skipped
mp3_songs = []
for song in all_songs:
path = song.get('path', '')
if path.lower().endswith(('.mp3', '.cdg')) and path not in skip_paths:
mp3_songs.append({
'position': len(mp3_songs) + 1,
'title': song.get('title', 'Unknown'),
'artist': song.get('artist', 'Unknown')
})
# Sort by artist, then by title
mp3_songs.sort(key=lambda x: (x['artist'].lower(), x['title'].lower()))
# Update positions after sorting
for i, song in enumerate(mp3_songs):
song['position'] = i + 1
return {
'title': 'MP3 Songs need to be found',
'songs': mp3_songs
}
def get_file_type(path: str) -> str:
"""Extract file type from path."""
path_lower = path.lower()
if path_lower.endswith('.mp4'):
return 'MP4'
elif path_lower.endswith('.mp3'):
return 'MP3'
elif path_lower.endswith('.cdg'):
return 'MP3' # Treat CDG as MP3 since they're paired
return 'Unknown'
def extract_channel(path: str) -> str:
"""Extract channel name from path with consolidated MP3/CDG handling."""
path_lower = path.lower()
parts = path.split('\\')
# Handle MP3/CDG files - consolidate all CDG folder content under "MP3"
if path.lower().endswith(('.mp3', '.cdg')):
# Look for CDG folder structure: CDG/ParentFolder/SubFolder/file.mp3
for i, part in enumerate(parts):
if part.lower() == 'cdg' and i < len(parts) - 1:
# If CDG is found, return "MP3" as the consolidated channel
return 'MP3'
# If not in CDG folder but still MP3/CDG, return "MP3"
return 'MP3'
# Handle MP4 files - keep existing channel logic
# Look for specific known channels first
known_channels = ['Sing King Karaoke', 'KaraFun Karaoke', 'Stingray Karaoke']
for channel in known_channels:
if channel.lower() in path_lower:
return channel
# Look for MP4 folder structure: MP4/ChannelName/song.mp4
for i, part in enumerate(parts):
if part.lower() == 'mp4' and i < len(parts) - 1:
# If MP4 is found, return the next folder (the actual channel)
if i + 1 < len(parts):
next_part = parts[i + 1]
# Check if the next part is a folder (no file extension) or a file
if '.' not in next_part:
return next_part # It's a folder, so it's the channel name
else:
return 'MP4 Root' # File is directly in MP4 folder
else:
return 'MP4 Root'
# Look for any folder that contains 'karaoke' (fallback)
for part in parts:
if 'karaoke' in part.lower() and '.' not in part:
return part
# If no specific channel found, return the folder containing the file
if len(parts) >= 2:
parent_folder = parts[-2] # Second to last part (folder containing the file)
# If parent folder is MP4, then file is in root
if parent_folder.lower() == 'mp4':
return 'MP4 Root'
# Make sure we're returning a folder name, not a filename
if '.' not in parent_folder:
return parent_folder
else:
# If parent is also a file, go up one more level
if len(parts) >= 3:
grandparent = parts[-3]
if '.' not in grandparent:
return grandparent
return 'Unknown'
def normalize_path(file_path: str) -> str:
"""Normalize malformed file paths that have been corrupted with ://."""
# Debug logging to track path transformation - show original path first
print(f"DEBUG: normalize_path ORIGINAL PATH: {file_path}")
# Fix malformed paths that have been corrupted with ://
# Since we fixed the HTML string literal escaping, paths should now come in correctly
# Just handle the basic :// to :\ conversion
if '://' in file_path:
print(f"DEBUG: Detected malformed path with ://, attempting to fix: {file_path}")
# Clean up special characters in the path first - but be more conservative
import re
# Only remove actual control characters, not legitimate path characters
file_path = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', file_path) # Remove only control characters
file_path = re.sub(r'\s+', ' ', file_path) # Normalize multiple spaces to single space
file_path = file_path.strip() # Remove leading/trailing spaces
# Simple fix: replace :// with :\
fixed_path = file_path.replace('://', ':\\')
print(f"DEBUG: Fixed path (simple :// to :\\ conversion): {fixed_path}")
return fixed_path
# If no :// corruption detected, return the path as-is
print(f"DEBUG: normalize_path output (no changes): {file_path}")
return file_path
@app.route('/')
def index():
"""Main dashboard page."""
return render_template('index.html')
@app.route('/remaining-songs')
def remaining_songs():
"""Page showing remaining songs after cleanup."""
return render_template('remaining_songs.html')
@app.route('/favorites')
def favorites():
"""Page showing favorites with matching songs."""
return render_template('favorites.html')
@app.route('/history')
def history():
"""Page showing history with matching songs."""
return render_template('history.html')
@app.route('/api/duplicates')
def get_duplicates():
"""API endpoint to get duplicate data."""
# Try to load detailed skip songs first, fallback to basic skip list
skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'))
if not skip_songs:
skip_songs = load_json_file(os.path.join(DATA_DIR, 'skipSongs.json'))
if not skip_songs:
return jsonify({'error': 'No skip songs data found'}), 404
duplicate_groups = get_duplicate_groups(skip_songs)
# Apply filters
artist_filter = request.args.get('artist', '').lower()
title_filter = request.args.get('title', '').lower()
channel_filter = request.args.get('channel', '').lower()
file_type_filter = request.args.get('file_type', '').lower()
min_duplicates = int(request.args.get('min_duplicates', 0))
filtered_groups = []
for group in duplicate_groups:
# Apply filters
if artist_filter and artist_filter not in group['artist'].lower():
continue
if title_filter and title_filter not in group['title'].lower():
continue
if group['total_duplicates'] < min_duplicates:
continue
# Check if any version (kept or skipped) matches channel/file_type filters
if channel_filter or file_type_filter:
matches_filter = False
# Special handling for mp3-only filter
if file_type_filter == 'mp3-only':
# Check if kept version is MP3 and there are no MP4 alternatives
kept_file_type = get_file_type(group['kept_version'])
has_mp4_alternative = False
# Check if any skipped version is MP4
for version in group['skipped_versions']:
if version['file_type'].upper() == 'MP4':
has_mp4_alternative = True
break
# Only show if kept version is MP3 and no MP4 alternatives exist
if kept_file_type.upper() != 'MP3' or has_mp4_alternative:
continue
# Apply channel filter if specified
if channel_filter:
kept_channel = extract_channel(group['kept_version'])
if channel_filter not in kept_channel.lower():
continue
elif file_type_filter == 'mp3':
# Special handling for MP3 filter - show songs where primary is MP3 and has alternatives
kept_file_type = get_file_type(group['kept_version'])
# Only show if kept version is MP3 and there are alternatives
if kept_file_type.upper() != 'MP3' or len(group['skipped_versions']) == 0:
continue
# Apply channel filter if specified
if channel_filter:
kept_channel = extract_channel(group['kept_version'])
if channel_filter not in kept_channel.lower():
continue
else:
# Regular file type and channel filtering
# Check kept version
kept_channel = extract_channel(group['kept_version'])
kept_file_type = get_file_type(group['kept_version'])
if (not channel_filter or channel_filter in kept_channel.lower()) and \
(not file_type_filter or file_type_filter in kept_file_type.lower()):
matches_filter = True
# Check skipped versions if kept version doesn't match
if not matches_filter:
for version in group['skipped_versions']:
if (not channel_filter or channel_filter in version['channel'].lower()) and \
(not file_type_filter or file_type_filter in version['file_type'].lower()):
matches_filter = True
break
if not matches_filter:
continue
filtered_groups.append(group)
# Pagination
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 50))
start_idx = (page - 1) * per_page
end_idx = start_idx + per_page
paginated_groups = filtered_groups[start_idx:end_idx]
return jsonify({
'duplicates': paginated_groups,
'total': len(filtered_groups),
'page': page,
'per_page': per_page,
'total_pages': (len(filtered_groups) + per_page - 1) // per_page
})
@app.route('/api/stats')
def get_stats():
"""API endpoint to get overall statistics."""
# Try to load detailed skip songs first, fallback to basic skip list
skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'))
if not skip_songs:
skip_songs = load_json_file(os.path.join(DATA_DIR, 'skipSongs.json'))
if not skip_songs:
return jsonify({'error': 'No skip songs data found'}), 404
# Load original all songs data to get total counts
all_songs = load_json_file(os.path.join(DATA_DIR, 'songs.json'))
if not all_songs:
all_songs = []
duplicate_groups = get_duplicate_groups(skip_songs)
# Calculate current statistics
total_duplicates = len(duplicate_groups)
total_files_to_skip = len(skip_songs)
# File type breakdown for skipped files
skip_file_types = {'MP4': 0, 'MP3': 0}
channels = {}
for group in duplicate_groups:
# Include kept version in channel stats
kept_channel = extract_channel(group['kept_version'])
channels[kept_channel] = channels.get(kept_channel, 0) + 1
# Include skipped versions
for version in group['skipped_versions']:
skip_file_types[version['file_type']] += 1
channel = version['channel']
channels[channel] = channels.get(channel, 0) + 1
# Calculate total file type breakdown from all songs
total_file_types = {'MP4': 0, 'MP3': 0}
total_songs = len(all_songs)
for song in all_songs:
file_type = get_file_type(song.get('path', ''))
if file_type in total_file_types:
total_file_types[file_type] += 1
# Calculate what will remain after skipping
remaining_file_types = {
'MP4': total_file_types['MP4'] - skip_file_types['MP4'],
'MP3': total_file_types['MP3'] - skip_file_types['MP3']
}
total_remaining = sum(remaining_file_types.values())
# Most duplicated songs
most_duplicated = sorted(duplicate_groups, key=lambda x: x['total_duplicates'], reverse=True)[:10]
return jsonify({
'total_songs': total_songs,
'total_duplicates': total_duplicates,
'total_files_to_skip': total_files_to_skip,
'total_remaining': total_remaining,
'total_file_types': total_file_types,
'skip_file_types': skip_file_types,
'remaining_file_types': remaining_file_types,
'channels': channels,
'most_duplicated': most_duplicated
})
@app.route('/api/config')
def get_config():
"""API endpoint to get current configuration."""
config = load_json_file(CONFIG_FILE)
return jsonify(config or {})
@app.route('/api/save-changes', methods=['POST'])
def save_changes():
"""API endpoint to save user changes to the skip list."""
try:
data = request.get_json()
changes = data.get('changes', [])
# Load current skip list
skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'))
if not skip_songs:
return jsonify({'error': 'No skip songs data found'}), 404
# Apply changes
for change in changes:
change_type = change.get('type')
song_key = change.get('song_key') # artist - title
file_path = change.get('file_path')
if change_type == 'keep_file':
# Remove this file from skip list
skip_songs = [s for s in skip_songs if s['path'] != file_path]
elif change_type == 'skip_file':
# Add this file to skip list
new_entry = {
'path': file_path,
'reason': 'manual_skip',
'artist': change.get('artist'),
'title': change.get('title'),
'kept_version': change.get('kept_version')
}
skip_songs.append(new_entry)
# Save updated skip list
backup_path = os.path.join(DATA_DIR, 'reports', f'skip_songs_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json')
import shutil
shutil.copy2(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'), backup_path)
with open(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'), 'w', encoding='utf-8') as f:
json.dump(skip_songs, f, indent=2, ensure_ascii=False)
return jsonify({
'success': True,
'message': f'Changes saved successfully. Backup created at: {backup_path}',
'total_files': len(skip_songs)
})
except Exception as e:
return jsonify({'error': f'Error saving changes: {str(e)}'}), 500
@app.route('/api/artists')
def get_artists():
"""API endpoint to get list of all artists for grouping."""
skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'))
if not skip_songs:
return jsonify({'error': 'No skip songs data found'}), 404
duplicate_groups = get_duplicate_groups(skip_songs)
# Group by artist
artists = {}
for group in duplicate_groups:
artist = group['artist']
if artist not in artists:
artists[artist] = {
'name': artist,
'songs': [],
'total_duplicates': 0
}
artists[artist]['songs'].append(group)
artists[artist]['total_duplicates'] += group['total_duplicates']
# Convert to list and sort by artist name
artists_list = list(artists.values())
artists_list.sort(key=lambda x: x['name'].lower())
return jsonify({
'artists': artists_list,
'total_artists': len(artists_list)
})
@app.route('/api/mp3-songs')
def get_mp3_songs():
"""API endpoint to get MP3 songs that remain after cleanup."""
# Load all songs and skip songs
all_songs = load_json_file(os.path.join(DATA_DIR, 'songs.json'))
skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'))
if not all_songs:
return jsonify({'error': 'No all songs data found'}), 404
if not skip_songs:
skip_songs = []
# Generate MP3 song list
mp3_song_list = generate_mp3_song_list(all_songs, skip_songs)
return jsonify(mp3_song_list)
@app.route('/api/remaining-songs')
def get_remaining_songs():
"""Get all remaining songs (MP4 and MP3) after cleanup with pagination."""
try:
all_songs = load_json_file(os.path.join(DATA_DIR, 'songs.json'))
skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'))
if not all_songs:
return jsonify({'error': 'No all songs data found'}), 404
if not skip_songs:
skip_songs = []
# Get pagination parameters
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 50, type=int)
search = request.args.get('search', '').lower()
file_type_filter = request.args.get('file_type', 'all')
artist_filter = request.args.get('artist', '')
# Create a set of paths that are being skipped
skip_paths = {song['path'] for song in skip_songs}
# Filter for songs that are NOT being skipped
remaining_songs = []
for song in all_songs:
path = song.get('path', '')
if path not in skip_paths:
# Apply file type filter
if file_type_filter != 'all':
if file_type_filter == 'mp4' and not path.lower().endswith('.mp4'):
continue
elif file_type_filter == 'mp3' and not path.lower().endswith(('.mp3', '.cdg')):
continue
# Apply search filter
if search:
title = song.get('title', '').lower()
artist = song.get('artist', '').lower()
if search not in title and search not in artist:
continue
# Apply artist filter
if artist_filter:
artist = song.get('artist', '').lower()
if artist_filter.lower() not in artist:
continue
remaining_songs.append({
'title': song.get('title', 'Unknown'),
'artist': song.get('artist', 'Unknown'),
'path': song.get('path', ''),
'file_type': get_file_type(song.get('path', '')),
'channel': extract_channel(song.get('path', ''))
})
# Sort by artist, then by title
remaining_songs.sort(key=lambda x: (x['artist'].lower(), x['title'].lower()))
# Calculate pagination
total_songs = len(remaining_songs)
total_pages = (total_songs + per_page - 1) // per_page
start_idx = (page - 1) * per_page
end_idx = start_idx + per_page
# Get songs for current page
page_songs = remaining_songs[start_idx:end_idx]
return jsonify({
'songs': page_songs,
'pagination': {
'current_page': page,
'per_page': per_page,
'total_songs': total_songs,
'total_pages': total_pages
}
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/download/mp3-songs')
def download_mp3_songs():
"""Download MP3 songs list as JSON file."""
# Load all songs and skip songs
all_songs = load_json_file(os.path.join(DATA_DIR, 'songs.json'))
skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'))
if not all_songs:
return jsonify({'error': 'No all songs data found'}), 404
if not skip_songs:
skip_songs = []
# Generate MP3 song list
mp3_song_list = generate_mp3_song_list(all_songs, skip_songs)
# Save to file
output_path = os.path.join(DATA_DIR, 'reports', 'mp3SongList.json')
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(mp3_song_list, f, indent=2, ensure_ascii=False)
# Return the file for download
return send_from_directory(
os.path.join(DATA_DIR, 'reports'),
'mp3SongList.json',
as_attachment=True,
download_name='mp3SongList.json'
)
@app.route('/api/save-priority-preferences', methods=['POST'])
def save_priority_preferences():
"""API endpoint to save user priority preferences."""
try:
data = request.get_json()
priority_changes = data.get('priority_changes', {})
if not priority_changes:
return jsonify({'error': 'No priority changes provided'}), 400
# Create preferences directory if it doesn't exist
preferences_dir = os.path.join(DATA_DIR, 'preferences')
os.makedirs(preferences_dir, exist_ok=True)
# Load existing preferences
preferences_file = os.path.join(preferences_dir, 'priority_preferences.json')
existing_preferences = {}
if os.path.exists(preferences_file):
with open(preferences_file, 'r', encoding='utf-8') as f:
existing_preferences = json.load(f)
# Update with new preferences
existing_preferences.update(priority_changes)
# Save updated preferences
with open(preferences_file, 'w', encoding='utf-8') as f:
json.dump(existing_preferences, f, indent=2, ensure_ascii=False)
# Create backup
backup_path = os.path.join(preferences_dir, f'priority_preferences_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json')
with open(backup_path, 'w', encoding='utf-8') as f:
json.dump(existing_preferences, f, indent=2, ensure_ascii=False)
return jsonify({
'success': True,
'message': f'Saved {len(priority_changes)} priority preferences. Backup created at: {backup_path}',
'total_preferences': len(existing_preferences)
})
except Exception as e:
return jsonify({'error': f'Error saving priority preferences: {str(e)}'}), 500
@app.route('/api/reset-priority-preferences', methods=['POST'])
def reset_priority_preferences():
"""API endpoint to reset all priority preferences."""
try:
preferences_dir = os.path.join(DATA_DIR, 'preferences')
preferences_file = os.path.join(preferences_dir, 'priority_preferences.json')
if os.path.exists(preferences_file):
# Create backup before deletion
backup_path = os.path.join(preferences_dir, f'priority_preferences_reset_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json')
import shutil
shutil.copy2(preferences_file, backup_path)
# Delete the preferences file
os.remove(preferences_file)
return jsonify({
'success': True,
'message': f'Priority preferences reset successfully. Backup created at: {backup_path}'
})
else:
return jsonify({
'success': True,
'message': 'No priority preferences found to reset'
})
except Exception as e:
return jsonify({'error': f'Error resetting priority preferences: {str(e)}'}), 500
@app.route('/api/load-priority-preferences')
def load_priority_preferences():
"""API endpoint to load current priority preferences."""
try:
preferences_file = os.path.join(DATA_DIR, 'preferences', 'priority_preferences.json')
if os.path.exists(preferences_file):
with open(preferences_file, 'r', encoding='utf-8') as f:
preferences = json.load(f)
return jsonify({
'success': True,
'preferences': preferences
})
else:
return jsonify({
'success': True,
'preferences': {}
})
except Exception as e:
return jsonify({'error': f'Error loading priority preferences: {str(e)}'}), 500
def find_matching_songs(item: Dict[str, Any], all_songs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Find all songs from songs.json that match a given item (favorite/history)."""
matching_songs = []
# Validate input
if item is None or not isinstance(item, dict):
print(f"Warning: Invalid item passed to find_matching_songs: {item}")
return matching_songs
item_artist = item.get('artist', '').lower().strip()
item_title = item.get('title', '').lower().strip()
for song in all_songs:
# Skip None songs
if song is None or not isinstance(song, dict):
continue
song_artist = song.get('artist', '').lower().strip()
song_title = song.get('title', '').lower().strip()
# Exact match on artist and title
if song_artist == item_artist and song_title == item_title:
matching_songs.append({
'path': song.get('path', ''),
'artist': song.get('artist', 'Unknown'),
'title': song.get('title', 'Unknown'),
'file_type': get_file_type(song.get('path', '')),
'channel': extract_channel(song.get('path', '')),
'is_current': song.get('path', '') == item.get('path', '')
})
# Sort by priority: current path first, then by file type priority
def sort_key(song):
# Current path gets highest priority
if song['is_current']:
return (0, 0)
# File type priority: MP4 > CDG/MP3 > standalone MP3 > standalone CDG
file_type_priority = {
'MP4': 1,
'MP3': 2,
'Unknown': 3
}
return (1, file_type_priority.get(song['file_type'], 3))
matching_songs.sort(key=sort_key)
return matching_songs
@app.route('/api/favorites')
def get_favorites():
"""Get favorites data with matching songs for each favorite."""
try:
# Load data files
favorites_file = os.path.join(DATA_DIR, 'favorites.json')
all_songs_file = os.path.join(DATA_DIR, 'songs.json')
favorites = load_json_file(favorites_file)
all_songs = load_json_file(all_songs_file)
if not favorites or not all_songs:
return jsonify({'error': 'Failed to load data files'}), 500
# Process each favorite
processed_favorites = []
for i, favorite in enumerate(favorites):
# Skip None items or items without required fields
if favorite is None:
print(f"Warning: Skipping None item at index {i} in favorites")
continue
if not isinstance(favorite, dict):
print(f"Warning: Skipping non-dict item at index {i} in favorites: {type(favorite)}")
continue
try:
matching_songs = find_matching_songs(favorite, all_songs)
processed_favorite = {
'index': i,
'original': favorite,
'matching_songs': matching_songs,
'artist': favorite.get('artist', 'Unknown'),
'title': favorite.get('title', 'Unknown'),
'current_path': favorite.get('path', ''),
'count': favorite.get('count', 0),
'favorite': favorite.get('favorite', False),
'disabled': favorite.get('disabled', False)
}
processed_favorites.append(processed_favorite)
except Exception as item_error:
print(f"Error processing favorite item {i}: {item_error}")
print(f"Item data: {favorite}")
continue
# Sort by artist, then by title
processed_favorites.sort(key=lambda x: (x['artist'].lower(), x['title'].lower()))
return jsonify({
'success': True,
'favorites': processed_favorites,
'total': len(processed_favorites)
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/history')
def get_history():
"""Get history data with matching songs for each history item."""
try:
# Load data files
history_file = os.path.join(DATA_DIR, 'history.json')
all_songs_file = os.path.join(DATA_DIR, 'songs.json')
history = load_json_file(history_file)
all_songs = load_json_file(all_songs_file)
if not history or not all_songs:
return jsonify({'error': 'Failed to load data files'}), 500
# Process each history item
processed_history = []
for i, item in enumerate(history):
# Skip None items or items without required fields
if item is None:
print(f"Warning: Skipping None item at index {i} in history")
continue
if not isinstance(item, dict):
print(f"Warning: Skipping non-dict item at index {i} in history: {type(item)}")
continue
try:
matching_songs = find_matching_songs(item, all_songs)
processed_item = {
'index': i,
'original': item,
'matching_songs': matching_songs,
'artist': item.get('artist', 'Unknown'),
'title': item.get('title', 'Unknown'),
'current_path': item.get('path', ''),
'count': item.get('count', 0),
'favorite': item.get('favorite', False),
'disabled': item.get('disabled', False)
}
processed_history.append(processed_item)
except Exception as item_error:
print(f"Error processing history item {i}: {item_error}")
print(f"Item data: {item}")
continue
# Sort by artist, then by title
processed_history.sort(key=lambda x: (x['artist'].lower(), x['title'].lower()))
return jsonify({
'success': True,
'history': processed_history,
'total': len(processed_history)
})
except Exception as e:
print(f"Error in get_history: {e}")
import traceback
print(f"Full traceback: {traceback.format_exc()}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/update-favorite-path', methods=['POST'])
def update_favorite_path():
"""Update the path of a favorite item."""
try:
data = request.get_json()
favorite_index = data.get('index')
new_path = data.get('path')
if favorite_index is None or new_path is None:
return jsonify({'success': False, 'error': 'Missing index or path'}), 400
# Load favorites file
favorites_file = os.path.join(DATA_DIR, 'favorites.json')
favorites = load_json_file(favorites_file)
if not favorites or favorite_index >= len(favorites):
return jsonify({'success': False, 'error': 'Invalid favorite index'}), 400
# Update the path
favorites[favorite_index]['path'] = new_path
# Save back to file
with open(favorites_file, 'w', encoding='utf-8') as f:
json.dump(favorites, f, indent=2, ensure_ascii=False)
return jsonify({'success': True})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/update-history-path', methods=['POST'])
def update_history_path():
"""Update the path of a history item."""
try:
data = request.get_json()
history_index = data.get('index')
new_path = data.get('path')
if history_index is None or new_path is None:
return jsonify({'success': False, 'error': 'Missing index or path'}), 400
# Load history file
history_file = os.path.join(DATA_DIR, 'history.json')
history = load_json_file(history_file)
if not history or history_index >= len(history):
return jsonify({'success': False, 'error': 'Invalid history index'}), 400
# Update the path
history[history_index]['path'] = new_path
# Save back to file
with open(history_file, 'w', encoding='utf-8') as f:
json.dump(history, f, indent=2, ensure_ascii=False)
return jsonify({'success': True})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/update-favorite-property', methods=['POST'])
def update_favorite_property():
"""Update any property of a favorite item."""
try:
data = request.get_json()
index = data.get('index')
property_name = data.get('property')
new_value = data.get('value')
if index is None or property_name is None or new_value is None:
return jsonify({'success': False, 'error': 'Missing index, property, or value'}), 400
# Load current favorites
favorites_file = os.path.join(DATA_DIR, 'favorites.json')
if not os.path.exists(favorites_file):
return jsonify({'success': False, 'error': 'Favorites file not found'}), 404
with open(favorites_file, 'r', encoding='utf-8') as f:
favorites = json.load(f)
# Update the property
if 0 <= index < len(favorites):
favorites[index][property_name] = new_value
# Special handling for path updates
if property_name == 'path' and 'original_path' in favorites[index]:
# Update original_path if it matches the old path
if favorites[index]['original_path'] == favorites[index].get('path'):
favorites[index]['original_path'] = new_value
# Save updated favorites
with open(favorites_file, 'w', encoding='utf-8') as f:
json.dump(favorites, f, indent=2, ensure_ascii=False)
return jsonify({'success': True})
else:
return jsonify({'success': False, 'error': 'Invalid index'}), 400
except Exception as e:
print(f"Error updating favorite property: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/update-history-property', methods=['POST'])
def update_history_property():
"""Update any property of a history item."""
try:
data = request.get_json()
index = data.get('index')
property_name = data.get('property')
new_value = data.get('value')
if index is None or property_name is None or new_value is None:
return jsonify({'success': False, 'error': 'Missing index, property, or value'}), 400
# Load current history
history_file = os.path.join(DATA_DIR, 'history.json')
if not os.path.exists(history_file):
return jsonify({'success': False, 'error': 'History file not found'}), 404
with open(history_file, 'r', encoding='utf-8') as f:
history = json.load(f)
# Update the property
if 0 <= index < len(history):
history[index][property_name] = new_value
# Special handling for path updates
if property_name == 'path' and 'original_path' in history[index]:
# Update original_path if it matches the old path
if history[index]['original_path'] == history[index].get('path'):
history[index]['original_path'] = new_value
# Save updated history
with open(history_file, 'w', encoding='utf-8') as f:
json.dump(history, f, indent=2, ensure_ascii=False)
return jsonify({'success': True})
else:
return jsonify({'success': False, 'error': 'Invalid index'}), 400
except Exception as e:
print(f"Error updating history property: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/delete-favorite', methods=['POST'])
def delete_favorite():
"""Delete a favorite item by index."""
try:
data = request.get_json()
index = data.get('index')
if index is None:
return jsonify({'success': False, 'error': 'Missing index'}), 400
# Load current favorites
favorites_file = os.path.join(DATA_DIR, 'favorites.json')
if not os.path.exists(favorites_file):
return jsonify({'success': False, 'error': 'Favorites file not found'}), 404
with open(favorites_file, 'r', encoding='utf-8') as f:
favorites = json.load(f)
# Delete the item at the specified index
if 0 <= index < len(favorites):
deleted_item = favorites.pop(index)
# Save updated favorites
with open(favorites_file, 'w', encoding='utf-8') as f:
json.dump(favorites, f, indent=2, ensure_ascii=False)
return jsonify({
'success': True,
'deleted_item': deleted_item,
'message': f'Deleted "{deleted_item.get("artist", "Unknown")} - {deleted_item.get("title", "Unknown")}"'
})
else:
return jsonify({'success': False, 'error': 'Invalid index'}), 400
except Exception as e:
print(f"Error deleting favorite: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/delete-history', methods=['POST'])
def delete_history():
"""Delete a history item by index."""
try:
data = request.get_json()
index = data.get('index')
if index is None:
return jsonify({'success': False, 'error': 'Missing index'}), 400
# Load current history
history_file = os.path.join(DATA_DIR, 'history.json')
if not os.path.exists(history_file):
return jsonify({'success': False, 'error': 'History file not found'}), 404
with open(history_file, 'r', encoding='utf-8') as f:
history = json.load(f)
# Delete the item at the specified index
if 0 <= index < len(history):
deleted_item = history.pop(index)
# Save updated history
with open(history_file, 'w', encoding='utf-8') as f:
json.dump(history, f, indent=2, ensure_ascii=False)
return jsonify({
'success': True,
'deleted_item': deleted_item,
'message': f'Deleted "{deleted_item.get("artist", "Unknown")} - {deleted_item.get("title", "Unknown")}"'
})
else:
return jsonify({'success': False, 'error': 'Invalid index'}), 400
except Exception as e:
print(f"Error deleting history item: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/merge-history', methods=['POST'])
def merge_history():
"""Merge history objects that match on artist, title, and path, summing their count properties."""
try:
# Load current history
history_file = os.path.join(DATA_DIR, 'history.json')
if not os.path.exists(history_file):
return jsonify({'success': False, 'error': 'History file not found'}), 404
with open(history_file, 'r', encoding='utf-8') as f:
history_items = json.load(f)
if not history_items:
return jsonify({'success': False, 'error': 'No history items found to merge'}), 400
# Create a dictionary to group items by artist, title, and path
grouped_items = {}
merged_count = 0
total_merged_entries = 0
for item in history_items:
if not isinstance(item, dict):
continue
artist = item.get('artist', '').strip()
title = item.get('title', '').strip()
path = item.get('path', '').strip()
if not artist or not title or not path:
continue
# Create a key for grouping
key = (artist.lower(), title.lower(), path.lower())
if key not in grouped_items:
grouped_items[key] = []
grouped_items[key].append(item)
# Process groups with multiple items
merged_items = []
for key, items in grouped_items.items():
if len(items) == 1:
# Single item, keep as is
merged_items.append(items[0])
else:
# Multiple items, merge them
artist, title, path = key
# Start with the first item as the base
merged_item = items[0].copy()
# Sum the counts (handle both int and string values)
total_count = 0
for item in items:
count_value = item.get('count', 0)
if isinstance(count_value, str):
try:
total_count += int(count_value)
except ValueError:
total_count += 0
else:
total_count += count_value
merged_item['count'] = total_count
# For boolean properties, if any are True, keep True
merged_item['favorite'] = any(item.get('favorite', False) for item in items)
merged_item['disabled'] = any(item.get('disabled', False) for item in items)
# For other properties, keep the first non-empty value
for prop in ['key', 'original_path', 'genre']:
if prop in merged_item and merged_item[prop]:
continue
for item in items[1:]: # Skip first item since we already have it
if item.get(prop):
merged_item[prop] = item[prop]
break
merged_items.append(merged_item)
merged_count += 1
total_merged_entries += len(items)
# Save the merged history
with open(history_file, 'w', encoding='utf-8') as f:
json.dump(merged_items, f, indent=2, ensure_ascii=False)
return jsonify({
'success': True,
'merged_groups': merged_count,
'total_merged_entries': total_merged_entries,
'final_count': len(merged_items),
'message': f'Successfully merged {merged_count} groups ({total_merged_entries} total entries → {len(merged_items)} entries)'
})
except Exception as e:
print(f"Error merging history objects: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/video/<path:file_path>')
def serve_video(file_path):
"""Serve video files for playback in the web UI."""
try:
# Decode the file path (it comes URL-encoded)
import urllib.parse
decoded_path = urllib.parse.unquote(file_path)
# Normalize the path to fix any malformed paths
normalized_path = normalize_path(decoded_path)
# Debug logging - show original path first
print(f"DEBUG: serve_video ORIGINAL PATH: {decoded_path}")
print(f"DEBUG: Normalized path: {normalized_path}")
print(f"DEBUG: Current working directory: {os.getcwd()}")
print(f"DEBUG: Path transformation: '{decoded_path}' -> '{normalized_path}'")
# Security check: ensure the path is within allowed directories
# This prevents directory traversal attacks
if '..' in normalized_path:
print(f"DEBUG: Security check failed - path contains '..'")
return jsonify({'error': 'Invalid file path'}), 400
# On Windows, allow absolute paths with drive letters
# On Unix-like systems, block absolute paths
if os.name == 'nt': # Windows
if normalized_path.startswith('/') and not normalized_path[1:].startswith(':'):
print(f"DEBUG: Security check failed - Unix-style absolute path on Windows")
return jsonify({'error': 'Invalid file path'}), 400
else: # Unix-like systems
if normalized_path.startswith('/'):
print(f"DEBUG: Security check failed - absolute path on Unix")
return jsonify({'error': 'Invalid file path'}), 400
# Check if file exists
if not os.path.exists(normalized_path):
print(f"DEBUG: File does not exist: {normalized_path}")
# Try to list the directory to see what's available
try:
directory = os.path.dirname(normalized_path)
if os.path.exists(directory):
files_in_dir = os.listdir(directory)
print(f"DEBUG: Files in directory {directory}: {files_in_dir[:10]}...") # Show first 10 files
else:
print(f"DEBUG: Directory does not exist: {directory}")
except Exception as e:
print(f"DEBUG: Error listing directory: {e}")
return jsonify({'error': 'Video file not found'}), 404
# Check if it's a video file and determine MIME type
file_extension = os.path.splitext(normalized_path)[1].lower()
mime_types = {
'.mp4': 'video/mp4',
'.avi': 'video/x-msvideo',
'.mkv': 'video/x-matroska',
'.mov': 'video/quicktime',
'.wmv': 'video/x-ms-wmv',
'.flv': 'video/x-flv',
'.webm': 'video/webm'
}
if file_extension not in mime_types:
print(f"DEBUG: Invalid file type: {normalized_path}")
return jsonify({'error': 'Invalid file type'}), 400
mime_type = mime_types[file_extension]
# Get file info for debugging
file_size = os.path.getsize(normalized_path)
print(f"DEBUG: File exists, size: {file_size} bytes")
print(f"DEBUG: MIME type: {mime_type}")
# Serve the video file
directory = os.path.dirname(normalized_path)
filename = os.path.basename(normalized_path)
print(f"DEBUG: Serving from directory: {directory}")
print(f"DEBUG: Filename: {filename}")
# Add headers for better video streaming
response = send_from_directory(
directory,
filename,
mimetype=mime_type
)
# Add CORS headers to allow cross-origin requests
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'GET, HEAD, OPTIONS'
response.headers['Access-Control-Allow-Headers'] = 'Range'
# Add cache control headers
response.headers['Cache-Control'] = 'public, max-age=3600'
return response
except Exception as e:
print(f"DEBUG: Exception in serve_video: {str(e)}")
import traceback
print(f"DEBUG: Full traceback: {traceback.format_exc()}")
return jsonify({'error': f'Error serving video: {str(e)}'}), 500
# Playlist Validation Endpoints
@app.route('/playlist-validation')
def playlist_validation():
"""Playlist validation page."""
return render_template('playlist_validation.html')
@app.route('/api/playlists')
def get_playlists():
"""Get list of all playlists."""
try:
playlists_path = os.path.join(DATA_DIR, 'songList.json')
with open(playlists_path, 'r', encoding='utf-8') as f:
playlists = json.load(f)
# Return basic playlist info
playlist_list = []
for i, playlist in enumerate(playlists):
playlist_list.append({
'index': i,
'title': playlist.get('title', 'Unknown Playlist'),
'song_count': len(playlist.get('songs', []))
})
return jsonify(playlist_list)
except Exception as e:
return jsonify({'error': f'Error loading playlists: {str(e)}'}), 500
@app.route('/api/validate-playlist/<int:playlist_index>')
def validate_playlist(playlist_index):
"""Validate a specific playlist."""
try:
# Load configuration
config = load_json_file(CONFIG_FILE)
if not config:
return jsonify({'error': 'Could not load configuration'}), 500
# Create validator
validator = PlaylistValidator(config, DATA_DIR)
# Load playlists
playlists_path = os.path.join(DATA_DIR, 'songList.json')
with open(playlists_path, 'r', encoding='utf-8') as f:
playlists = json.load(f)
if playlist_index >= len(playlists):
return jsonify({'error': f'Invalid playlist index: {playlist_index}'}), 400
# Validate the playlist
result = validator.validate_playlist(playlists[playlist_index], dry_run=True)
return jsonify(result)
except Exception as e:
return jsonify({'error': f'Error validating playlist: {str(e)}'}), 500
@app.route('/api/validate-all-playlists')
def validate_all_playlists():
"""Validate all playlists."""
try:
# Load configuration
config = load_json_file(CONFIG_FILE)
if not config:
return jsonify({'error': 'Could not load configuration'}), 500
# Create validator
validator = PlaylistValidator(config, DATA_DIR)
# Validate all playlists
results = validator.validate_all_playlists(dry_run=True)
return jsonify(results)
except Exception as e:
return jsonify({'error': f'Error validating playlists: {str(e)}'}), 500
@app.route('/api/update-playlist-song', methods=['POST'])
def update_playlist_song():
"""Update a playlist song with corrected artist/title."""
try:
data = request.get_json()
playlist_index = data.get('playlist_index')
song_position = data.get('song_position')
new_artist = data.get('new_artist')
new_title = data.get('new_title')
dry_run = data.get('dry_run', True)
if any(x is None for x in [playlist_index, song_position, new_artist, new_title]):
return jsonify({'error': 'Missing required parameters'}), 400
# Load configuration
config = load_json_file(CONFIG_FILE)
if not config:
return jsonify({'error': 'Could not load configuration'}), 500
# Create validator
validator = PlaylistValidator(config, DATA_DIR)
# Update the song
success = validator.update_playlist_song(
playlist_index, song_position, new_artist, new_title, dry_run
)
if success:
return jsonify({'success': True, 'message': 'Playlist song updated successfully'})
else:
return jsonify({'error': 'Failed to update playlist song'}), 500
except Exception as e:
return jsonify({'error': f'Error updating playlist song: {str(e)}'}), 500
@app.route('/api/apply-all-updates', methods=['POST'])
def apply_all_updates():
"""Apply all pending playlist updates in batch."""
try:
data = request.get_json()
changes = data.get('changes', [])
if not changes:
return jsonify({'error': 'No changes to apply'}), 400
# Load configuration
config = load_json_file(CONFIG_FILE)
if not config:
return jsonify({'error': 'Could not load configuration'}), 500
# Create validator
validator = PlaylistValidator(config, DATA_DIR)
# Apply all changes
success_count = 0
failed_count = 0
for change in changes:
success = validator.update_playlist_song(
change['playlistIndex'],
change['songPosition'],
change['newArtist'],
change['newTitle'],
False # Always apply changes (not dry run)
)
if success:
success_count += 1
else:
failed_count += 1
if failed_count == 0:
message = f'Successfully applied {success_count} changes to playlists.'
else:
message = f'Applied {success_count} changes, {failed_count} failed.'
return jsonify({
'success': True,
'message': message,
'applied': success_count,
'failed': failed_count
})
except Exception as e:
return jsonify({'error': f'Error applying updates: {str(e)}'}), 500
@app.route('/api/reset-and-regenerate', methods=['POST'])
def reset_and_regenerate():
"""Delete all generated files and run the CLI tool again."""
try:
import subprocess
import shutil
import os
import threading
# Reset progress data
reset_progress()
progress_data.update({
'status': 'starting',
'message': 'Initializing reset and regenerate process...',
'progress': 0,
'current_step': 'Initializing',
'cli_output': []
})
# Get the project root directory (parent of web directory)
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# Files and directories to delete
files_to_delete = [
os.path.join(DATA_DIR, 'skipSongs.json'),
os.path.join(DATA_DIR, 'reports'),
os.path.join(DATA_DIR, 'preferences')
]
deleted_items = []
# Update progress
progress_data.update({
'status': 'deleting',
'message': 'Deleting generated files...',
'progress': 10,
'current_step': 'Cleaning up old files'
})
# Delete files and directories
for item_path in files_to_delete:
if os.path.exists(item_path):
try:
if os.path.isfile(item_path):
os.remove(item_path)
deleted_items.append(f"File: {os.path.basename(item_path)}")
elif os.path.isdir(item_path):
shutil.rmtree(item_path)
deleted_items.append(f"Directory: {os.path.basename(item_path)}")
except Exception as e:
print(f"Warning: Could not delete {item_path}: {e}")
# Run the CLI tool
cli_dir = os.path.join(project_root, 'cli')
cli_script = os.path.join(cli_dir, 'main.py')
if not os.path.exists(cli_script):
progress_data.update({
'status': 'error',
'message': 'CLI script not found',
'progress': 0,
'current_step': 'Error'
})
return jsonify({'error': 'CLI script not found'}), 500
# Update progress
progress_data.update({
'status': 'running',
'message': 'Running CLI tool to analyze songs...',
'progress': 20,
'current_step': 'Running CLI Analysis'
})
# Change to CLI directory and run the tool
original_cwd = os.getcwd()
os.chdir(cli_dir)
def run_cli_with_progress():
global progress_data
try:
# Run the CLI tool with process-all flag and capture output in real-time
process = subprocess.Popen(
[sys.executable, 'main.py', '--process-all', '--verbose'],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
universal_newlines=True
)
# Read output line by line
line_count = 0
for line in iter(process.stdout.readline, ''):
if line:
line = line.strip()
progress_data['cli_output'].append(line)
line_count += 1
# Update progress based on output patterns
if 'Loading songs' in line or 'loading' in line.lower():
progress_data.update({
'progress': 30,
'current_step': 'Loading song library',
'message': f'Loading song library... ({line_count} lines processed)'
})
elif 'Finding duplicates' in line or 'duplicate' in line.lower():
progress_data.update({
'progress': 50,
'current_step': 'Finding duplicates',
'message': f'Finding duplicates... ({line_count} lines processed)'
})
elif 'Saving reports' in line or 'saving' in line.lower():
progress_data.update({
'progress': 80,
'current_step': 'Saving reports',
'message': f'Saving reports... ({line_count} lines processed)'
})
elif 'Complete' in line or 'Finished' in line or 'done' in line.lower():
progress_data.update({
'progress': 100,
'current_step': 'Complete',
'message': 'Process completed successfully!'
})
else:
# Update progress gradually based on line count
if line_count < 100:
progress_data.update({
'progress': min(20 + (line_count * 2), 30),
'current_step': 'Initializing',
'message': f'Initializing... ({line_count} lines processed)'
})
elif line_count < 500:
progress_data.update({
'progress': min(30 + ((line_count - 100) * 0.1), 50),
'current_step': 'Processing',
'message': f'Processing songs... ({line_count} lines processed)'
})
elif line_count < 1000:
progress_data.update({
'progress': min(50 + ((line_count - 500) * 0.05), 80),
'current_step': 'Analyzing',
'message': f'Analyzing duplicates... ({line_count} lines processed)'
})
else:
progress_data.update({
'progress': min(80 + ((line_count - 1000) * 0.02), 95),
'current_step': 'Finalizing',
'message': f'Finalizing... ({line_count} lines processed)'
})
process.stdout.close()
return_code = process.wait()
if return_code == 0:
progress_data.update({
'status': 'completed',
'message': f'✅ Reset and regeneration completed successfully!\n\nDeleted items:\n' + "\n".join(deleted_items),
'progress': 100,
'current_step': 'Complete'
})
else:
progress_data.update({
'status': 'error',
'message': f'CLI tool failed with return code {return_code}',
'progress': 0,
'current_step': 'Error'
})
except Exception as e:
progress_data.update({
'status': 'error',
'message': f'Error during CLI execution: {str(e)}',
'progress': 0,
'current_step': 'Error'
})
finally:
# Restore original working directory
os.chdir(original_cwd)
# Run CLI in a separate thread
cli_thread = threading.Thread(target=run_cli_with_progress)
cli_thread.daemon = True
cli_thread.start()
return jsonify({
'success': True,
'message': 'Reset and regenerate process started. Check progress endpoint for updates.',
'deleted_items': deleted_items
})
except Exception as e:
progress_data.update({
'status': 'error',
'message': f'Error during reset and regenerate: {str(e)}',
'progress': 0,
'current_step': 'Error'
})
return jsonify({'error': f'Error during reset and regenerate: {str(e)}'}), 500
@app.route('/api/progress')
def get_progress():
"""Get current progress of reset and regenerate process."""
global progress_data
return jsonify(progress_data)
@app.route('/api/progress/reset', methods=['POST'])
def reset_progress_endpoint():
"""Reset progress data to idle state."""
reset_progress()
return jsonify({'success': True, 'message': 'Progress reset to idle'})
@app.route('/api/progress/stream')
def progress_stream():
"""Server-Sent Events endpoint for real-time progress updates."""
def generate():
global progress_data
last_data = None
while True:
current_data = progress_data.copy()
# Only send if data has changed
if current_data != last_data:
yield f"data: {json.dumps(current_data)}\n\n"
last_data = current_data.copy()
# If process is complete or error, stop streaming
if current_data['status'] in ['completed', 'error']:
break
time.sleep(1) # Update every second
return app.response_class(
generate(),
mimetype='text/plain'
)
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5002)