Files
app_faturas/app/main.py
ewerton.almeida bcf9861e97
All checks were successful
continuous-integration/drone/push Build is passing
Criação da tela de clientes.
2025-08-09 19:51:14 -03:00

477 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import uuid
from fastapi import FastAPI, HTTPException, Request, UploadFile, File
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
import os, shutil
from sqlalchemy import text
from datetime import date
import re
from fastapi.responses import StreamingResponse
from io import BytesIO
from app.models import ParametrosFormula
from sqlalchemy.future import select
from app.database import AsyncSessionLocal
from app.models import Fatura
from app.processor import (
fila_processamento,
processar_em_lote,
status_arquivos,
limpar_arquivos_processados
)
from fastapi.responses import FileResponse
from app.models import Fatura, SelicMensal, ParametrosFormula
from datetime import date
from app.utils import avaliar_formula
from app.routes import clientes
app = FastAPI()
templates = Jinja2Templates(directory="app/templates")
app.state.templates = templates
app.mount("/static", StaticFiles(directory="app/static"), name="static")
UPLOAD_DIR = "uploads/temp"
os.makedirs(UPLOAD_DIR, exist_ok=True)
def _parse_referencia(ref: str):
"""Aceita 'JAN/2024', 'JAN/24', '01/2024', '01/24', '202401'. Retorna (ano, mes)."""
meses = {'JAN':1,'FEV':2,'MAR':3,'ABR':4,'MAI':5,'JUN':6,'JUL':7,'AGO':8,'SET':9,'OUT':10,'NOV':11,'DEZ':12}
ref = (ref or "").strip().upper()
if "/" in ref:
a, b = [p.strip() for p in ref.split("/", 1)]
# mês pode vir 'JAN' ou '01'
mes = meses.get(a, None)
if mes is None:
mes = int(re.sub(r"\D", "", a) or 1)
ano = int(re.sub(r"\D", "", b) or 0)
# ano 2 dígitos -> 2000+
if ano < 100:
ano += 2000
else:
# '202401' ou '2024-01'
num = re.sub(r"\D", "", ref)
if len(num) >= 6:
ano, mes = int(num[:4]), int(num[4:6])
elif len(num) == 4: # '2024'
ano, mes = int(num), 1
else:
ano, mes = date.today().year, 1
return ano, mes
async def _carregar_selic_map(session):
res = await session.execute(text("SELECT ano, mes, percentual FROM faturas.selic_mensal"))
rows = res.mappings().all()
return {(int(r["ano"]), int(r["mes"])): float(r["percentual"]) for r in rows}
def _fator_selic_from_map(selic_map: dict, ano_inicio: int, mes_inicio: int, hoje: date) -> float:
try:
ano, mes = int(ano_inicio), int(mes_inicio)
except Exception:
return 1.0
if ano > hoje.year or (ano == hoje.year and mes > hoje.month):
return 1.0
fator = 1.0
while (ano < hoje.year) or (ano == hoje.year and mes <= hoje.month):
perc = selic_map.get((ano, mes))
if perc is not None:
fator *= (1 + (perc / 100.0))
mes += 1
if mes > 12:
mes = 1
ano += 1
return fator
def _avaliar_formula(texto_formula: str | None, contexto: dict) -> float:
if not texto_formula:
return 0.0
expr = str(texto_formula)
# Substitui nomes de campos por valores numéricos (None -> 0)
for campo, valor in contexto.items():
v = valor
if v is None or v == "":
v = 0
# aceita vírgula como decimal vindo do banco
if isinstance(v, str):
v = v.replace(".", "").replace(",", ".") if re.search(r"[0-9],[0-9]", v) else v
expr = re.sub(rf'\b{re.escape(str(campo))}\b', str(v), expr)
try:
return float(eval(expr, {"__builtins__": {}}, {}))
except Exception:
return 0.0
@app.get("/", response_class=HTMLResponse)
async def dashboard(request: Request, cliente: str | None = None):
print("DBG /: inicio", flush=True)
try:
async with AsyncSessionLocal() as session:
print("DBG /: abrindo sessão", flush=True)
r = await session.execute(text(
"SELECT DISTINCT nome FROM faturas.faturas ORDER BY nome"
))
clientes = [c for c, in r.fetchall()]
print(f"DBG /: clientes={len(clientes)}", flush=True)
# Fórmulas
fp = await session.execute(text("""
SELECT formula FROM faturas.parametros_formula
WHERE nome = 'Cálculo PIS sobre ICMS' AND ativo = TRUE LIMIT 1
"""))
formula_pis = fp.scalar_one_or_none()
fc = await session.execute(text("""
SELECT formula FROM faturas.parametros_formula
WHERE nome = 'Cálculo COFINS sobre ICMS' AND ativo = TRUE LIMIT 1
"""))
formula_cofins = fc.scalar_one_or_none()
print(f"DBG /: tem_formulas pis={bool(formula_pis)} cofins={bool(formula_cofins)}", flush=True)
sql = "SELECT * FROM faturas.faturas"
params = {}
if cliente:
sql += " WHERE nome = :cliente"
params["cliente"] = cliente
print("DBG /: SQL faturas ->", sql, params, flush=True)
ftrs = (await session.execute(text(sql), params)).mappings().all()
print(f"DBG /: total_faturas={len(ftrs)}", flush=True)
# ===== KPIs e Séries para o dashboard =====
from collections import defaultdict
total_faturas = len(ftrs)
qtd_icms_na_base = 0
soma_corrigida = 0.0
hoje = date.today()
selic_map = await _carregar_selic_map(session)
# Séries e somatórios comerciais
serie_mensal = defaultdict(float) # {(ano, mes): valor_corrigido}
sum_por_dist = defaultdict(float) # {"distribuidora": valor_corrigido}
somatorio_v_total = 0.0
contagem_com_icms = 0
for f in ftrs:
ctx = dict(f)
# PIS/COFINS sobre ICMS
v_pis = _avaliar_formula(formula_pis, ctx)
v_cof = _avaliar_formula(formula_cofins, ctx)
v_total = max(0.0, float(v_pis or 0) + float(v_cof or 0))
# % de faturas com ICMS na base
if (v_pis or 0) > 0:
qtd_icms_na_base += 1
contagem_com_icms += 1
# referência -> (ano,mes)
try:
ano, mes = _parse_referencia(f.get("referencia"))
except Exception:
ano, mes = hoje.year, hoje.month
# SELIC
fator = _fator_selic_from_map(selic_map, ano, mes, hoje)
valor_corrigido = v_total * fator
soma_corrigida += valor_corrigido
somatorio_v_total += v_total
# séries
serie_mensal[(ano, mes)] += valor_corrigido
dist = (f.get("distribuidora") or "").strip() or "Não informado"
sum_por_dist[dist] += valor_corrigido
percentual_icms_base = (qtd_icms_na_base / total_faturas * 100.0) if total_faturas else 0.0
valor_restituicao_corrigida = soma_corrigida
valor_medio_com_icms = (somatorio_v_total / contagem_com_icms) if contagem_com_icms else 0.0
# total de clientes (distinct já carregado)
total_clientes = len(clientes)
# Série mensal últimos 12 meses
ultimos = []
a, m = hoje.year, hoje.month
for _ in range(12):
ultimos.append((a, m))
m -= 1
if m == 0:
m = 12; a -= 1
ultimos.reverse()
serie_mensal_labels = [f"{mes:02d}/{ano}" for (ano, mes) in ultimos]
serie_mensal_valores = [round(serie_mensal.get((ano, mes), 0.0), 2) for (ano, mes) in ultimos]
# Top 5 distribuidoras
top5 = sorted(sum_por_dist.items(), key=lambda kv: kv[1], reverse=True)[:5]
top5_labels = [k for k, _ in top5]
top5_valores = [round(v, 2) for _, v in top5]
print("DBG /: calculos OK", flush=True)
print("DBG /: render template", flush=True)
return templates.TemplateResponse("dashboard.html", {
"request": request,
"clientes": clientes,
"cliente_atual": cliente or "",
"total_faturas": total_faturas,
"valor_restituicao_corrigida": valor_restituicao_corrigida,
"percentual_icms_base": percentual_icms_base,
# Novos dados para o template
"total_clientes": total_clientes,
"valor_medio_com_icms": valor_medio_com_icms,
"situacao_atual_percent": percentual_icms_base, # para gráfico de alerta
"serie_mensal_labels": serie_mensal_labels,
"serie_mensal_valores": serie_mensal_valores,
"top5_labels": top5_labels,
"top5_valores": top5_valores,
})
except Exception as e:
import traceback
print("ERR /:", e, flush=True)
traceback.print_exc()
# Página de erro amigável (sem derrubar servidor)
return HTMLResponse(
f"<pre style='padding:16px;color:#b91c1c;background:#fff1f2'>Falha no dashboard:\n{e}</pre>",
status_code=500
)
@app.get("/upload", response_class=HTMLResponse)
def upload_page(request: Request):
app_env = os.getenv("APP_ENV", "dev") # Captura variável de ambiente
return templates.TemplateResponse("upload.html", {
"request": request,
"app_env": app_env # Passa para o template
})
@app.get("/relatorios", response_class=HTMLResponse)
def relatorios_page(request: Request):
return templates.TemplateResponse("relatorios.html", {"request": request})
@app.post("/upload-files")
async def upload_files(files: list[UploadFile] = File(...)):
for file in files:
temp_path = os.path.join(UPLOAD_DIR, f"{uuid.uuid4()}_{file.filename}")
with open(temp_path, "wb") as f:
shutil.copyfileobj(file.file, f)
await fila_processamento.put({
"caminho_pdf": temp_path,
"nome_original": file.filename
})
return {"message": "Arquivos enviados para fila"}
@app.post("/process-queue")
async def process_queue():
resultados = await processar_em_lote()
return {"message": "Processamento concluído", "resultados": resultados}
@app.get("/get-status")
async def get_status():
files = []
for nome, status in status_arquivos.items():
if isinstance(status, dict):
files.append({
"nome": nome,
"status": status.get("status", "Erro"),
"mensagem": status.get("mensagem", "---"),
"tempo": status.get("tempo", "---"),
"tamanho": f"{status.get('tamanho', 0)} KB",
"data": status.get("data", "")
})
else:
files.append({
"nome": nome,
"status": status,
"mensagem": "---" if status == "Concluído" else status,
"tempo": "---" # ✅ AQUI também
})
is_processing = not fila_processamento.empty()
return JSONResponse(content={"is_processing": is_processing, "files": files})
@app.post("/clear-all")
async def clear_all():
limpar_arquivos_processados()
for f in os.listdir(UPLOAD_DIR):
os.remove(os.path.join(UPLOAD_DIR, f))
return {"message": "Fila e arquivos limpos"}
@app.get("/export-excel")
async def export_excel():
import pandas as pd
async with AsyncSessionLocal() as session:
# 1. Coletar faturas e tabela SELIC
faturas_result = await session.execute(select(Fatura))
faturas = faturas_result.scalars().all()
selic_result = await session.execute(select(SelicMensal))
selic_tabela = selic_result.scalars().all()
# 2. Criar mapa {(ano, mes): percentual}
selic_map = {(s.ano, s.mes): float(s.percentual) for s in selic_tabela}
hoje = date.today()
def calcular_fator_selic(ano_inicio, mes_inicio):
fator = 1.0
ano, mes = ano_inicio, mes_inicio
while (ano < hoje.year) or (ano == hoje.year and mes <= hoje.month):
percentual = selic_map.get((ano, mes))
if percentual:
fator *= (1 + percentual / 100)
mes += 1
if mes > 12:
mes = 1
ano += 1
return fator
# 3. Buscar fórmulas exatas por nome
formula_pis_result = await session.execute(
select(ParametrosFormula.formula).where(
ParametrosFormula.nome == "Cálculo PIS sobre ICMS",
ParametrosFormula.ativo == True
).limit(1)
)
formula_cofins_result = await session.execute(
select(ParametrosFormula.formula).where(
ParametrosFormula.nome == "Cálculo COFINS sobre ICMS",
ParametrosFormula.ativo == True
).limit(1)
)
formula_pis = formula_pis_result.scalar_one_or_none()
formula_cofins = formula_cofins_result.scalar_one_or_none()
# 4. Montar dados
mes_map = {
'JAN': 1, 'FEV': 2, 'MAR': 3, 'ABR': 4, 'MAI': 5, 'JUN': 6,
'JUL': 7, 'AGO': 8, 'SET': 9, 'OUT': 10, 'NOV': 11, 'DEZ': 12
}
dados = []
for f in faturas:
try:
if "/" in f.referencia:
mes_str, ano_str = f.referencia.split("/")
mes = mes_map.get(mes_str.strip().upper())
ano = int(ano_str)
if not mes or not ano:
raise ValueError("Mês ou ano inválido")
else:
ano = int(f.referencia[:4])
mes = int(f.referencia[4:])
fator = calcular_fator_selic(ano, mes)
periodo = f"{mes:02d}/{ano} à {hoje.month:02d}/{hoje.year}"
contexto = f.__dict__
valor_pis_icms = avaliar_formula(formula_pis, contexto) if formula_pis else None
valor_cofins_icms = avaliar_formula(formula_cofins, contexto) if formula_cofins else None
dados.append({
"Nome": f.nome,
"UC": f.unidade_consumidora,
"Referência": f.referencia,
"Nota Fiscal": f.nota_fiscal,
"Valor Total": f.valor_total,
"ICMS (%)": f.icms_aliq,
"ICMS (R$)": f.icms_valor,
"Base ICMS": f.icms_base,
"PIS (%)": f.pis_aliq,
"PIS (R$)": f.pis_valor,
"Base PIS": f.pis_base,
"COFINS (%)": f.cofins_aliq,
"COFINS (R$)": f.cofins_valor,
"Base COFINS": f.cofins_base,
"Consumo (kWh)": f.consumo,
"Tarifa": f.tarifa,
"Cidade": f.cidade,
"Estado": f.estado,
"Distribuidora": f.distribuidora,
"Data Processamento": f.data_processamento,
"Fator SELIC acumulado": fator,
"Período SELIC usado": periodo,
"PIS sobre ICMS": valor_pis_icms,
"Valor Corrigido PIS (ICMS)": valor_pis_icms * fator if valor_pis_icms else None,
"COFINS sobre ICMS": valor_cofins_icms,
"Valor Corrigido COFINS (ICMS)": valor_cofins_icms * fator if valor_cofins_icms else None,
})
except Exception as e:
print(f"Erro ao processar fatura {f.nota_fiscal}: {e}")
df = pd.DataFrame(dados)
output = BytesIO()
with pd.ExcelWriter(output, engine="xlsxwriter") as writer:
df.to_excel(writer, index=False, sheet_name="Faturas Corrigidas")
output.seek(0)
return StreamingResponse(output, media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={
"Content-Disposition": "attachment; filename=faturas_corrigidas.xlsx"
})
from app.parametros import router as parametros_router
app.include_router(parametros_router)
app.include_router(clientes.router)
def is_homolog():
return os.getenv("APP_ENV", "dev") == "homolog"
@app.post("/limpar-faturas")
async def limpar_faturas():
app_env = os.getenv("APP_ENV", "dev")
if app_env not in ["homolog", "dev", "local"]:
return JSONResponse(status_code=403, content={"message": "Operação não permitida neste ambiente."})
async with AsyncSessionLocal() as session:
print("🧪 Limpando faturas do banco...")
await session.execute(text("DELETE FROM faturas.faturas"))
await session.commit()
upload_path = os.path.join("app", "uploads")
for nome in os.listdir(upload_path):
caminho = os.path.join(upload_path, nome)
if os.path.isfile(caminho):
os.remove(caminho)
return {"message": "Faturas e arquivos apagados com sucesso."}
@app.get("/erros/download")
async def download_erros():
zip_path = os.path.join("app", "uploads", "erros", "faturas_erro.zip")
if os.path.exists(zip_path):
response = FileResponse(zip_path, filename="faturas_erro.zip", media_type="application/zip")
# ⚠️ Agendar exclusão após resposta
asyncio.create_task(limpar_erros())
return response
else:
raise HTTPException(status_code=404, detail="Arquivo de erro não encontrado.")
@app.get("/erros/log")
async def download_log_erros():
txt_path = os.path.join("app", "uploads", "erros", "erros.txt")
if os.path.exists(txt_path):
response = FileResponse(txt_path, filename="erros.txt", media_type="text/plain")
# ⚠️ Agendar exclusão após resposta
asyncio.create_task(limpar_erros())
return response
else:
raise HTTPException(status_code=404, detail="Log de erro não encontrado.")
async def limpar_erros():
await asyncio.sleep(5) # Aguarda 5 segundos para garantir que o download inicie
pasta = os.path.join("app", "uploads", "erros")
for nome in ["faturas_erro.zip", "erros.txt"]:
caminho = os.path.join(pasta, nome)
if os.path.exists(caminho):
os.remove(caminho)