Module pipelines.rj_cor.meteorologia.precipitacao_websirene.tasks
Tasks for precipitacao_alertario
Functions
def download_dados() ‑> pandas.core.frame.DataFrame
-
Expand source code
@task( max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) def download_dados() -> pd.DataFrame: """ Faz o request e salva em um dataframe """ # Acessar url websirene url = "http://websirene.rio.rj.gov.br/xml/chuvas.xml" dfr = pdx.read_xml(url, ["estacoes"]) return dfr
Faz o request e salva em um dataframe
def salvar_dados(dfr: pandas.core.frame.DataFrame) ‑> str | pathlib.Path
-
Expand source code
@task def salvar_dados(dfr: pd.DataFrame) -> Union[str, Path]: """ Salvar dados tratados em csv para conseguir subir pro GCP """ # Ordenação de variáveis cols_order = [ "id_estacao", "data_medicao", "acumulado_chuva_15_min", "acumulado_chuva_1_h", "acumulado_chuva_4_h", "acumulado_chuva_24_h", "acumulado_chuva_96_h", "acumulado_chuva_mes", ] dfr = dfr[cols_order] prepath = Path("/tmp/precipitacao_websirene/") prepath.mkdir(parents=True, exist_ok=True) partition_column = "data_medicao" dataframe, partitions = parse_date_columns(dfr, partition_column) current_time = pendulum.now("America/Sao_Paulo").strftime("%Y%m%d%H%M") # Cria partições a partir da data to_partitions( data=dataframe, partition_columns=partitions, savepath=prepath, data_type="csv", suffix=current_time, ) log(f"[DEBUG] Files saved on {prepath}") return prepath
Salvar dados tratados em csv para conseguir subir pro GCP
def tratar_dados(dfr: pandas.core.frame.DataFrame,
dataset_id: str,
table_id: str,
mode: str = 'dev') ‑> Tuple[pandas.core.frame.DataFrame, bool]-
Expand source code
@task( nout=2, max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) def tratar_dados( dfr: pd.DataFrame, dataset_id: str, table_id: str, mode: str = "dev", ) -> Tuple[pd.DataFrame, bool]: """ Faz o request e salva dados localmente """ drop_cols = [ "@hora", "estacao|@nome", "estacao|@type", "estacao|localizacao|@bacia", "estacao|localizacao|@latitude", "estacao|localizacao|@longitude", ] rename_cols = { "estacao|@id": "id_estacao", "estacao|chuvas|@h01": "acumulado_chuva_1_h", "estacao|chuvas|@h04": "acumulado_chuva_4_h", "estacao|chuvas|@h24": "acumulado_chuva_24_h", "estacao|chuvas|@h96": "acumulado_chuva_96_h", "estacao|chuvas|@hora": "data_medicao_utc", "estacao|chuvas|@m15": "acumulado_chuva_15_min", "estacao|chuvas|@mes": "acumulado_chuva_mes", } dfr = pdx.fully_flatten(dfr).drop(drop_cols, axis=1).rename(rename_cols, axis=1) # Converte de UTC para horário São Paulo date_format = "%Y-%m-%d %H:%M:%S" dfr["data_medicao_utc"] = pd.to_datetime(dfr["data_medicao_utc"]) dfr["data_medicao"] = ( dfr["data_medicao_utc"] .dt.tz_convert("America/Sao_Paulo") .dt.strftime(date_format) ) dfr["data_medicao"] = pd.to_datetime(dfr["data_medicao"]) dfr = dfr.drop(["data_medicao_utc"], axis=1) # Converte variáveis que deveriam ser float para float float_cols = [ "acumulado_chuva_15_min", "acumulado_chuva_1_h", "acumulado_chuva_4_h", "acumulado_chuva_24_h", "acumulado_chuva_96_h", "acumulado_chuva_mes", ] dfr[float_cols] = dfr[float_cols].apply(pd.to_numeric, errors="coerce") dfr = save_updated_rows_on_redis( dfr, dataset_id, table_id, unique_id="id_estacao", date_column="data_medicao", date_format=date_format, mode=mode, ) # If df is empty stop flow empty_data = dfr.shape[0] == 0 log(f"[DEBUG]: dataframe is empty: {empty_data}") return dfr, empty_data
Faz o request e salva dados localmente