import logging import os import shutil import asyncio from sqlalchemy.future import select from utils import extrair_dados_pdf from database import AsyncSessionLocal from models import Fatura, LogProcessamento logger = logging.getLogger(__name__) UPLOADS_DIR = os.path.join("app", "uploads") TEMP_DIR = os.path.join(UPLOADS_DIR, "temp") fila_processamento = asyncio.Queue() status_arquivos = {} def remover_arquivo_temp(caminho_pdf): try: if os.path.exists(caminho_pdf) and TEMP_DIR in caminho_pdf: os.remove(caminho_pdf) logger.info(f"Arquivo temporário removido: {os.path.basename(caminho_pdf)}") except Exception as e: logger.warning(f"Falha ao remover arquivo temporário: {e}") def salvar_em_uploads(caminho_pdf_temp, nome_original, nota_fiscal): try: extensao = os.path.splitext(nome_original)[1].lower() nome_destino = f"{nota_fiscal}{extensao}" destino_final = os.path.join(UPLOADS_DIR, nome_destino) shutil.copy2(caminho_pdf_temp, destino_final) return destino_final except Exception as e: logger.error(f"Erro ao salvar em uploads: {e}") return caminho_pdf_temp async def process_single_file(caminho_pdf_temp: str, nome_original: str): async with AsyncSessionLocal() as session: try: dados = extrair_dados_pdf(caminho_pdf_temp) dados['arquivo_pdf'] = nome_original existente_result = await session.execute( select(Fatura).filter_by(nota_fiscal=dados['nota_fiscal'], unidade_consumidora=dados['unidade_consumidora']) ) if existente_result.scalar_one_or_none(): remover_arquivo_temp(caminho_pdf_temp) return {"status": "Duplicado", "dados": dados} caminho_final = salvar_em_uploads(caminho_pdf_temp, nome_original, dados['nota_fiscal']) dados['link_arquivo'] = caminho_final fatura = Fatura(**dados) session.add(fatura) session.add(LogProcessamento( status="Sucesso", mensagem="Fatura processada com sucesso", nome_arquivo=nome_original, acao="PROCESSAMENTO" )) await session.commit() remover_arquivo_temp(caminho_pdf_temp) return {"status": "Concluído", "dados": dados} except Exception as e: await session.rollback() session.add(LogProcessamento( status="Erro", mensagem=str(e), nome_arquivo=nome_original, acao="PROCESSAMENTO" )) await session.commit() logger.error(f"Erro ao processar fatura: {e}", exc_info=True) remover_arquivo_temp(caminho_pdf_temp) return {"status": "Erro", "mensagem": str(e)} async def processar_em_lote(): resultados = [] while not fila_processamento.empty(): item = await fila_processamento.get() resultado = await process_single_file(item['caminho_pdf'], item['nome_original']) status_arquivos[item['nome_original']] = resultado.get("status", "Erro") resultados.append(resultado) return resultados def limpar_arquivos_processados(): status_arquivos.clear() while not fila_processamento.empty(): fila_processamento.get_nowait()