94 lines
3.3 KiB
Python
94 lines
3.3 KiB
Python
import logging
|
|
import os
|
|
import shutil
|
|
import asyncio
|
|
from sqlalchemy.future import select
|
|
from utils import extrair_dados_pdf
|
|
from database import AsyncSessionLocal
|
|
from models import Fatura, LogProcessamento
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
UPLOADS_DIR = os.path.join(os.getcwd(), "uploads")
|
|
TEMP_DIR = os.path.join(UPLOADS_DIR, "temp")
|
|
|
|
fila_processamento = asyncio.Queue()
|
|
status_arquivos = {}
|
|
|
|
def remover_arquivo_temp(caminho_pdf):
|
|
try:
|
|
if os.path.exists(caminho_pdf) and TEMP_DIR in caminho_pdf:
|
|
os.remove(caminho_pdf)
|
|
logger.info(f"Arquivo temporário removido: {os.path.basename(caminho_pdf)}")
|
|
except Exception as e:
|
|
logger.warning(f"Falha ao remover arquivo temporário: {e}")
|
|
|
|
def salvar_em_uploads(caminho_pdf_temp, nome_original, nota_fiscal):
|
|
try:
|
|
extensao = os.path.splitext(nome_original)[1].lower()
|
|
nome_destino = f"{nota_fiscal}{extensao}"
|
|
destino_final = os.path.join(UPLOADS_DIR, nome_destino)
|
|
shutil.copy2(caminho_pdf_temp, destino_final)
|
|
return destino_final
|
|
except Exception as e:
|
|
logger.error(f"Erro ao salvar em uploads: {e}")
|
|
return caminho_pdf_temp
|
|
|
|
async def process_single_file(caminho_pdf_temp: str, nome_original: str):
|
|
async with AsyncSessionLocal() as session:
|
|
try:
|
|
dados = extrair_dados_pdf(caminho_pdf_temp)
|
|
dados['arquivo_pdf'] = nome_original
|
|
|
|
existente_result = await session.execute(
|
|
select(Fatura).filter_by(nota_fiscal=dados['nota_fiscal'], unidade_consumidora=dados['unidade_consumidora'])
|
|
)
|
|
if existente_result.scalar_one_or_none():
|
|
remover_arquivo_temp(caminho_pdf_temp)
|
|
return {"status": "Duplicado", "dados": dados}
|
|
|
|
caminho_final = salvar_em_uploads(caminho_pdf_temp, nome_original, dados['nota_fiscal'])
|
|
dados['link_arquivo'] = caminho_final
|
|
|
|
|
|
fatura = Fatura(**dados)
|
|
session.add(fatura)
|
|
|
|
session.add(LogProcessamento(
|
|
status="Sucesso",
|
|
mensagem="Fatura processada com sucesso",
|
|
nome_arquivo=nome_original,
|
|
acao="PROCESSAMENTO"
|
|
))
|
|
|
|
await session.commit()
|
|
remover_arquivo_temp(caminho_pdf_temp)
|
|
return {"status": "Concluído", "dados": dados}
|
|
|
|
except Exception as e:
|
|
await session.rollback()
|
|
session.add(LogProcessamento(
|
|
status="Erro",
|
|
mensagem=str(e),
|
|
nome_arquivo=nome_original,
|
|
acao="PROCESSAMENTO"
|
|
))
|
|
await session.commit()
|
|
logger.error(f"Erro ao processar fatura: {e}", exc_info=True)
|
|
remover_arquivo_temp(caminho_pdf_temp)
|
|
return {"status": "Erro", "mensagem": str(e)}
|
|
|
|
async def processar_em_lote():
|
|
resultados = []
|
|
while not fila_processamento.empty():
|
|
item = await fila_processamento.get()
|
|
resultado = await process_single_file(item['caminho_pdf'], item['nome_original'])
|
|
status_arquivos[item['nome_original']] = resultado.get("status", "Erro")
|
|
resultados.append(resultado)
|
|
return resultados
|
|
|
|
def limpar_arquivos_processados():
|
|
status_arquivos.clear()
|
|
while not fila_processamento.empty():
|
|
fila_processamento.get_nowait()
|