204 lines
6.7 KiB
Python
204 lines
6.7 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import urllib.request
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
import pandas as pd
|
|
import streamlit as st
|
|
|
|
from web_core.constants import TREND_BEAR, TREND_BULL, TREND_NEUTRAL
|
|
from web_core.data import fetch_ohlc, maybe_drop_live_bar
|
|
from web_core.strategy import classify_bars, detect_trends
|
|
|
|
|
|
def detect_regime(analyzed: pd.DataFrame, lookback: int = 30) -> str:
|
|
if analyzed.empty:
|
|
return "Unknown"
|
|
window = analyzed.tail(max(5, lookback))
|
|
fake_ratio = float((window["classification"] == "fake").mean())
|
|
trend_changes = int((window["trend_state"] != window["trend_state"].shift(1)).sum())
|
|
if fake_ratio >= 0.55 or trend_changes >= max(3, len(window) // 8):
|
|
return "Choppy"
|
|
return "Trending"
|
|
|
|
|
|
def compute_signal_quality(
|
|
analyzed: pd.DataFrame,
|
|
trend_now: str,
|
|
volume_filter_enabled: bool,
|
|
lookback: int = 20,
|
|
) -> dict[str, float | str]:
|
|
if analyzed.empty:
|
|
return {"score": 0.0, "label": "Low", "fake_ratio": 1.0}
|
|
|
|
window = analyzed.tail(max(5, lookback))
|
|
fake_ratio = float((window["classification"] == "fake").mean())
|
|
real_ratio = 1.0 - fake_ratio
|
|
trend_bonus = 0.2 if trend_now in {TREND_BULL, TREND_BEAR} else 0.0
|
|
volume_bonus = 0.1 if volume_filter_enabled else 0.0
|
|
score = min(1.0, max(0.0, real_ratio * 0.7 + trend_bonus + volume_bonus))
|
|
|
|
if score >= 0.75:
|
|
label = "High"
|
|
elif score >= 0.5:
|
|
label = "Medium"
|
|
else:
|
|
label = "Low"
|
|
return {"score": round(score * 100.0, 1), "label": label, "fake_ratio": round(fake_ratio * 100.0, 1)}
|
|
|
|
|
|
@st.cache_data(show_spinner=False, ttl=120)
|
|
def analyze_symbol_state(
|
|
symbol: str,
|
|
interval: str,
|
|
period: str,
|
|
use_body_range: bool,
|
|
volume_filter_enabled: bool,
|
|
volume_sma_window: int,
|
|
volume_multiplier: float,
|
|
) -> dict[str, Any]:
|
|
try:
|
|
raw = fetch_ohlc(symbol=symbol, interval=interval, period=period)
|
|
raw = maybe_drop_live_bar(raw, interval=interval, enabled=True)
|
|
if len(raw) < 3:
|
|
return {"symbol": symbol, "trend": "Insufficient", "regime": "Unknown"}
|
|
analyzed, _ = detect_trends(
|
|
classify_bars(
|
|
raw,
|
|
use_body_range=use_body_range,
|
|
volume_filter_enabled=volume_filter_enabled,
|
|
volume_sma_window=volume_sma_window,
|
|
volume_multiplier=volume_multiplier,
|
|
)
|
|
)
|
|
trend_now = str(analyzed.iloc[-1]["trend_state"])
|
|
return {
|
|
"symbol": symbol,
|
|
"trend": trend_now,
|
|
"regime": detect_regime(analyzed),
|
|
"fake_ratio": round(float((analyzed.tail(20)["classification"] == "fake").mean()) * 100.0, 1),
|
|
}
|
|
except Exception as exc:
|
|
return {"symbol": symbol, "trend": "Error", "regime": "Unknown", "error": str(exc)}
|
|
|
|
|
|
def multi_timeframe_confirmation(
|
|
symbol: str,
|
|
period: str,
|
|
use_body_range: bool,
|
|
volume_filter_enabled: bool,
|
|
volume_sma_window: int,
|
|
volume_multiplier: float,
|
|
) -> dict[str, Any]:
|
|
intervals = ["1h", "4h", "1d"]
|
|
states = [
|
|
analyze_symbol_state(
|
|
symbol=symbol,
|
|
interval=itv,
|
|
period=period,
|
|
use_body_range=use_body_range,
|
|
volume_filter_enabled=volume_filter_enabled,
|
|
volume_sma_window=volume_sma_window,
|
|
volume_multiplier=volume_multiplier,
|
|
)
|
|
for itv in intervals
|
|
]
|
|
trends = [str(item.get("trend")) for item in states if str(item.get("trend")) in {TREND_BULL, TREND_BEAR}]
|
|
bull_count = trends.count(TREND_BULL)
|
|
bear_count = trends.count(TREND_BEAR)
|
|
agreement = max(bull_count, bear_count) / 3.0
|
|
consensus = TREND_NEUTRAL
|
|
if bull_count > bear_count:
|
|
consensus = TREND_BULL
|
|
elif bear_count > bull_count:
|
|
consensus = TREND_BEAR
|
|
|
|
return {"states": states, "agreement_pct": round(agreement * 100.0, 1), "consensus": consensus}
|
|
|
|
|
|
def compare_symbols(
|
|
symbols: list[str],
|
|
interval: str,
|
|
period: str,
|
|
use_body_range: bool,
|
|
volume_filter_enabled: bool,
|
|
volume_sma_window: int,
|
|
volume_multiplier: float,
|
|
) -> pd.DataFrame:
|
|
rows: list[dict[str, Any]] = []
|
|
for symbol in symbols:
|
|
state = analyze_symbol_state(
|
|
symbol=symbol,
|
|
interval=interval,
|
|
period=period,
|
|
use_body_range=use_body_range,
|
|
volume_filter_enabled=volume_filter_enabled,
|
|
volume_sma_window=volume_sma_window,
|
|
volume_multiplier=volume_multiplier,
|
|
)
|
|
rows.append(state)
|
|
return pd.DataFrame(rows)
|
|
|
|
|
|
def compute_session_stats(
|
|
analyzed: pd.DataFrame,
|
|
session_started_at: float,
|
|
) -> dict[str, float | int]:
|
|
if analyzed.empty:
|
|
return {"wins": 0, "losses": 0, "avg_move_pct": 0.0, "fake_ratio_pct": 0.0}
|
|
|
|
idx = analyzed.index
|
|
if idx.tz is None:
|
|
cutoff = pd.Timestamp(datetime.fromtimestamp(session_started_at))
|
|
else:
|
|
cutoff = pd.Timestamp(datetime.fromtimestamp(session_started_at, tz=timezone.utc)).tz_convert(idx.tz)
|
|
|
|
window = analyzed[analyzed.index >= cutoff]
|
|
if window.empty:
|
|
window = analyzed.tail(30)
|
|
|
|
wins = 0
|
|
losses = 0
|
|
moves: list[float] = []
|
|
for i in range(1, len(window) - 1):
|
|
trend = str(window.iloc[i]["trend_state"])
|
|
prev = str(window.iloc[i - 1]["trend_state"])
|
|
if trend == prev or trend not in {TREND_BULL, TREND_BEAR}:
|
|
continue
|
|
entry = float(window.iloc[i]["Close"])
|
|
nxt = float(window.iloc[i + 1]["Close"])
|
|
move = ((nxt - entry) / entry) * 100.0 if trend == TREND_BULL else ((entry - nxt) / entry) * 100.0
|
|
moves.append(move)
|
|
wins += int(move > 0)
|
|
losses += int(move <= 0)
|
|
|
|
fake_ratio_pct = float((window["classification"] == "fake").mean()) * 100.0
|
|
avg_move_pct = (sum(moves) / len(moves)) if moves else 0.0
|
|
return {
|
|
"wins": wins,
|
|
"losses": losses,
|
|
"avg_move_pct": round(avg_move_pct, 2),
|
|
"fake_ratio_pct": round(fake_ratio_pct, 2),
|
|
}
|
|
|
|
|
|
def send_webhook_alert(url: str, payload: dict[str, Any]) -> tuple[bool, str]:
|
|
target = str(url or "").strip()
|
|
if not target:
|
|
return False, "No webhook URL configured."
|
|
try:
|
|
body = json.dumps(payload).encode("utf-8")
|
|
request = urllib.request.Request(
|
|
target,
|
|
data=body,
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST",
|
|
)
|
|
with urllib.request.urlopen(request, timeout=5) as response:
|
|
code = int(response.status)
|
|
return (200 <= code < 300), f"Webhook responded with status {code}"
|
|
except Exception as exc:
|
|
return False, str(exc)
|