from __future__ import annotations import json import urllib.request from datetime import datetime, timezone from typing import Any import pandas as pd import streamlit as st from web_core.constants import TREND_BEAR, TREND_BULL, TREND_NEUTRAL from web_core.data import fetch_ohlc, maybe_drop_live_bar from web_core.strategy import classify_bars, detect_trends def detect_regime(analyzed: pd.DataFrame, lookback: int = 30) -> str: if analyzed.empty: return "Unknown" window = analyzed.tail(max(5, lookback)) fake_ratio = float((window["classification"] == "fake").mean()) trend_changes = int((window["trend_state"] != window["trend_state"].shift(1)).sum()) if fake_ratio >= 0.55 or trend_changes >= max(3, len(window) // 8): return "Choppy" return "Trending" def compute_signal_quality( analyzed: pd.DataFrame, trend_now: str, volume_filter_enabled: bool, lookback: int = 20, ) -> dict[str, float | str]: if analyzed.empty: return {"score": 0.0, "label": "Low", "fake_ratio": 1.0} window = analyzed.tail(max(5, lookback)) fake_ratio = float((window["classification"] == "fake").mean()) real_ratio = 1.0 - fake_ratio trend_bonus = 0.2 if trend_now in {TREND_BULL, TREND_BEAR} else 0.0 volume_bonus = 0.1 if volume_filter_enabled else 0.0 score = min(1.0, max(0.0, real_ratio * 0.7 + trend_bonus + volume_bonus)) if score >= 0.75: label = "High" elif score >= 0.5: label = "Medium" else: label = "Low" return {"score": round(score * 100.0, 1), "label": label, "fake_ratio": round(fake_ratio * 100.0, 1)} @st.cache_data(show_spinner=False, ttl=120) def analyze_symbol_state( symbol: str, interval: str, period: str, use_body_range: bool, volume_filter_enabled: bool, volume_sma_window: int, volume_multiplier: float, ) -> dict[str, Any]: try: raw = fetch_ohlc(symbol=symbol, interval=interval, period=period) raw = maybe_drop_live_bar(raw, interval=interval, enabled=True) if len(raw) < 3: return {"symbol": symbol, "trend": "Insufficient", "regime": "Unknown"} analyzed, _ = detect_trends( classify_bars( raw, use_body_range=use_body_range, volume_filter_enabled=volume_filter_enabled, volume_sma_window=volume_sma_window, volume_multiplier=volume_multiplier, ) ) trend_now = str(analyzed.iloc[-1]["trend_state"]) return { "symbol": symbol, "trend": trend_now, "regime": detect_regime(analyzed), "fake_ratio": round(float((analyzed.tail(20)["classification"] == "fake").mean()) * 100.0, 1), } except Exception as exc: return {"symbol": symbol, "trend": "Error", "regime": "Unknown", "error": str(exc)} def multi_timeframe_confirmation( symbol: str, period: str, use_body_range: bool, volume_filter_enabled: bool, volume_sma_window: int, volume_multiplier: float, ) -> dict[str, Any]: intervals = ["1h", "4h", "1d"] states = [ analyze_symbol_state( symbol=symbol, interval=itv, period=period, use_body_range=use_body_range, volume_filter_enabled=volume_filter_enabled, volume_sma_window=volume_sma_window, volume_multiplier=volume_multiplier, ) for itv in intervals ] trends = [str(item.get("trend")) for item in states if str(item.get("trend")) in {TREND_BULL, TREND_BEAR}] bull_count = trends.count(TREND_BULL) bear_count = trends.count(TREND_BEAR) agreement = max(bull_count, bear_count) / 3.0 consensus = TREND_NEUTRAL if bull_count > bear_count: consensus = TREND_BULL elif bear_count > bull_count: consensus = TREND_BEAR return {"states": states, "agreement_pct": round(agreement * 100.0, 1), "consensus": consensus} def compare_symbols( symbols: list[str], interval: str, period: str, use_body_range: bool, volume_filter_enabled: bool, volume_sma_window: int, volume_multiplier: float, ) -> pd.DataFrame: rows: list[dict[str, Any]] = [] for symbol in symbols: state = analyze_symbol_state( symbol=symbol, interval=interval, period=period, use_body_range=use_body_range, volume_filter_enabled=volume_filter_enabled, volume_sma_window=volume_sma_window, volume_multiplier=volume_multiplier, ) rows.append(state) return pd.DataFrame(rows) def compute_session_stats( analyzed: pd.DataFrame, session_started_at: float, ) -> dict[str, float | int]: if analyzed.empty: return {"wins": 0, "losses": 0, "avg_move_pct": 0.0, "fake_ratio_pct": 0.0} idx = analyzed.index if idx.tz is None: cutoff = pd.Timestamp(datetime.fromtimestamp(session_started_at)) else: cutoff = pd.Timestamp(datetime.fromtimestamp(session_started_at, tz=timezone.utc)).tz_convert(idx.tz) window = analyzed[analyzed.index >= cutoff] if window.empty: window = analyzed.tail(30) wins = 0 losses = 0 moves: list[float] = [] for i in range(1, len(window) - 1): trend = str(window.iloc[i]["trend_state"]) prev = str(window.iloc[i - 1]["trend_state"]) if trend == prev or trend not in {TREND_BULL, TREND_BEAR}: continue entry = float(window.iloc[i]["Close"]) nxt = float(window.iloc[i + 1]["Close"]) move = ((nxt - entry) / entry) * 100.0 if trend == TREND_BULL else ((entry - nxt) / entry) * 100.0 moves.append(move) wins += int(move > 0) losses += int(move <= 0) fake_ratio_pct = float((window["classification"] == "fake").mean()) * 100.0 avg_move_pct = (sum(moves) / len(moves)) if moves else 0.0 return { "wins": wins, "losses": losses, "avg_move_pct": round(avg_move_pct, 2), "fake_ratio_pct": round(fake_ratio_pct, 2), } def send_webhook_alert(url: str, payload: dict[str, Any]) -> tuple[bool, str]: target = str(url or "").strip() if not target: return False, "No webhook URL configured." try: body = json.dumps(payload).encode("utf-8") request = urllib.request.Request( target, data=body, headers={"Content-Type": "application/json"}, method="POST", ) with urllib.request.urlopen(request, timeout=5) as response: code = int(response.status) return (200 <= code < 300), f"Webhook responded with status {code}" except Exception as exc: return False, str(exc)