1452 lines
56 KiB
Python
1452 lines
56 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Web UI for Karaoke Song Library Cleanup Tool
|
|
Provides interactive interface for reviewing duplicates and making decisions.
|
|
"""
|
|
|
|
from flask import Flask, render_template, jsonify, request, send_from_directory
|
|
import json
|
|
import os
|
|
from typing import Dict, List, Any
|
|
from datetime import datetime
|
|
|
|
# Import playlist validator
|
|
import sys
|
|
sys.path.append('../cli')
|
|
from playlist_validator import PlaylistValidator
|
|
|
|
app = Flask(__name__)
|
|
|
|
# Configuration
|
|
DATA_DIR = '../data'
|
|
REPORTS_DIR = os.path.join(DATA_DIR, 'reports')
|
|
CONFIG_FILE = '../config/config.json'
|
|
|
|
def load_json_file(file_path: str) -> Any:
|
|
"""Load JSON file safely."""
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
print(f"Error loading {file_path}: {e}")
|
|
return None
|
|
|
|
def get_duplicate_groups(skip_songs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""Group skip songs by artist/title to show duplicates together."""
|
|
duplicate_groups = {}
|
|
|
|
for skip_song in skip_songs:
|
|
artist = skip_song.get('artist', 'Unknown')
|
|
title = skip_song.get('title', 'Unknown')
|
|
key = f"{artist} - {title}"
|
|
|
|
if key not in duplicate_groups:
|
|
duplicate_groups[key] = {
|
|
'artist': artist,
|
|
'title': title,
|
|
'kept_version': skip_song.get('kept_version', 'Unknown'),
|
|
'skipped_versions': [],
|
|
'total_duplicates': 0
|
|
}
|
|
|
|
duplicate_groups[key]['skipped_versions'].append({
|
|
'path': skip_song['path'],
|
|
'reason': skip_song.get('reason', 'duplicate'),
|
|
'file_type': get_file_type(skip_song['path']),
|
|
'channel': extract_channel(skip_song['path'])
|
|
})
|
|
duplicate_groups[key]['total_duplicates'] = len(duplicate_groups[key]['skipped_versions'])
|
|
|
|
# Convert to list and sort by artist first, then by title
|
|
groups_list = list(duplicate_groups.values())
|
|
groups_list.sort(key=lambda x: (x['artist'].lower(), x['title'].lower()))
|
|
|
|
return groups_list
|
|
|
|
|
|
def generate_mp3_song_list(all_songs: List[Dict[str, Any]], skip_songs: List[Dict[str, Any]]) -> Dict[str, Any]:
|
|
"""Generate a list of MP3 songs that remain after cleanup."""
|
|
# Create a set of paths that are being skipped
|
|
skip_paths = {song['path'] for song in skip_songs}
|
|
|
|
# Filter for MP3/CDG songs that are NOT being skipped
|
|
mp3_songs = []
|
|
for song in all_songs:
|
|
path = song.get('path', '')
|
|
if path.lower().endswith(('.mp3', '.cdg')) and path not in skip_paths:
|
|
mp3_songs.append({
|
|
'position': len(mp3_songs) + 1,
|
|
'title': song.get('title', 'Unknown'),
|
|
'artist': song.get('artist', 'Unknown')
|
|
})
|
|
|
|
# Sort by artist, then by title
|
|
mp3_songs.sort(key=lambda x: (x['artist'].lower(), x['title'].lower()))
|
|
|
|
# Update positions after sorting
|
|
for i, song in enumerate(mp3_songs):
|
|
song['position'] = i + 1
|
|
|
|
return {
|
|
'title': 'MP3 Songs need to be found',
|
|
'songs': mp3_songs
|
|
}
|
|
|
|
def get_file_type(path: str) -> str:
|
|
"""Extract file type from path."""
|
|
path_lower = path.lower()
|
|
if path_lower.endswith('.mp4'):
|
|
return 'MP4'
|
|
elif path_lower.endswith('.mp3'):
|
|
return 'MP3'
|
|
elif path_lower.endswith('.cdg'):
|
|
return 'MP3' # Treat CDG as MP3 since they're paired
|
|
return 'Unknown'
|
|
|
|
def extract_channel(path: str) -> str:
|
|
"""Extract channel name from path with consolidated MP3/CDG handling."""
|
|
path_lower = path.lower()
|
|
parts = path.split('\\')
|
|
|
|
# Handle MP3/CDG files - consolidate all CDG folder content under "MP3"
|
|
if path.lower().endswith(('.mp3', '.cdg')):
|
|
# Look for CDG folder structure: CDG/ParentFolder/SubFolder/file.mp3
|
|
for i, part in enumerate(parts):
|
|
if part.lower() == 'cdg' and i < len(parts) - 1:
|
|
# If CDG is found, return "MP3" as the consolidated channel
|
|
return 'MP3'
|
|
|
|
# If not in CDG folder but still MP3/CDG, return "MP3"
|
|
return 'MP3'
|
|
|
|
# Handle MP4 files - keep existing channel logic
|
|
# Look for specific known channels first
|
|
known_channels = ['Sing King Karaoke', 'KaraFun Karaoke', 'Stingray Karaoke']
|
|
for channel in known_channels:
|
|
if channel.lower() in path_lower:
|
|
return channel
|
|
|
|
# Look for MP4 folder structure: MP4/ChannelName/song.mp4
|
|
for i, part in enumerate(parts):
|
|
if part.lower() == 'mp4' and i < len(parts) - 1:
|
|
# If MP4 is found, return the next folder (the actual channel)
|
|
if i + 1 < len(parts):
|
|
next_part = parts[i + 1]
|
|
# Check if the next part is a folder (no file extension) or a file
|
|
if '.' not in next_part:
|
|
return next_part # It's a folder, so it's the channel name
|
|
else:
|
|
return 'MP4 Root' # File is directly in MP4 folder
|
|
else:
|
|
return 'MP4 Root'
|
|
|
|
# Look for any folder that contains 'karaoke' (fallback)
|
|
for part in parts:
|
|
if 'karaoke' in part.lower() and '.' not in part:
|
|
return part
|
|
|
|
# If no specific channel found, return the folder containing the file
|
|
if len(parts) >= 2:
|
|
parent_folder = parts[-2] # Second to last part (folder containing the file)
|
|
# If parent folder is MP4, then file is in root
|
|
if parent_folder.lower() == 'mp4':
|
|
return 'MP4 Root'
|
|
# Make sure we're returning a folder name, not a filename
|
|
if '.' not in parent_folder:
|
|
return parent_folder
|
|
else:
|
|
# If parent is also a file, go up one more level
|
|
if len(parts) >= 3:
|
|
grandparent = parts[-3]
|
|
if '.' not in grandparent:
|
|
return grandparent
|
|
|
|
return 'Unknown'
|
|
|
|
def normalize_path(file_path: str) -> str:
|
|
"""Normalize malformed file paths that have been corrupted with ://."""
|
|
# Debug logging to track path transformation - show original path first
|
|
print(f"DEBUG: normalize_path ORIGINAL PATH: {file_path}")
|
|
|
|
# Fix malformed paths that have been corrupted with ://
|
|
# Since we fixed the HTML string literal escaping, paths should now come in correctly
|
|
# Just handle the basic :// to :\ conversion
|
|
if '://' in file_path:
|
|
print(f"DEBUG: Detected malformed path with ://, attempting to fix: {file_path}")
|
|
|
|
# Clean up special characters in the path first - but be more conservative
|
|
import re
|
|
# Only remove actual control characters, not legitimate path characters
|
|
file_path = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', file_path) # Remove only control characters
|
|
file_path = re.sub(r'\s+', ' ', file_path) # Normalize multiple spaces to single space
|
|
file_path = file_path.strip() # Remove leading/trailing spaces
|
|
|
|
# Simple fix: replace :// with :\
|
|
fixed_path = file_path.replace('://', ':\\')
|
|
print(f"DEBUG: Fixed path (simple :// to :\\ conversion): {fixed_path}")
|
|
return fixed_path
|
|
|
|
# If no :// corruption detected, return the path as-is
|
|
print(f"DEBUG: normalize_path output (no changes): {file_path}")
|
|
return file_path
|
|
|
|
|
|
@app.route('/')
|
|
def index():
|
|
"""Main dashboard page."""
|
|
return render_template('index.html')
|
|
|
|
@app.route('/remaining-songs')
|
|
def remaining_songs():
|
|
"""Page showing remaining songs after cleanup."""
|
|
return render_template('remaining_songs.html')
|
|
|
|
@app.route('/favorites')
|
|
def favorites():
|
|
"""Page showing favorites with matching songs."""
|
|
return render_template('favorites.html')
|
|
|
|
@app.route('/history')
|
|
def history():
|
|
"""Page showing history with matching songs."""
|
|
return render_template('history.html')
|
|
|
|
@app.route('/api/duplicates')
|
|
def get_duplicates():
|
|
"""API endpoint to get duplicate data."""
|
|
# Try to load detailed skip songs first, fallback to basic skip list
|
|
skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'))
|
|
if not skip_songs:
|
|
skip_songs = load_json_file(os.path.join(DATA_DIR, 'skipSongs.json'))
|
|
|
|
if not skip_songs:
|
|
return jsonify({'error': 'No skip songs data found'}), 404
|
|
|
|
duplicate_groups = get_duplicate_groups(skip_songs)
|
|
|
|
# Apply filters
|
|
artist_filter = request.args.get('artist', '').lower()
|
|
title_filter = request.args.get('title', '').lower()
|
|
channel_filter = request.args.get('channel', '').lower()
|
|
file_type_filter = request.args.get('file_type', '').lower()
|
|
min_duplicates = int(request.args.get('min_duplicates', 0))
|
|
|
|
filtered_groups = []
|
|
for group in duplicate_groups:
|
|
# Apply filters
|
|
if artist_filter and artist_filter not in group['artist'].lower():
|
|
continue
|
|
if title_filter and title_filter not in group['title'].lower():
|
|
continue
|
|
if group['total_duplicates'] < min_duplicates:
|
|
continue
|
|
|
|
# Check if any version (kept or skipped) matches channel/file_type filters
|
|
if channel_filter or file_type_filter:
|
|
matches_filter = False
|
|
|
|
# Special handling for mp3-only filter
|
|
if file_type_filter == 'mp3-only':
|
|
# Check if kept version is MP3 and there are no MP4 alternatives
|
|
kept_file_type = get_file_type(group['kept_version'])
|
|
has_mp4_alternative = False
|
|
|
|
# Check if any skipped version is MP4
|
|
for version in group['skipped_versions']:
|
|
if version['file_type'].upper() == 'MP4':
|
|
has_mp4_alternative = True
|
|
break
|
|
|
|
# Only show if kept version is MP3 and no MP4 alternatives exist
|
|
if kept_file_type.upper() != 'MP3' or has_mp4_alternative:
|
|
continue
|
|
|
|
# Apply channel filter if specified
|
|
if channel_filter:
|
|
kept_channel = extract_channel(group['kept_version'])
|
|
if channel_filter not in kept_channel.lower():
|
|
continue
|
|
elif file_type_filter == 'mp3':
|
|
# Special handling for MP3 filter - show songs where primary is MP3 and has alternatives
|
|
kept_file_type = get_file_type(group['kept_version'])
|
|
|
|
# Only show if kept version is MP3 and there are alternatives
|
|
if kept_file_type.upper() != 'MP3' or len(group['skipped_versions']) == 0:
|
|
continue
|
|
|
|
# Apply channel filter if specified
|
|
if channel_filter:
|
|
kept_channel = extract_channel(group['kept_version'])
|
|
if channel_filter not in kept_channel.lower():
|
|
continue
|
|
else:
|
|
# Regular file type and channel filtering
|
|
# Check kept version
|
|
kept_channel = extract_channel(group['kept_version'])
|
|
kept_file_type = get_file_type(group['kept_version'])
|
|
if (not channel_filter or channel_filter in kept_channel.lower()) and \
|
|
(not file_type_filter or file_type_filter in kept_file_type.lower()):
|
|
matches_filter = True
|
|
|
|
# Check skipped versions if kept version doesn't match
|
|
if not matches_filter:
|
|
for version in group['skipped_versions']:
|
|
if (not channel_filter or channel_filter in version['channel'].lower()) and \
|
|
(not file_type_filter or file_type_filter in version['file_type'].lower()):
|
|
matches_filter = True
|
|
break
|
|
|
|
if not matches_filter:
|
|
continue
|
|
|
|
filtered_groups.append(group)
|
|
|
|
# Pagination
|
|
page = int(request.args.get('page', 1))
|
|
per_page = int(request.args.get('per_page', 50))
|
|
start_idx = (page - 1) * per_page
|
|
end_idx = start_idx + per_page
|
|
|
|
paginated_groups = filtered_groups[start_idx:end_idx]
|
|
|
|
return jsonify({
|
|
'duplicates': paginated_groups,
|
|
'total': len(filtered_groups),
|
|
'page': page,
|
|
'per_page': per_page,
|
|
'total_pages': (len(filtered_groups) + per_page - 1) // per_page
|
|
})
|
|
|
|
@app.route('/api/stats')
|
|
def get_stats():
|
|
"""API endpoint to get overall statistics."""
|
|
# Try to load detailed skip songs first, fallback to basic skip list
|
|
skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'))
|
|
if not skip_songs:
|
|
skip_songs = load_json_file(os.path.join(DATA_DIR, 'skipSongs.json'))
|
|
|
|
if not skip_songs:
|
|
return jsonify({'error': 'No skip songs data found'}), 404
|
|
|
|
# Load original all songs data to get total counts
|
|
all_songs = load_json_file(os.path.join(DATA_DIR, 'allSongs.json'))
|
|
if not all_songs:
|
|
all_songs = []
|
|
|
|
duplicate_groups = get_duplicate_groups(skip_songs)
|
|
|
|
# Calculate current statistics
|
|
total_duplicates = len(duplicate_groups)
|
|
total_files_to_skip = len(skip_songs)
|
|
|
|
# File type breakdown for skipped files
|
|
skip_file_types = {'MP4': 0, 'MP3': 0}
|
|
channels = {}
|
|
|
|
for group in duplicate_groups:
|
|
# Include kept version in channel stats
|
|
kept_channel = extract_channel(group['kept_version'])
|
|
channels[kept_channel] = channels.get(kept_channel, 0) + 1
|
|
|
|
# Include skipped versions
|
|
for version in group['skipped_versions']:
|
|
skip_file_types[version['file_type']] += 1
|
|
channel = version['channel']
|
|
channels[channel] = channels.get(channel, 0) + 1
|
|
|
|
# Calculate total file type breakdown from all songs
|
|
total_file_types = {'MP4': 0, 'MP3': 0}
|
|
total_songs = len(all_songs)
|
|
|
|
for song in all_songs:
|
|
file_type = get_file_type(song.get('path', ''))
|
|
if file_type in total_file_types:
|
|
total_file_types[file_type] += 1
|
|
|
|
# Calculate what will remain after skipping
|
|
remaining_file_types = {
|
|
'MP4': total_file_types['MP4'] - skip_file_types['MP4'],
|
|
'MP3': total_file_types['MP3'] - skip_file_types['MP3']
|
|
}
|
|
|
|
total_remaining = sum(remaining_file_types.values())
|
|
|
|
# Most duplicated songs
|
|
most_duplicated = sorted(duplicate_groups, key=lambda x: x['total_duplicates'], reverse=True)[:10]
|
|
|
|
return jsonify({
|
|
'total_songs': total_songs,
|
|
'total_duplicates': total_duplicates,
|
|
'total_files_to_skip': total_files_to_skip,
|
|
'total_remaining': total_remaining,
|
|
'total_file_types': total_file_types,
|
|
'skip_file_types': skip_file_types,
|
|
'remaining_file_types': remaining_file_types,
|
|
'channels': channels,
|
|
'most_duplicated': most_duplicated
|
|
})
|
|
|
|
@app.route('/api/config')
|
|
def get_config():
|
|
"""API endpoint to get current configuration."""
|
|
config = load_json_file(CONFIG_FILE)
|
|
return jsonify(config or {})
|
|
|
|
@app.route('/api/save-changes', methods=['POST'])
|
|
def save_changes():
|
|
"""API endpoint to save user changes to the skip list."""
|
|
try:
|
|
data = request.get_json()
|
|
changes = data.get('changes', [])
|
|
|
|
# Load current skip list
|
|
skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'))
|
|
if not skip_songs:
|
|
return jsonify({'error': 'No skip songs data found'}), 404
|
|
|
|
# Apply changes
|
|
for change in changes:
|
|
change_type = change.get('type')
|
|
song_key = change.get('song_key') # artist - title
|
|
file_path = change.get('file_path')
|
|
|
|
if change_type == 'keep_file':
|
|
# Remove this file from skip list
|
|
skip_songs = [s for s in skip_songs if s['path'] != file_path]
|
|
elif change_type == 'skip_file':
|
|
# Add this file to skip list
|
|
new_entry = {
|
|
'path': file_path,
|
|
'reason': 'manual_skip',
|
|
'artist': change.get('artist'),
|
|
'title': change.get('title'),
|
|
'kept_version': change.get('kept_version')
|
|
}
|
|
skip_songs.append(new_entry)
|
|
|
|
# Save updated skip list
|
|
backup_path = os.path.join(DATA_DIR, 'reports', f'skip_songs_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json')
|
|
import shutil
|
|
shutil.copy2(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'), backup_path)
|
|
|
|
with open(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'), 'w', encoding='utf-8') as f:
|
|
json.dump(skip_songs, f, indent=2, ensure_ascii=False)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'Changes saved successfully. Backup created at: {backup_path}',
|
|
'total_files': len(skip_songs)
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': f'Error saving changes: {str(e)}'}), 500
|
|
|
|
@app.route('/api/artists')
|
|
def get_artists():
|
|
"""API endpoint to get list of all artists for grouping."""
|
|
skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'))
|
|
if not skip_songs:
|
|
return jsonify({'error': 'No skip songs data found'}), 404
|
|
|
|
duplicate_groups = get_duplicate_groups(skip_songs)
|
|
|
|
# Group by artist
|
|
artists = {}
|
|
for group in duplicate_groups:
|
|
artist = group['artist']
|
|
if artist not in artists:
|
|
artists[artist] = {
|
|
'name': artist,
|
|
'songs': [],
|
|
'total_duplicates': 0
|
|
}
|
|
artists[artist]['songs'].append(group)
|
|
artists[artist]['total_duplicates'] += group['total_duplicates']
|
|
|
|
# Convert to list and sort by artist name
|
|
artists_list = list(artists.values())
|
|
artists_list.sort(key=lambda x: x['name'].lower())
|
|
|
|
return jsonify({
|
|
'artists': artists_list,
|
|
'total_artists': len(artists_list)
|
|
})
|
|
|
|
|
|
@app.route('/api/mp3-songs')
|
|
def get_mp3_songs():
|
|
"""API endpoint to get MP3 songs that remain after cleanup."""
|
|
# Load all songs and skip songs
|
|
all_songs = load_json_file(os.path.join(DATA_DIR, 'allSongs.json'))
|
|
skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'))
|
|
|
|
if not all_songs:
|
|
return jsonify({'error': 'No all songs data found'}), 404
|
|
|
|
if not skip_songs:
|
|
skip_songs = []
|
|
|
|
# Generate MP3 song list
|
|
mp3_song_list = generate_mp3_song_list(all_songs, skip_songs)
|
|
|
|
return jsonify(mp3_song_list)
|
|
|
|
@app.route('/api/remaining-songs')
|
|
def get_remaining_songs():
|
|
"""Get all remaining songs (MP4 and MP3) after cleanup with pagination."""
|
|
try:
|
|
all_songs = load_json_file(os.path.join(DATA_DIR, 'allSongs.json'))
|
|
skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'))
|
|
|
|
if not all_songs:
|
|
return jsonify({'error': 'No all songs data found'}), 404
|
|
|
|
if not skip_songs:
|
|
skip_songs = []
|
|
|
|
# Get pagination parameters
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = request.args.get('per_page', 50, type=int)
|
|
search = request.args.get('search', '').lower()
|
|
file_type_filter = request.args.get('file_type', 'all')
|
|
artist_filter = request.args.get('artist', '')
|
|
|
|
# Create a set of paths that are being skipped
|
|
skip_paths = {song['path'] for song in skip_songs}
|
|
|
|
# Filter for songs that are NOT being skipped
|
|
remaining_songs = []
|
|
for song in all_songs:
|
|
path = song.get('path', '')
|
|
if path not in skip_paths:
|
|
# Apply file type filter
|
|
if file_type_filter != 'all':
|
|
if file_type_filter == 'mp4' and not path.lower().endswith('.mp4'):
|
|
continue
|
|
elif file_type_filter == 'mp3' and not path.lower().endswith(('.mp3', '.cdg')):
|
|
continue
|
|
|
|
# Apply search filter
|
|
if search:
|
|
title = song.get('title', '').lower()
|
|
artist = song.get('artist', '').lower()
|
|
if search not in title and search not in artist:
|
|
continue
|
|
|
|
# Apply artist filter
|
|
if artist_filter:
|
|
artist = song.get('artist', '').lower()
|
|
if artist_filter.lower() not in artist:
|
|
continue
|
|
|
|
remaining_songs.append({
|
|
'title': song.get('title', 'Unknown'),
|
|
'artist': song.get('artist', 'Unknown'),
|
|
'path': song.get('path', ''),
|
|
'file_type': get_file_type(song.get('path', '')),
|
|
'channel': extract_channel(song.get('path', ''))
|
|
})
|
|
|
|
# Sort by artist, then by title
|
|
remaining_songs.sort(key=lambda x: (x['artist'].lower(), x['title'].lower()))
|
|
|
|
# Calculate pagination
|
|
total_songs = len(remaining_songs)
|
|
total_pages = (total_songs + per_page - 1) // per_page
|
|
start_idx = (page - 1) * per_page
|
|
end_idx = start_idx + per_page
|
|
|
|
# Get songs for current page
|
|
page_songs = remaining_songs[start_idx:end_idx]
|
|
|
|
return jsonify({
|
|
'songs': page_songs,
|
|
'pagination': {
|
|
'current_page': page,
|
|
'per_page': per_page,
|
|
'total_songs': total_songs,
|
|
'total_pages': total_pages
|
|
}
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/download/mp3-songs')
|
|
def download_mp3_songs():
|
|
"""Download MP3 songs list as JSON file."""
|
|
# Load all songs and skip songs
|
|
all_songs = load_json_file(os.path.join(DATA_DIR, 'allSongs.json'))
|
|
skip_songs = load_json_file(os.path.join(DATA_DIR, 'reports', 'skip_songs_detailed.json'))
|
|
|
|
if not all_songs:
|
|
return jsonify({'error': 'No all songs data found'}), 404
|
|
|
|
if not skip_songs:
|
|
skip_songs = []
|
|
|
|
# Generate MP3 song list
|
|
mp3_song_list = generate_mp3_song_list(all_songs, skip_songs)
|
|
|
|
# Save to file
|
|
output_path = os.path.join(DATA_DIR, 'reports', 'mp3SongList.json')
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
json.dump(mp3_song_list, f, indent=2, ensure_ascii=False)
|
|
|
|
# Return the file for download
|
|
return send_from_directory(
|
|
os.path.join(DATA_DIR, 'reports'),
|
|
'mp3SongList.json',
|
|
as_attachment=True,
|
|
download_name='mp3SongList.json'
|
|
)
|
|
|
|
@app.route('/api/save-priority-preferences', methods=['POST'])
|
|
def save_priority_preferences():
|
|
"""API endpoint to save user priority preferences."""
|
|
try:
|
|
data = request.get_json()
|
|
priority_changes = data.get('priority_changes', {})
|
|
|
|
if not priority_changes:
|
|
return jsonify({'error': 'No priority changes provided'}), 400
|
|
|
|
# Create preferences directory if it doesn't exist
|
|
preferences_dir = os.path.join(DATA_DIR, 'preferences')
|
|
os.makedirs(preferences_dir, exist_ok=True)
|
|
|
|
# Load existing preferences
|
|
preferences_file = os.path.join(preferences_dir, 'priority_preferences.json')
|
|
existing_preferences = {}
|
|
if os.path.exists(preferences_file):
|
|
with open(preferences_file, 'r', encoding='utf-8') as f:
|
|
existing_preferences = json.load(f)
|
|
|
|
# Update with new preferences
|
|
existing_preferences.update(priority_changes)
|
|
|
|
# Save updated preferences
|
|
with open(preferences_file, 'w', encoding='utf-8') as f:
|
|
json.dump(existing_preferences, f, indent=2, ensure_ascii=False)
|
|
|
|
# Create backup
|
|
backup_path = os.path.join(preferences_dir, f'priority_preferences_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json')
|
|
with open(backup_path, 'w', encoding='utf-8') as f:
|
|
json.dump(existing_preferences, f, indent=2, ensure_ascii=False)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'Saved {len(priority_changes)} priority preferences. Backup created at: {backup_path}',
|
|
'total_preferences': len(existing_preferences)
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': f'Error saving priority preferences: {str(e)}'}), 500
|
|
|
|
@app.route('/api/reset-priority-preferences', methods=['POST'])
|
|
def reset_priority_preferences():
|
|
"""API endpoint to reset all priority preferences."""
|
|
try:
|
|
preferences_dir = os.path.join(DATA_DIR, 'preferences')
|
|
preferences_file = os.path.join(preferences_dir, 'priority_preferences.json')
|
|
|
|
if os.path.exists(preferences_file):
|
|
# Create backup before deletion
|
|
backup_path = os.path.join(preferences_dir, f'priority_preferences_reset_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json')
|
|
import shutil
|
|
shutil.copy2(preferences_file, backup_path)
|
|
|
|
# Delete the preferences file
|
|
os.remove(preferences_file)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'Priority preferences reset successfully. Backup created at: {backup_path}'
|
|
})
|
|
else:
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'No priority preferences found to reset'
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': f'Error resetting priority preferences: {str(e)}'}), 500
|
|
|
|
@app.route('/api/load-priority-preferences')
|
|
def load_priority_preferences():
|
|
"""API endpoint to load current priority preferences."""
|
|
try:
|
|
preferences_file = os.path.join(DATA_DIR, 'preferences', 'priority_preferences.json')
|
|
|
|
if os.path.exists(preferences_file):
|
|
with open(preferences_file, 'r', encoding='utf-8') as f:
|
|
preferences = json.load(f)
|
|
return jsonify({
|
|
'success': True,
|
|
'preferences': preferences
|
|
})
|
|
else:
|
|
return jsonify({
|
|
'success': True,
|
|
'preferences': {}
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': f'Error loading priority preferences: {str(e)}'}), 500
|
|
|
|
def find_matching_songs(item: Dict[str, Any], all_songs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""Find all songs from allSongs.json that match a given item (favorite/history)."""
|
|
matching_songs = []
|
|
|
|
# Validate input
|
|
if item is None or not isinstance(item, dict):
|
|
print(f"Warning: Invalid item passed to find_matching_songs: {item}")
|
|
return matching_songs
|
|
|
|
item_artist = item.get('artist', '').lower().strip()
|
|
item_title = item.get('title', '').lower().strip()
|
|
|
|
for song in all_songs:
|
|
# Skip None songs
|
|
if song is None or not isinstance(song, dict):
|
|
continue
|
|
|
|
song_artist = song.get('artist', '').lower().strip()
|
|
song_title = song.get('title', '').lower().strip()
|
|
|
|
# Exact match on artist and title
|
|
if song_artist == item_artist and song_title == item_title:
|
|
matching_songs.append({
|
|
'path': song.get('path', ''),
|
|
'artist': song.get('artist', 'Unknown'),
|
|
'title': song.get('title', 'Unknown'),
|
|
'file_type': get_file_type(song.get('path', '')),
|
|
'channel': extract_channel(song.get('path', '')),
|
|
'is_current': song.get('path', '') == item.get('path', '')
|
|
})
|
|
|
|
# Sort by priority: current path first, then by file type priority
|
|
def sort_key(song):
|
|
# Current path gets highest priority
|
|
if song['is_current']:
|
|
return (0, 0)
|
|
|
|
# File type priority: MP4 > CDG/MP3 > standalone MP3 > standalone CDG
|
|
file_type_priority = {
|
|
'MP4': 1,
|
|
'MP3': 2,
|
|
'Unknown': 3
|
|
}
|
|
return (1, file_type_priority.get(song['file_type'], 3))
|
|
|
|
matching_songs.sort(key=sort_key)
|
|
return matching_songs
|
|
|
|
@app.route('/api/favorites')
|
|
def get_favorites():
|
|
"""Get favorites data with matching songs for each favorite."""
|
|
try:
|
|
# Load data files
|
|
favorites_file = os.path.join(DATA_DIR, 'favorites.json')
|
|
all_songs_file = os.path.join(DATA_DIR, 'allSongs.json')
|
|
|
|
favorites = load_json_file(favorites_file)
|
|
all_songs = load_json_file(all_songs_file)
|
|
|
|
if not favorites or not all_songs:
|
|
return jsonify({'error': 'Failed to load data files'}), 500
|
|
|
|
# Process each favorite
|
|
processed_favorites = []
|
|
for i, favorite in enumerate(favorites):
|
|
# Skip None items or items without required fields
|
|
if favorite is None:
|
|
print(f"Warning: Skipping None item at index {i} in favorites")
|
|
continue
|
|
|
|
if not isinstance(favorite, dict):
|
|
print(f"Warning: Skipping non-dict item at index {i} in favorites: {type(favorite)}")
|
|
continue
|
|
|
|
try:
|
|
matching_songs = find_matching_songs(favorite, all_songs)
|
|
|
|
processed_favorite = {
|
|
'index': i,
|
|
'original': favorite,
|
|
'matching_songs': matching_songs,
|
|
'artist': favorite.get('artist', 'Unknown'),
|
|
'title': favorite.get('title', 'Unknown'),
|
|
'current_path': favorite.get('path', ''),
|
|
'count': favorite.get('count', 0),
|
|
'favorite': favorite.get('favorite', False),
|
|
'disabled': favorite.get('disabled', False)
|
|
}
|
|
processed_favorites.append(processed_favorite)
|
|
except Exception as item_error:
|
|
print(f"Error processing favorite item {i}: {item_error}")
|
|
print(f"Item data: {favorite}")
|
|
continue
|
|
|
|
# Sort by artist, then by title
|
|
processed_favorites.sort(key=lambda x: (x['artist'].lower(), x['title'].lower()))
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'favorites': processed_favorites,
|
|
'total': len(processed_favorites)
|
|
})
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
|
|
@app.route('/api/history')
|
|
def get_history():
|
|
"""Get history data with matching songs for each history item."""
|
|
try:
|
|
# Load data files
|
|
history_file = os.path.join(DATA_DIR, 'history.json')
|
|
all_songs_file = os.path.join(DATA_DIR, 'allSongs.json')
|
|
|
|
history = load_json_file(history_file)
|
|
all_songs = load_json_file(all_songs_file)
|
|
|
|
if not history or not all_songs:
|
|
return jsonify({'error': 'Failed to load data files'}), 500
|
|
|
|
# Process each history item
|
|
processed_history = []
|
|
for i, item in enumerate(history):
|
|
# Skip None items or items without required fields
|
|
if item is None:
|
|
print(f"Warning: Skipping None item at index {i} in history")
|
|
continue
|
|
|
|
if not isinstance(item, dict):
|
|
print(f"Warning: Skipping non-dict item at index {i} in history: {type(item)}")
|
|
continue
|
|
|
|
try:
|
|
matching_songs = find_matching_songs(item, all_songs)
|
|
|
|
processed_item = {
|
|
'index': i,
|
|
'original': item,
|
|
'matching_songs': matching_songs,
|
|
'artist': item.get('artist', 'Unknown'),
|
|
'title': item.get('title', 'Unknown'),
|
|
'current_path': item.get('path', ''),
|
|
'count': item.get('count', 0),
|
|
'favorite': item.get('favorite', False),
|
|
'disabled': item.get('disabled', False)
|
|
}
|
|
processed_history.append(processed_item)
|
|
except Exception as item_error:
|
|
print(f"Error processing history item {i}: {item_error}")
|
|
print(f"Item data: {item}")
|
|
continue
|
|
|
|
# Sort by artist, then by title
|
|
processed_history.sort(key=lambda x: (x['artist'].lower(), x['title'].lower()))
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'history': processed_history,
|
|
'total': len(processed_history)
|
|
})
|
|
except Exception as e:
|
|
print(f"Error in get_history: {e}")
|
|
import traceback
|
|
print(f"Full traceback: {traceback.format_exc()}")
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
|
|
@app.route('/api/update-favorite-path', methods=['POST'])
|
|
def update_favorite_path():
|
|
"""Update the path of a favorite item."""
|
|
try:
|
|
data = request.get_json()
|
|
favorite_index = data.get('index')
|
|
new_path = data.get('path')
|
|
|
|
if favorite_index is None or new_path is None:
|
|
return jsonify({'success': False, 'error': 'Missing index or path'}), 400
|
|
|
|
# Load favorites file
|
|
favorites_file = os.path.join(DATA_DIR, 'favorites.json')
|
|
favorites = load_json_file(favorites_file)
|
|
|
|
if not favorites or favorite_index >= len(favorites):
|
|
return jsonify({'success': False, 'error': 'Invalid favorite index'}), 400
|
|
|
|
# Update the path
|
|
favorites[favorite_index]['path'] = new_path
|
|
|
|
# Save back to file
|
|
with open(favorites_file, 'w', encoding='utf-8') as f:
|
|
json.dump(favorites, f, indent=2, ensure_ascii=False)
|
|
|
|
return jsonify({'success': True})
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
|
|
@app.route('/api/update-history-path', methods=['POST'])
|
|
def update_history_path():
|
|
"""Update the path of a history item."""
|
|
try:
|
|
data = request.get_json()
|
|
history_index = data.get('index')
|
|
new_path = data.get('path')
|
|
|
|
if history_index is None or new_path is None:
|
|
return jsonify({'success': False, 'error': 'Missing index or path'}), 400
|
|
|
|
# Load history file
|
|
history_file = os.path.join(DATA_DIR, 'history.json')
|
|
history = load_json_file(history_file)
|
|
|
|
if not history or history_index >= len(history):
|
|
return jsonify({'success': False, 'error': 'Invalid history index'}), 400
|
|
|
|
# Update the path
|
|
history[history_index]['path'] = new_path
|
|
|
|
# Save back to file
|
|
with open(history_file, 'w', encoding='utf-8') as f:
|
|
json.dump(history, f, indent=2, ensure_ascii=False)
|
|
|
|
return jsonify({'success': True})
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/update-favorite-property', methods=['POST'])
|
|
def update_favorite_property():
|
|
"""Update any property of a favorite item."""
|
|
try:
|
|
data = request.get_json()
|
|
index = data.get('index')
|
|
property_name = data.get('property')
|
|
new_value = data.get('value')
|
|
|
|
if index is None or property_name is None or new_value is None:
|
|
return jsonify({'success': False, 'error': 'Missing index, property, or value'}), 400
|
|
|
|
# Load current favorites
|
|
favorites_file = os.path.join(DATA_DIR, 'favorites.json')
|
|
if not os.path.exists(favorites_file):
|
|
return jsonify({'success': False, 'error': 'Favorites file not found'}), 404
|
|
|
|
with open(favorites_file, 'r', encoding='utf-8') as f:
|
|
favorites = json.load(f)
|
|
|
|
# Update the property
|
|
if 0 <= index < len(favorites):
|
|
favorites[index][property_name] = new_value
|
|
# Special handling for path updates
|
|
if property_name == 'path' and 'original_path' in favorites[index]:
|
|
# Update original_path if it matches the old path
|
|
if favorites[index]['original_path'] == favorites[index].get('path'):
|
|
favorites[index]['original_path'] = new_value
|
|
|
|
# Save updated favorites
|
|
with open(favorites_file, 'w', encoding='utf-8') as f:
|
|
json.dump(favorites, f, indent=2, ensure_ascii=False)
|
|
|
|
return jsonify({'success': True})
|
|
else:
|
|
return jsonify({'success': False, 'error': 'Invalid index'}), 400
|
|
|
|
except Exception as e:
|
|
print(f"Error updating favorite property: {e}")
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/update-history-property', methods=['POST'])
|
|
def update_history_property():
|
|
"""Update any property of a history item."""
|
|
try:
|
|
data = request.get_json()
|
|
index = data.get('index')
|
|
property_name = data.get('property')
|
|
new_value = data.get('value')
|
|
|
|
if index is None or property_name is None or new_value is None:
|
|
return jsonify({'success': False, 'error': 'Missing index, property, or value'}), 400
|
|
|
|
# Load current history
|
|
history_file = os.path.join(DATA_DIR, 'history.json')
|
|
if not os.path.exists(history_file):
|
|
return jsonify({'success': False, 'error': 'History file not found'}), 404
|
|
|
|
with open(history_file, 'r', encoding='utf-8') as f:
|
|
history = json.load(f)
|
|
|
|
# Update the property
|
|
if 0 <= index < len(history):
|
|
history[index][property_name] = new_value
|
|
# Special handling for path updates
|
|
if property_name == 'path' and 'original_path' in history[index]:
|
|
# Update original_path if it matches the old path
|
|
if history[index]['original_path'] == history[index].get('path'):
|
|
history[index]['original_path'] = new_value
|
|
|
|
# Save updated history
|
|
with open(history_file, 'w', encoding='utf-8') as f:
|
|
json.dump(history, f, indent=2, ensure_ascii=False)
|
|
|
|
return jsonify({'success': True})
|
|
else:
|
|
return jsonify({'success': False, 'error': 'Invalid index'}), 400
|
|
|
|
except Exception as e:
|
|
print(f"Error updating history property: {e}")
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/delete-favorite', methods=['POST'])
|
|
def delete_favorite():
|
|
"""Delete a favorite item by index."""
|
|
try:
|
|
data = request.get_json()
|
|
index = data.get('index')
|
|
|
|
if index is None:
|
|
return jsonify({'success': False, 'error': 'Missing index'}), 400
|
|
|
|
# Load current favorites
|
|
favorites_file = os.path.join(DATA_DIR, 'favorites.json')
|
|
if not os.path.exists(favorites_file):
|
|
return jsonify({'success': False, 'error': 'Favorites file not found'}), 404
|
|
|
|
with open(favorites_file, 'r', encoding='utf-8') as f:
|
|
favorites = json.load(f)
|
|
|
|
# Delete the item at the specified index
|
|
if 0 <= index < len(favorites):
|
|
deleted_item = favorites.pop(index)
|
|
|
|
# Save updated favorites
|
|
with open(favorites_file, 'w', encoding='utf-8') as f:
|
|
json.dump(favorites, f, indent=2, ensure_ascii=False)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'deleted_item': deleted_item,
|
|
'message': f'Deleted "{deleted_item.get("artist", "Unknown")} - {deleted_item.get("title", "Unknown")}"'
|
|
})
|
|
else:
|
|
return jsonify({'success': False, 'error': 'Invalid index'}), 400
|
|
|
|
except Exception as e:
|
|
print(f"Error deleting favorite: {e}")
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/delete-history', methods=['POST'])
|
|
def delete_history():
|
|
"""Delete a history item by index."""
|
|
try:
|
|
data = request.get_json()
|
|
index = data.get('index')
|
|
|
|
if index is None:
|
|
return jsonify({'success': False, 'error': 'Missing index'}), 400
|
|
|
|
# Load current history
|
|
history_file = os.path.join(DATA_DIR, 'history.json')
|
|
if not os.path.exists(history_file):
|
|
return jsonify({'success': False, 'error': 'History file not found'}), 404
|
|
|
|
with open(history_file, 'r', encoding='utf-8') as f:
|
|
history = json.load(f)
|
|
|
|
# Delete the item at the specified index
|
|
if 0 <= index < len(history):
|
|
deleted_item = history.pop(index)
|
|
|
|
# Save updated history
|
|
with open(history_file, 'w', encoding='utf-8') as f:
|
|
json.dump(history, f, indent=2, ensure_ascii=False)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'deleted_item': deleted_item,
|
|
'message': f'Deleted "{deleted_item.get("artist", "Unknown")} - {deleted_item.get("title", "Unknown")}"'
|
|
})
|
|
else:
|
|
return jsonify({'success': False, 'error': 'Invalid index'}), 400
|
|
|
|
except Exception as e:
|
|
print(f"Error deleting history item: {e}")
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/merge-history', methods=['POST'])
|
|
def merge_history():
|
|
"""Merge history objects that match on artist, title, and path, summing their count properties."""
|
|
try:
|
|
# Load current history
|
|
history_file = os.path.join(DATA_DIR, 'history.json')
|
|
if not os.path.exists(history_file):
|
|
return jsonify({'success': False, 'error': 'History file not found'}), 404
|
|
|
|
with open(history_file, 'r', encoding='utf-8') as f:
|
|
history_items = json.load(f)
|
|
|
|
if not history_items:
|
|
return jsonify({'success': False, 'error': 'No history items found to merge'}), 400
|
|
|
|
# Create a dictionary to group items by artist, title, and path
|
|
grouped_items = {}
|
|
merged_count = 0
|
|
total_merged_entries = 0
|
|
|
|
for item in history_items:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
|
|
artist = item.get('artist', '').strip()
|
|
title = item.get('title', '').strip()
|
|
path = item.get('path', '').strip()
|
|
|
|
if not artist or not title or not path:
|
|
continue
|
|
|
|
# Create a key for grouping
|
|
key = (artist.lower(), title.lower(), path.lower())
|
|
|
|
if key not in grouped_items:
|
|
grouped_items[key] = []
|
|
grouped_items[key].append(item)
|
|
|
|
# Process groups with multiple items
|
|
merged_items = []
|
|
|
|
for key, items in grouped_items.items():
|
|
if len(items) == 1:
|
|
# Single item, keep as is
|
|
merged_items.append(items[0])
|
|
else:
|
|
# Multiple items, merge them
|
|
artist, title, path = key
|
|
|
|
# Start with the first item as the base
|
|
merged_item = items[0].copy()
|
|
|
|
# Sum the counts (handle both int and string values)
|
|
total_count = 0
|
|
for item in items:
|
|
count_value = item.get('count', 0)
|
|
if isinstance(count_value, str):
|
|
try:
|
|
total_count += int(count_value)
|
|
except ValueError:
|
|
total_count += 0
|
|
else:
|
|
total_count += count_value
|
|
merged_item['count'] = total_count
|
|
|
|
# For boolean properties, if any are True, keep True
|
|
merged_item['favorite'] = any(item.get('favorite', False) for item in items)
|
|
merged_item['disabled'] = any(item.get('disabled', False) for item in items)
|
|
|
|
# For other properties, keep the first non-empty value
|
|
for prop in ['key', 'original_path', 'genre']:
|
|
if prop in merged_item and merged_item[prop]:
|
|
continue
|
|
for item in items[1:]: # Skip first item since we already have it
|
|
if item.get(prop):
|
|
merged_item[prop] = item[prop]
|
|
break
|
|
|
|
merged_items.append(merged_item)
|
|
merged_count += 1
|
|
total_merged_entries += len(items)
|
|
|
|
# Save the merged history
|
|
with open(history_file, 'w', encoding='utf-8') as f:
|
|
json.dump(merged_items, f, indent=2, ensure_ascii=False)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'merged_groups': merged_count,
|
|
'total_merged_entries': total_merged_entries,
|
|
'final_count': len(merged_items),
|
|
'message': f'Successfully merged {merged_count} groups ({total_merged_entries} total entries → {len(merged_items)} entries)'
|
|
})
|
|
|
|
except Exception as e:
|
|
print(f"Error merging history objects: {e}")
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/video/<path:file_path>')
|
|
def serve_video(file_path):
|
|
"""Serve video files for playback in the web UI."""
|
|
try:
|
|
# Decode the file path (it comes URL-encoded)
|
|
import urllib.parse
|
|
decoded_path = urllib.parse.unquote(file_path)
|
|
|
|
# Normalize the path to fix any malformed paths
|
|
normalized_path = normalize_path(decoded_path)
|
|
|
|
# Debug logging - show original path first
|
|
print(f"DEBUG: serve_video ORIGINAL PATH: {decoded_path}")
|
|
print(f"DEBUG: Normalized path: {normalized_path}")
|
|
print(f"DEBUG: Current working directory: {os.getcwd()}")
|
|
print(f"DEBUG: Path transformation: '{decoded_path}' -> '{normalized_path}'")
|
|
|
|
# Security check: ensure the path is within allowed directories
|
|
# This prevents directory traversal attacks
|
|
if '..' in normalized_path:
|
|
print(f"DEBUG: Security check failed - path contains '..'")
|
|
return jsonify({'error': 'Invalid file path'}), 400
|
|
|
|
# On Windows, allow absolute paths with drive letters
|
|
# On Unix-like systems, block absolute paths
|
|
if os.name == 'nt': # Windows
|
|
if normalized_path.startswith('/') and not normalized_path[1:].startswith(':'):
|
|
print(f"DEBUG: Security check failed - Unix-style absolute path on Windows")
|
|
return jsonify({'error': 'Invalid file path'}), 400
|
|
else: # Unix-like systems
|
|
if normalized_path.startswith('/'):
|
|
print(f"DEBUG: Security check failed - absolute path on Unix")
|
|
return jsonify({'error': 'Invalid file path'}), 400
|
|
|
|
# Check if file exists
|
|
if not os.path.exists(normalized_path):
|
|
print(f"DEBUG: File does not exist: {normalized_path}")
|
|
# Try to list the directory to see what's available
|
|
try:
|
|
directory = os.path.dirname(normalized_path)
|
|
if os.path.exists(directory):
|
|
files_in_dir = os.listdir(directory)
|
|
print(f"DEBUG: Files in directory {directory}: {files_in_dir[:10]}...") # Show first 10 files
|
|
else:
|
|
print(f"DEBUG: Directory does not exist: {directory}")
|
|
except Exception as e:
|
|
print(f"DEBUG: Error listing directory: {e}")
|
|
return jsonify({'error': 'Video file not found'}), 404
|
|
|
|
# Check if it's a video file and determine MIME type
|
|
file_extension = os.path.splitext(normalized_path)[1].lower()
|
|
mime_types = {
|
|
'.mp4': 'video/mp4',
|
|
'.avi': 'video/x-msvideo',
|
|
'.mkv': 'video/x-matroska',
|
|
'.mov': 'video/quicktime',
|
|
'.wmv': 'video/x-ms-wmv',
|
|
'.flv': 'video/x-flv',
|
|
'.webm': 'video/webm'
|
|
}
|
|
|
|
if file_extension not in mime_types:
|
|
print(f"DEBUG: Invalid file type: {normalized_path}")
|
|
return jsonify({'error': 'Invalid file type'}), 400
|
|
|
|
mime_type = mime_types[file_extension]
|
|
|
|
# Get file info for debugging
|
|
file_size = os.path.getsize(normalized_path)
|
|
print(f"DEBUG: File exists, size: {file_size} bytes")
|
|
print(f"DEBUG: MIME type: {mime_type}")
|
|
|
|
# Serve the video file
|
|
directory = os.path.dirname(normalized_path)
|
|
filename = os.path.basename(normalized_path)
|
|
|
|
print(f"DEBUG: Serving from directory: {directory}")
|
|
print(f"DEBUG: Filename: {filename}")
|
|
|
|
# Add headers for better video streaming
|
|
response = send_from_directory(
|
|
directory,
|
|
filename,
|
|
mimetype=mime_type
|
|
)
|
|
|
|
# Add CORS headers to allow cross-origin requests
|
|
response.headers['Access-Control-Allow-Origin'] = '*'
|
|
response.headers['Access-Control-Allow-Methods'] = 'GET, HEAD, OPTIONS'
|
|
response.headers['Access-Control-Allow-Headers'] = 'Range'
|
|
|
|
# Add cache control headers
|
|
response.headers['Cache-Control'] = 'public, max-age=3600'
|
|
|
|
return response
|
|
|
|
except Exception as e:
|
|
print(f"DEBUG: Exception in serve_video: {str(e)}")
|
|
import traceback
|
|
print(f"DEBUG: Full traceback: {traceback.format_exc()}")
|
|
return jsonify({'error': f'Error serving video: {str(e)}'}), 500
|
|
|
|
|
|
# Playlist Validation Endpoints
|
|
|
|
@app.route('/playlist-validation')
|
|
def playlist_validation():
|
|
"""Playlist validation page."""
|
|
return render_template('playlist_validation.html')
|
|
|
|
|
|
@app.route('/api/playlists')
|
|
def get_playlists():
|
|
"""Get list of all playlists."""
|
|
try:
|
|
playlists_path = os.path.join(DATA_DIR, 'songLists.json')
|
|
with open(playlists_path, 'r', encoding='utf-8') as f:
|
|
playlists = json.load(f)
|
|
|
|
# Return basic playlist info
|
|
playlist_list = []
|
|
for i, playlist in enumerate(playlists):
|
|
playlist_list.append({
|
|
'index': i,
|
|
'title': playlist.get('title', 'Unknown Playlist'),
|
|
'song_count': len(playlist.get('songs', []))
|
|
})
|
|
|
|
return jsonify(playlist_list)
|
|
except Exception as e:
|
|
return jsonify({'error': f'Error loading playlists: {str(e)}'}), 500
|
|
|
|
|
|
@app.route('/api/validate-playlist/<int:playlist_index>')
|
|
def validate_playlist(playlist_index):
|
|
"""Validate a specific playlist."""
|
|
try:
|
|
# Load configuration
|
|
config = load_json_file(CONFIG_FILE)
|
|
if not config:
|
|
return jsonify({'error': 'Could not load configuration'}), 500
|
|
|
|
# Create validator
|
|
validator = PlaylistValidator(config, DATA_DIR)
|
|
|
|
# Load playlists
|
|
playlists_path = os.path.join(DATA_DIR, 'songLists.json')
|
|
with open(playlists_path, 'r', encoding='utf-8') as f:
|
|
playlists = json.load(f)
|
|
|
|
if playlist_index >= len(playlists):
|
|
return jsonify({'error': f'Invalid playlist index: {playlist_index}'}), 400
|
|
|
|
# Validate the playlist
|
|
result = validator.validate_playlist(playlists[playlist_index], dry_run=True)
|
|
|
|
return jsonify(result)
|
|
except Exception as e:
|
|
return jsonify({'error': f'Error validating playlist: {str(e)}'}), 500
|
|
|
|
|
|
@app.route('/api/validate-all-playlists')
|
|
def validate_all_playlists():
|
|
"""Validate all playlists."""
|
|
try:
|
|
# Load configuration
|
|
config = load_json_file(CONFIG_FILE)
|
|
if not config:
|
|
return jsonify({'error': 'Could not load configuration'}), 500
|
|
|
|
# Create validator
|
|
validator = PlaylistValidator(config, DATA_DIR)
|
|
|
|
# Validate all playlists
|
|
results = validator.validate_all_playlists(dry_run=True)
|
|
|
|
return jsonify(results)
|
|
except Exception as e:
|
|
return jsonify({'error': f'Error validating playlists: {str(e)}'}), 500
|
|
|
|
|
|
@app.route('/api/update-playlist-song', methods=['POST'])
|
|
def update_playlist_song():
|
|
"""Update a playlist song with corrected artist/title."""
|
|
try:
|
|
data = request.get_json()
|
|
playlist_index = data.get('playlist_index')
|
|
song_position = data.get('song_position')
|
|
new_artist = data.get('new_artist')
|
|
new_title = data.get('new_title')
|
|
dry_run = data.get('dry_run', True)
|
|
|
|
if any(x is None for x in [playlist_index, song_position, new_artist, new_title]):
|
|
return jsonify({'error': 'Missing required parameters'}), 400
|
|
|
|
# Load configuration
|
|
config = load_json_file(CONFIG_FILE)
|
|
if not config:
|
|
return jsonify({'error': 'Could not load configuration'}), 500
|
|
|
|
# Create validator
|
|
validator = PlaylistValidator(config, DATA_DIR)
|
|
|
|
# Update the song
|
|
success = validator.update_playlist_song(
|
|
playlist_index, song_position, new_artist, new_title, dry_run
|
|
)
|
|
|
|
if success:
|
|
return jsonify({'success': True, 'message': 'Playlist song updated successfully'})
|
|
else:
|
|
return jsonify({'error': 'Failed to update playlist song'}), 500
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': f'Error updating playlist song: {str(e)}'}), 500
|
|
|
|
|
|
@app.route('/api/apply-all-updates', methods=['POST'])
|
|
def apply_all_updates():
|
|
"""Apply all pending playlist updates in batch."""
|
|
try:
|
|
data = request.get_json()
|
|
changes = data.get('changes', [])
|
|
|
|
if not changes:
|
|
return jsonify({'error': 'No changes to apply'}), 400
|
|
|
|
# Load configuration
|
|
config = load_json_file(CONFIG_FILE)
|
|
if not config:
|
|
return jsonify({'error': 'Could not load configuration'}), 500
|
|
|
|
# Create validator
|
|
validator = PlaylistValidator(config, DATA_DIR)
|
|
|
|
# Apply all changes
|
|
success_count = 0
|
|
failed_count = 0
|
|
|
|
for change in changes:
|
|
success = validator.update_playlist_song(
|
|
change['playlistIndex'],
|
|
change['songPosition'],
|
|
change['newArtist'],
|
|
change['newTitle'],
|
|
False # Always apply changes (not dry run)
|
|
)
|
|
if success:
|
|
success_count += 1
|
|
else:
|
|
failed_count += 1
|
|
|
|
if failed_count == 0:
|
|
message = f'Successfully applied {success_count} changes to playlists.'
|
|
else:
|
|
message = f'Applied {success_count} changes, {failed_count} failed.'
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': message,
|
|
'applied': success_count,
|
|
'failed': failed_count
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': f'Error applying updates: {str(e)}'}), 500
|
|
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug=True, host='0.0.0.0', port=5000) |