Module pipelines.rj_rioaguas.saneamento_drenagem.nivel_lagoa.tasks

Tasks para pipeline de dados de nível da Lagoa Rodrigo de Freitas. Fonte: Squitter.

Functions

def download_file(download_url)
Expand source code
@task
def download_file(download_url):
    """
    Função para download de tabela com os dados.

    Args:
    download_url (str): URL onde a tabela está localizada.
    """
    # Acessar username e password
    dicionario = get_vault_secret("rioaguas_nivel_lagoa_squitter")
    url = dicionario["data"]["url"]
    user = dicionario["data"]["user"]
    password = dicionario["data"]["password"]
    session = login(url, user, password)
    page = session.get(download_url)
    soup = BeautifulSoup(page.text, "html.parser")
    table = soup.find_all("table")
    dfr = pd.read_html(str(table))[0]
    return dfr

Função para download de tabela com os dados.

Args: download_url (str): URL onde a tabela está localizada.

def salvar_dados(dados: pandas.core.frame.DataFrame) ‑> str | pathlib.Path
Expand source code
@task
def salvar_dados(dados: pd.DataFrame) -> Union[str, Path]:
    """
    Salvar dados em csv.
    """
    prepath = Path("/tmp/nivel_lagoa/")
    prepath.mkdir(parents=True, exist_ok=True)

    partition_column = "data_medicao"
    dataframe, partitions = parse_date_columns(dados, partition_column)
    current_time = pendulum.now("America/Sao_Paulo").strftime("%Y%m%d%H%M")

    # Cria partições a partir da data
    to_partitions(
        data=dataframe,
        partition_columns=partitions,
        savepath=prepath,
        data_type="csv",
        suffix=current_time,
    )
    log(f"[DEBUG] Files saved on {prepath}")
    return prepath

Salvar dados em csv.

def tratar_dados(dfr: pandas.core.frame.DataFrame,
dataset_id: str,
table_id: str,
mode: str = 'prod') ‑> Tuple[pandas.core.frame.DataFrame, bool]
Expand source code
@task(
    nout=2,
    max_retries=constants.TASK_MAX_RETRIES.value,
    retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def tratar_dados(
    dfr: pd.DataFrame, dataset_id: str, table_id: str, mode: str = "prod"
) -> Tuple[pd.DataFrame, bool]:
    """
    Tratar dados para o padrão estabelecido.
    """

    # Renomeia colunas
    dfr = dfr.rename(
        columns={"Hora Leitura": "data_medicao", "Nível [m]": "nivel_agua"}
    )
    # Adiciona coluna para id e nome da lagoa
    dfr["id_estacao"] = "1"
    dfr["nome_estacao"] = "Lagoa rodrigo de freitas"
    # Remove duplicados
    dfr = dfr.drop_duplicates(subset=["id_estacao", "data_medicao"], keep="first")
    # Acessa o redis e mantem apenas linhas que ainda não foram salvas
    log(f"[DEBUG]: dados coletados\n{dfr.tail()}")
    dfr = save_updated_rows_on_redis(
        dfr,
        dataset_id,
        table_id,
        unique_id="id_estacao",
        date_column="data_medicao",
        date_format="%d/%m/%Y %H:%M",
        mode=mode,
    )
    log(f"[DEBUG]: dados que serão salvos no df\n{dfr.tail()}")

    # If df is empty stop flow
    empty_data = dfr.shape[0] == 0
    log(f"[DEBUG]: dataframe is empty: {empty_data}")

    return (
        dfr[["data_medicao", "id_estacao", "nome_estacao", "nivel_agua"]],
        empty_data,
    )

Tratar dados para o padrão estabelecido.