import logging import os import shutil import asyncio from sqlalchemy.future import select from app.utils import extrair_dados_pdf from app.database import AsyncSessionLocal from app.models import Fatura, LogProcessamento import time import traceback import uuid logger = logging.getLogger(__name__) UPLOADS_DIR = os.path.join("app", "uploads") TEMP_DIR = os.path.join(UPLOADS_DIR, "temp") fila_processamento = asyncio.Queue() status_arquivos = {} def remover_arquivo_temp(caminho_pdf): try: if os.path.exists(caminho_pdf) and TEMP_DIR in caminho_pdf: os.remove(caminho_pdf) logger.info(f"Arquivo temporário removido: {os.path.basename(caminho_pdf)}") except Exception as e: logger.warning(f"Falha ao remover arquivo temporário: {e}") def salvar_em_uploads(caminho_pdf_temp, nome_original, nota_fiscal): try: extensao = os.path.splitext(nome_original)[1].lower() nome_destino = f"{nota_fiscal}_{uuid.uuid4().hex[:6]}{extensao}" destino_final = os.path.join(UPLOADS_DIR, nome_destino) shutil.copy2(caminho_pdf_temp, destino_final) return destino_final except Exception as e: logger.error(f"Erro ao salvar em uploads: {e}") return caminho_pdf_temp async def process_single_file(caminho_pdf_temp: str, nome_original: str): inicio = time.perf_counter() async with AsyncSessionLocal() as session: try: dados = extrair_dados_pdf(caminho_pdf_temp) dados['arquivo_pdf'] = nome_original # Verifica se a fatura já existe existente_result = await session.execute( select(Fatura).filter_by( nota_fiscal=dados['nota_fiscal'], unidade_consumidora=dados['unidade_consumidora'] ) ) if existente_result.scalar_one_or_none(): duracao = round(time.perf_counter() - inicio, 2) remover_arquivo_temp(caminho_pdf_temp) return { "status": "Duplicado", "dados": dados, "tempo": f"{duracao}s" } # Salva arquivo final caminho_final = salvar_em_uploads(caminho_pdf_temp, nome_original, dados['nota_fiscal']) dados['link_arquivo'] = caminho_final # Salva fatura fatura = Fatura(**dados) session.add(fatura) await session.commit() remover_arquivo_temp(caminho_pdf_temp) duracao = round(time.perf_counter() - inicio, 2) return { "status": "Concluído", "dados": dados, "tempo": f"{duracao}s" } except Exception as e: erro_str = traceback.format_exc() duracao = round(time.perf_counter() - inicio, 2) await session.rollback() remover_arquivo_temp(caminho_pdf_temp) print(f"\n📄 ERRO no arquivo: {nome_original}") print(f"⏱ Tempo até erro: {duracao}s") print(f"❌ Erro detalhado:\n{erro_str}") return { "status": "Erro", "mensagem": str(e), "tempo": f"{duracao}s", "trace": erro_str } async def processar_em_lote(): import traceback # para exibir erros resultados = [] while not fila_processamento.empty(): item = await fila_processamento.get() try: resultado = await process_single_file(item['caminho_pdf'], item['nome_original']) status_arquivos[item['nome_original']] = { "status": resultado.get("status"), "mensagem": resultado.get("mensagem", ""), "tempo": resultado.get("tempo", "---") } resultados.append(status_arquivos[item['nome_original']]) except Exception as e: status_arquivos[item['nome_original']] = { "status": "Erro", "mensagem": str(e), "tempo": "---" } resultados.append({ "nome": item['nome_original'], "status": "Erro", "mensagem": str(e) }) print(f"Erro ao processar {item['nome_original']}: {e}") print(traceback.format_exc()) return resultados def limpar_arquivos_processados(): status_arquivos.clear() while not fila_processamento.empty(): fila_processamento.get_nowait()