from __future__ import annotations import json from pathlib import Path from typing import Any import pandas as pd import streamlit as st import yfinance as yf from streamlit_autorefresh import st_autorefresh from manesh_trader.analytics import backtest_signals from manesh_trader.charting import build_figure from manesh_trader.constants import INTERVAL_OPTIONS, PERIOD_OPTIONS from manesh_trader.data import fetch_ohlc, maybe_drop_live_bar from manesh_trader.exporting import df_for_export from manesh_trader.strategy import classify_bars, detect_trends SETTINGS_PATH = Path.home() / ".manesh_trader" / "settings.json" def _clamp_int(value: Any, fallback: int, minimum: int, maximum: int) -> int: try: parsed = int(value) except (TypeError, ValueError): return fallback return min(maximum, max(minimum, parsed)) def _clamp_float(value: Any, fallback: float, minimum: float, maximum: float) -> float: try: parsed = float(value) except (TypeError, ValueError): return fallback return min(maximum, max(minimum, parsed)) def _to_bool(value: Any, fallback: bool) -> bool: if isinstance(value, bool): return value if isinstance(value, (int, float)): return value != 0 if value is None: return fallback normalized = str(value).strip().lower() if normalized in {"1", "true", "yes", "y", "on"}: return True if normalized in {"0", "false", "no", "n", "off"}: return False return fallback def _clamp_max_bars(value: Any, fallback: int = 500) -> int: return _clamp_int(value=value, fallback=fallback, minimum=20, maximum=5000) def normalize_web_settings(raw: dict[str, Any] | None) -> dict[str, Any]: raw = raw or {} defaults: dict[str, Any] = { "symbol": "AAPL", "interval": "1d", "period": "6mo", "max_bars": 500, "drop_live": True, "use_body_range": False, "volume_filter_enabled": False, "volume_sma_window": 20, "volume_multiplier": 1.0, "gray_fake": True, "hide_market_closed_gaps": True, "enable_auto_refresh": False, "refresh_sec": 60, } symbol = str(raw.get("symbol", defaults["symbol"])).strip().upper() if not symbol: symbol = str(defaults["symbol"]) interval = str(raw.get("interval", defaults["interval"])) if interval not in INTERVAL_OPTIONS: interval = str(defaults["interval"]) period = str(raw.get("period", defaults["period"])) if period not in PERIOD_OPTIONS: period = str(defaults["period"]) max_bars = _clamp_max_bars(raw.get("max_bars"), fallback=int(defaults["max_bars"])) drop_live = _to_bool(raw.get("drop_live"), fallback=bool(defaults["drop_live"])) use_body_range = _to_bool(raw.get("use_body_range"), fallback=bool(defaults["use_body_range"])) volume_filter_enabled = _to_bool( raw.get("volume_filter_enabled"), fallback=bool(defaults["volume_filter_enabled"]) ) volume_sma_window = _clamp_int( raw.get("volume_sma_window"), fallback=int(defaults["volume_sma_window"]), minimum=2, maximum=100, ) volume_multiplier = round( _clamp_float( raw.get("volume_multiplier"), fallback=float(defaults["volume_multiplier"]), minimum=0.1, maximum=3.0, ), 1, ) gray_fake = _to_bool(raw.get("gray_fake"), fallback=bool(defaults["gray_fake"])) hide_market_closed_gaps = _to_bool( raw.get("hide_market_closed_gaps"), fallback=bool(defaults["hide_market_closed_gaps"]), ) enable_auto_refresh = _to_bool(raw.get("enable_auto_refresh"), fallback=bool(defaults["enable_auto_refresh"])) refresh_sec = _clamp_int( raw.get("refresh_sec"), fallback=int(defaults["refresh_sec"]), minimum=10, maximum=600, ) return { "symbol": symbol, "interval": interval, "period": period, "max_bars": max_bars, "drop_live": drop_live, "use_body_range": use_body_range, "volume_filter_enabled": volume_filter_enabled, "volume_sma_window": volume_sma_window, "volume_multiplier": volume_multiplier, "gray_fake": gray_fake, "hide_market_closed_gaps": hide_market_closed_gaps, "enable_auto_refresh": enable_auto_refresh, "refresh_sec": refresh_sec, } def load_web_settings() -> dict[str, Any]: if not SETTINGS_PATH.exists(): return normalize_web_settings(None) try: payload = json.loads(SETTINGS_PATH.read_text(encoding="utf-8")) if not isinstance(payload, dict): return normalize_web_settings(None) return normalize_web_settings(payload) except Exception: return normalize_web_settings(None) def save_web_settings(settings: dict[str, Any]) -> None: SETTINGS_PATH.parent.mkdir(parents=True, exist_ok=True) SETTINGS_PATH.write_text(json.dumps(normalize_web_settings(settings), indent=2), encoding="utf-8") @st.cache_data(show_spinner=False, ttl=3600) def lookup_symbol_candidates(query: str, max_results: int = 10) -> list[dict[str, str]]: cleaned = query.strip() if len(cleaned) < 2: return [] try: search = yf.Search(cleaned, max_results=max_results) quotes = getattr(search, "quotes", []) or [] except Exception: return [] seen_symbols: set[str] = set() candidates: list[dict[str, str]] = [] for quote in quotes: symbol = str(quote.get("symbol", "")).strip().upper() if not symbol or symbol in seen_symbols: continue name = str(quote.get("shortname") or quote.get("longname") or "").strip() exchange = str(quote.get("exchDisp") or quote.get("exchange") or "").strip() type_display = str(quote.get("typeDisp") or quote.get("quoteType") or "").strip() seen_symbols.add(symbol) candidates.append( { "symbol": symbol, "name": name, "exchange": exchange, "type": type_display, } ) return candidates @st.cache_data(show_spinner=False, ttl=3600) def resolve_symbol_identity(symbol: str) -> dict[str, str]: normalized_symbol = symbol.strip().upper() if not normalized_symbol: return {"symbol": "", "name": "", "exchange": ""} def _from_quote(quote: dict[str, Any]) -> dict[str, str]: return { "symbol": normalized_symbol, "name": str(quote.get("shortname") or quote.get("longname") or "").strip(), "exchange": str(quote.get("exchDisp") or quote.get("exchange") or "").strip(), } try: search = yf.Search(normalized_symbol, max_results=8) quotes = getattr(search, "quotes", []) or [] for quote in quotes: candidate_symbol = str(quote.get("symbol", "")).strip().upper() if candidate_symbol == normalized_symbol: return _from_quote(quote) if quotes: return _from_quote(quotes[0]) except Exception: pass try: info = yf.Ticker(normalized_symbol).info return { "symbol": normalized_symbol, "name": str(info.get("shortName") or info.get("longName") or "").strip(), "exchange": str(info.get("exchange") or "").strip(), } except Exception: return {"symbol": normalized_symbol, "name": "", "exchange": ""} @st.cache_data(show_spinner=False) def load_onboarding_markdown() -> str: onboarding_path = Path(__file__).with_name("ONBOARDING.md") if onboarding_path.exists(): return onboarding_path.read_text(encoding="utf-8") return "ONBOARDING.md not found in project root." @st.dialog("Onboarding Guide", width="large") def onboarding_dialog() -> None: st.markdown(load_onboarding_markdown()) def main() -> None: st.set_page_config(page_title="Real Bars vs Fake Bars Analyzer", layout="wide") st.title("Real Bars vs Fake Bars Trend Analyzer") st.caption( "Price-action tool that classifies closed bars, filters fake bars, and tracks trend persistence using only real bars." ) if st.button("Open ONBOARDING.md", type="tertiary"): onboarding_dialog() with st.expander("Help / Quick Start", expanded=False): st.markdown( """ **Start in 60 seconds** 1. Set a symbol like `AAPL` or `BTC-USD`. 2. Choose `Timeframe` (`1d` is a good default) and `Period` (`6mo`). 3. Keep **Ignore potentially live last bar** enabled. 4. Review trend status and markers: - Green triangle: `real_bull` - Red triangle: `real_bear` - `fake` bars are noise and ignored by trend logic 5. Use **Export** to download CSV/PDF outputs. **Rule summary** - `real_bull`: close > previous high - `real_bear`: close < previous low - `fake`: close inside previous range - Trend starts/reverses only after 2 consecutive real bars in that direction. """ ) with st.sidebar: st.header("Data Settings") query_params = st.query_params persisted_settings = load_web_settings() def first_query_value(key: str) -> str | None: raw = query_params.get(key) if raw is None: return None if isinstance(raw, list): return str(raw[0]) if raw else None return str(raw) query_overrides: dict[str, Any] = {} for key in persisted_settings: candidate = first_query_value(key) if candidate is not None: query_overrides[key] = candidate effective_defaults = normalize_web_settings({**persisted_settings, **query_overrides}) st.subheader("Find Symbol") symbol_search_query = st.text_input( "Search by company or ticker", value="", placeholder="Apple, Tesla, Bitcoin...", help="Type a name (e.g. Apple) and select a result to prefill Symbol.", ) symbol_from_search: str | None = None if symbol_search_query.strip(): candidates = lookup_symbol_candidates(symbol_search_query) if candidates: result_placeholder = "Select a result..." labels = [result_placeholder] + [ " | ".join( [ candidate["symbol"], candidate["name"] or "No name", candidate["exchange"] or "Unknown exchange", ] ) for candidate in candidates ] selected_label = st.selectbox("Matches", labels, index=0) if selected_label != result_placeholder: selected_index = labels.index(selected_label) - 1 symbol_from_search = candidates[selected_index]["symbol"] else: st.caption("No matches found. Try another company name.") symbol = st.text_input( "Symbol", value=symbol_from_search or str(effective_defaults["symbol"]), help="Ticker or pair to analyze, e.g. AAPL, MSFT, BTC-USD.", ).strip().upper() interval = st.selectbox( "Timeframe", INTERVAL_OPTIONS, index=INTERVAL_OPTIONS.index(str(effective_defaults["interval"])), help="Bar size for each candle. Shorter intervals are noisier; 1d is a good default.", ) period = st.selectbox( "Period", PERIOD_OPTIONS, index=PERIOD_OPTIONS.index(str(effective_defaults["period"])), help="How much history to load for trend analysis.", ) max_bars = st.number_input( "Max bars", min_value=20, max_value=5000, value=int(effective_defaults["max_bars"]), step=10, help="Limits loaded candles to keep charting responsive. 500 is a solid starting point.", ) drop_live = st.checkbox("Ignore potentially live last bar", value=bool(effective_defaults["drop_live"])) st.header("Classification Filters") use_body_range = st.checkbox( "Use previous body range (ignore wicks)", value=bool(effective_defaults["use_body_range"]), ) volume_filter_enabled = st.checkbox( "Enable volume filter", value=bool(effective_defaults["volume_filter_enabled"]), ) volume_sma_window = st.slider( "Volume SMA window", 2, 100, int(effective_defaults["volume_sma_window"]), ) volume_multiplier = st.slider( "Min volume / SMA multiplier", 0.1, 3.0, float(effective_defaults["volume_multiplier"]), 0.1, ) gray_fake = st.checkbox("Gray out fake bars", value=bool(effective_defaults["gray_fake"])) hide_market_closed_gaps = st.checkbox( "Hide market-closed gaps (stocks)", value=bool(effective_defaults["hide_market_closed_gaps"]), ) st.header("Monitoring") enable_auto_refresh = st.checkbox("Auto-refresh", value=bool(effective_defaults["enable_auto_refresh"])) refresh_sec = st.slider("Refresh interval (seconds)", 10, 600, int(effective_defaults["refresh_sec"]), 10) if enable_auto_refresh: st_autorefresh(interval=refresh_sec * 1000, key="data_refresh") try: save_web_settings( { "symbol": symbol, "interval": interval, "period": period, "max_bars": int(max_bars), "drop_live": bool(drop_live), "use_body_range": bool(use_body_range), "volume_filter_enabled": bool(volume_filter_enabled), "volume_sma_window": int(volume_sma_window), "volume_multiplier": float(volume_multiplier), "gray_fake": bool(gray_fake), "hide_market_closed_gaps": bool(hide_market_closed_gaps), "enable_auto_refresh": bool(enable_auto_refresh), "refresh_sec": int(refresh_sec), } ) except Exception: # Non-fatal: app should run even if local settings cannot be saved. pass if not symbol: st.error("Please enter a symbol.") st.stop() symbol_identity = resolve_symbol_identity(symbol) identity_name = symbol_identity["name"] identity_exchange = symbol_identity["exchange"] if identity_name: st.markdown(f"### {symbol} - {identity_name}") if identity_exchange: st.caption(f"Exchange: {identity_exchange}") else: st.markdown(f"### {symbol}") try: raw = fetch_ohlc(symbol=symbol, interval=interval, period=period) raw = maybe_drop_live_bar(raw, interval=interval, enabled=drop_live) if len(raw) > max_bars: raw = raw.iloc[-max_bars:].copy() if len(raw) < 3: st.error("Not enough bars to classify. Increase period or use a broader timeframe.") st.stop() classified = classify_bars( raw, use_body_range=use_body_range, volume_filter_enabled=volume_filter_enabled, volume_sma_window=volume_sma_window, volume_multiplier=volume_multiplier, ) analyzed, events = detect_trends(classified) except Exception as exc: st.error(f"Data error: {exc}") st.stop() latest = analyzed.iloc[-1] trend_now = str(latest["trend_state"]) bull_count = int((analyzed["classification"] == "real_bull").sum()) bear_count = int((analyzed["classification"] == "real_bear").sum()) fake_count = int((analyzed["classification"] == "fake").sum()) c1, c2, c3, c4 = st.columns(4) c1.metric("Current Trend", trend_now) c2.metric("Real Bullish Bars", bull_count) c3.metric("Real Bearish Bars", bear_count) c4.metric("Fake Bars", fake_count) alert_key = f"{symbol}-{interval}-{period}" newest_event = events[-1].event if events else "" previous_event = st.session_state.get(f"last_event-{alert_key}", "") if newest_event and newest_event != previous_event: st.warning(f"Alert: {newest_event}") st.session_state[f"last_event-{alert_key}"] = newest_event fig = build_figure( analyzed, gray_fake=gray_fake, interval=interval, hide_market_closed_gaps=hide_market_closed_gaps, ) st.plotly_chart(fig, use_container_width=True) bt = backtest_signals(analyzed) st.subheader("Backtest Snapshot") b1, b2, b3, b4 = st.columns(4) b1.metric("Signals", int(bt["trades"])) b2.metric("Wins", int(bt["wins"])) b3.metric("Losses", int(bt["losses"])) b4.metric("Win Rate", f"{bt['win_rate']}%") st.caption("Method: trend-change signal, scored by next-bar direction. Educational only; not a trading recommendation.") st.subheader("Trend Events") if events: event_df = pd.DataFrame( { "timestamp": [str(e.timestamp) for e in events[-25:]][::-1], "event": [e.event for e in events[-25:]][::-1], "trend_after": [e.trend_after for e in events[-25:]][::-1], } ) st.dataframe(event_df, use_container_width=True) else: st.info("No trend start/reversal events detected in the selected data window.") st.subheader("Export") export_df = df_for_export(analyzed) csv_bytes = export_df.to_csv(index=False).encode("utf-8") st.download_button( "Download classified data (CSV)", data=csv_bytes, file_name=f"{symbol}_{interval}_classified.csv", mime="text/csv", ) try: pdf_bytes = fig.to_image(format="pdf") st.download_button( "Download chart (PDF)", data=pdf_bytes, file_name=f"{symbol}_{interval}_chart.pdf", mime="application/pdf", ) except Exception: st.caption("PDF export unavailable. Install `kaleido` and rerun to enable chart PDF downloads.") with st.expander("Latest classified bars"): st.dataframe(export_df.tail(30), use_container_width=True) if __name__ == "__main__": main()