Files
app_faturas/app/processor.py

260 lines
9.3 KiB
Python
Raw Normal View History

2025-07-28 13:29:45 -03:00
import logging
import os
import shutil
import asyncio
import httpx
2025-07-28 13:29:45 -03:00
from sqlalchemy.future import select
from app.utils import extrair_dados_pdf
from app.database import AsyncSessionLocal
from app.models import Fatura, LogProcessamento
import time
import traceback
import uuid
from app.models import SelicMensal
from sqlalchemy import select
from zipfile import ZipFile
2025-07-28 13:29:45 -03:00
logger = logging.getLogger(__name__)
UPLOADS_DIR = os.path.join("app", "uploads")
2025-07-28 13:29:45 -03:00
TEMP_DIR = os.path.join(UPLOADS_DIR, "temp")
fila_processamento = asyncio.Queue()
status_arquivos = {}
def remover_arquivo_temp(caminho_pdf):
try:
if os.path.exists(caminho_pdf) and TEMP_DIR in caminho_pdf:
os.remove(caminho_pdf)
logger.info(f"Arquivo temporário removido: {os.path.basename(caminho_pdf)}")
except Exception as e:
logger.warning(f"Falha ao remover arquivo temporário: {e}")
def salvar_em_uploads(caminho_pdf_temp, nome_original, nota_fiscal):
ERROS_DIR = os.path.join("app", "uploads", "erros")
os.makedirs(ERROS_DIR, exist_ok=True)
erros_detectados = []
2025-07-28 13:29:45 -03:00
try:
extensao = os.path.splitext(nome_original)[1].lower()
nome_destino = f"{nota_fiscal}_{uuid.uuid4().hex[:6]}{extensao}"
2025-07-28 13:29:45 -03:00
destino_final = os.path.join(UPLOADS_DIR, nome_destino)
shutil.copy2(caminho_pdf_temp, destino_final)
return destino_final
except Exception as e:
# Copiar o arquivo com erro
extensao = os.path.splitext(nome_original)[1].lower()
nome_arquivo = f"{uuid.uuid4().hex[:6]}_erro{extensao}"
caminho_pdf = caminho_pdf_temp
shutil.copy2(caminho_pdf, os.path.join(ERROS_DIR, nome_arquivo))
mensagem = f"{nome_arquivo}: {str(e)}"
erros_detectados.append(mensagem)
2025-07-28 13:29:45 -03:00
logger.error(f"Erro ao salvar em uploads: {e}")
return caminho_pdf_temp
async def process_single_file(caminho_pdf_temp: str, nome_original: str, cliente_id: str | None = None):
inicio = time.perf_counter()
2025-07-28 13:29:45 -03:00
async with AsyncSessionLocal() as session:
try:
dados = extrair_dados_pdf(caminho_pdf_temp)
dados['arquivo_pdf'] = nome_original
from decimal import Decimal, ROUND_HALF_UP
_Q6 = Decimal("0.000000")
def _to_percent_6(x):
"""Converte para percent (se vier em fração) e quantiza em 6 casas."""
if x is None:
return None
try:
v = Decimal(str(x))
except Exception:
return None
# se vier em fração (ex.: 0.012872), vira 1.2872… (percentual)
if Decimal("0") < v <= Decimal("1"):
v = v * Decimal("100")
return v.quantize(_Q6, rounding=ROUND_HALF_UP)
def _to_dec6(x):
"""Apenas 6 casas, sem % (use para tarifa, bases, etc.)."""
if x is None:
return None
try:
v = Decimal(str(x))
except Exception:
return None
return v.quantize(_Q6, rounding=ROUND_HALF_UP)
dados['icms_aliq'] = _to_percent_6(dados.get('icms_aliq'))
dados['pis_aliq'] = _to_percent_6(dados.get('pis_aliq'))
dados['cofins_aliq'] = _to_percent_6(dados.get('cofins_aliq'))
# tarifa NÃO é percentual: apenas 6 casas
dados['tarifa'] = _to_dec6(dados.get('tarifa'))
# Verifica se a fatura já existe
2025-07-28 13:29:45 -03:00
existente_result = await session.execute(
select(Fatura).filter_by(
nota_fiscal=dados['nota_fiscal'],
unidade_consumidora=dados['unidade_consumidora']
)
2025-07-28 13:29:45 -03:00
)
if existente_result.scalar_one_or_none():
duracao = round(time.perf_counter() - inicio, 2)
2025-07-28 13:29:45 -03:00
remover_arquivo_temp(caminho_pdf_temp)
return {
"status": "Duplicado",
"dados": dados,
"tempo": f"{duracao}s"
}
data_comp = dados.get("competencia")
if data_comp:
await garantir_selic_para_competencia(session, data_comp.year, data_comp.month)
2025-07-28 13:29:45 -03:00
# Salva arquivo final
2025-07-28 13:29:45 -03:00
caminho_final = salvar_em_uploads(caminho_pdf_temp, nome_original, dados['nota_fiscal'])
dados['link_arquivo'] = caminho_final
# Salva fatura
dados['cliente_id'] = cliente_id
if cliente_id:
dados['cliente_id'] = cliente_id
fatura = Fatura(**dados)
2025-07-28 13:29:45 -03:00
session.add(fatura)
await session.commit()
remover_arquivo_temp(caminho_pdf_temp)
duracao = round(time.perf_counter() - inicio, 2)
return {
"status": "Concluído",
"dados": dados,
"tempo": f"{duracao}s"
}
2025-07-28 13:29:45 -03:00
except Exception as e:
erro_str = traceback.format_exc()
duracao = round(time.perf_counter() - inicio, 2)
2025-07-28 13:29:45 -03:00
await session.rollback()
remover_arquivo_temp(caminho_pdf_temp)
print(f"\n📄 ERRO no arquivo: {nome_original}")
print(f"⏱ Tempo até erro: {duracao}s")
print(f"❌ Erro detalhado:\n{erro_str}")
return {
"status": "Erro",
"mensagem": str(e),
"tempo": f"{duracao}s",
"trace": erro_str
}
2025-07-28 13:29:45 -03:00
async def processar_em_lote():
import traceback # para exibir erros
2025-07-28 13:29:45 -03:00
resultados = []
while not fila_processamento.empty():
item = await fila_processamento.get()
try:
resultado = await process_single_file(
item['caminho_pdf'],
item['nome_original'],
item.get('cliente_id')
)
# tentar tamanho/data do TEMP; se não existir mais, tenta do destino final; senão, 0/""
temp_path = item['caminho_pdf']
dest_path = (resultado.get("dados") or {}).get("link_arquivo", "")
def _safe_size(p):
try:
return os.path.getsize(p) // 1024
except Exception:
return 0
def _safe_mtime(p):
try:
return time.strftime("%d/%m/%Y", time.localtime(os.path.getmtime(p)))
except Exception:
return ""
status_arquivos[item['nome_original']] = {
"status": resultado.get("status"),
"mensagem": resultado.get("mensagem", ""),
"tempo": resultado.get("tempo", "---"),
"tamanho": _safe_size(temp_path) or _safe_size(dest_path),
"data": _safe_mtime(temp_path) or _safe_mtime(dest_path),
}
resultados.append(status_arquivos[item['nome_original']])
except Exception as e:
status_arquivos[item['nome_original']] = {
"status": "Erro",
"mensagem": str(e),
"tempo": "---"
}
resultados.append({
"nome": item['nome_original'],
"status": "Erro",
"mensagem": str(e)
})
print(f"Erro ao processar {item['nome_original']}: {e}")
print(traceback.format_exc())
# Após o loop, salvar TXT com erros
erros_txt = []
for nome, status in status_arquivos.items():
if status['status'] == 'Erro':
erros_txt.append(f"{nome} - {status.get('mensagem', 'Erro desconhecido')}")
if erros_txt:
erros_dir = os.path.join(UPLOADS_DIR, "erros")
os.makedirs(erros_dir, exist_ok=True) # <- GARANTE A PASTA
with open(os.path.join(erros_dir, "erros.txt"), "w", encoding="utf-8") as f:
f.write("\n".join(erros_txt))
# Compacta PDFs com erro
with ZipFile(os.path.join(erros_dir, "faturas_erro.zip"), "w") as zipf:
for nome in status_arquivos:
if status_arquivos[nome]['status'] == 'Erro':
caminho = os.path.join(UPLOADS_DIR, "temp", nome)
if os.path.exists(caminho):
zipf.write(caminho, arcname=nome)
return resultados
2025-07-28 13:29:45 -03:00
def limpar_arquivos_processados():
status_arquivos.clear()
while not fila_processamento.empty():
fila_processamento.get_nowait()
async def garantir_selic_para_competencia(session, ano, mes):
# Verifica se já existe
result = await session.execute(select(SelicMensal).filter_by(ano=ano, mes=mes))
existente = result.scalar_one_or_none()
if existente:
return # já tem
# Busca na API do Banco Central
url = (
f"https://api.bcb.gov.br/dados/serie/bcdata.sgs.4390/dados?"
f"formato=json&dataInicial=01/{mes:02d}/{ano}&dataFinal=30/{mes:02d}/{ano}"
)
async with httpx.AsyncClient() as client:
resp = await client.get(url)
resp.raise_for_status()
dados = resp.json()
if dados:
percentual = float(dados[0]["valor"].replace(",", "."))
novo = SelicMensal(ano=ano, mes=mes, percentual=percentual)
session.add(novo)
await session.commit()