import logging import os import shutil import asyncio import httpx from sqlalchemy.future import select from app.utils import extrair_dados_pdf from app.database import AsyncSessionLocal from app.models import Fatura, LogProcessamento import time import traceback import uuid from app.models import SelicMensal from sqlalchemy import select from zipfile import ZipFile logger = logging.getLogger(__name__) UPLOADS_DIR = os.path.join("app", "uploads") TEMP_DIR = os.path.join(UPLOADS_DIR, "temp") fila_processamento = asyncio.Queue() status_arquivos = {} def remover_arquivo_temp(caminho_pdf): try: if os.path.exists(caminho_pdf) and TEMP_DIR in caminho_pdf: os.remove(caminho_pdf) logger.info(f"Arquivo temporário removido: {os.path.basename(caminho_pdf)}") except Exception as e: logger.warning(f"Falha ao remover arquivo temporário: {e}") def salvar_em_uploads(caminho_pdf_temp, nome_original, nota_fiscal): ERROS_DIR = os.path.join("app", "uploads", "erros") os.makedirs(ERROS_DIR, exist_ok=True) erros_detectados = [] try: extensao = os.path.splitext(nome_original)[1].lower() nome_destino = f"{nota_fiscal}_{uuid.uuid4().hex[:6]}{extensao}" destino_final = os.path.join(UPLOADS_DIR, nome_destino) shutil.copy2(caminho_pdf_temp, destino_final) return destino_final except Exception as e: # Copiar o arquivo com erro extensao = os.path.splitext(nome_original)[1].lower() nome_arquivo = f"{uuid.uuid4().hex[:6]}_erro{extensao}" caminho_pdf = caminho_pdf_temp shutil.copy2(caminho_pdf, os.path.join(ERROS_DIR, nome_arquivo)) mensagem = f"{nome_arquivo}: {str(e)}" erros_detectados.append(mensagem) logger.error(f"Erro ao salvar em uploads: {e}") return caminho_pdf_temp async def process_single_file(caminho_pdf_temp: str, nome_original: str): inicio = time.perf_counter() async with AsyncSessionLocal() as session: try: dados = extrair_dados_pdf(caminho_pdf_temp) dados['arquivo_pdf'] = nome_original # Verifica se a fatura já existe existente_result = await session.execute( select(Fatura).filter_by( nota_fiscal=dados['nota_fiscal'], unidade_consumidora=dados['unidade_consumidora'] ) ) if existente_result.scalar_one_or_none(): duracao = round(time.perf_counter() - inicio, 2) remover_arquivo_temp(caminho_pdf_temp) return { "status": "Duplicado", "dados": dados, "tempo": f"{duracao}s" } data_comp = dados.get("competencia") if data_comp: await garantir_selic_para_competencia(session, data_comp.year, data_comp.month) # Salva arquivo final caminho_final = salvar_em_uploads(caminho_pdf_temp, nome_original, dados['nota_fiscal']) dados['link_arquivo'] = caminho_final # Salva fatura fatura = Fatura(**dados) session.add(fatura) await session.commit() remover_arquivo_temp(caminho_pdf_temp) duracao = round(time.perf_counter() - inicio, 2) return { "status": "Concluído", "dados": dados, "tempo": f"{duracao}s" } except Exception as e: erro_str = traceback.format_exc() duracao = round(time.perf_counter() - inicio, 2) await session.rollback() remover_arquivo_temp(caminho_pdf_temp) print(f"\n📄 ERRO no arquivo: {nome_original}") print(f"⏱ Tempo até erro: {duracao}s") print(f"❌ Erro detalhado:\n{erro_str}") return { "status": "Erro", "mensagem": str(e), "tempo": f"{duracao}s", "trace": erro_str } async def processar_em_lote(): import traceback # para exibir erros resultados = [] while not fila_processamento.empty(): item = await fila_processamento.get() try: resultado = await process_single_file(item['caminho_pdf'], item['nome_original']) status_arquivos[item['nome_original']] = { "status": resultado.get("status"), "mensagem": resultado.get("mensagem", ""), "tempo": resultado.get("tempo", "---"), "tamanho": os.path.getsize(item['caminho_pdf']) // 1024, # tamanho em KB "data": time.strftime("%d/%m/%Y", time.localtime(os.path.getmtime(item['caminho_pdf']))) } resultados.append(status_arquivos[item['nome_original']]) except Exception as e: status_arquivos[item['nome_original']] = { "status": "Erro", "mensagem": str(e), "tempo": "---" } resultados.append({ "nome": item['nome_original'], "status": "Erro", "mensagem": str(e) }) print(f"Erro ao processar {item['nome_original']}: {e}") print(traceback.format_exc()) # Após o loop, salvar TXT com erros erros_txt = [] for nome, status in status_arquivos.items(): if status['status'] == 'Erro': erros_txt.append(f"{nome} - {status.get('mensagem', 'Erro desconhecido')}") if erros_txt: with open(os.path.join(UPLOADS_DIR, "erros", "erros.txt"), "w", encoding="utf-8") as f: f.write("\n".join(erros_txt)) # Compacta PDFs com erro with ZipFile(os.path.join(UPLOADS_DIR, "erros", "faturas_erro.zip"), "w") as zipf: for nome in status_arquivos: if status_arquivos[nome]['status'] == 'Erro': caminho = os.path.join(UPLOADS_DIR, "temp", nome) if os.path.exists(caminho): zipf.write(caminho, arcname=nome) return resultados def limpar_arquivos_processados(): status_arquivos.clear() while not fila_processamento.empty(): fila_processamento.get_nowait() async def garantir_selic_para_competencia(session, ano, mes): # Verifica se já existe result = await session.execute(select(SelicMensal).filter_by(ano=ano, mes=mes)) existente = result.scalar_one_or_none() if existente: return # já tem # Busca na API do Banco Central url = ( f"https://api.bcb.gov.br/dados/serie/bcdata.sgs.4390/dados?" f"formato=json&dataInicial=01/{mes:02d}/{ano}&dataFinal=30/{mes:02d}/{ano}" ) async with httpx.AsyncClient() as client: resp = await client.get(url) resp.raise_for_status() dados = resp.json() if dados: percentual = float(dados[0]["valor"].replace(",", ".")) novo = SelicMensal(ano=ano, mes=mes, fator=percentual) session.add(novo) await session.commit()