Module pipelines.rj_rioaguas.saneamento_drenagem.nivel_lamina_agua.tasks
Tasks para pipeline de dados de nível de lâmina de água em via.
Functions
def download_file() ‑> pandas.core.frame.DataFrame
-
Expand source code
@task def download_file() -> pd.DataFrame: """ Função para download de tabela com os dados. Args: download_url (str): URL onde a tabela está localizada. """ # Acessar página web dicionario = get_vault_secret("rioaguas_lamina_agua") url = dicionario["data"]["url"] session = login(url) page = session.get(url) # Faz o parse do htm e seleciona apenas dados que estão em tabela soup = BeautifulSoup(page.text, "html.parser") table = soup.find_all("table") # Converte dados para dataframe dados = pd.read_html(str(table), flavor="bs4")[0] return dados
Função para download de tabela com os dados.
Args: download_url (str): URL onde a tabela está localizada.
def salvar_dados(dados: pandas.core.frame.DataFrame) ‑> str | pathlib.Path
-
Expand source code
@task def salvar_dados(dados: pd.DataFrame) -> Union[str, Path]: """ Salvar dados em csv. """ save_cols = [ "data_medicao", "id_estacao", "altura_agua", ] dataframe = dados[save_cols].copy() prepath = Path("/tmp/altura_agua/") prepath.mkdir(parents=True, exist_ok=True) partition_column = "data_medicao" dataframe, partitions = parse_date_columns(dataframe, partition_column) current_time = pendulum.now("America/Sao_Paulo").strftime("%Y%m%d%H%M") to_partitions( data=dataframe, partition_columns=partitions, savepath=prepath, data_type="csv", suffix=current_time, ) log(f"[DEBUG] Files saved on {prepath}") # filename = prepath / f"nivel_{current_time}.csv" # log(f"Saving {filename}") # dados[save_cols].to_csv(filename, index=False) return prepath
Salvar dados em csv.
def tratar_dados(dados: pandas.core.frame.DataFrame,
dataset_id: str,
table_id: str,
mode: str = 'prod') ‑> pandas.core.frame.DataFrame-
Expand source code
@task def tratar_dados( dados: pd.DataFrame, dataset_id: str, table_id: str, mode: str = "prod" ) -> pd.DataFrame: """Tratar dados para o padrão estabelecido e filtrar linhas para salvarmos apenas as medições que foram contratadas pela prefeitura. """ # Cria id das estações estacao_2_id_estacao = { "Catete": "1", "Bangu - Rua da Feira": "2", "Bangu - Rua do Açudes": "3", "Rio Maracanã - Visc Itamarati": "4", "Itanhangá": "5", "Bangu - Av Santa Cruz": "6", "Lagoa": "7", "Rio Maracanã - R: Uruguai": "8", } dados["id_estacao"] = dados["Endereço"].map(estacao_2_id_estacao) rename_cols = { "Endereço": "endereco", "Último envio": "data_medicao", "Temperatura": "temperatura", "Umidade": "umidade", "Precipitação": "precipitacao", "Lâmina": "altura_agua", } # Substitui valores que aparecem nas linhas dados = dados.rename(rename_cols, axis=1).replace( { " ºC": "", " %": "", " mm": "", " cm": "", ",": ".", "R:": "rua", }, regex=True, ) dados["endereco"] = dados["endereco"].str.capitalize() dados["endereco"] = dados["endereco"].apply(lambda x: unidecode.unidecode(x)) date_format = "%d/%m/%Y %H:%M" dados["data_medicao"] = pd.to_datetime(dados["data_medicao"], format=date_format) # Fixa ordem das colunas cols_order = [ "data_medicao", "id_estacao", "endereco", "altura_agua", "precipitacao", "umidade", "temperatura", ] log(f"[DEBUG]: dados coletados\n{dados.head()}") dados = save_updated_rows_on_redis( dados, dataset_id, table_id, unique_id="id_estacao", date_column="data_medicao", date_format="%d/%m/%Y %H:%M", mode=mode, ) log(f"[DEBUG]: dados que serão salvos\n{dados.head()}") return dados[cols_order]
Tratar dados para o padrão estabelecido e filtrar linhas para salvarmos apenas as medições que foram contratadas pela prefeitura.