maneshtrader/app.py

517 lines
18 KiB
Python

from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import pandas as pd
import streamlit as st
import yfinance as yf
from streamlit_autorefresh import st_autorefresh
from manesh_trader.analytics import backtest_signals
from manesh_trader.charting import build_figure
from manesh_trader.constants import INTERVAL_OPTIONS, PERIOD_OPTIONS
from manesh_trader.data import fetch_ohlc, maybe_drop_live_bar
from manesh_trader.exporting import df_for_export
from manesh_trader.strategy import classify_bars, detect_trends
SETTINGS_PATH = Path.home() / ".manesh_trader" / "settings.json"
def _clamp_int(value: Any, fallback: int, minimum: int, maximum: int) -> int:
try:
parsed = int(value)
except (TypeError, ValueError):
return fallback
return min(maximum, max(minimum, parsed))
def _clamp_float(value: Any, fallback: float, minimum: float, maximum: float) -> float:
try:
parsed = float(value)
except (TypeError, ValueError):
return fallback
return min(maximum, max(minimum, parsed))
def _to_bool(value: Any, fallback: bool) -> bool:
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return value != 0
if value is None:
return fallback
normalized = str(value).strip().lower()
if normalized in {"1", "true", "yes", "y", "on"}:
return True
if normalized in {"0", "false", "no", "n", "off"}:
return False
return fallback
def _clamp_max_bars(value: Any, fallback: int = 500) -> int:
return _clamp_int(value=value, fallback=fallback, minimum=20, maximum=5000)
def normalize_web_settings(raw: dict[str, Any] | None) -> dict[str, Any]:
raw = raw or {}
defaults: dict[str, Any] = {
"symbol": "AAPL",
"interval": "1d",
"period": "6mo",
"max_bars": 500,
"drop_live": True,
"use_body_range": False,
"volume_filter_enabled": False,
"volume_sma_window": 20,
"volume_multiplier": 1.0,
"gray_fake": True,
"hide_market_closed_gaps": True,
"enable_auto_refresh": False,
"refresh_sec": 60,
}
symbol = str(raw.get("symbol", defaults["symbol"])).strip().upper()
if not symbol:
symbol = str(defaults["symbol"])
interval = str(raw.get("interval", defaults["interval"]))
if interval not in INTERVAL_OPTIONS:
interval = str(defaults["interval"])
period = str(raw.get("period", defaults["period"]))
if period not in PERIOD_OPTIONS:
period = str(defaults["period"])
max_bars = _clamp_max_bars(raw.get("max_bars"), fallback=int(defaults["max_bars"]))
drop_live = _to_bool(raw.get("drop_live"), fallback=bool(defaults["drop_live"]))
use_body_range = _to_bool(raw.get("use_body_range"), fallback=bool(defaults["use_body_range"]))
volume_filter_enabled = _to_bool(
raw.get("volume_filter_enabled"), fallback=bool(defaults["volume_filter_enabled"])
)
volume_sma_window = _clamp_int(
raw.get("volume_sma_window"),
fallback=int(defaults["volume_sma_window"]),
minimum=2,
maximum=100,
)
volume_multiplier = round(
_clamp_float(
raw.get("volume_multiplier"),
fallback=float(defaults["volume_multiplier"]),
minimum=0.1,
maximum=3.0,
),
1,
)
gray_fake = _to_bool(raw.get("gray_fake"), fallback=bool(defaults["gray_fake"]))
hide_market_closed_gaps = _to_bool(
raw.get("hide_market_closed_gaps"),
fallback=bool(defaults["hide_market_closed_gaps"]),
)
enable_auto_refresh = _to_bool(raw.get("enable_auto_refresh"), fallback=bool(defaults["enable_auto_refresh"]))
refresh_sec = _clamp_int(
raw.get("refresh_sec"),
fallback=int(defaults["refresh_sec"]),
minimum=10,
maximum=600,
)
return {
"symbol": symbol,
"interval": interval,
"period": period,
"max_bars": max_bars,
"drop_live": drop_live,
"use_body_range": use_body_range,
"volume_filter_enabled": volume_filter_enabled,
"volume_sma_window": volume_sma_window,
"volume_multiplier": volume_multiplier,
"gray_fake": gray_fake,
"hide_market_closed_gaps": hide_market_closed_gaps,
"enable_auto_refresh": enable_auto_refresh,
"refresh_sec": refresh_sec,
}
def load_web_settings() -> dict[str, Any]:
if not SETTINGS_PATH.exists():
return normalize_web_settings(None)
try:
payload = json.loads(SETTINGS_PATH.read_text(encoding="utf-8"))
if not isinstance(payload, dict):
return normalize_web_settings(None)
return normalize_web_settings(payload)
except Exception:
return normalize_web_settings(None)
def save_web_settings(settings: dict[str, Any]) -> None:
SETTINGS_PATH.parent.mkdir(parents=True, exist_ok=True)
SETTINGS_PATH.write_text(json.dumps(normalize_web_settings(settings), indent=2), encoding="utf-8")
@st.cache_data(show_spinner=False, ttl=3600)
def lookup_symbol_candidates(query: str, max_results: int = 10) -> list[dict[str, str]]:
cleaned = query.strip()
if len(cleaned) < 2:
return []
try:
search = yf.Search(cleaned, max_results=max_results)
quotes = getattr(search, "quotes", []) or []
except Exception:
return []
seen_symbols: set[str] = set()
candidates: list[dict[str, str]] = []
for quote in quotes:
symbol = str(quote.get("symbol", "")).strip().upper()
if not symbol or symbol in seen_symbols:
continue
name = str(quote.get("shortname") or quote.get("longname") or "").strip()
exchange = str(quote.get("exchDisp") or quote.get("exchange") or "").strip()
type_display = str(quote.get("typeDisp") or quote.get("quoteType") or "").strip()
seen_symbols.add(symbol)
candidates.append(
{
"symbol": symbol,
"name": name,
"exchange": exchange,
"type": type_display,
}
)
return candidates
@st.cache_data(show_spinner=False, ttl=3600)
def resolve_symbol_identity(symbol: str) -> dict[str, str]:
normalized_symbol = symbol.strip().upper()
if not normalized_symbol:
return {"symbol": "", "name": "", "exchange": ""}
def _from_quote(quote: dict[str, Any]) -> dict[str, str]:
return {
"symbol": normalized_symbol,
"name": str(quote.get("shortname") or quote.get("longname") or "").strip(),
"exchange": str(quote.get("exchDisp") or quote.get("exchange") or "").strip(),
}
try:
search = yf.Search(normalized_symbol, max_results=8)
quotes = getattr(search, "quotes", []) or []
for quote in quotes:
candidate_symbol = str(quote.get("symbol", "")).strip().upper()
if candidate_symbol == normalized_symbol:
return _from_quote(quote)
if quotes:
return _from_quote(quotes[0])
except Exception:
pass
try:
info = yf.Ticker(normalized_symbol).info
return {
"symbol": normalized_symbol,
"name": str(info.get("shortName") or info.get("longName") or "").strip(),
"exchange": str(info.get("exchange") or "").strip(),
}
except Exception:
return {"symbol": normalized_symbol, "name": "", "exchange": ""}
@st.cache_data(show_spinner=False)
def load_onboarding_markdown() -> str:
onboarding_path = Path(__file__).with_name("ONBOARDING.md")
if onboarding_path.exists():
return onboarding_path.read_text(encoding="utf-8")
return "ONBOARDING.md not found in project root."
@st.dialog("Onboarding Guide", width="large")
def onboarding_dialog() -> None:
st.markdown(load_onboarding_markdown())
def main() -> None:
st.set_page_config(page_title="Real Bars vs Fake Bars Analyzer", layout="wide")
st.title("Real Bars vs Fake Bars Trend Analyzer")
st.caption(
"Price-action tool that classifies closed bars, filters fake bars, and tracks trend persistence using only real bars."
)
if st.button("Open ONBOARDING.md", type="tertiary"):
onboarding_dialog()
with st.expander("Help / Quick Start", expanded=False):
st.markdown(
"""
**Start in 60 seconds**
1. Set a symbol like `AAPL` or `BTC-USD`.
2. Choose `Timeframe` (`1d` is a good default) and `Period` (`6mo`).
3. Keep **Ignore potentially live last bar** enabled.
4. Review trend status and markers:
- Green triangle: `real_bull`
- Red triangle: `real_bear`
- `fake` bars are noise and ignored by trend logic
5. Use **Export** to download CSV/PDF outputs.
**Rule summary**
- `real_bull`: close > previous high
- `real_bear`: close < previous low
- `fake`: close inside previous range
- Trend starts/reverses only after 2 consecutive real bars in that direction.
"""
)
with st.sidebar:
st.header("Data Settings")
query_params = st.query_params
persisted_settings = load_web_settings()
def first_query_value(key: str) -> str | None:
raw = query_params.get(key)
if raw is None:
return None
if isinstance(raw, list):
return str(raw[0]) if raw else None
return str(raw)
query_overrides: dict[str, Any] = {}
for key in persisted_settings:
candidate = first_query_value(key)
if candidate is not None:
query_overrides[key] = candidate
effective_defaults = normalize_web_settings({**persisted_settings, **query_overrides})
st.subheader("Find Symbol")
symbol_search_query = st.text_input(
"Search by company or ticker",
value="",
placeholder="Apple, Tesla, Bitcoin...",
help="Type a name (e.g. Apple) and select a result to prefill Symbol.",
)
symbol_from_search: str | None = None
if symbol_search_query.strip():
candidates = lookup_symbol_candidates(symbol_search_query)
if candidates:
result_placeholder = "Select a result..."
labels = [result_placeholder] + [
" | ".join(
[
candidate["symbol"],
candidate["name"] or "No name",
candidate["exchange"] or "Unknown exchange",
]
)
for candidate in candidates
]
selected_label = st.selectbox("Matches", labels, index=0)
if selected_label != result_placeholder:
selected_index = labels.index(selected_label) - 1
symbol_from_search = candidates[selected_index]["symbol"]
else:
st.caption("No matches found. Try another company name.")
symbol = st.text_input(
"Symbol",
value=symbol_from_search or str(effective_defaults["symbol"]),
help="Ticker or pair to analyze, e.g. AAPL, MSFT, BTC-USD.",
).strip().upper()
interval = st.selectbox(
"Timeframe",
INTERVAL_OPTIONS,
index=INTERVAL_OPTIONS.index(str(effective_defaults["interval"])),
help="Bar size for each candle. Shorter intervals are noisier; 1d is a good default.",
)
period = st.selectbox(
"Period",
PERIOD_OPTIONS,
index=PERIOD_OPTIONS.index(str(effective_defaults["period"])),
help="How much history to load for trend analysis.",
)
max_bars = st.number_input(
"Max bars",
min_value=20,
max_value=5000,
value=int(effective_defaults["max_bars"]),
step=10,
help="Limits loaded candles to keep charting responsive. 500 is a solid starting point.",
)
drop_live = st.checkbox("Ignore potentially live last bar", value=bool(effective_defaults["drop_live"]))
st.header("Classification Filters")
use_body_range = st.checkbox(
"Use previous body range (ignore wicks)",
value=bool(effective_defaults["use_body_range"]),
)
volume_filter_enabled = st.checkbox(
"Enable volume filter",
value=bool(effective_defaults["volume_filter_enabled"]),
)
volume_sma_window = st.slider(
"Volume SMA window",
2,
100,
int(effective_defaults["volume_sma_window"]),
)
volume_multiplier = st.slider(
"Min volume / SMA multiplier",
0.1,
3.0,
float(effective_defaults["volume_multiplier"]),
0.1,
)
gray_fake = st.checkbox("Gray out fake bars", value=bool(effective_defaults["gray_fake"]))
hide_market_closed_gaps = st.checkbox(
"Hide market-closed gaps (stocks)",
value=bool(effective_defaults["hide_market_closed_gaps"]),
)
st.header("Monitoring")
enable_auto_refresh = st.checkbox("Auto-refresh", value=bool(effective_defaults["enable_auto_refresh"]))
refresh_sec = st.slider("Refresh interval (seconds)", 10, 600, int(effective_defaults["refresh_sec"]), 10)
if enable_auto_refresh:
st_autorefresh(interval=refresh_sec * 1000, key="data_refresh")
try:
save_web_settings(
{
"symbol": symbol,
"interval": interval,
"period": period,
"max_bars": int(max_bars),
"drop_live": bool(drop_live),
"use_body_range": bool(use_body_range),
"volume_filter_enabled": bool(volume_filter_enabled),
"volume_sma_window": int(volume_sma_window),
"volume_multiplier": float(volume_multiplier),
"gray_fake": bool(gray_fake),
"hide_market_closed_gaps": bool(hide_market_closed_gaps),
"enable_auto_refresh": bool(enable_auto_refresh),
"refresh_sec": int(refresh_sec),
}
)
except Exception:
# Non-fatal: app should run even if local settings cannot be saved.
pass
if not symbol:
st.error("Please enter a symbol.")
st.stop()
symbol_identity = resolve_symbol_identity(symbol)
identity_name = symbol_identity["name"]
identity_exchange = symbol_identity["exchange"]
if identity_name:
st.markdown(f"### {symbol} - {identity_name}")
if identity_exchange:
st.caption(f"Exchange: {identity_exchange}")
else:
st.markdown(f"### {symbol}")
try:
raw = fetch_ohlc(symbol=symbol, interval=interval, period=period)
raw = maybe_drop_live_bar(raw, interval=interval, enabled=drop_live)
if len(raw) > max_bars:
raw = raw.iloc[-max_bars:].copy()
if len(raw) < 3:
st.error("Not enough bars to classify. Increase period or use a broader timeframe.")
st.stop()
classified = classify_bars(
raw,
use_body_range=use_body_range,
volume_filter_enabled=volume_filter_enabled,
volume_sma_window=volume_sma_window,
volume_multiplier=volume_multiplier,
)
analyzed, events = detect_trends(classified)
except Exception as exc:
st.error(f"Data error: {exc}")
st.stop()
latest = analyzed.iloc[-1]
trend_now = str(latest["trend_state"])
bull_count = int((analyzed["classification"] == "real_bull").sum())
bear_count = int((analyzed["classification"] == "real_bear").sum())
fake_count = int((analyzed["classification"] == "fake").sum())
c1, c2, c3, c4 = st.columns(4)
c1.metric("Current Trend", trend_now)
c2.metric("Real Bullish Bars", bull_count)
c3.metric("Real Bearish Bars", bear_count)
c4.metric("Fake Bars", fake_count)
alert_key = f"{symbol}-{interval}-{period}"
newest_event = events[-1].event if events else ""
previous_event = st.session_state.get(f"last_event-{alert_key}", "")
if newest_event and newest_event != previous_event:
st.warning(f"Alert: {newest_event}")
st.session_state[f"last_event-{alert_key}"] = newest_event
fig = build_figure(
analyzed,
gray_fake=gray_fake,
interval=interval,
hide_market_closed_gaps=hide_market_closed_gaps,
)
st.plotly_chart(fig, use_container_width=True)
bt = backtest_signals(analyzed)
st.subheader("Backtest Snapshot")
b1, b2, b3, b4 = st.columns(4)
b1.metric("Signals", int(bt["trades"]))
b2.metric("Wins", int(bt["wins"]))
b3.metric("Losses", int(bt["losses"]))
b4.metric("Win Rate", f"{bt['win_rate']}%")
st.caption("Method: trend-change signal, scored by next-bar direction. Educational only; not a trading recommendation.")
st.subheader("Trend Events")
if events:
event_df = pd.DataFrame(
{
"timestamp": [str(e.timestamp) for e in events[-25:]][::-1],
"event": [e.event for e in events[-25:]][::-1],
"trend_after": [e.trend_after for e in events[-25:]][::-1],
}
)
st.dataframe(event_df, use_container_width=True)
else:
st.info("No trend start/reversal events detected in the selected data window.")
st.subheader("Export")
export_df = df_for_export(analyzed)
csv_bytes = export_df.to_csv(index=False).encode("utf-8")
st.download_button(
"Download classified data (CSV)",
data=csv_bytes,
file_name=f"{symbol}_{interval}_classified.csv",
mime="text/csv",
)
try:
pdf_bytes = fig.to_image(format="pdf")
st.download_button(
"Download chart (PDF)",
data=pdf_bytes,
file_name=f"{symbol}_{interval}_chart.pdf",
mime="application/pdf",
)
except Exception:
st.caption("PDF export unavailable. Install `kaleido` and rerun to enable chart PDF downloads.")
with st.expander("Latest classified bars"):
st.dataframe(export_df.tail(30), use_container_width=True)
if __name__ == "__main__":
main()