All checks were successful
continuous-integration/drone/push Build is passing
138 lines
5.5 KiB
Python
Executable File
138 lines
5.5 KiB
Python
Executable File
# app/layouts/equatorial_go.py
|
||
import re
|
||
from datetime import datetime
|
||
import fitz
|
||
import logging
|
||
|
||
def converter_valor(valor_str):
|
||
try:
|
||
if not valor_str:
|
||
return 0.0
|
||
valor_limpo = str(valor_str).replace('.', '').replace(',', '.')
|
||
valor_limpo = re.sub(r'[^\d.]', '', valor_limpo)
|
||
return float(valor_limpo) if valor_limpo else 0.0
|
||
except (ValueError, TypeError):
|
||
return 0.0
|
||
|
||
def extrair_dados(texto_final):
|
||
import logging
|
||
logging.debug("\n========== INÍCIO DO TEXTO EXTRAÍDO ==========\n" + texto_final + "\n========== FIM ==========")
|
||
|
||
def extrair_seguro(patterns, texto_busca, flags=re.IGNORECASE | re.MULTILINE):
|
||
if not isinstance(patterns, list): patterns = [patterns]
|
||
for pattern in patterns:
|
||
match = re.search(pattern, texto_busca, flags)
|
||
if match:
|
||
for group in match.groups():
|
||
if group: return group.strip()
|
||
return ""
|
||
|
||
nota_fiscal = extrair_seguro(r'NOTA FISCAL Nº\s*(\d+)', texto_final)
|
||
|
||
# --- Unidade Consumidora (UC): 8–12 dígitos, SEM hífen ---
|
||
uc = extrair_seguro([
|
||
r'UNIDADE\s*CONSUMIDORA\D*?(\d{8,12})',
|
||
r'\bUC\D*?(\d{8,12})',
|
||
r'INSTALA[ÇC][ÃA]O\D*?(\d{8,12})',
|
||
], texto_final)
|
||
|
||
# fallback: maior sequência "solta" de 8–10 dígitos sem hífen
|
||
if not uc:
|
||
seqs = re.findall(r'(?<!\d)(\d{8,10})(?![\d-])', texto_final)
|
||
if seqs:
|
||
uc = max(seqs, key=len)
|
||
|
||
logging.debug("TEXTO PDF:\n" + texto_final)
|
||
|
||
referencia = extrair_seguro([
|
||
r'\b([A-Z]{3}/\d{4})\b',
|
||
r'\b([A-Z]{3}\s*/\s*\d{4})\b',
|
||
r'\b([A-Z]{3}\d{4})\b',
|
||
r'(\d{2}/\d{4})'
|
||
], texto_final.upper())
|
||
|
||
# Limpeza prévia para evitar falhas por quebra de linha ou espaços
|
||
texto_limpo = texto_final.replace('\n', '').replace('\r', '').replace(' ', '')
|
||
|
||
if any(padrao in texto_limpo for padrao in ['R$***********0,00', 'R$*********0,00', 'R$*0,00']):
|
||
valor_total = 0.0
|
||
else:
|
||
match_valor_total = re.search(r'R\$[*\s]*([\d\.\s]*,\d{2})', texto_final)
|
||
valor_total = converter_valor(match_valor_total.group(1)) if match_valor_total else None
|
||
|
||
match_nome = re.search(r'(?<=\n)([A-Z\s]{10,})(?=\s+CNPJ/CPF:)', texto_final)
|
||
nome = match_nome.group(1).replace('\n', ' ').strip() if match_nome else "NÃO IDENTIFICADO"
|
||
|
||
# Remove qualquer excesso após o nome verdadeiro
|
||
nome = re.split(r'\b(FAZENDA|RUA|AVENIDA|SETOR|CEP|CNPJ|CPF)\b', nome, maxsplit=1, flags=re.IGNORECASE)[0].strip()
|
||
|
||
match_cidade_estado = re.search(r'CEP:\s*\d{8}\s+(.*?)\s+([A-Z]{2})\s+BRASIL', texto_final)
|
||
cidade = match_cidade_estado.group(1).strip() if match_cidade_estado else "NÃO IDENTIFICADA"
|
||
estado = match_cidade_estado.group(2).strip() if match_cidade_estado else "NÃO IDENTIFICADO"
|
||
|
||
match_class = re.search(r'Classificação:\s*(.*)', texto_final, re.IGNORECASE)
|
||
classificacao = match_class.group(1).strip() if match_class else "NÃO IDENTIFICADA"
|
||
|
||
def extrair_tributo_linhas_separadas(nome_tributo):
|
||
linhas = texto_final.split('\n')
|
||
for i, linha in enumerate(linhas):
|
||
if nome_tributo in linha.upper():
|
||
aliq = base = valor = 0.0
|
||
if i + 1 < len(linhas):
|
||
aliq_match = re.search(r'([\d,]+)%', linhas[i + 1])
|
||
if aliq_match:
|
||
aliq = converter_valor(aliq_match.group(1)) / 100
|
||
if i + 2 < len(linhas):
|
||
base = converter_valor(linhas[i + 2].strip())
|
||
if i + 3 < len(linhas):
|
||
valor = converter_valor(linhas[i + 3].strip())
|
||
return base, aliq, valor
|
||
return 0.0, 0.0, 0.0
|
||
|
||
pis_base, pis_aliq, pis_valor = extrair_tributo_linhas_separadas('PIS/PASEP')
|
||
icms_base, icms_aliq, icms_valor = extrair_tributo_linhas_separadas('ICMS')
|
||
cofins_base, cofins_aliq, cofins_valor = extrair_tributo_linhas_separadas('COFINS')
|
||
|
||
match_consumo = re.search(r'CONSUMO\s+\d+\s+([\d.,]+)', texto_final)
|
||
consumo = converter_valor(match_consumo.group(1)) if match_consumo else 0.0
|
||
|
||
match_tarifa = re.search(r'CONSUMO KWH \+ ICMS/PIS/COFINS\s+([\d.,]+)', texto_final) \
|
||
or re.search(r'CUSTO DISP\s+([\d.,]+)', texto_final)
|
||
tarifa = converter_valor(match_tarifa.group(1)) if match_tarifa else 0.0
|
||
|
||
dados = {
|
||
'classificacao_tarifaria': classificacao,
|
||
'nome': nome,
|
||
'unidade_consumidora': uc,
|
||
'cidade': cidade,
|
||
'estado': estado,
|
||
'referencia': referencia,
|
||
'valor_total': valor_total,
|
||
'pis_aliq': pis_aliq,
|
||
'icms_aliq': icms_aliq,
|
||
'cofins_aliq': cofins_aliq,
|
||
'pis_valor': pis_valor,
|
||
'icms_valor': icms_valor,
|
||
'cofins_valor': cofins_valor,
|
||
'pis_base': pis_base,
|
||
'icms_base': icms_base,
|
||
'cofins_base': cofins_base,
|
||
'consumo': consumo,
|
||
'tarifa': tarifa,
|
||
'nota_fiscal': nota_fiscal,
|
||
'data_processamento': datetime.now(),
|
||
'distribuidora': 'Equatorial Goiás'
|
||
}
|
||
|
||
campos_obrigatorios = ['nome', 'unidade_consumidora', 'referencia', 'nota_fiscal']
|
||
faltantes = [campo for campo in campos_obrigatorios if not dados.get(campo)]
|
||
if valor_total is None and not any(p in texto_limpo for p in ['R$***********0,00', 'R$*********0,00', 'R$*0,00']):
|
||
faltantes.append('valor_total')
|
||
|
||
if faltantes:
|
||
raise ValueError(f"Campos obrigatórios faltantes: {', '.join(faltantes)}")
|
||
|
||
return dados
|
||
|
||
|