Files
app_faturas/app/parametros.py
2025-08-11 18:45:57 -03:00

347 lines
12 KiB
Python

# parametros.py
from fastapi import APIRouter, Request, Depends, Form, UploadFile, File
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_session
from app.models import AliquotaUF, ParametrosFormula, SelicMensal
from typing import List
from pydantic import BaseModel
import datetime
from fastapi.templating import Jinja2Templates
from sqlalchemy.future import select
from app.database import AsyncSessionLocal
from fastapi.responses import RedirectResponse, JSONResponse
from app.models import Fatura
from fastapi import Body
from app.database import engine
import httpx
from app.models import SelicMensal
from sqlalchemy.dialects.postgresql import insert as pg_insert
import io
import csv
from fastapi.responses import StreamingResponse
import pandas as pd
from io import BytesIO
from sqlalchemy import select
from decimal import Decimal
router = APIRouter()
# === Schemas ===
class AliquotaUFSchema(BaseModel):
uf: str
exercicio: int
aliq_icms: float
class Config:
from_attributes = True
class ParametrosFormulaSchema(BaseModel):
nome: str
formula: str
ativo: bool = True
class Config:
from_attributes = True
class SelicMensalSchema(BaseModel):
mes: str # 'YYYY-MM'
fator: float
class Config:
from_attributes = True
# === Rotas ===
templates = Jinja2Templates(directory="app/templates")
@router.get("/parametros")
async def parametros_page(request: Request):
async with AsyncSessionLocal() as session:
# Consulta das fórmulas
result = await session.execute(select(ParametrosFormula))
parametros = result.scalars().all()
# Consulta da tabela selic_mensal
selic_result = await session.execute(
select(SelicMensal).order_by(SelicMensal.ano.desc(), SelicMensal.mes.desc())
)
selic_dados = selic_result.scalars().all()
# Pega última data
ultima_data_selic = "-"
if selic_dados:
ultima = selic_dados[0]
ultima_data_selic = f"{ultima.mes:02d}/{ultima.ano}"
# Campos numéricos da fatura
campos = [
col.name for col in Fatura.__table__.columns
if col.type.__class__.__name__ in ['Integer', 'Float', 'Numeric']
]
return templates.TemplateResponse("parametros.html", {
"request": request,
"lista_parametros": parametros,
"parametros": {},
"campos_fatura": campos,
"selic_dados": selic_dados,
"ultima_data_selic": ultima_data_selic
})
@router.post("/parametros/editar/{param_id}")
async def editar_parametro(param_id: int, request: Request):
data = await request.json()
async with AsyncSessionLocal() as session:
param = await session.get(ParametrosFormula, param_id)
if param:
param.nome = data.get("nome", param.nome)
param.formula = data.get("formula", param.formula)
param.ativo = data.get("ativo", param.ativo)
await session.commit()
return {"success": True}
return {"success": False, "error": "Não encontrado"}
@router.post("/parametros/ativar/{param_id}")
async def ativar_parametro(param_id: int, request: Request):
data = await request.json()
ativo = bool(data.get("ativo", True))
async with AsyncSessionLocal() as session:
param = await session.get(ParametrosFormula, param_id)
if not param:
return JSONResponse(status_code=404, content={"error": "Parâmetro não encontrado"})
param.ativo = ativo
await session.commit()
return {"success": True}
@router.get("/parametros/delete/{param_id}")
async def deletar_parametro(param_id: int):
async with AsyncSessionLocal() as session:
param = await session.get(ParametrosFormula, param_id)
if not param:
return RedirectResponse("/parametros?erro=1&msg=Parâmetro não encontrado", status_code=303)
await session.delete(param)
await session.commit()
return RedirectResponse("/parametros?ok=1&msg=Parâmetro removido", status_code=303)
@router.post("/parametros/testar")
async def testar_formula(db: AsyncSession = Depends(get_session), data: dict = Body(...)):
formula = data.get("formula")
exemplo = await db.execute(select(Fatura).limit(1))
fatura = exemplo.scalar_one_or_none()
if not fatura:
return {"success": False, "error": "Sem dados para teste."}
try:
contexto = {col.name: getattr(fatura, col.name) for col in Fatura.__table__.columns}
resultado = eval(formula, {}, contexto)
return {"success": True, "resultado": resultado}
except Exception as e:
return {"success": False, "error": str(e)}
@router.get("/parametros/aliquotas")
async def listar_aliquotas(uf: str | None = None, db: AsyncSession = Depends(get_session)):
stmt = select(AliquotaUF).order_by(AliquotaUF.uf, AliquotaUF.exercicio.desc())
if uf:
stmt = stmt.where(AliquotaUF.uf == uf)
rows = (await db.execute(stmt)).scalars().all()
return [
{"uf": r.uf, "exercicio": int(r.exercicio), "aliquota": float(r.aliq_icms)}
for r in rows
]
@router.post("/parametros/aliquotas")
async def adicionar_aliquota(aliq: AliquotaUFSchema, db: AsyncSession = Depends(get_session)):
result = await db.execute(
select(AliquotaUF).filter_by(uf=aliq.uf, exercicio=aliq.exercicio)
)
existente = result.scalar_one_or_none()
if existente:
existente.aliq_icms = aliq.aliq_icms # atualizado
else:
novo = AliquotaUF(**aliq.dict())
db.add(novo)
await db.commit()
return RedirectResponse(url="/parametros?ok=true&msg=Alíquota salva com sucesso", status_code=303)
@router.get("/parametros/formulas", response_model=List[ParametrosFormulaSchema])
async def listar_formulas(db: AsyncSession = Depends(get_session)):
result = await db.execute(select(ParametrosFormula).order_by(ParametrosFormula.nome))
return result.scalars().all()
@router.post("/parametros/formulas")
async def salvar_formula(form: ParametrosFormulaSchema, db: AsyncSession = Depends(get_session)):
result = await db.execute(
select(ParametrosFormula).filter_by(nome=form.nome)
)
existente = result.scalar_one_or_none()
if existente:
existente.formula = form.formula
existente.ativo = form.ativo
else:
novo = ParametrosFormula(nome=form.nome, formula=form.formula, ativo=form.ativo)
db.add(novo)
await db.commit()
return RedirectResponse(url="/parametros?ok=true&msg=Parâmetro salvo com sucesso", status_code=303)
@router.get("/parametros/selic", response_model=List[SelicMensalSchema])
async def listar_selic(db: AsyncSession = Depends(get_session)):
result = await db.execute(select(SelicMensal).order_by(SelicMensal.mes.desc()))
return result.scalars().all()
@router.post("/parametros/selic/importar")
async def importar_selic(request: Request, data_maxima: str = Form(None)):
try:
hoje = datetime.date.today()
inicio = datetime.date(hoje.year - 5, 1, 1)
fim = datetime.datetime.strptime(data_maxima, "%Y-%m-%d").date() if data_maxima else hoje
url = (
f"https://api.bcb.gov.br/dados/serie/bcdata.sgs.4390/dados?"
f"formato=json&dataInicial={inicio.strftime('%d/%m/%Y')}&dataFinal={fim.strftime('%d/%m/%Y')}"
)
async with httpx.AsyncClient() as client:
response = await client.get(url)
response.raise_for_status()
dados = response.json()
registros = []
for item in dados:
data = datetime.datetime.strptime(item['data'], "%d/%m/%Y")
ano, mes = data.year, data.month
percentual = float(item['valor'].replace(',', '.'))
registros.append({"ano": ano, "mes": mes, "percentual": percentual})
async with engine.begin() as conn:
stmt = pg_insert(SelicMensal.__table__).values(registros)
upsert_stmt = stmt.on_conflict_do_update(
index_elements=['ano', 'mes'],
set_={'percentual': stmt.excluded.percentual}
)
await conn.execute(upsert_stmt)
return RedirectResponse("/parametros?aba=selic", status_code=303)
except Exception as e:
return RedirectResponse(f"/parametros?erro=1&msg={str(e)}", status_code=303)
@router.get("/parametros/aliquotas/template")
def baixar_template_excel():
df = pd.DataFrame(columns=["UF", "Exercício", "Alíquota"])
df.loc[0] = ["SP", "2025", "18"] # exemplo opcional
df.loc[1] = ["MG", "2025", "12"] # exemplo opcional
output = BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
df.to_excel(writer, sheet_name='Template', index=False)
# Adiciona instrução como observação na célula A5 (linha 5)
sheet = writer.sheets['Template']
sheet.cell(row=5, column=1).value = (
"⚠️ Após preencher, salve como CSV (.csv separado por vírgulas) para importar no sistema."
)
output.seek(0)
return StreamingResponse(
output,
media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
headers={"Content-Disposition": "attachment; filename=template_aliquotas.xlsx"}
)
@router.post("/parametros/aliquotas/salvar")
async def salvar_aliquota(payload: dict, db: AsyncSession = Depends(get_session)):
uf = (payload.get("uf") or "").strip().upper()
exercicio = int(payload.get("exercicio") or 0)
aliquota = Decimal(str(payload.get("aliquota") or "0"))
orig_uf = (payload.get("original_uf") or "").strip().upper() or uf
orig_ex = int(payload.get("original_exercicio") or 0) or exercicio
if not uf or not exercicio or aliquota <= 0:
return JSONResponse(status_code=400, content={"error": "UF, exercício e alíquota são obrigatórios."})
# busca pelo registro original (antes da edição)
stmt = select(AliquotaUF).where(
AliquotaUF.uf == orig_uf,
AliquotaUF.exercicio == orig_ex
)
existente = (await db.execute(stmt)).scalar_one_or_none()
if existente:
# atualiza (inclusive a chave, se mudou)
existente.uf = uf
existente.exercicio = exercicio
existente.aliq_icms = aliquota
else:
# não existia o original -> upsert padrão
db.add(AliquotaUF(uf=uf, exercicio=exercicio, aliq_icms=aliquota))
await db.commit()
return {"success": True}
@router.post("/parametros/aliquotas/importar")
async def importar_aliquotas_csv(arquivo: UploadFile = File(...), db: AsyncSession = Depends(get_session)):
content = await arquivo.read()
text = content.decode("utf-8", errors="ignore")
# tenta ; depois ,
sniffer = csv.Sniffer()
dialect = sniffer.sniff(text.splitlines()[0] if text else "uf;exercicio;aliquota")
reader = csv.DictReader(io.StringIO(text), dialect=dialect)
count = 0
for row in reader:
uf = (row.get("uf") or row.get("UF") or "").strip().upper()
exercicio_str = (row.get("exercicio") or row.get("ano") or "").strip()
try:
exercicio = int(exercicio_str)
except Exception:
continue
aliquota_str = (row.get("aliquota") or row.get("aliq_icms") or "").replace(",", ".").strip()
if not uf or not exercicio or not aliquota_str:
continue
try:
aliquota = Decimal(aliquota_str)
except Exception:
continue
stmt = select(AliquotaUF).where(AliquotaUF.uf == uf, AliquotaUF.exercicio == exercicio)
existente = (await db.execute(stmt)).scalar_one_or_none()
if existente:
existente.aliq_icms = aliquota
else:
db.add(AliquotaUF(uf=uf, exercicio=exercicio, aliq_icms=aliquota))
count += 1
await db.commit()
return {"success": True, "qtd": count}
@router.delete("/parametros/aliquotas/{uf}/{exercicio}")
async def excluir_aliquota(uf: str, exercicio: int, db: AsyncSession = Depends(get_session)):
stmt = select(AliquotaUF).where(
AliquotaUF.uf == uf.upper(),
AliquotaUF.exercicio == exercicio
)
row = (await db.execute(stmt)).scalar_one_or_none()
if not row:
return JSONResponse(status_code=404, content={"error": "Registro não encontrado."})
await db.delete(row)
await db.commit()
return {"success": True}