from __future__ import annotations

"""
Cabanna API
===========

API para:
- Healthcheck contra SQL Server
- Catálogos (sucursales)
- Min/max de fechas disponibles en ventas
- ETL a Parquet (folios y líneas) por rangos de fechas
- Refresh diario (mes actual + opcional mes anterior)
- Lectura eficiente de Parquet (líneas) vía PyArrow Dataset con pushdown

Notas operativas (Render/free):
- Filesystem puede ser efímero (reinicios/deploys pueden borrar data).
- Evita cargar data completa a RAM: usar pushdown + escritura por batches.
- Preferir columnas necesarias (columns=...) cuando aplique.

Convención de fechas:
- start: inclusive (YYYY-MM-DD)
- end: exclusive (YYYY-MM-DD)
"""
from dotenv import load_dotenv
load_dotenv()
from dataclasses import asdict, dataclass
from typing import Iterable, Optional
import pandas as pd
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
from fastapi import Depends, FastAPI, Query
from fastapi.responses import FileResponse
from app.db import get_engine
from app.security import require_api_key
from app.queries import QUERY_TEST, QUERY_SUCURSALES, QUERY_VENTAS_MINMAX
from app.dwh.etl_folios import build_folios_parquet
from app.dwh.etl_lineas import build_lineas_parquet
from app.dwh.paths import FOLIOS_DIR, LINEAS_DIR


# ------------------------------------------------------------------------------
# App
# ------------------------------------------------------------------------------
app = FastAPI(
    title="Cabanna API",
    version="0.1.0",
    description="API para extracción/ETL de Cabanna y lectura eficiente de Parquets.",
)

# ------------------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------------------
def _parse_date(s: str) -> pd.Timestamp:
    """Parsea YYYY-MM-DD a Timestamp (naive). Lanza ValueError si es inválido."""
    dt = pd.to_datetime(s, errors="raise")
    if pd.isna(dt):
        raise ValueError(f"Fecha inválida: {s}")
    # normaliza a fecha (sin hora)
    return dt.normalize()


def _month_ranges(start_dt: pd.Timestamp, end_dt: pd.Timestamp) -> Iterable[pd.Timestamp]:
    """
    Itera inicios de mes [start_dt, end_dt).
    start_dt y end_dt se asumen normalizados.
    """
    cur = start_dt.replace(day=1)
    while cur < end_dt:
        yield cur
        cur = (cur + pd.offsets.MonthBegin(1)).normalize()

def _refresh_month_window(include_prev_month: bool) -> dict:
    """
    Retorna rangos (start,end) para:
    - mes actual
    - mes previo (opcional)
    """
    # Naive local. Ajusta al TZ que realmente quieres.
    now = pd.Timestamp.now(tz="America/Los_Angeles").tz_convert(None)
    cur_ini = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
    cur_fin = (cur_ini + pd.offsets.MonthBegin(1))

    out = {
        "current_month": (cur_ini, cur_fin),
        "prev_month": None,
    }
    if include_prev_month:
        prev_ini = (cur_ini - pd.offsets.MonthBegin(1))
        prev_fin = cur_ini
        out["prev_month"] = (prev_ini, prev_fin)

    return out

# ------------------------------------------------------------------------------
# Response models (simple, sin Pydantic para no inflar)
# ------------------------------------------------------------------------------
@dataclass
class HealthResponse:
    status: str
    db: int

@dataclass
class VentasMinMaxResponse:
    fecha_min: Optional[str]
    fecha_max: Optional[str]
# ------------------------------------------------------------------------------
# Core endpoints
# ------------------------------------------------------------------------------
@app.get("/health", dependencies=[Depends(require_api_key)], response_model=None)
def health():
    """
    Verifica conectividad a DB.

    Returns:
      {"status":"ok","db":1}
    """
    engine = get_engine()
    with engine.connect() as conn:
        df = pd.read_sql(QUERY_TEST, conn)

    res = HealthResponse(status="ok", db=int(df.loc[0, "ok"]))
    return asdict(res)

@app.get("/sucursales", dependencies=[Depends(require_api_key)], response_model=None)
def sucursales():
    """
    Regresa catálogo de sucursales (NombreSucursal normalizado).
    """
    engine = get_engine()
    with engine.connect() as conn:
        df = pd.read_sql(QUERY_SUCURSALES, conn)

    if "NombreSucursal" in df.columns:
        df["NombreSucursal"] = df["NombreSucursal"].astype(str).str.strip()

    return {"rows": df.to_dict(orient="records")}

@app.get("/ventas/minmax", dependencies=[Depends(require_api_key)], response_model=None)
def ventas_minmax():
    """
    Regresa el rango min/max de fechas disponibles en ventas.
    """
    engine = get_engine()
    with engine.connect() as conn:
        df = pd.read_sql(QUERY_VENTAS_MINMAX, conn)

    fecha_min = pd.to_datetime(df.loc[0, "fecha_min"], errors="coerce")
    fecha_max = pd.to_datetime(df.loc[0, "fecha_max"], errors="coerce")

    res = VentasMinMaxResponse(
        fecha_min=None if pd.isna(fecha_min) else fecha_min.isoformat(),
        fecha_max=None if pd.isna(fecha_max) else fecha_max.isoformat(),
    )
    return asdict(res)

# ------------------------------------------------------------------------------
# ETL - Folios
# ------------------------------------------------------------------------------
@app.post("/etl/folios", dependencies=[Depends(require_api_key)], response_model=None)
def etl_folios(
    start: str = Query(..., description="YYYY-MM-DD (inclusive)"),
    end: str = Query(..., description="YYYY-MM-DD (exclusive)"),
):
    """
    Construye parquets mensuales de folios en data/dwh/folios.

    Recomendación:
    - Correr por año o por rangos pequeños en entornos limitados (RAM).
    """
    # Validación de fechas
    start_dt = _parse_date(start)
    end_dt = _parse_date(end)
    if end_dt <= start_dt:
        return {"status": "error", "detail": "end debe ser mayor a start"}

    result = build_folios_parquet(
        start_dt.strftime("%Y-%m-%d"),
        end_dt.strftime("%Y-%m-%d"),
        stop_on_consecutive_empty=3,
        min_rows_to_write=1,
    )

    # evidencia post-escritura
    files = sorted(FOLIOS_DIR.glob("folios_*.parquet")) if FOLIOS_DIR.exists() else []
    result["dwh_dir"] = str(FOLIOS_DIR)
    result["parquets_now"] = len(files)
    result["last_parquet_now"] = files[-1].name if files else None

    return result

@app.post("/etl/folios/refresh_daily", dependencies=[Depends(require_api_key)], response_model=None)
def etl_folios_refresh_daily(
    include_prev_month: bool = Query(True, description="También refresca el mes anterior (por ajustes tardíos)"),
):
    """
    Refresca folios del mes actual y opcionalmente el mes anterior.
    """
    win = _refresh_month_window(include_prev_month)
    cur_ini, cur_fin = win["current_month"]

    build_folios_parquet(cur_ini.strftime("%Y-%m-%d"), cur_fin.strftime("%Y-%m-%d"))

    if win["prev_month"] is not None:
        prev_ini, prev_fin = win["prev_month"]
        build_folios_parquet(prev_ini.strftime("%Y-%m-%d"), prev_fin.strftime("%Y-%m-%d"))

    return {
        "status": "ok",
        "refreshed": {
            "current_month": f"{cur_ini.date()}..{cur_fin.date()}",
            "prev_month": bool(include_prev_month),
        },
    }

# ------------------------------------------------------------------------------
# ETL - Líneas
# ------------------------------------------------------------------------------
@app.post("/etl/lineas", dependencies=[Depends(require_api_key)], response_model=None)
def etl_lineas(
    start: str = Query(..., description="YYYY-MM-DD (inclusive)"),
    end: str = Query(..., description="YYYY-MM-DD (exclusive)"),
):
    """
    Construye parquets mensuales de líneas en data/dwh/lineas.
    """
    start_dt = _parse_date(start)
    end_dt = _parse_date(end)
    if end_dt <= start_dt:
        return {"status": "error", "detail": "end debe ser mayor a start"}

    build_lineas_parquet(start_dt.strftime("%Y-%m-%d"), end_dt.strftime("%Y-%m-%d"))
    return {"status": "ok", "start": start, "end": end}

@app.post("/etl/lineas/refresh_daily", dependencies=[Depends(require_api_key)], response_model=None)
def etl_lineas_refresh_daily(
    include_prev_month: bool = Query(True, description="También refresca el mes anterior (por ajustes tardíos)"),
):
    win = _refresh_month_window(include_prev_month)
    cur_ini, cur_fin = win["current_month"]

    r1 = build_lineas_parquet(cur_ini.strftime("%Y-%m-%d"), cur_fin.strftime("%Y-%m-%d"))

    r2 = None
    if win["prev_month"] is not None:
        prev_ini, prev_fin = win["prev_month"]
        r2 = build_lineas_parquet(prev_ini.strftime("%Y-%m-%d"), prev_fin.strftime("%Y-%m-%d"))

    return {
        "status": "ok",
        "current_month": r1,
        "prev_month": r2,
    }



