import asyncio import uuid from fastapi import FastAPI, HTTPException, Request, UploadFile, File, Depends, Form from fastapi.templating import Jinja2Templates from fastapi.responses import HTMLResponse, JSONResponse from fastapi.staticfiles import StaticFiles import os, shutil from sqlalchemy import text from datetime import date import re from fastapi.responses import StreamingResponse from io import BytesIO from app.models import ParametrosFormula from sqlalchemy.future import select from app.database import AsyncSessionLocal from app.models import Fatura from app.processor import ( fila_processamento, processar_em_lote, status_arquivos, limpar_arquivos_processados ) from fastapi.responses import FileResponse from app.models import Fatura, SelicMensal, ParametrosFormula from datetime import date from app.utils import avaliar_formula from app.routes import clientes from sqlalchemy.ext.asyncio import AsyncSession from app.database import get_session from fastapi import Query from sqlalchemy import select as sqla_select from app.models import AliquotaUF import pandas as pd app = FastAPI() templates = Jinja2Templates(directory="app/templates") app.state.templates = templates app.mount("/static", StaticFiles(directory="app/static"), name="static") UPLOAD_DIR = os.path.join("app", "uploads", "temp") os.makedirs(UPLOAD_DIR, exist_ok=True) def _parse_referencia(ref: str): """Aceita 'JAN/2024', '01/2024', '202401' etc. Retorna (ano, mes).""" meses = {'JAN':1,'FEV':2,'MAR':3,'ABR':4,'MAI':5,'JUN':6,'JUL':7,'AGO':8,'SET':9,'OUT':10,'NOV':11,'DEZ':12} ref = (ref or "").strip().upper() if "/" in ref: a, b = [p.strip() for p in ref.split("/", 1)] mes = meses.get(a, None) if mes is None: mes = int(re.sub(r"\D", "", a) or 1) ano = int(re.sub(r"\D", "", b) or 0) if ano < 100: ano += 2000 else: num = re.sub(r"\D", "", ref) if len(num) >= 6: ano, mes = int(num[:4]), int(num[4:6]) elif len(num) == 4: ano, mes = int(num), 1 else: ano, mes = 0, 0 return ano, mes async def _carregar_selic_map(session): res = await session.execute(text("SELECT ano, mes, percentual FROM faturas.selic_mensal")) rows = res.mappings().all() return {(int(r["ano"]), int(r["mes"])): float(r["percentual"]) for r in rows} def _fator_selic_from_map(selic_map: dict, ano_inicio: int, mes_inicio: int, hoje: date) -> float: try: ano, mes = int(ano_inicio), int(mes_inicio) except Exception: return 1.0 if ano > hoje.year or (ano == hoje.year and mes > hoje.month): return 1.0 fator = 1.0 while (ano < hoje.year) or (ano == hoje.year and mes <= hoje.month): perc = selic_map.get((ano, mes)) if perc is not None: fator *= (1 + (perc / 100.0)) mes += 1 if mes > 12: mes = 1 ano += 1 return fator def _avaliar_formula(texto_formula: str | None, contexto: dict) -> float: if not texto_formula: return 0.0 expr = str(texto_formula) # Substitui nomes de campos por valores numéricos (None -> 0) for campo, valor in contexto.items(): v = valor if v is None or v == "": v = 0 # aceita vírgula como decimal vindo do banco if isinstance(v, str): v = v.replace(".", "").replace(",", ".") if re.search(r"[0-9],[0-9]", v) else v # nome do campo escapado na regex pat = rf'\b{re.escape(str(campo))}\b' # normaliza o valor para número; se não der, vira 0 val = v if val is None or val == "": num = 0.0 else: if isinstance(val, str): # troca vírgula decimal e remove separador de milhar simples val_norm = val.replace(".", "").replace(",", ".") else: val_norm = val try: num = float(val_norm) except Exception: num = 0.0 # usa lambda para evitar interpretação de backslashes no replacement expr = re.sub(pat, lambda m: str(num), expr) try: return float(eval(expr, {"__builtins__": {}}, {})) except Exception: return 0.0 @app.get("/", response_class=HTMLResponse) async def dashboard(request: Request, cliente: str | None = None): print("DBG /: inicio", flush=True) try: async with AsyncSessionLocal() as session: print("DBG /: abrindo sessão", flush=True) r = await session.execute(text(""" SELECT id, nome_fantasia FROM faturas.clientes WHERE ativo = TRUE ORDER BY nome_fantasia """)) clientes = [{"id": id_, "nome": nome} for id_, nome in r.fetchall()] print(f"DBG /: clientes={len(clientes)}", flush=True) # Fórmulas fp = await session.execute(text(""" SELECT formula FROM faturas.parametros_formula WHERE nome = 'Cálculo PIS sobre ICMS' AND ativo = TRUE LIMIT 1 """)) formula_pis = fp.scalar_one_or_none() fc = await session.execute(text(""" SELECT formula FROM faturas.parametros_formula WHERE nome = 'Cálculo COFINS sobre ICMS' AND ativo = TRUE LIMIT 1 """)) formula_cofins = fc.scalar_one_or_none() print(f"DBG /: tem_formulas pis={bool(formula_pis)} cofins={bool(formula_cofins)}", flush=True) sql = "SELECT * FROM faturas.faturas" params = {} if cliente: sql += " WHERE cliente_id = :cliente" params["cliente"] = cliente print("DBG /: SQL faturas ->", sql, params, flush=True) ftrs = (await session.execute(text(sql), params)).mappings().all() print(f"DBG /: total_faturas={len(ftrs)}", flush=True) # ===== KPIs e Séries para o dashboard ===== from collections import defaultdict total_faturas = len(ftrs) qtd_icms_na_base = 0 soma_corrigida = 0.0 hoje = date.today() selic_map = await _carregar_selic_map(session) # Séries e somatórios comerciais serie_mensal = defaultdict(float) # {(ano, mes): valor_corrigido} sum_por_dist = defaultdict(float) # {"distribuidora": valor_corrigido} somatorio_v_total = 0.0 contagem_com_icms = 0 for f in ftrs: ctx = dict(f) # PIS/COFINS sobre ICMS v_pis = _avaliar_formula(formula_pis, ctx) v_cof = _avaliar_formula(formula_cofins, ctx) v_total = max(0.0, float(v_pis or 0) + float(v_cof or 0)) # % de faturas com ICMS na base if (v_pis or 0) > 0: qtd_icms_na_base += 1 contagem_com_icms += 1 # referência -> (ano,mes) try: ano, mes = _parse_referencia(f.get("referencia")) except Exception: ano, mes = hoje.year, hoje.month # SELIC fator = _fator_selic_from_map(selic_map, ano, mes, hoje) valor_corrigido = v_total * fator soma_corrigida += valor_corrigido somatorio_v_total += v_total # séries serie_mensal[(ano, mes)] += valor_corrigido dist = (f.get("distribuidora") or "").strip() or "Não informado" sum_por_dist[dist] += valor_corrigido percentual_icms_base = (qtd_icms_na_base / total_faturas * 100.0) if total_faturas else 0.0 valor_restituicao_corrigida = soma_corrigida valor_medio_com_icms = (somatorio_v_total / contagem_com_icms) if contagem_com_icms else 0.0 # total de clientes (distinct já carregado) total_clientes = len(clientes) # Série mensal – últimos 12 meses ultimos = [] a, m = hoje.year, hoje.month for _ in range(12): ultimos.append((a, m)) m -= 1 if m == 0: m = 12; a -= 1 ultimos.reverse() serie_mensal_labels = [f"{mes:02d}/{ano}" for (ano, mes) in ultimos] serie_mensal_valores = [round(serie_mensal.get((ano, mes), 0.0), 2) for (ano, mes) in ultimos] # Top 5 distribuidoras top5 = sorted(sum_por_dist.items(), key=lambda kv: kv[1], reverse=True)[:5] top5_labels = [k for k, _ in top5] top5_valores = [round(v, 2) for _, v in top5] print("DBG /: calculos OK", flush=True) print("DBG /: render template", flush=True) return templates.TemplateResponse("dashboard.html", { "request": request, "clientes": clientes, "cliente_atual": cliente or "", "total_faturas": total_faturas, "valor_restituicao_corrigida": valor_restituicao_corrigida, "percentual_icms_base": percentual_icms_base, # Novos dados para o template "total_clientes": total_clientes, "valor_medio_com_icms": valor_medio_com_icms, "situacao_atual_percent": percentual_icms_base, # para gráfico de alerta "serie_mensal_labels": serie_mensal_labels, "serie_mensal_valores": serie_mensal_valores, "top5_labels": top5_labels, "top5_valores": top5_valores, }) except Exception as e: import traceback print("ERR /:", e, flush=True) traceback.print_exc() # Página de erro amigável (sem derrubar servidor) return HTMLResponse( f"
Falha no dashboard:\n{e}",
status_code=500
)
@app.get("/upload", response_class=HTMLResponse)
def upload_page(request: Request):
app_env = os.getenv("APP_ENV", "dev") # Captura variável de ambiente
return templates.TemplateResponse("upload.html", {
"request": request,
"app_env": app_env # Passa para o template
})
@app.get("/relatorios", response_class=HTMLResponse)
async def relatorios_page(request: Request, cliente: str | None = Query(None)):
async with AsyncSessionLocal() as session:
# Carregar clientes ativos para o combo
r_cli = await session.execute(text("""
SELECT id, nome_fantasia
FROM faturas.clientes
WHERE ativo = TRUE
ORDER BY nome_fantasia
"""))
clientes = [{"id": str(row.id), "nome": row.nome_fantasia} for row in r_cli]
# Carregar faturas (todas ou filtradas por cliente)
if cliente:
r_fat = await session.execute(text("""
SELECT *
FROM faturas.faturas
WHERE cliente_id = :cid
ORDER BY data_processamento DESC
"""), {"cid": cliente})
else:
r_fat = await session.execute(text("""
SELECT *
FROM faturas.faturas
ORDER BY data_processamento DESC
"""))
faturas = r_fat.mappings().all()
return templates.TemplateResponse("relatorios.html", {
"request": request,
"clientes": clientes,
"cliente_selecionado": cliente or "",
"faturas": faturas
})
@app.post("/upload-files")
async def upload_files(
cliente_id: str = Form(...),
files: list[UploadFile] = File(...)
):
for file in files:
temp_path = os.path.join(UPLOAD_DIR, f"{uuid.uuid4()}_{file.filename}")
with open(temp_path, "wb") as f:
shutil.copyfileobj(file.file, f)
await fila_processamento.put({
"caminho_pdf": temp_path,
"nome_original": file.filename,
"cliente_id": cliente_id
})
return {"message": "Arquivos enviados para fila"}
@app.post("/process-queue")
async def process_queue():
resultados = await processar_em_lote()
return {"message": "Processamento concluído", "resultados": resultados}
@app.get("/get-status")
async def get_status():
files = []
for nome, status in status_arquivos.items():
if isinstance(status, dict):
files.append({
"nome": nome,
"status": status.get("status", "Erro"),
"mensagem": status.get("mensagem", "---"),
"tempo": status.get("tempo", "---"),
"tamanho": f"{status.get('tamanho', 0)} KB",
"data": status.get("data", "")
})
else:
files.append({
"nome": nome,
"status": status,
"mensagem": "---" if status == "Concluído" else status,
"tempo": "---" # ✅ AQUI também
})
is_processing = not fila_processamento.empty()
return JSONResponse(content={"is_processing": is_processing, "files": files})
@app.post("/clear-all")
async def clear_all():
limpar_arquivos_processados()
for f in os.listdir(UPLOAD_DIR):
os.remove(os.path.join(UPLOAD_DIR, f))
return {"message": "Fila e arquivos limpos"}
@app.get("/export-excel")
async def export_excel(
tipo: str = Query("geral", pattern="^(geral|exclusao_icms|aliquota_icms)$"),
cliente: str | None = Query(None)
):
async with AsyncSessionLocal() as session:
# 1) Faturas
stmt = select(Fatura)
if cliente:
stmt = stmt.where(Fatura.cliente_id == cliente)
faturas = (await session.execute(stmt)).scalars().all()
# 2) Mapa de alíquotas cadastradas (UF/ano)
aliq_rows = (await session.execute(select(AliquotaUF))).scalars().all()
aliq_map = {(r.uf.upper(), int(r.exercicio)): float(r.aliq_icms) for r in aliq_rows}
dados = []
if tipo == "aliquota_icms":
for f in faturas:
uf = (f.estado or "").strip().upper()
ano, _ = _parse_referencia(f.referencia or "")
aliq_nf = float(f.icms_aliq or 0.0)
aliq_cad = aliq_map.get((uf, ano))
diff_pp = (aliq_nf - aliq_cad) if aliq_cad is not None else None
confere = (abs(diff_pp) < 1e-6) if diff_pp is not None else None
dados.append({
"Cliente": f.nome,
"UF (fatura)": uf,
"Exercício (ref)": ano,
"Referência": f.referencia,
"Nota Fiscal": f.nota_fiscal,
"ICMS (%) NF": aliq_nf,
# novas colunas padronizadas
"ICMS (%) (UF/Ref)": aliq_cad,
"Dif. ICMS (pp)": diff_pp,
"ICMS confere?": "SIM" if confere else ("N/D" if confere is None else "NÃO"),
"Valor Total": f.valor_total,
"Distribuidora": f.distribuidora,
"Data Processamento": f.data_processamento,
"Arquivo PDF": f.arquivo_pdf,
})
filename = "relatorio_aliquota_icms.xlsx"
elif tipo == "exclusao_icms":
for f in faturas:
uf = (f.estado or "").strip().upper()
ano, _ = _parse_referencia(f.referencia or "")
aliq_nf = float(f.icms_aliq or 0.0)
aliq_cad = aliq_map.get((uf, ano))
diff_pp = (aliq_nf - aliq_cad) if aliq_cad is not None else None
confere = (abs(diff_pp) < 1e-6) if diff_pp is not None else None
dados.append({
"Cliente": f.nome,
"UC": f.unidade_consumidora,
"Referência": f.referencia,
"Valor Total": f.valor_total,
"PIS (%)": f.pis_aliq,
"ICMS (%)": f.icms_aliq,
"COFINS (%)": f.cofins_aliq,
"PIS (R$)": f.pis_valor,
"ICMS (R$)": f.icms_valor,
"COFINS (R$)": f.cofins_valor,
"Base PIS (R$)": f.pis_base,
"Base ICMS (R$)": f.icms_base,
"Base COFINS (R$)": f.cofins_base,
# novas colunas
"ICMS (%) (UF/Ref)": aliq_cad,
"Dif. ICMS (pp)": diff_pp,
"ICMS confere?": "SIM" if confere else ("N/D" if confere is None else "NÃO"),
"Consumo (kWh)": f.consumo,
"Tarifa": f.tarifa,
"Nota Fiscal": f.nota_fiscal,
"Arquivo PDF": f.arquivo_pdf,
})
filename = "relatorio_exclusao_icms.xlsx"
else: # geral
for f in faturas:
uf = (f.estado or "").strip().upper()
ano, _ = _parse_referencia(f.referencia or "")
aliq_nf = float(f.icms_aliq or 0.0)
aliq_cad = aliq_map.get((uf, ano))
diff_pp = (aliq_nf - aliq_cad) if aliq_cad is not None else None
confere = (abs(diff_pp) < 1e-6) if diff_pp is not None else None
dados.append({
"Cliente": f.nome,
"UC": f.unidade_consumidora,
"Referência": f.referencia,
"Nota Fiscal": f.nota_fiscal,
"Valor Total": f.valor_total,
"ICMS (%)": f.icms_aliq,
"ICMS (R$)": f.icms_valor,
# novas colunas
"ICMS (%) (UF/Ref)": aliq_cad,
"Dif. ICMS (pp)": diff_pp,
"ICMS confere?": "SIM" if confere else ("N/D" if confere is None else "NÃO"),
"Base ICMS (R$)": f.icms_base,
"PIS (%)": f.pis_aliq,
"PIS (R$)": f.pis_valor,
"Base PIS (R$)": f.pis_base,
"COFINS (%)": f.cofins_aliq,
"COFINS (R$)": f.cofins_valor,
"Base COFINS (R$)": f.cofins_base,
"Consumo (kWh)": f.consumo,
"Tarifa": f.tarifa,
"Distribuidora": f.distribuidora,
"Data Processamento": f.data_processamento,
"Arquivo PDF": f.arquivo_pdf,
})
filename = "relatorio_geral.xlsx"
# 3) Excel em memória
output = BytesIO()
df = pd.DataFrame(dados)
# força "Arquivo PDF" a ser a última coluna
if "Arquivo PDF" in df.columns:
cols = [c for c in df.columns if c != "Arquivo PDF"] + ["Arquivo PDF"]
df = df[cols]
# garante que colunas percentuais estejam numéricas (se existirem)
for col in ["ICMS (%)", "ICMS (%) (UF/Ref)", "Dif. ICMS (pp)", "PIS (%)", "COFINS (%)"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce")
with pd.ExcelWriter(output, engine="xlsxwriter") as writer:
df.to_excel(writer, index=False, sheet_name="Relatório")
# aplica formatação 4 casas decimais
wb = writer.book
ws = writer.sheets["Relatório"]
fmt_4dec = wb.add_format({"num_format": "0.0000"})
for col in ["ICMS (%)", "ICMS (%) (UF/Ref)", "Dif. ICMS (pp)", "PIS (%)", "COFINS (%)"]:
if col in df.columns:
i = df.columns.get_loc(col)
# largura automática básica + formato; ajuste a largura se quiser (ex.: 12)
ws.set_column(i, i, None, fmt_4dec)
output.seek(0)
return StreamingResponse(
output,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f'attachment; filename="{filename}"'}
)
from app.parametros import router as parametros_router
app.include_router(parametros_router)
app.include_router(clientes.router)
def is_homolog():
return os.getenv("APP_ENV", "dev") == "homolog"
@app.post("/limpar-faturas")
async def limpar_faturas():
app_env = os.getenv("APP_ENV", "dev")
if app_env not in ["homolog", "dev", "local"]:
return JSONResponse(status_code=403, content={"message": "Operação não permitida neste ambiente."})
async with AsyncSessionLocal() as session:
print("🧪 Limpando faturas do banco...")
await session.execute(text("DELETE FROM faturas.faturas"))
await session.commit()
upload_path = os.path.join("app", "uploads")
for nome in os.listdir(upload_path):
caminho = os.path.join(upload_path, nome)
if os.path.isfile(caminho):
os.remove(caminho)
return {"message": "Faturas e arquivos apagados com sucesso."}
@app.get("/erros/download")
async def download_erros():
zip_path = os.path.join("app", "uploads", "erros", "faturas_erro.zip")
if os.path.exists(zip_path):
response = FileResponse(zip_path, filename="faturas_erro.zip", media_type="application/zip")
# ⚠️ Agendar exclusão após resposta
asyncio.create_task(limpar_erros())
return response
else:
raise HTTPException(status_code=404, detail="Arquivo de erro não encontrado.")
@app.get("/erros/log")
async def download_log_erros():
txt_path = os.path.join("app", "uploads", "erros", "erros.txt")
if os.path.exists(txt_path):
response = FileResponse(txt_path, filename="erros.txt", media_type="text/plain")
# ⚠️ Agendar exclusão após resposta
asyncio.create_task(limpar_erros())
return response
else:
raise HTTPException(status_code=404, detail="Log de erro não encontrado.")
async def limpar_erros():
await asyncio.sleep(5) # Aguarda 5 segundos para garantir que o download inicie
pasta = os.path.join("app", "uploads", "erros")
for nome in ["faturas_erro.zip", "erros.txt"]:
caminho = os.path.join(pasta, nome)
if os.path.exists(caminho):
os.remove(caminho)
@app.get("/api/clientes")
async def listar_clientes(db: AsyncSession = Depends(get_session)):
sql = text("""
SELECT id, nome_fantasia, cnpj, ativo
FROM faturas.clientes
WHERE ativo = TRUE
ORDER BY nome_fantasia
""")
res = await db.execute(sql)
rows = res.mappings().all()
return [
{
"id": str(r["id"]),
"nome_fantasia": r["nome_fantasia"],
"cnpj": r["cnpj"],
"ativo": bool(r["ativo"]),
}
for r in rows
]
@app.get("/api/relatorios")
async def api_relatorios(
cliente: str | None = Query(None),
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=5, le=200),
db: AsyncSession = Depends(get_session),
):
offset = (page - 1) * page_size
where = "WHERE cliente_id = :cliente" if cliente else ""
params = {"limit": page_size, "offset": offset}
if cliente:
params["cliente"] = cliente
# ❗ Inclua 'estado' no SELECT
sql = text(f"""
SELECT id, nome, unidade_consumidora, referencia, nota_fiscal,
valor_total, icms_aliq, icms_valor, pis_aliq, pis_valor,
cofins_aliq, cofins_valor, distribuidora, data_processamento,
estado
FROM faturas.faturas
{where}
ORDER BY data_processamento DESC
LIMIT :limit OFFSET :offset
""")
count_sql = text(f"SELECT COUNT(*) AS total FROM faturas.faturas {where}")
rows = (await db.execute(sql, params)).mappings().all()
total = (await db.execute(count_sql, params)).scalar_one()
# 🔹 Carrega mapa de alíquotas UF/ano
aliq_rows = (await db.execute(select(AliquotaUF))).scalars().all()
aliq_map = {(r.uf.upper(), int(r.exercicio)): float(r.aliq_icms) for r in aliq_rows}
items = []
for r in rows:
uf = (r["estado"] or "").strip().upper()
ano, _mes = _parse_referencia(r["referencia"] or "")
aliq_nf = float(r["icms_aliq"] or 0.0)
aliq_cad = aliq_map.get((uf, ano))
diff_pp = (aliq_nf - aliq_cad) if aliq_cad is not None else None
ok = (abs(diff_pp) < 1e-6) if diff_pp is not None else None
items.append({
"id": str(r["id"]),
"nome": r["nome"],
"unidade_consumidora": r["unidade_consumidora"],
"referencia": r["referencia"],
"nota_fiscal": r["nota_fiscal"],
"valor_total": float(r["valor_total"]) if r["valor_total"] is not None else None,
"icms_aliq": aliq_nf,
"icms_valor": r["icms_valor"],
"pis_aliq": r["pis_aliq"],
"pis_valor": r["pis_valor"],
"cofins_aliq": r["cofins_aliq"],
"cofins_valor": r["cofins_valor"],
"distribuidora": r["distribuidora"],
"data_processamento": r["data_processamento"].isoformat() if r["data_processamento"] else None,
# novos
"estado": uf,
"exercicio": ano,
"aliq_cadastral": aliq_cad,
"aliq_diff_pp": round(diff_pp, 4) if diff_pp is not None else None,
"aliq_ok": ok,
})
return {"items": items, "total": total, "page": page, "page_size": page_size}
async def _carregar_aliquota_map(session):
rows = (await session.execute(
text("SELECT uf, exercicio, aliq_icms FROM faturas.aliquotas_uf")
)).mappings().all()
# (UF, ANO) -> float
return {(r["uf"].upper(), int(r["exercicio"])): float(r["aliq_icms"]) for r in rows}