Files
app_faturas/processor.py

94 lines
3.3 KiB
Python
Raw Normal View History

import logging
import os
import shutil
import asyncio
from sqlalchemy.future import select
from utils import extrair_dados_pdf
from database import AsyncSessionLocal
from models import Fatura, LogProcessamento
logger = logging.getLogger(__name__)
UPLOADS_DIR = os.path.join("app", "uploads")
TEMP_DIR = os.path.join(UPLOADS_DIR, "temp")
fila_processamento = asyncio.Queue()
status_arquivos = {}
def remover_arquivo_temp(caminho_pdf):
try:
if os.path.exists(caminho_pdf) and TEMP_DIR in caminho_pdf:
os.remove(caminho_pdf)
logger.info(f"Arquivo temporário removido: {os.path.basename(caminho_pdf)}")
except Exception as e:
logger.warning(f"Falha ao remover arquivo temporário: {e}")
def salvar_em_uploads(caminho_pdf_temp, nome_original, nota_fiscal):
try:
extensao = os.path.splitext(nome_original)[1].lower()
nome_destino = f"{nota_fiscal}{extensao}"
destino_final = os.path.join(UPLOADS_DIR, nome_destino)
shutil.copy2(caminho_pdf_temp, destino_final)
return destino_final
except Exception as e:
logger.error(f"Erro ao salvar em uploads: {e}")
return caminho_pdf_temp
async def process_single_file(caminho_pdf_temp: str, nome_original: str):
async with AsyncSessionLocal() as session:
try:
dados = extrair_dados_pdf(caminho_pdf_temp)
dados['arquivo_pdf'] = nome_original
existente_result = await session.execute(
select(Fatura).filter_by(nota_fiscal=dados['nota_fiscal'], unidade_consumidora=dados['unidade_consumidora'])
)
if existente_result.scalar_one_or_none():
remover_arquivo_temp(caminho_pdf_temp)
return {"status": "Duplicado", "dados": dados}
caminho_final = salvar_em_uploads(caminho_pdf_temp, nome_original, dados['nota_fiscal'])
dados['link_arquivo'] = caminho_final
fatura = Fatura(**dados)
session.add(fatura)
session.add(LogProcessamento(
status="Sucesso",
mensagem="Fatura processada com sucesso",
nome_arquivo=nome_original,
acao="PROCESSAMENTO"
))
await session.commit()
remover_arquivo_temp(caminho_pdf_temp)
return {"status": "Concluído", "dados": dados}
except Exception as e:
await session.rollback()
session.add(LogProcessamento(
status="Erro",
mensagem=str(e),
nome_arquivo=nome_original,
acao="PROCESSAMENTO"
))
await session.commit()
logger.error(f"Erro ao processar fatura: {e}", exc_info=True)
remover_arquivo_temp(caminho_pdf_temp)
return {"status": "Erro", "mensagem": str(e)}
async def processar_em_lote():
resultados = []
while not fila_processamento.empty():
item = await fila_processamento.get()
resultado = await process_single_file(item['caminho_pdf'], item['nome_original'])
status_arquivos[item['nome_original']] = resultado.get("status", "Erro")
resultados.append(resultado)
return resultados
def limpar_arquivos_processados():
status_arquivos.clear()
while not fila_processamento.empty():
fila_processamento.get_nowait()