Criação da tela de clientes e relatórios
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2025-08-11 13:14:54 -03:00
parent bcf9861e97
commit 950eb2a826
7 changed files with 595 additions and 181 deletions

View File

@@ -1,6 +1,6 @@
import asyncio
import uuid
from fastapi import FastAPI, HTTPException, Request, UploadFile, File
from fastapi import FastAPI, HTTPException, Request, UploadFile, File, Depends, Form
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
@@ -25,6 +25,10 @@ from app.models import Fatura, SelicMensal, ParametrosFormula
from datetime import date
from app.utils import avaliar_formula
from app.routes import clientes
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_session
from fastapi import Query
from sqlalchemy import select as sqla_select
app = FastAPI()
@@ -32,7 +36,7 @@ templates = Jinja2Templates(directory="app/templates")
app.state.templates = templates
app.mount("/static", StaticFiles(directory="app/static"), name="static")
UPLOAD_DIR = "uploads/temp"
UPLOAD_DIR = os.path.join("app", "uploads", "temp")
os.makedirs(UPLOAD_DIR, exist_ok=True)
def _parse_referencia(ref: str):
@@ -99,7 +103,27 @@ def _avaliar_formula(texto_formula: str | None, contexto: dict) -> float:
# aceita vírgula como decimal vindo do banco
if isinstance(v, str):
v = v.replace(".", "").replace(",", ".") if re.search(r"[0-9],[0-9]", v) else v
expr = re.sub(rf'\b{re.escape(str(campo))}\b', str(v), expr)
# nome do campo escapado na regex
pat = rf'\b{re.escape(str(campo))}\b'
# normaliza o valor para número; se não der, vira 0
val = v
if val is None or val == "":
num = 0.0
else:
if isinstance(val, str):
# troca vírgula decimal e remove separador de milhar simples
val_norm = val.replace(".", "").replace(",", ".")
else:
val_norm = val
try:
num = float(val_norm)
except Exception:
num = 0.0
# usa lambda para evitar interpretação de backslashes no replacement
expr = re.sub(pat, lambda m: str(num), expr)
try:
return float(eval(expr, {"__builtins__": {}}, {}))
@@ -113,10 +137,14 @@ async def dashboard(request: Request, cliente: str | None = None):
async with AsyncSessionLocal() as session:
print("DBG /: abrindo sessão", flush=True)
r = await session.execute(text(
"SELECT DISTINCT nome FROM faturas.faturas ORDER BY nome"
))
clientes = [c for c, in r.fetchall()]
r = await session.execute(text("""
SELECT id, nome_fantasia
FROM faturas.clientes
WHERE ativo = TRUE
ORDER BY nome_fantasia
"""))
clientes = [{"id": id_, "nome": nome} for id_, nome in r.fetchall()]
print(f"DBG /: clientes={len(clientes)}", flush=True)
# Fórmulas
@@ -135,7 +163,7 @@ async def dashboard(request: Request, cliente: str | None = None):
sql = "SELECT * FROM faturas.faturas"
params = {}
if cliente:
sql += " WHERE nome = :cliente"
sql += " WHERE cliente_id = :cliente"
params["cliente"] = cliente
print("DBG /: SQL faturas ->", sql, params, flush=True)
@@ -254,21 +282,58 @@ def upload_page(request: Request):
})
@app.get("/relatorios", response_class=HTMLResponse)
def relatorios_page(request: Request):
return templates.TemplateResponse("relatorios.html", {"request": request})
async def relatorios_page(request: Request, cliente: str | None = Query(None)):
async with AsyncSessionLocal() as session:
# Carregar clientes ativos para o combo
r_cli = await session.execute(text("""
SELECT id, nome_fantasia
FROM faturas.clientes
WHERE ativo = TRUE
ORDER BY nome_fantasia
"""))
clientes = [{"id": str(row.id), "nome": row.nome_fantasia} for row in r_cli]
# Carregar faturas (todas ou filtradas por cliente)
if cliente:
r_fat = await session.execute(text("""
SELECT *
FROM faturas.faturas
WHERE cliente_id = :cid
ORDER BY data_processamento DESC
"""), {"cid": cliente})
else:
r_fat = await session.execute(text("""
SELECT *
FROM faturas.faturas
ORDER BY data_processamento DESC
"""))
faturas = r_fat.mappings().all()
return templates.TemplateResponse("relatorios.html", {
"request": request,
"clientes": clientes,
"cliente_selecionado": cliente or "",
"faturas": faturas
})
@app.post("/upload-files")
async def upload_files(files: list[UploadFile] = File(...)):
async def upload_files(
cliente_id: str = Form(...),
files: list[UploadFile] = File(...)
):
for file in files:
temp_path = os.path.join(UPLOAD_DIR, f"{uuid.uuid4()}_{file.filename}")
with open(temp_path, "wb") as f:
shutil.copyfileobj(file.file, f)
await fila_processamento.put({
"caminho_pdf": temp_path,
"nome_original": file.filename
"nome_original": file.filename,
"cliente_id": cliente_id
})
return {"message": "Arquivos enviados para fila"}
@app.post("/process-queue")
async def process_queue():
resultados = await processar_em_lote()
@@ -307,117 +372,109 @@ async def clear_all():
return {"message": "Fila e arquivos limpos"}
@app.get("/export-excel")
async def export_excel():
async def export_excel(
tipo: str = Query("geral", regex="^(geral|exclusao_icms|aliquota_icms)$"),
cliente: str | None = Query(None)
):
import pandas as pd
# 1) Carregar faturas (com filtro por cliente, se houver)
async with AsyncSessionLocal() as session:
# 1. Coletar faturas e tabela SELIC
faturas_result = await session.execute(select(Fatura))
stmt = select(Fatura)
if cliente:
# filtra por cliente_id (UUID em string)
stmt = stmt.where(Fatura.cliente_id == cliente)
faturas_result = await session.execute(stmt)
faturas = faturas_result.scalars().all()
selic_result = await session.execute(select(SelicMensal))
selic_tabela = selic_result.scalars().all()
# 2. Criar mapa {(ano, mes): percentual}
selic_map = {(s.ano, s.mes): float(s.percentual) for s in selic_tabela}
hoje = date.today()
def calcular_fator_selic(ano_inicio, mes_inicio):
fator = 1.0
ano, mes = ano_inicio, mes_inicio
while (ano < hoje.year) or (ano == hoje.year and mes <= hoje.month):
percentual = selic_map.get((ano, mes))
if percentual:
fator *= (1 + percentual / 100)
mes += 1
if mes > 12:
mes = 1
ano += 1
return fator
# 3. Buscar fórmulas exatas por nome
formula_pis_result = await session.execute(
select(ParametrosFormula.formula).where(
ParametrosFormula.nome == "Cálculo PIS sobre ICMS",
ParametrosFormula.ativo == True
).limit(1)
)
formula_cofins_result = await session.execute(
select(ParametrosFormula.formula).where(
ParametrosFormula.nome == "Cálculo COFINS sobre ICMS",
ParametrosFormula.ativo == True
).limit(1)
)
formula_pis = formula_pis_result.scalar_one_or_none()
formula_cofins = formula_cofins_result.scalar_one_or_none()
# 4. Montar dados
mes_map = {
'JAN': 1, 'FEV': 2, 'MAR': 3, 'ABR': 4, 'MAI': 5, 'JUN': 6,
'JUL': 7, 'AGO': 8, 'SET': 9, 'OUT': 10, 'NOV': 11, 'DEZ': 12
}
# 2) Montar dados conforme 'tipo'
dados = []
for f in faturas:
try:
if "/" in f.referencia:
mes_str, ano_str = f.referencia.split("/")
mes = mes_map.get(mes_str.strip().upper())
ano = int(ano_str)
if not mes or not ano:
raise ValueError("Mês ou ano inválido")
else:
ano = int(f.referencia[:4])
mes = int(f.referencia[4:])
fator = calcular_fator_selic(ano, mes)
periodo = f"{mes:02d}/{ano} à {hoje.month:02d}/{hoje.year}"
contexto = f.__dict__
valor_pis_icms = avaliar_formula(formula_pis, contexto) if formula_pis else None
valor_cofins_icms = avaliar_formula(formula_cofins, contexto) if formula_cofins else None
if tipo == "aliquota_icms":
# Campos: (Cliente, UC, Referência, Valor Total, ICMS (%), ICMS (R$),
# Base ICMS (R$), Consumo (kWh), Tarifa, Nota Fiscal)
for f in faturas:
dados.append({
"Nome": f.nome,
"Cliente": f.nome,
"UC": f.unidade_consumidora,
"Referência": f.referencia,
"Valor Total": f.valor_total,
"ICMS (%)": f.icms_aliq,
"ICMS (R$)": f.icms_valor,
"Base ICMS (R$)": f.icms_base,
"Consumo (kWh)": f.consumo,
"Tarifa": f.tarifa,
"Nota Fiscal": f.nota_fiscal,
})
filename = "relatorio_aliquota_icms.xlsx"
elif tipo == "exclusao_icms":
# Campos: (Cliente, UC, Referência, Valor Total, PIS (%), ICMS (%), COFINS (%),
# PIS (R$), ICMS (R$), COFINS (R$), Base PIS (R$), Base ICMS (R$),
# Base COFINS (R$), Consumo (kWh), Tarifa, Nota Fiscal)
for f in faturas:
dados.append({
"Cliente": f.nome,
"UC": f.unidade_consumidora,
"Referência": f.referencia,
"Valor Total": f.valor_total,
"PIS (%)": f.pis_aliq,
"ICMS (%)": f.icms_aliq,
"COFINS (%)": f.cofins_aliq,
"PIS (R$)": f.pis_valor,
"ICMS (R$)": f.icms_valor,
"COFINS (R$)": f.cofins_valor,
"Base PIS (R$)": f.pis_base,
"Base ICMS (R$)": f.icms_base,
"Base COFINS (R$)": f.cofins_base,
"Consumo (kWh)": f.consumo,
"Tarifa": f.tarifa,
"Nota Fiscal": f.nota_fiscal,
})
filename = "relatorio_exclusao_icms.xlsx"
else: # "geral" (mantém seu relatório atual — sem fórmulas SELIC)
# Se quiser manter exatamente o que já tinha com SELIC e fórmulas,
# você pode copiar sua lógica anterior aqui. Abaixo deixo um "geral"
# simplificado com as colunas principais.
for f in faturas:
dados.append({
"Cliente": f.nome,
"UC": f.unidade_consumidora,
"Referência": f.referencia,
"Nota Fiscal": f.nota_fiscal,
"Valor Total": f.valor_total,
"ICMS (%)": f.icms_aliq,
"ICMS (R$)": f.icms_valor,
"Base ICMS": f.icms_base,
"Base ICMS (R$)": f.icms_base,
"PIS (%)": f.pis_aliq,
"PIS (R$)": f.pis_valor,
"Base PIS": f.pis_base,
"Base PIS (R$)": f.pis_base,
"COFINS (%)": f.cofins_aliq,
"COFINS (R$)": f.cofins_valor,
"Base COFINS": f.cofins_base,
"Base COFINS (R$)": f.cofins_base,
"Consumo (kWh)": f.consumo,
"Tarifa": f.tarifa,
"Cidade": f.cidade,
"Estado": f.estado,
"Distribuidora": f.distribuidora,
"Data Processamento": f.data_processamento,
"Fator SELIC acumulado": fator,
"Período SELIC usado": periodo,
"PIS sobre ICMS": valor_pis_icms,
"Valor Corrigido PIS (ICMS)": valor_pis_icms * fator if valor_pis_icms else None,
"COFINS sobre ICMS": valor_cofins_icms,
"Valor Corrigido COFINS (ICMS)": valor_cofins_icms * fator if valor_cofins_icms else None,
})
except Exception as e:
print(f"Erro ao processar fatura {f.nota_fiscal}: {e}")
df = pd.DataFrame(dados)
filename = "relatorio_geral.xlsx"
# 3) Gerar excel em memória
from io import BytesIO
output = BytesIO()
df = pd.DataFrame(dados)
with pd.ExcelWriter(output, engine="xlsxwriter") as writer:
df.to_excel(writer, index=False, sheet_name="Faturas Corrigidas")
df.to_excel(writer, index=False, sheet_name="Relatório")
output.seek(0)
return StreamingResponse(output, media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={
"Content-Disposition": "attachment; filename=faturas_corrigidas.xlsx"
})
# 4) Responder
return StreamingResponse(
output,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f'attachment; filename="{filename}"'}
)
from app.parametros import router as parametros_router
app.include_router(parametros_router)
@@ -474,3 +531,70 @@ async def limpar_erros():
caminho = os.path.join(pasta, nome)
if os.path.exists(caminho):
os.remove(caminho)
@app.get("/api/clientes")
async def listar_clientes(db: AsyncSession = Depends(get_session)):
sql = text("""
SELECT id, nome_fantasia, cnpj, ativo
FROM faturas.clientes
WHERE ativo = TRUE
ORDER BY nome_fantasia
""")
res = await db.execute(sql)
rows = res.mappings().all()
return [
{
"id": str(r["id"]),
"nome_fantasia": r["nome_fantasia"],
"cnpj": r["cnpj"],
"ativo": bool(r["ativo"]),
}
for r in rows
]
@app.get("/api/relatorios")
async def api_relatorios(
cliente: str | None = Query(None),
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=5, le=200),
db: AsyncSession = Depends(get_session),
):
offset = (page - 1) * page_size
where = "WHERE cliente_id = :cliente" if cliente else ""
params = {"limit": page_size, "offset": offset}
if cliente:
params["cliente"] = cliente
sql = text(f"""
SELECT id, nome, unidade_consumidora, referencia, nota_fiscal,
valor_total, icms_aliq, icms_valor, pis_aliq, pis_valor,
cofins_aliq, cofins_valor, distribuidora, data_processamento
FROM faturas.faturas
{where}
ORDER BY data_processamento DESC
LIMIT :limit OFFSET :offset
""")
count_sql = text(f"SELECT COUNT(*) AS total FROM faturas.faturas {where}")
rows = (await db.execute(sql, params)).mappings().all()
total = (await db.execute(count_sql, params)).scalar_one()
items = [{
"id": str(r["id"]),
"nome": r["nome"],
"unidade_consumidora": r["unidade_consumidora"],
"referencia": r["referencia"],
"nota_fiscal": r["nota_fiscal"],
"valor_total": float(r["valor_total"]) if r["valor_total"] is not None else None,
"icms_aliq": r["icms_aliq"],
"icms_valor": r["icms_valor"],
"pis_aliq": r["pis_aliq"],
"pis_valor": r["pis_valor"],
"cofins_aliq": r["cofins_aliq"],
"cofins_valor": r["cofins_valor"],
"distribuidora": r["distribuidora"],
"data_processamento": r["data_processamento"].isoformat() if r["data_processamento"] else None,
} for r in rows]
return {"items": items, "total": total, "page": page, "page_size": page_size}