from __future__ import annotations

from dataclasses import dataclass, asdict
from pathlib import Path
import os
import pandas as pd
import tempfile

from app.db import get_engine
from app.queries import QUERY_LINEAS_SUCURSAL
from app.dwh.paths import LINEAS_DIR, ensure_dirs

# ✅ Ideal: mover esto a app/dwh/drive_storage.py (recomendado)
# Por ahora reusamos lo que ya tienes (funciona).
from app.dwh.etl_folios import upload_or_replace_in_drive, get_setting


FOLDER_ID_LINEAS = get_setting("FOLDER_ID_LINEAS")
if not FOLDER_ID_LINEAS:
    raise RuntimeError("Falta FOLDER_ID_LINEAS en .env o variables de entorno.")


def month_start(dt: pd.Timestamp) -> pd.Timestamp:
    return dt.normalize().replace(day=1)


def next_month(dt: pd.Timestamp) -> pd.Timestamp:
    return (dt + pd.offsets.MonthBegin(1)).normalize()


def month_ranges(start_dt: pd.Timestamp, end_dt: pd.Timestamp):
    cur = month_start(start_dt)
    end_dt = pd.to_datetime(end_dt)
    while cur < end_dt:
        nxt = next_month(cur)
        ini = cur
        fin = min(nxt, end_dt)
        yield ini, fin
        cur = nxt


@dataclass
class EtlLineasResult:
    status: str
    start: str
    end: str
    files_written: int
    rows_written: int
    months_total: int
    months_empty: int
    empty_months: list[str]
    last_nonempty_end: str | None
    last_file: str | None
    warnings: list[str]


def build_lineas_parquet(
    start: str,
    end: str,
    *,
    stop_on_consecutive_empty: int = 3,
    min_rows_to_write: int = 1,
) -> dict:
    """
    Construye parquets mensuales lineas_YYYY_MM.parquet y los sube a Drive.
    - NO sube archivos vacíos.
    - Se puede detener tras N meses consecutivos vacíos.
    - Regresa un resumen para logging / endpoints.
    """

    ensure_dirs()
    engine = get_engine()

    start_dt = pd.to_datetime(start)
    end_dt = pd.to_datetime(end)

    files_written = 0
    rows_written = 0
    months_total = 0
    months_empty = 0
    empty_months: list[str] = []
    warnings: list[str] = []

    last_nonempty_end: pd.Timestamp | None = None
    last_file: Path | None = None

    consecutive_empty = 0

    for ini, fin in month_ranges(start_dt, end_dt):
        months_total += 1
        ym = ini.strftime("%Y-%m")
        out = LINEAS_DIR / f"lineas_{ini.strftime('%Y_%m')}.parquet"

        with engine.connect() as conn:
            df = pd.read_sql(QUERY_LINEAS_SUCURSAL, conn, params=(ini, fin))

        if not df.empty and "Fecha" in df.columns:
            df["Fecha"] = pd.to_datetime(df["Fecha"], errors="coerce")

        # ✅ no subas vacíos
        if df is None or len(df) < min_rows_to_write:
            months_empty += 1
            empty_months.append(ym)
            consecutive_empty += 1

            if stop_on_consecutive_empty and consecutive_empty >= stop_on_consecutive_empty:
                warnings.append(
                    f"Se detuvo por {consecutive_empty} meses consecutivos sin datos. "
                    f"Último mes procesado: {ym} (rango {ini.date()}→{fin.date()})."
                )
                break
            continue

        consecutive_empty = 0

        # ✅ write a temp parquet y súbelo
        tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".parquet")
        try:
            df.to_parquet(tmp.name, index=False)
            tmp.close()

            upload_or_replace_in_drive(
                folder_id=FOLDER_ID_LINEAS,
                filename=out.name,     # lineas_YYYY_MM.parquet
                local_path=tmp.name
            )

        finally:
            # limpia temp file
            try:
                os.unlink(tmp.name)
            except Exception:
                pass

        files_written += 1
        rows_written += len(df)
        last_nonempty_end = fin
        last_file = out

    res = EtlLineasResult(
        status="ok",
        start=start,
        end=end,
        files_written=files_written,
        rows_written=rows_written,
        months_total=months_total,
        months_empty=months_empty,
        empty_months=empty_months[-24:],
        last_nonempty_end=last_nonempty_end.isoformat() if last_nonempty_end is not None else None,
        last_file=str(last_file) if last_file is not None else None,
        warnings=warnings,
    )

    if files_written == 0:
        res.status = "no_data"
        res.warnings.append("No se escribió ningún parquet con datos. Revisa QUERY_LINEAS_SUCURSAL y/o el rango.")

    return asdict(res)
