maneshtrader/web_core/data.py

146 lines
4.3 KiB
Python

from __future__ import annotations
from datetime import datetime, timedelta
import re
import pandas as pd
import streamlit as st
import yfinance as yf
_DAY_PERIOD_PATTERN = re.compile(r"^([1-9]\d*)d$")
_DAY_PERIOD_FETCH_ATTEMPTS = 5
def _utc_now() -> datetime:
return datetime.utcnow()
def _parse_day_period(period: str) -> int | None:
match = _DAY_PERIOD_PATTERN.match(str(period).strip().lower())
if not match:
return None
return int(match.group(1))
def _intraday_max_lookback_days(interval: str) -> int | None:
interval_key = str(interval).strip().lower()
if interval_key == "1m":
return 7
if interval_key in {"2m", "5m", "15m", "30m", "60m", "90m", "1h"}:
return 60
return None
def _initial_calendar_lookback_days(trading_days: int) -> int:
# Expand beyond target trading days to account for weekends and holidays.
return max(trading_days + 2, int((trading_days * 7) / 5) + 3)
def _trading_day_count(df: pd.DataFrame) -> int:
if df.empty:
return 0
index = pd.DatetimeIndex(df.index)
return int(index.normalize().nunique())
def _trim_to_recent_trading_days(df: pd.DataFrame, trading_days: int) -> pd.DataFrame:
if df.empty or trading_days <= 0:
return df.copy()
index = pd.DatetimeIndex(df.index)
session_days = pd.DatetimeIndex(index.normalize().unique()).sort_values()
if len(session_days) <= trading_days:
return df.copy()
keep_days = session_days[-trading_days:]
mask = index.normalize().isin(keep_days)
return df.loc[mask].copy()
def _fetch_history_for_period(ticker: yf.Ticker, interval: str, period: str) -> pd.DataFrame:
history_kwargs = {"interval": interval, "auto_adjust": False, "actions": False}
day_period = _parse_day_period(period)
if day_period is None:
return ticker.history(period=period, **history_kwargs)
lookback_days = _initial_calendar_lookback_days(day_period)
max_lookback_days = _intraday_max_lookback_days(interval)
if max_lookback_days is not None:
lookback_days = min(lookback_days, max_lookback_days)
now = _utc_now()
best_df = pd.DataFrame()
for _ in range(_DAY_PERIOD_FETCH_ATTEMPTS):
start = now - timedelta(days=lookback_days)
df = ticker.history(start=start, end=now, **history_kwargs)
if len(df) > len(best_df):
best_df = df
if _trading_day_count(df) >= day_period:
return df
if max_lookback_days is not None and lookback_days >= max_lookback_days:
break
next_lookback = int(lookback_days * 1.6) + 1
if max_lookback_days is not None:
next_lookback = min(next_lookback, max_lookback_days)
if next_lookback <= lookback_days:
break
lookback_days = next_lookback
if not best_df.empty:
return best_df
return ticker.history(period=period, **history_kwargs)
@st.cache_data(ttl=60, show_spinner=False)
def fetch_ohlc(symbol: str, interval: str, period: str) -> pd.DataFrame:
ticker = yf.Ticker(symbol)
df = _fetch_history_for_period(ticker=ticker, interval=interval, period=period)
if df.empty:
raise ValueError("No data returned. Check symbol/interval/period compatibility.")
df = df.rename(columns=str.title)
required = ["Open", "High", "Low", "Close", "Volume"]
missing = [c for c in required if c not in df.columns]
if missing:
raise ValueError(f"Missing required columns: {missing}")
out = df[required].dropna().copy()
day_period = _parse_day_period(period)
if day_period is not None:
out = _trim_to_recent_trading_days(out, day_period)
return out
def maybe_drop_live_bar(df: pd.DataFrame, interval: str, enabled: bool) -> pd.DataFrame:
if not enabled or len(df) < 2:
return df
last_ts = df.index[-1]
if last_ts.tzinfo is None:
now = datetime.utcnow()
else:
now = datetime.now(tz=last_ts.tzinfo)
delta = now - last_ts.to_pydatetime()
intraday_intervals = {
"1m": 1,
"2m": 2,
"5m": 5,
"15m": 15,
"30m": 30,
"60m": 60,
"90m": 90,
"1h": 60,
}
minutes = intraday_intervals.get(interval)
if minutes is not None and delta.total_seconds() < minutes * 60:
return df.iloc[:-1].copy()
return df