maneshtrader/web_core/auth/profile_store.py

233 lines
8.7 KiB
Python

from __future__ import annotations
import json
import time
from pathlib import Path
from typing import Any
import pandas as pd
from web_core.auth.profile_auth import (
DEFAULT_PROFILE_ID,
PROFILE_SESSION_TIMEOUT_SEC,
find_existing_profile_id,
first_query_param_value,
hash_profile_pin,
is_truthy_flag,
is_profile_session_expired,
normalize_pin,
normalize_profile_id,
profile_exists,
resolve_login_profile,
)
from web_core.settings.settings_schema import normalize_web_settings
from web_core.time_display import format_timestamp
SETTINGS_PATH = Path.home() / ".web_local_shell" / "settings.json"
LEGACY_SETTINGS_PATH = Path.home() / ".manesh_trader" / "settings.json"
def _normalize_epoch(value: Any) -> int | None:
if not isinstance(value, (int, float)):
return None
parsed = int(value)
return parsed if parsed >= 0 else None
def format_epoch(
value: Any,
display_timezone: str = "America/Chicago",
use_24h_time: bool = False,
) -> str:
parsed = _normalize_epoch(value)
if parsed is None:
return "n/a"
return format_timestamp(
pd.to_datetime(parsed, unit="s", utc=True),
display_timezone=display_timezone,
use_24h_time=use_24h_time,
)
def _load_raw_settings_payload() -> dict[str, Any] | None:
source_path = SETTINGS_PATH if SETTINGS_PATH.exists() else LEGACY_SETTINGS_PATH
if not source_path.exists():
return None
try:
payload = json.loads(source_path.read_text(encoding="utf-8"))
return payload if isinstance(payload, dict) else None
except Exception:
return None
def normalize_profile_record(profile_id: str, raw_profile_data: Any) -> dict[str, Any]:
now_epoch = int(time.time())
fallback_settings = normalize_web_settings(None)
settings: dict[str, Any]
pin_hash = ""
raw_audit: Any = {}
if isinstance(raw_profile_data, dict) and isinstance(raw_profile_data.get("settings"), dict):
settings = normalize_web_settings(raw_profile_data.get("settings"))
pin_hash = str(raw_profile_data.get("pin_hash") or "").strip()
if not pin_hash and isinstance(raw_profile_data.get("auth"), dict):
pin_hash = str(raw_profile_data["auth"].get("pin_hash") or "").strip()
raw_audit = raw_profile_data.get("audit")
elif isinstance(raw_profile_data, dict):
settings = normalize_web_settings(raw_profile_data)
else:
settings = fallback_settings
created_at = now_epoch
updated_at = now_epoch
last_login_at = None
last_symbol = str(settings.get("symbol", fallback_settings["symbol"]))
if isinstance(raw_audit, dict):
created_at = _normalize_epoch(raw_audit.get("created_at")) or now_epoch
updated_at = _normalize_epoch(raw_audit.get("updated_at")) or created_at
last_login_at = _normalize_epoch(raw_audit.get("last_login_at"))
raw_last_symbol = str(raw_audit.get("last_symbol") or "").strip().upper()
if raw_last_symbol:
last_symbol = raw_last_symbol
return {
"settings": settings,
"pin_hash": pin_hash,
"audit": {
"created_at": created_at,
"updated_at": updated_at,
"last_login_at": last_login_at,
"last_symbol": last_symbol,
},
}
def _write_settings_store(profile_records: dict[str, dict[str, Any]], last_profile: str) -> None:
serialized_profiles: dict[str, dict[str, Any]] = {}
for profile_id, record in profile_records.items():
serialized_profiles[profile_id] = normalize_profile_record(profile_id=profile_id, raw_profile_data=record)
payload = {
"last_profile": normalize_profile_id(last_profile),
"profiles": serialized_profiles,
}
SETTINGS_PATH.parent.mkdir(parents=True, exist_ok=True)
SETTINGS_PATH.write_text(json.dumps(payload, indent=2), encoding="utf-8")
def _load_profile_records() -> tuple[dict[str, dict[str, Any]], str]:
payload = _load_raw_settings_payload()
if not payload:
return {}, DEFAULT_PROFILE_ID
raw_profiles = payload.get("profiles")
if isinstance(raw_profiles, dict):
profile_records: dict[str, dict[str, Any]] = {}
for raw_profile_id, raw_profile_data in raw_profiles.items():
if isinstance(raw_profile_data, dict):
profile_id = normalize_profile_id(raw_profile_id)
profile_records[profile_id] = normalize_profile_record(profile_id, raw_profile_data)
last_profile = normalize_profile_id(payload.get("last_profile"))
if profile_records and last_profile not in profile_records:
last_profile = next(iter(profile_records))
return profile_records, last_profile
legacy_record = normalize_profile_record(profile_id=DEFAULT_PROFILE_ID, raw_profile_data=payload)
return {DEFAULT_PROFILE_ID: legacy_record}, DEFAULT_PROFILE_ID
def list_web_profiles() -> list[str]:
profile_records, _ = _load_profile_records()
profile_ids = list(profile_records.keys())
if DEFAULT_PROFILE_ID not in profile_ids:
profile_ids.append(DEFAULT_PROFILE_ID)
return sorted(profile_ids, key=lambda item: (item != DEFAULT_PROFILE_ID, item.lower()))
def load_web_settings(profile_id: str | None = None) -> dict[str, Any]:
profile_records, last_profile = _load_profile_records()
selected_profile = normalize_profile_id(profile_id if profile_id is not None else last_profile)
if selected_profile not in profile_records:
return normalize_web_settings(None)
return normalize_web_settings(profile_records[selected_profile].get("settings"))
def profile_requires_pin(profile_id: str) -> bool:
selected = normalize_profile_id(profile_id)
profile_records, _ = _load_profile_records()
record = profile_records.get(selected)
return isinstance(record, dict) and bool(str(record.get("pin_hash") or "").strip())
def verify_profile_pin(profile_id: str, pin: Any) -> bool:
selected = normalize_profile_id(profile_id)
profile_records, _ = _load_profile_records()
record = profile_records.get(selected)
if not isinstance(record, dict):
return False
stored_hash = str(record.get("pin_hash") or "").strip()
if not stored_hash:
return True
normalized_pin = normalize_pin(pin)
if normalized_pin is None:
return False
return stored_hash == hash_profile_pin(selected, normalized_pin)
def get_profile_audit(profile_id: str) -> dict[str, Any]:
selected = normalize_profile_id(profile_id)
profile_records, _ = _load_profile_records()
record = profile_records.get(selected)
if not isinstance(record, dict):
return {}
audit = record.get("audit")
return audit if isinstance(audit, dict) else {}
def create_profile(profile_id: str, pin: Any = None, now_epoch: int | None = None) -> None:
selected = normalize_profile_id(profile_id)
profile_records, _ = _load_profile_records()
epoch = int(now_epoch if now_epoch is not None else time.time())
normalized_pin = normalize_pin(pin)
profile_records[selected] = {
"settings": normalize_web_settings(None),
"pin_hash": hash_profile_pin(selected, normalized_pin) if normalized_pin else "",
"audit": {
"created_at": epoch,
"updated_at": epoch,
"last_login_at": epoch,
"last_symbol": "AAPL",
},
}
_write_settings_store(profile_records, last_profile=selected)
def mark_profile_login(profile_id: str, now_epoch: int | None = None) -> None:
selected = normalize_profile_id(profile_id)
profile_records, _ = _load_profile_records()
epoch = int(now_epoch if now_epoch is not None else time.time())
existing = profile_records.get(selected, normalize_profile_record(selected, {}))
normalized = normalize_profile_record(selected, existing)
normalized["audit"]["last_login_at"] = epoch
profile_records[selected] = normalized
_write_settings_store(profile_records, last_profile=selected)
def save_web_settings(settings: dict[str, Any], profile_id: str | None = None) -> None:
selected_profile = normalize_profile_id(profile_id)
now_epoch = int(time.time())
profile_records, _ = _load_profile_records()
existing = profile_records.get(selected_profile, normalize_profile_record(selected_profile, {}))
normalized_existing = normalize_profile_record(selected_profile, existing)
normalized_settings = normalize_web_settings(settings)
normalized_existing["settings"] = normalized_settings
normalized_existing["audit"]["updated_at"] = now_epoch
normalized_existing["audit"]["last_symbol"] = str(normalized_settings.get("symbol", "AAPL"))
profile_records[selected_profile] = normalized_existing
_write_settings_store(profile_records, last_profile=selected_profile)