Files
app_faturas/app/main.py

653 lines
24 KiB
Python
Raw Normal View History

import asyncio
2025-07-28 13:29:45 -03:00
import uuid
from fastapi import FastAPI, HTTPException, Request, UploadFile, File, Depends, Form
2025-07-28 13:29:45 -03:00
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
import os, shutil
from sqlalchemy import text
from datetime import date
import re
2025-07-28 13:29:45 -03:00
from fastapi.responses import StreamingResponse
from io import BytesIO
from app.models import ParametrosFormula
2025-07-28 13:29:45 -03:00
from sqlalchemy.future import select
from app.database import AsyncSessionLocal
from app.models import Fatura
from app.processor import (
2025-07-28 13:29:45 -03:00
fila_processamento,
processar_em_lote,
status_arquivos,
limpar_arquivos_processados
)
from fastapi.responses import FileResponse
from app.models import Fatura, SelicMensal, ParametrosFormula
from datetime import date
from app.utils import avaliar_formula
2025-08-09 19:51:14 -03:00
from app.routes import clientes
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_session
from fastapi import Query
from sqlalchemy import select as sqla_select
from app.models import AliquotaUF
import pandas as pd
2025-07-28 13:29:45 -03:00
app = FastAPI()
templates = Jinja2Templates(directory="app/templates")
2025-08-09 19:51:14 -03:00
app.state.templates = templates
app.mount("/static", StaticFiles(directory="app/static"), name="static")
2025-07-28 13:29:45 -03:00
UPLOAD_DIR = os.path.join("app", "uploads", "temp")
2025-07-28 13:29:45 -03:00
os.makedirs(UPLOAD_DIR, exist_ok=True)
def _parse_referencia(ref: str):
"""Aceita 'JAN/2024', '01/2024', '202401' etc. Retorna (ano, mes)."""
meses = {'JAN':1,'FEV':2,'MAR':3,'ABR':4,'MAI':5,'JUN':6,'JUL':7,'AGO':8,'SET':9,'OUT':10,'NOV':11,'DEZ':12}
ref = (ref or "").strip().upper()
if "/" in ref:
a, b = [p.strip() for p in ref.split("/", 1)]
mes = meses.get(a, None)
if mes is None:
mes = int(re.sub(r"\D", "", a) or 1)
ano = int(re.sub(r"\D", "", b) or 0)
if ano < 100:
ano += 2000
else:
num = re.sub(r"\D", "", ref)
if len(num) >= 6:
ano, mes = int(num[:4]), int(num[4:6])
elif len(num) == 4:
ano, mes = int(num), 1
else:
ano, mes = 0, 0
return ano, mes
async def _carregar_selic_map(session):
res = await session.execute(text("SELECT ano, mes, percentual FROM faturas.selic_mensal"))
rows = res.mappings().all()
return {(int(r["ano"]), int(r["mes"])): float(r["percentual"]) for r in rows}
def _fator_selic_from_map(selic_map: dict, ano_inicio: int, mes_inicio: int, hoje: date) -> float:
try:
ano, mes = int(ano_inicio), int(mes_inicio)
except Exception:
return 1.0
if ano > hoje.year or (ano == hoje.year and mes > hoje.month):
return 1.0
fator = 1.0
while (ano < hoje.year) or (ano == hoje.year and mes <= hoje.month):
perc = selic_map.get((ano, mes))
if perc is not None:
fator *= (1 + (perc / 100.0))
mes += 1
if mes > 12:
mes = 1
ano += 1
return fator
def _avaliar_formula(texto_formula: str | None, contexto: dict) -> float:
if not texto_formula:
return 0.0
expr = str(texto_formula)
# Substitui nomes de campos por valores numéricos (None -> 0)
for campo, valor in contexto.items():
v = valor
if v is None or v == "":
v = 0
# aceita vírgula como decimal vindo do banco
if isinstance(v, str):
v = v.replace(".", "").replace(",", ".") if re.search(r"[0-9],[0-9]", v) else v
# nome do campo escapado na regex
pat = rf'\b{re.escape(str(campo))}\b'
# normaliza o valor para número; se não der, vira 0
val = v
if val is None or val == "":
num = 0.0
else:
if isinstance(val, str):
# troca vírgula decimal e remove separador de milhar simples
val_norm = val.replace(".", "").replace(",", ".")
else:
val_norm = val
try:
num = float(val_norm)
except Exception:
num = 0.0
# usa lambda para evitar interpretação de backslashes no replacement
expr = re.sub(pat, lambda m: str(num), expr)
try:
return float(eval(expr, {"__builtins__": {}}, {}))
except Exception:
return 0.0
2025-07-28 13:29:45 -03:00
@app.get("/", response_class=HTMLResponse)
async def dashboard(request: Request, cliente: str | None = None):
print("DBG /: inicio", flush=True)
try:
async with AsyncSessionLocal() as session:
print("DBG /: abrindo sessão", flush=True)
r = await session.execute(text("""
SELECT id, nome_fantasia
FROM faturas.clientes
WHERE ativo = TRUE
ORDER BY nome_fantasia
"""))
clientes = [{"id": id_, "nome": nome} for id_, nome in r.fetchall()]
print(f"DBG /: clientes={len(clientes)}", flush=True)
# Fórmulas
fp = await session.execute(text("""
SELECT formula FROM faturas.parametros_formula
WHERE nome = 'Cálculo PIS sobre ICMS' AND ativo = TRUE LIMIT 1
"""))
formula_pis = fp.scalar_one_or_none()
fc = await session.execute(text("""
SELECT formula FROM faturas.parametros_formula
WHERE nome = 'Cálculo COFINS sobre ICMS' AND ativo = TRUE LIMIT 1
"""))
formula_cofins = fc.scalar_one_or_none()
print(f"DBG /: tem_formulas pis={bool(formula_pis)} cofins={bool(formula_cofins)}", flush=True)
sql = "SELECT * FROM faturas.faturas"
params = {}
if cliente:
sql += " WHERE cliente_id = :cliente"
params["cliente"] = cliente
print("DBG /: SQL faturas ->", sql, params, flush=True)
ftrs = (await session.execute(text(sql), params)).mappings().all()
print(f"DBG /: total_faturas={len(ftrs)}", flush=True)
# ===== KPIs e Séries para o dashboard =====
from collections import defaultdict
total_faturas = len(ftrs)
qtd_icms_na_base = 0
soma_corrigida = 0.0
hoje = date.today()
selic_map = await _carregar_selic_map(session)
# Séries e somatórios comerciais
serie_mensal = defaultdict(float) # {(ano, mes): valor_corrigido}
sum_por_dist = defaultdict(float) # {"distribuidora": valor_corrigido}
somatorio_v_total = 0.0
contagem_com_icms = 0
for f in ftrs:
ctx = dict(f)
# PIS/COFINS sobre ICMS
v_pis = _avaliar_formula(formula_pis, ctx)
v_cof = _avaliar_formula(formula_cofins, ctx)
v_total = max(0.0, float(v_pis or 0) + float(v_cof or 0))
# % de faturas com ICMS na base
if (v_pis or 0) > 0:
qtd_icms_na_base += 1
contagem_com_icms += 1
# referência -> (ano,mes)
try:
ano, mes = _parse_referencia(f.get("referencia"))
except Exception:
ano, mes = hoje.year, hoje.month
# SELIC
fator = _fator_selic_from_map(selic_map, ano, mes, hoje)
valor_corrigido = v_total * fator
soma_corrigida += valor_corrigido
somatorio_v_total += v_total
# séries
serie_mensal[(ano, mes)] += valor_corrigido
dist = (f.get("distribuidora") or "").strip() or "Não informado"
sum_por_dist[dist] += valor_corrigido
percentual_icms_base = (qtd_icms_na_base / total_faturas * 100.0) if total_faturas else 0.0
valor_restituicao_corrigida = soma_corrigida
valor_medio_com_icms = (somatorio_v_total / contagem_com_icms) if contagem_com_icms else 0.0
# total de clientes (distinct já carregado)
total_clientes = len(clientes)
# Série mensal últimos 12 meses
ultimos = []
a, m = hoje.year, hoje.month
for _ in range(12):
ultimos.append((a, m))
m -= 1
if m == 0:
m = 12; a -= 1
ultimos.reverse()
serie_mensal_labels = [f"{mes:02d}/{ano}" for (ano, mes) in ultimos]
serie_mensal_valores = [round(serie_mensal.get((ano, mes), 0.0), 2) for (ano, mes) in ultimos]
# Top 5 distribuidoras
top5 = sorted(sum_por_dist.items(), key=lambda kv: kv[1], reverse=True)[:5]
top5_labels = [k for k, _ in top5]
top5_valores = [round(v, 2) for _, v in top5]
print("DBG /: calculos OK", flush=True)
print("DBG /: render template", flush=True)
return templates.TemplateResponse("dashboard.html", {
"request": request,
"clientes": clientes,
"cliente_atual": cliente or "",
"total_faturas": total_faturas,
"valor_restituicao_corrigida": valor_restituicao_corrigida,
"percentual_icms_base": percentual_icms_base,
# Novos dados para o template
"total_clientes": total_clientes,
"valor_medio_com_icms": valor_medio_com_icms,
"situacao_atual_percent": percentual_icms_base, # para gráfico de alerta
"serie_mensal_labels": serie_mensal_labels,
"serie_mensal_valores": serie_mensal_valores,
"top5_labels": top5_labels,
"top5_valores": top5_valores,
})
except Exception as e:
import traceback
print("ERR /:", e, flush=True)
traceback.print_exc()
# Página de erro amigável (sem derrubar servidor)
return HTMLResponse(
f"<pre style='padding:16px;color:#b91c1c;background:#fff1f2'>Falha no dashboard:\n{e}</pre>",
status_code=500
)
2025-07-28 13:29:45 -03:00
@app.get("/upload", response_class=HTMLResponse)
def upload_page(request: Request):
app_env = os.getenv("APP_ENV", "dev") # Captura variável de ambiente
return templates.TemplateResponse("upload.html", {
"request": request,
"app_env": app_env # Passa para o template
})
2025-07-28 13:29:45 -03:00
@app.get("/relatorios", response_class=HTMLResponse)
async def relatorios_page(request: Request, cliente: str | None = Query(None)):
async with AsyncSessionLocal() as session:
# Carregar clientes ativos para o combo
r_cli = await session.execute(text("""
SELECT id, nome_fantasia
FROM faturas.clientes
WHERE ativo = TRUE
ORDER BY nome_fantasia
"""))
clientes = [{"id": str(row.id), "nome": row.nome_fantasia} for row in r_cli]
# Carregar faturas (todas ou filtradas por cliente)
if cliente:
r_fat = await session.execute(text("""
SELECT *
FROM faturas.faturas
WHERE cliente_id = :cid
ORDER BY data_processamento DESC
"""), {"cid": cliente})
else:
r_fat = await session.execute(text("""
SELECT *
FROM faturas.faturas
ORDER BY data_processamento DESC
"""))
faturas = r_fat.mappings().all()
return templates.TemplateResponse("relatorios.html", {
"request": request,
"clientes": clientes,
"cliente_selecionado": cliente or "",
"faturas": faturas
})
2025-07-28 13:29:45 -03:00
@app.post("/upload-files")
async def upload_files(
cliente_id: str = Form(...),
files: list[UploadFile] = File(...)
):
2025-07-28 13:29:45 -03:00
for file in files:
temp_path = os.path.join(UPLOAD_DIR, f"{uuid.uuid4()}_{file.filename}")
with open(temp_path, "wb") as f:
shutil.copyfileobj(file.file, f)
await fila_processamento.put({
"caminho_pdf": temp_path,
"nome_original": file.filename,
"cliente_id": cliente_id
2025-07-28 13:29:45 -03:00
})
return {"message": "Arquivos enviados para fila"}
2025-07-28 13:29:45 -03:00
@app.post("/process-queue")
async def process_queue():
resultados = await processar_em_lote()
return {"message": "Processamento concluído", "resultados": resultados}
@app.get("/get-status")
async def get_status():
files = []
for nome, status in status_arquivos.items():
if isinstance(status, dict):
files.append({
"nome": nome,
"status": status.get("status", "Erro"),
"mensagem": status.get("mensagem", "---"),
"tempo": status.get("tempo", "---"),
"tamanho": f"{status.get('tamanho', 0)} KB",
"data": status.get("data", "")
})
else:
files.append({
"nome": nome,
"status": status,
"mensagem": "---" if status == "Concluído" else status,
"tempo": "---" # ✅ AQUI também
})
2025-07-28 13:29:45 -03:00
is_processing = not fila_processamento.empty()
return JSONResponse(content={"is_processing": is_processing, "files": files})
2025-07-28 13:29:45 -03:00
@app.post("/clear-all")
async def clear_all():
limpar_arquivos_processados()
for f in os.listdir(UPLOAD_DIR):
os.remove(os.path.join(UPLOAD_DIR, f))
return {"message": "Fila e arquivos limpos"}
@app.get("/export-excel")
async def export_excel(
tipo: str = Query("geral", pattern="^(geral|exclusao_icms|aliquota_icms)$"),
cliente: str | None = Query(None)
):
2025-07-28 13:29:45 -03:00
async with AsyncSessionLocal() as session:
# 1) Faturas
stmt = select(Fatura)
if cliente:
stmt = stmt.where(Fatura.cliente_id == cliente)
faturas = (await session.execute(stmt)).scalars().all()
# 2) Mapa de alíquotas cadastradas (UF/ano)
aliq_rows = (await session.execute(select(AliquotaUF))).scalars().all()
aliq_map = {(r.uf.upper(), int(r.exercicio)): float(r.aliq_icms) for r in aliq_rows}
dados = []
if tipo == "aliquota_icms":
for f in faturas:
uf = (f.estado or "").strip().upper()
ano, _ = _parse_referencia(f.referencia or "")
aliq_nf = float(f.icms_aliq or 0.0)
aliq_cad = aliq_map.get((uf, ano))
diff_pp = (aliq_nf - aliq_cad) if aliq_cad is not None else None
confere = (abs(diff_pp) < 1e-6) if diff_pp is not None else None
dados.append({
"Cliente": f.nome,
"UF (fatura)": uf,
"Exercício (ref)": ano,
"Referência": f.referencia,
"Nota Fiscal": f.nota_fiscal,
"ICMS (%) NF": aliq_nf,
# novas colunas padronizadas
"ICMS (%) (UF/Ref)": aliq_cad,
"Dif. ICMS (pp)": diff_pp,
"ICMS confere?": "SIM" if confere else ("N/D" if confere is None else "NÃO"),
"Valor Total": f.valor_total,
"Distribuidora": f.distribuidora,
"Data Processamento": f.data_processamento,
})
filename = "relatorio_aliquota_icms.xlsx"
elif tipo == "exclusao_icms":
for f in faturas:
uf = (f.estado or "").strip().upper()
ano, _ = _parse_referencia(f.referencia or "")
aliq_nf = float(f.icms_aliq or 0.0)
aliq_cad = aliq_map.get((uf, ano))
diff_pp = (aliq_nf - aliq_cad) if aliq_cad is not None else None
confere = (abs(diff_pp) < 1e-6) if diff_pp is not None else None
dados.append({
"Cliente": f.nome,
"UC": f.unidade_consumidora,
"Referência": f.referencia,
"Valor Total": f.valor_total,
"PIS (%)": f.pis_aliq,
"ICMS (%)": f.icms_aliq,
"COFINS (%)": f.cofins_aliq,
"PIS (R$)": f.pis_valor,
"ICMS (R$)": f.icms_valor,
"COFINS (R$)": f.cofins_valor,
"Base PIS (R$)": f.pis_base,
"Base ICMS (R$)": f.icms_base,
"Base COFINS (R$)": f.cofins_base,
# novas colunas
"ICMS (%) (UF/Ref)": aliq_cad,
"Dif. ICMS (pp)": diff_pp,
"ICMS confere?": "SIM" if confere else ("N/D" if confere is None else "NÃO"),
"Consumo (kWh)": f.consumo,
"Tarifa": f.tarifa,
"Nota Fiscal": f.nota_fiscal,
})
filename = "relatorio_exclusao_icms.xlsx"
2025-07-28 13:29:45 -03:00
else: # geral
for f in faturas:
uf = (f.estado or "").strip().upper()
ano, _ = _parse_referencia(f.referencia or "")
aliq_nf = float(f.icms_aliq or 0.0)
aliq_cad = aliq_map.get((uf, ano))
diff_pp = (aliq_nf - aliq_cad) if aliq_cad is not None else None
confere = (abs(diff_pp) < 1e-6) if diff_pp is not None else None
dados.append({
"Cliente": f.nome,
"UC": f.unidade_consumidora,
"Referência": f.referencia,
"Nota Fiscal": f.nota_fiscal,
"Valor Total": f.valor_total,
"ICMS (%)": f.icms_aliq,
"ICMS (R$)": f.icms_valor,
# novas colunas
"ICMS (%) (UF/Ref)": aliq_cad,
"Dif. ICMS (pp)": diff_pp,
"ICMS confere?": "SIM" if confere else ("N/D" if confere is None else "NÃO"),
"Base ICMS (R$)": f.icms_base,
"PIS (%)": f.pis_aliq,
"PIS (R$)": f.pis_valor,
"Base PIS (R$)": f.pis_base,
"COFINS (%)": f.cofins_aliq,
"COFINS (R$)": f.cofins_valor,
"Base COFINS (R$)": f.cofins_base,
"Consumo (kWh)": f.consumo,
"Tarifa": f.tarifa,
"Distribuidora": f.distribuidora,
"Data Processamento": f.data_processamento,
})
filename = "relatorio_geral.xlsx"
2025-07-28 13:29:45 -03:00
# 3) Excel em memória
output = BytesIO()
df = pd.DataFrame(dados)
with pd.ExcelWriter(output, engine="xlsxwriter") as writer:
df.to_excel(writer, index=False, sheet_name="Relatório")
output.seek(0)
return StreamingResponse(
output,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f'attachment; filename="{filename}"'}
)
from app.parametros import router as parametros_router
app.include_router(parametros_router)
2025-08-09 19:51:14 -03:00
app.include_router(clientes.router)
def is_homolog():
return os.getenv("APP_ENV", "dev") == "homolog"
@app.post("/limpar-faturas")
async def limpar_faturas():
app_env = os.getenv("APP_ENV", "dev")
if app_env not in ["homolog", "dev", "local"]:
return JSONResponse(status_code=403, content={"message": "Operação não permitida neste ambiente."})
async with AsyncSessionLocal() as session:
print("🧪 Limpando faturas do banco...")
await session.execute(text("DELETE FROM faturas.faturas"))
await session.commit()
upload_path = os.path.join("app", "uploads")
for nome in os.listdir(upload_path):
caminho = os.path.join(upload_path, nome)
if os.path.isfile(caminho):
os.remove(caminho)
return {"message": "Faturas e arquivos apagados com sucesso."}
@app.get("/erros/download")
async def download_erros():
zip_path = os.path.join("app", "uploads", "erros", "faturas_erro.zip")
if os.path.exists(zip_path):
response = FileResponse(zip_path, filename="faturas_erro.zip", media_type="application/zip")
# ⚠️ Agendar exclusão após resposta
asyncio.create_task(limpar_erros())
return response
else:
raise HTTPException(status_code=404, detail="Arquivo de erro não encontrado.")
@app.get("/erros/log")
async def download_log_erros():
txt_path = os.path.join("app", "uploads", "erros", "erros.txt")
if os.path.exists(txt_path):
response = FileResponse(txt_path, filename="erros.txt", media_type="text/plain")
# ⚠️ Agendar exclusão após resposta
asyncio.create_task(limpar_erros())
return response
else:
raise HTTPException(status_code=404, detail="Log de erro não encontrado.")
async def limpar_erros():
await asyncio.sleep(5) # Aguarda 5 segundos para garantir que o download inicie
pasta = os.path.join("app", "uploads", "erros")
for nome in ["faturas_erro.zip", "erros.txt"]:
caminho = os.path.join(pasta, nome)
if os.path.exists(caminho):
os.remove(caminho)
@app.get("/api/clientes")
async def listar_clientes(db: AsyncSession = Depends(get_session)):
sql = text("""
SELECT id, nome_fantasia, cnpj, ativo
FROM faturas.clientes
WHERE ativo = TRUE
ORDER BY nome_fantasia
""")
res = await db.execute(sql)
rows = res.mappings().all()
return [
{
"id": str(r["id"]),
"nome_fantasia": r["nome_fantasia"],
"cnpj": r["cnpj"],
"ativo": bool(r["ativo"]),
}
for r in rows
]
@app.get("/api/relatorios")
async def api_relatorios(
cliente: str | None = Query(None),
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=5, le=200),
db: AsyncSession = Depends(get_session),
):
offset = (page - 1) * page_size
where = "WHERE cliente_id = :cliente" if cliente else ""
params = {"limit": page_size, "offset": offset}
if cliente:
params["cliente"] = cliente
# ❗ Inclua 'estado' no SELECT
sql = text(f"""
SELECT id, nome, unidade_consumidora, referencia, nota_fiscal,
valor_total, icms_aliq, icms_valor, pis_aliq, pis_valor,
cofins_aliq, cofins_valor, distribuidora, data_processamento,
estado
FROM faturas.faturas
{where}
ORDER BY data_processamento DESC
LIMIT :limit OFFSET :offset
""")
count_sql = text(f"SELECT COUNT(*) AS total FROM faturas.faturas {where}")
rows = (await db.execute(sql, params)).mappings().all()
total = (await db.execute(count_sql, params)).scalar_one()
# 🔹 Carrega mapa de alíquotas UF/ano
aliq_rows = (await db.execute(select(AliquotaUF))).scalars().all()
aliq_map = {(r.uf.upper(), int(r.exercicio)): float(r.aliq_icms) for r in aliq_rows}
items = []
for r in rows:
uf = (r["estado"] or "").strip().upper()
ano, _mes = _parse_referencia(r["referencia"] or "")
aliq_nf = float(r["icms_aliq"] or 0.0)
aliq_cad = aliq_map.get((uf, ano))
diff_pp = (aliq_nf - aliq_cad) if aliq_cad is not None else None
ok = (abs(diff_pp) < 1e-6) if diff_pp is not None else None
items.append({
"id": str(r["id"]),
"nome": r["nome"],
"unidade_consumidora": r["unidade_consumidora"],
"referencia": r["referencia"],
"nota_fiscal": r["nota_fiscal"],
"valor_total": float(r["valor_total"]) if r["valor_total"] is not None else None,
"icms_aliq": aliq_nf,
"icms_valor": r["icms_valor"],
"pis_aliq": r["pis_aliq"],
"pis_valor": r["pis_valor"],
"cofins_aliq": r["cofins_aliq"],
"cofins_valor": r["cofins_valor"],
"distribuidora": r["distribuidora"],
"data_processamento": r["data_processamento"].isoformat() if r["data_processamento"] else None,
# novos
"estado": uf,
"exercicio": ano,
"aliq_cadastral": aliq_cad,
"aliq_diff_pp": round(diff_pp, 4) if diff_pp is not None else None,
"aliq_ok": ok,
})
return {"items": items, "total": total, "page": page, "page_size": page_size}
async def _carregar_aliquota_map(session):
rows = (await session.execute(
text("SELECT uf, exercicio, aliq_icms FROM faturas.aliquotas_uf")
)).mappings().all()
# (UF, ANO) -> float
return {(r["uf"].upper(), int(r["exercicio"])): float(r["aliq_icms"]) for r in rows}