93 lines
3.2 KiB
Python
93 lines
3.2 KiB
Python
|
|
# calculos.py
|
||
|
|
|
||
|
|
from decimal import Decimal
|
||
|
|
from datetime import datetime
|
||
|
|
from models import ParametrosFormula, SelicMensal
|
||
|
|
from sqlalchemy.future import select
|
||
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
|
import re
|
||
|
|
|
||
|
|
def mes_para_numero(mes: str) -> int:
|
||
|
|
meses = {
|
||
|
|
'JAN': 1, 'FEV': 2, 'MAR': 3, 'ABR': 4, 'MAI': 5, 'JUN': 6,
|
||
|
|
'JUL': 7, 'AGO': 8, 'SET': 9, 'OUT': 10, 'NOV': 11, 'DEZ': 12
|
||
|
|
}
|
||
|
|
if mes.isdigit():
|
||
|
|
return int(mes)
|
||
|
|
return meses.get(mes.upper(), 1)
|
||
|
|
|
||
|
|
|
||
|
|
async def calcular_valor_corrigido(valor_original, competencia: str, session: AsyncSession) -> Decimal:
|
||
|
|
try:
|
||
|
|
mes, ano = competencia.split('/')
|
||
|
|
data_inicio = datetime(int(ano), mes_para_numero(mes), 1)
|
||
|
|
|
||
|
|
query = select(SelicMensal).where(
|
||
|
|
(SelicMensal.ano > data_inicio.year) |
|
||
|
|
((SelicMensal.ano == data_inicio.year) & (SelicMensal.mes >= data_inicio.month))
|
||
|
|
)
|
||
|
|
resultados = await session.execute(query)
|
||
|
|
fatores = resultados.scalars().all()
|
||
|
|
|
||
|
|
fator = Decimal('1.00')
|
||
|
|
for row in fatores:
|
||
|
|
fator *= Decimal(row.fator)
|
||
|
|
|
||
|
|
return Decimal(valor_original) * fator
|
||
|
|
except Exception:
|
||
|
|
return Decimal(valor_original)
|
||
|
|
|
||
|
|
|
||
|
|
async def aplicar_formula(nome: str, contexto: dict, session: AsyncSession) -> Decimal:
|
||
|
|
try:
|
||
|
|
result = await session.execute(
|
||
|
|
select(ParametrosFormula).where(ParametrosFormula.nome == nome)
|
||
|
|
)
|
||
|
|
formula = result.scalar_one_or_none()
|
||
|
|
if not formula:
|
||
|
|
return Decimal('0.00')
|
||
|
|
|
||
|
|
texto_formula = formula.formula # nome correto do campo
|
||
|
|
|
||
|
|
for campo, valor in contexto.items():
|
||
|
|
texto_formula = re.sub(rf'\b{campo}\b', str(valor).replace(',', '.'), texto_formula)
|
||
|
|
|
||
|
|
resultado = eval(texto_formula, {"__builtins__": {}}, {})
|
||
|
|
return Decimal(str(resultado))
|
||
|
|
except Exception:
|
||
|
|
return Decimal('0.00')
|
||
|
|
|
||
|
|
|
||
|
|
async def calcular_campos_dinamicos(fatura: dict, session: AsyncSession) -> dict:
|
||
|
|
try:
|
||
|
|
result = await session.execute(select(ParametrosFormula))
|
||
|
|
parametros = result.scalars().all()
|
||
|
|
|
||
|
|
for param in parametros:
|
||
|
|
try:
|
||
|
|
texto_formula = param.formula
|
||
|
|
for campo, valor in fatura.items():
|
||
|
|
texto_formula = re.sub(rf'\b{campo}\b', str(valor).replace(',', '.'), texto_formula)
|
||
|
|
valor_resultado = eval(texto_formula, {"__builtins__": {}}, {})
|
||
|
|
fatura[param.nome] = round(Decimal(valor_resultado), 2)
|
||
|
|
except:
|
||
|
|
fatura[param.nome] = None
|
||
|
|
|
||
|
|
return fatura
|
||
|
|
except Exception as e:
|
||
|
|
raise RuntimeError(f"Erro ao calcular campos dinâmicos: {str(e)}")
|
||
|
|
|
||
|
|
|
||
|
|
def calcular_pis_sobre_icms(base_pis, valor_icms, aliq_pis):
|
||
|
|
try:
|
||
|
|
return (Decimal(base_pis) - (Decimal(base_pis) - Decimal(valor_icms))) * Decimal(aliq_pis)
|
||
|
|
except:
|
||
|
|
return Decimal('0.00')
|
||
|
|
|
||
|
|
|
||
|
|
def calcular_cofins_sobre_icms(base_cofins, valor_icms, aliq_cofins):
|
||
|
|
try:
|
||
|
|
return (Decimal(base_cofins) - (Decimal(base_cofins) - Decimal(valor_icms))) * Decimal(aliq_cofins)
|
||
|
|
except:
|
||
|
|
return Decimal('0.00')
|