Module pipelines.rj_escritorio.geolocator.tasks

Tasks for geolocator

Functions

def cria_csv(base_enderecos_novos: pandas.core.frame.DataFrame) ‑> str | pathlib.Path
Expand source code
@task
def cria_csv(base_enderecos_novos: pd.DataFrame) -> Union[str, Path]:
    """
    Une os endereços previamente catalogados com os novos e cria um csv.
    """

    # Fixa ordem das colunas
    cols_order = [
        "endereco_completo",
        "pais",
        "estado",
        "municipio",
        "bairro",
        "id_logradouro",
        "logradouro",
        "numero_porta",
        "latitude",
        "longitude",
    ]
    base_enderecos_novos[cols_order] = base_enderecos_novos[cols_order]

    # Hora atual no formato YYYY-MM-DD para criar partições
    current_day = pendulum.now("America/Sao_Paulo").strftime("%Y-%m-%d")

    ano = current_day[:4]
    mes = str(int(current_day[5:7]))
    partitions = os.path.join(
        f"ano_particao={ano}", f"mes_particao={mes}", f"data_particao={current_day}"
    )

    base_path = os.path.join(os.getcwd(), "tmp", "geolocator")
    partition_path = os.path.join(base_path, partitions)

    if not os.path.exists(partition_path):
        os.makedirs(partition_path)

    filename = os.path.join(partition_path, "base_enderecos.csv")
    log(f"File is saved on: {filename}")

    base_enderecos_novos.to_csv(
        filename,
        index=False,
    )
    return base_path

Une os endereços previamente catalogados com os novos e cria um csv.

def geolocaliza_enderecos(base_enderecos_novos: pandas.core.frame.DataFrame) ‑> pandas.core.frame.DataFrame
Expand source code
@task
def geolocaliza_enderecos(base_enderecos_novos: pd.DataFrame) -> pd.DataFrame:
    """
    Geolocaliza todos os novos endereços que entraram no dia anterior
    e verifica se pontos pertencem a cidade do Rio de Janeiro.
    """
    start_time = time.time()
    coordenadas = base_enderecos_novos["endereco_completo"].apply(
        lambda x: pd.Series(geolocator(x), index=["lat", "long"])
    )

    coordenadas[["lat", "long"]] = coordenadas.apply(
        lambda x: checar_point_pertence_cidade(x.lat, x.long),
        axis=1,
        result_type="expand",
    )

    log(f"--- {(time.time() - start_time)} seconds ---")

    try:
        base_enderecos_novos["latitude"] = coordenadas["lat"]
        base_enderecos_novos["longitude"] = coordenadas["long"]
    except:  # pylint: disable=bare-except
        log("Não foram identificados chamados abertos no dia anterior.")

    return base_enderecos_novos

Geolocaliza todos os novos endereços que entraram no dia anterior e verifica se pontos pertencem a cidade do Rio de Janeiro.

def seleciona_enderecos_novos() ‑> Tuple[pandas.core.frame.DataFrame, bool]
Expand source code
@task(nout=2)
def seleciona_enderecos_novos() -> Tuple[pd.DataFrame, bool]:
    """
    Seleciona base de endereços chamados que entraram no dia anterior e não estão geolocalizados.
    """
    # pylint: disable=W1401
    query = """
    WITH
    end_conhecidos AS (
        SELECT endereco_completo
        FROM `rj-escritorio-dev.dados_mestres_staging.enderecos_geolocalizados`
    ),
    chamados_ontem AS (
        SELECT
        'Brasil' pais,
        'RJ' estado,
        'Rio de Janeiro' municipio,
        no_bairro bairro,
        LPAD(SAFE_CAST(REGEXP_REPLACE(id_logradouro, r'\.0$', '') AS STRING), 6, '0') id_logradouro,
        no_logradouro logradouro,
        SAFE_CAST(SAFE_CAST(ds_endereco_numero AS INT) AS STRING) numero_porta,
        CONCAT(no_logradouro, ' ', SAFE_CAST(SAFE_CAST(ds_endereco_numero AS INT) AS STRING), ', ', no_bairro,
            ', ', 'Rio de Janeiro, RJ, Brasil') endereco_completo
        FROM `rj-segovi.adm_central_atendimento_1746_staging.chamado`
        WHERE no_logradouro IS NOT NULL
            AND CAST(dt_inicio AS TIMESTAMP) BETWEEN DATE_ADD(CAST(CURRENT_DATE() AS TIMESTAMP), INTERVAL -1 DAY) AND CURRENT_TIMESTAMP()
        )

    SELECT DISTINCT
    ch.endereco_completo, pais, estado, municipio,
    bairro, id_logradouro, logradouro, ch.numero_porta
    FROM chamados_ontem ch
    LEFT JOIN end_conhecidos ec ON ec.endereco_completo = ch.endereco_completo
    WHERE ec.endereco_completo IS NULL AND ch.endereco_completo IS NOT NULL
    """
    base_enderecos_novos = bd.read_sql(
        query, billing_project_id="rj-escritorio-dev", from_file=True
    )
    possui_enderecos_novos = base_enderecos_novos.shape[0] > 0

    return base_enderecos_novos.drop_duplicates(), possui_enderecos_novos

Seleciona base de endereços chamados que entraram no dia anterior e não estão geolocalizados.