82 lines
2.8 KiB
Python
82 lines
2.8 KiB
Python
from __future__ import annotations
|
|
|
|
import pandas as pd
|
|
|
|
from .constants import TREND_BEAR, TREND_BULL, TREND_NEUTRAL
|
|
from .models import TrendEvent
|
|
|
|
|
|
def classify_bars(
|
|
df: pd.DataFrame,
|
|
use_body_range: bool,
|
|
volume_filter_enabled: bool,
|
|
volume_sma_window: int,
|
|
volume_multiplier: float,
|
|
) -> pd.DataFrame:
|
|
result = df.copy()
|
|
result["classification"] = "unclassified"
|
|
|
|
if volume_filter_enabled:
|
|
vol_sma = result["Volume"].rolling(volume_sma_window, min_periods=1).mean()
|
|
result["volume_ok"] = result["Volume"] >= (vol_sma * volume_multiplier)
|
|
else:
|
|
result["volume_ok"] = True
|
|
|
|
for i in range(1, len(result)):
|
|
prev = result.iloc[i - 1]
|
|
cur = result.iloc[i]
|
|
|
|
prev_high = max(prev["Open"], prev["Close"]) if use_body_range else prev["High"]
|
|
prev_low = min(prev["Open"], prev["Close"]) if use_body_range else prev["Low"]
|
|
|
|
if not bool(cur["volume_ok"]):
|
|
result.iloc[i, result.columns.get_loc("classification")] = "fake"
|
|
elif cur["Close"] > prev_high:
|
|
result.iloc[i, result.columns.get_loc("classification")] = "real_bull"
|
|
elif cur["Close"] < prev_low:
|
|
result.iloc[i, result.columns.get_loc("classification")] = "real_bear"
|
|
else:
|
|
result.iloc[i, result.columns.get_loc("classification")] = "fake"
|
|
|
|
result.iloc[0, result.columns.get_loc("classification")] = "unclassified"
|
|
return result
|
|
|
|
|
|
def detect_trends(classified_df: pd.DataFrame) -> tuple[pd.DataFrame, list[TrendEvent]]:
|
|
out = classified_df.copy()
|
|
out["trend_state"] = TREND_NEUTRAL
|
|
|
|
trend_state = TREND_NEUTRAL
|
|
bull_run = 0
|
|
bear_run = 0
|
|
events: list[TrendEvent] = []
|
|
|
|
for idx, row in out.iterrows():
|
|
classification = row["classification"]
|
|
|
|
if classification == "real_bull":
|
|
bull_run += 1
|
|
bear_run = 0
|
|
|
|
if trend_state == TREND_NEUTRAL and bull_run >= 2:
|
|
trend_state = TREND_BULL
|
|
events.append(TrendEvent(idx, "Bullish trend started", trend_state))
|
|
elif trend_state == TREND_BEAR and bull_run >= 2:
|
|
trend_state = TREND_BULL
|
|
events.append(TrendEvent(idx, "Bullish reversal confirmed (2 real bullish bars)", trend_state))
|
|
|
|
elif classification == "real_bear":
|
|
bear_run += 1
|
|
bull_run = 0
|
|
|
|
if trend_state == TREND_NEUTRAL and bear_run >= 2:
|
|
trend_state = TREND_BEAR
|
|
events.append(TrendEvent(idx, "Bearish trend started", trend_state))
|
|
elif trend_state == TREND_BULL and bear_run >= 2:
|
|
trend_state = TREND_BEAR
|
|
events.append(TrendEvent(idx, "Bearish reversal confirmed (2 real bearish bars)", trend_state))
|
|
|
|
out.at[idx, "trend_state"] = trend_state
|
|
|
|
return out, events
|