Files
app_faturas/app/processor.py

134 lines
4.5 KiB
Python
Raw Normal View History

2025-07-28 13:29:45 -03:00
import logging
import os
import shutil
import asyncio
from sqlalchemy.future import select
from app.utils import extrair_dados_pdf
from app.database import AsyncSessionLocal
from app.models import Fatura, LogProcessamento
import time
import traceback
import uuid
2025-07-28 13:29:45 -03:00
logger = logging.getLogger(__name__)
UPLOADS_DIR = os.path.join("app", "uploads")
2025-07-28 13:29:45 -03:00
TEMP_DIR = os.path.join(UPLOADS_DIR, "temp")
fila_processamento = asyncio.Queue()
status_arquivos = {}
def remover_arquivo_temp(caminho_pdf):
try:
if os.path.exists(caminho_pdf) and TEMP_DIR in caminho_pdf:
os.remove(caminho_pdf)
logger.info(f"Arquivo temporário removido: {os.path.basename(caminho_pdf)}")
except Exception as e:
logger.warning(f"Falha ao remover arquivo temporário: {e}")
def salvar_em_uploads(caminho_pdf_temp, nome_original, nota_fiscal):
try:
extensao = os.path.splitext(nome_original)[1].lower()
nome_destino = f"{nota_fiscal}_{uuid.uuid4().hex[:6]}{extensao}"
2025-07-28 13:29:45 -03:00
destino_final = os.path.join(UPLOADS_DIR, nome_destino)
shutil.copy2(caminho_pdf_temp, destino_final)
return destino_final
except Exception as e:
logger.error(f"Erro ao salvar em uploads: {e}")
return caminho_pdf_temp
async def process_single_file(caminho_pdf_temp: str, nome_original: str):
inicio = time.perf_counter()
2025-07-28 13:29:45 -03:00
async with AsyncSessionLocal() as session:
try:
dados = extrair_dados_pdf(caminho_pdf_temp)
dados['arquivo_pdf'] = nome_original
# Verifica se a fatura já existe
2025-07-28 13:29:45 -03:00
existente_result = await session.execute(
select(Fatura).filter_by(
nota_fiscal=dados['nota_fiscal'],
unidade_consumidora=dados['unidade_consumidora']
)
2025-07-28 13:29:45 -03:00
)
if existente_result.scalar_one_or_none():
duracao = round(time.perf_counter() - inicio, 2)
2025-07-28 13:29:45 -03:00
remover_arquivo_temp(caminho_pdf_temp)
return {
"status": "Duplicado",
"dados": dados,
"tempo": f"{duracao}s"
}
2025-07-28 13:29:45 -03:00
# Salva arquivo final
2025-07-28 13:29:45 -03:00
caminho_final = salvar_em_uploads(caminho_pdf_temp, nome_original, dados['nota_fiscal'])
dados['link_arquivo'] = caminho_final
# Salva fatura
2025-07-28 13:29:45 -03:00
fatura = Fatura(**dados)
session.add(fatura)
await session.commit()
remover_arquivo_temp(caminho_pdf_temp)
duracao = round(time.perf_counter() - inicio, 2)
return {
"status": "Concluído",
"dados": dados,
"tempo": f"{duracao}s"
}
2025-07-28 13:29:45 -03:00
except Exception as e:
erro_str = traceback.format_exc()
duracao = round(time.perf_counter() - inicio, 2)
2025-07-28 13:29:45 -03:00
await session.rollback()
remover_arquivo_temp(caminho_pdf_temp)
print(f"\n📄 ERRO no arquivo: {nome_original}")
print(f"⏱ Tempo até erro: {duracao}s")
print(f"❌ Erro detalhado:\n{erro_str}")
return {
"status": "Erro",
"mensagem": str(e),
"tempo": f"{duracao}s",
"trace": erro_str
}
2025-07-28 13:29:45 -03:00
async def processar_em_lote():
import traceback # para exibir erros
2025-07-28 13:29:45 -03:00
resultados = []
while not fila_processamento.empty():
item = await fila_processamento.get()
try:
resultado = await process_single_file(item['caminho_pdf'], item['nome_original'])
status_arquivos[item['nome_original']] = {
"status": resultado.get("status"),
"mensagem": resultado.get("mensagem", ""),
"tempo": resultado.get("tempo", "---")
}
resultados.append(status_arquivos[item['nome_original']])
except Exception as e:
status_arquivos[item['nome_original']] = {
"status": "Erro",
"mensagem": str(e),
"tempo": "---"
}
resultados.append({
"nome": item['nome_original'],
"status": "Erro",
"mensagem": str(e)
})
print(f"Erro ao processar {item['nome_original']}: {e}")
print(traceback.format_exc())
2025-07-28 13:29:45 -03:00
return resultados
def limpar_arquivos_processados():
status_arquivos.clear()
while not fila_processamento.empty():
fila_processamento.get_nowait()