Module pipelines.rj_cor.meteorologia.precipitacao_alertario.tasks
Tasks for precipitacao_alertario
Functions
def check_to_run_dbt(dataset_id: str, table_id: str, mode: str = 'dev') ‑> bool
-
Expand source code
@task(skip_on_upstream_skip=False) def check_to_run_dbt( dataset_id: str, table_id: str, mode: str = "dev", ) -> bool: """ It will run even if its upstream tasks skip. """ key_table_1 = build_redis_key( dataset_id, table_id, name="dbt_last_update", mode=mode ) key_table_2 = build_redis_key(dataset_id, table_id, name="last_update", mode=mode) format_date_table_1 = "YYYY-MM-DD HH:mm:SS" format_date_table_2 = "YYYY-MM-DD HH:mm:SS" # Returns true if date saved on table_2 (alertario) is bigger than # the date saved on table_1 (dbt). run_dbt = compare_dates_between_tables_redis( key_table_1, format_date_table_1, key_table_2, format_date_table_2 ) log(f">>>> debug data alertario > data dbt: {run_dbt}") return run_dbt
It will run even if its upstream tasks skip.
def download_data() ‑> pandas.core.frame.DataFrame
-
Expand source code
@task( nout=2, max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) def download_data() -> pd.DataFrame: """ Request data from API and return each data in a different dataframe. """ dicionario = get_vault_secret("alertario_api") url = dicionario["data"]["url"] try: response = requests.get(url) if response.status_code == 200: soup = BeautifulSoup(response.text, "html.parser") # Get all tables from HTML structure tables = soup.find_all("table") # Data cames in Brazillian format and has some extra newline tables = [ str(table).replace(",", ".").replace("\n", "") for table in tables ] # Convert HTML table to pandas dataframe dfr = pd.read_html(str(tables), decimal=",") else: log( f"Erro ao fazer a solicitação. Código de status: {response.status_code}" ) except requests.RequestException as e: log(f"Erro durante a solicitação: {e}") dfr_pluviometric = dfr[0] dfr_meteorological = dfr[1] # dfr_rain_conditions = dfr[2] # dfr_landslide_probability = dfr[3] log(f"\nPluviometric df {dfr_pluviometric.iloc[0]}") log(f"\nMeteorological df {dfr_meteorological.iloc[0]}") return ( dfr_pluviometric, dfr_meteorological, ) # , dfr_rain_conditions, dfr_landslide_probability
Request data from API and return each data in a different dataframe.
def save_data(dfr: pandas.core.frame.DataFrame, data_name: str = 'temp', wait=None) ‑> str | pathlib.Path
-
Expand source code
@task def save_data( dfr: pd.DataFrame, data_name: str = "temp", wait=None, # pylint: disable=unused-argument ) -> Union[str, Path]: """ Salvar dfr tratados em csv para conseguir subir pro GCP """ prepath = Path(f"/tmp/precipitacao_alertario/{data_name}") prepath.mkdir(parents=True, exist_ok=True) partition_column = "data_medicao" log(f"Dataframe before partitions {dfr.iloc[0]}") log(f"Dataframe before partitions {dfr.dtypes}") dataframe, partitions = parse_date_columns(dfr, partition_column) current_time = pendulum.now("America/Sao_Paulo").strftime("%Y%m%d%H%M") log(f"Dataframe after partitions {dataframe.iloc[0]}") log(f"Dataframe after partitions {dataframe.dtypes}") to_partitions( data=dataframe, partition_columns=partitions, savepath=prepath, data_type="csv", suffix=current_time, ) log(f"Files saved on {prepath}") return prepath
Salvar dfr tratados em csv para conseguir subir pro GCP
def save_data_old(dfr: pandas.core.frame.DataFrame, data_name: str = 'temp', wait=None) ‑> str | pathlib.Path
-
Expand source code
@task def save_data_old( dfr: pd.DataFrame, data_name: str = "temp", wait=None, # pylint: disable=unused-argument ) -> Union[str, Path]: """ Salvar dfr tratados em csv para conseguir subir pro GCP """ prepath = Path(f"/tmp/precipitacao_alertario/{data_name}") prepath.mkdir(parents=True, exist_ok=True) log(f"Dataframe before partitions old api {dfr.iloc[0]}") partition_column = "data_medicao" dataframe, partitions = parse_date_columns_old_api(dfr, partition_column) current_time = pendulum.now("America/Sao_Paulo").strftime("%Y%m%d%H%M") log(f"Dataframe after partitions old api {dataframe.iloc[0]}") to_partitions( data=dataframe, partition_columns=partitions, savepath=prepath, data_type="csv", suffix=current_time, ) log(f"{data_name} files saved on {prepath}") return prepath
Salvar dfr tratados em csv para conseguir subir pro GCP
def save_last_dbt_update(dataset_id: str, table_id: str, mode: str = 'dev', wait=None) ‑> None
-
Expand source code
@task def save_last_dbt_update( dataset_id: str, table_id: str, mode: str = "dev", wait=None, # pylint: disable=unused-argument ) -> None: """ Save on dbt last timestamp where it was updated """ last_update_key = build_redis_key( dataset_id, table_id, name="last_update", mode=mode ) last_update = get_redis_output(last_update_key) redis_key = build_redis_key(dataset_id, table_id, name="dbt_last_update", mode=mode) log(f"Saving {last_update} as last time dbt was updated") save_str_on_redis(redis_key, "date", last_update["date"])
Save on dbt last timestamp where it was updated
def treat_old_pluviometer(dfr: pandas.core.frame.DataFrame, wait=None) ‑> pandas.core.frame.DataFrame
-
Expand source code
@task( nout=2, max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) def treat_old_pluviometer( dfr: pd.DataFrame, wait=None, # pylint: disable=unused-argument ) -> pd.DataFrame: """ Renomeia colunas no estilo do antigo flow. """ log( f"Starting treating pluviometer data to match old API.\ Pluviometer table enter as\n{dfr.iloc[0]}" ) rename_cols = { "acumulado_chuva_15min": "acumulado_chuva_15_min", "acumulado_chuva_1h": "acumulado_chuva_1_h", "acumulado_chuva_4h": "acumulado_chuva_4_h", "acumulado_chuva_24h": "acumulado_chuva_24_h", "acumulado_chuva_96h": "acumulado_chuva_96_h", } dfr.rename(columns=rename_cols, inplace=True) # date_format = "%Y-%m-%d %H:%M:%S" # dfr["data_medicao"] = dfr["data_medicao"].dt.strftime(date_format) dfr.data_medicao = dfr.data_medicao.apply(treat_date_col) # Converte variáveis que deveriam ser float para float float_cols = [ "acumulado_chuva_15_min", "acumulado_chuva_1_h", "acumulado_chuva_4_h", "acumulado_chuva_24_h", "acumulado_chuva_96_h", ] dfr[float_cols] = dfr[float_cols].apply(pd.to_numeric, errors="coerce") # Altera valores negativos para None dfr[float_cols] = np.where(dfr[float_cols] < 0, None, dfr[float_cols]) # Elimina linhas em que o id_estacao é igual mantendo a de menor valor nas colunas float dfr.sort_values(["id_estacao", "data_medicao"] + float_cols, inplace=True) dfr.drop_duplicates(subset=["id_estacao", "data_medicao"], keep="first") # Fix columns order dfr = dfr[ [ "data_medicao", "id_estacao", "acumulado_chuva_15_min", "acumulado_chuva_1_h", "acumulado_chuva_4_h", "acumulado_chuva_24_h", "acumulado_chuva_96_h", ] ] log( f"Ending treating pluviometer data to match old API.\ Pluviometer table finished as\n{dfr.iloc[0]}" ) return dfr
Renomeia colunas no estilo do antigo flow.
def treat_pluviometer_and_meteorological_data(dfr: pandas.core.frame.DataFrame,
dataset_id: str,
table_id: str,
mode: str = 'dev') ‑> Tuple[pandas.core.frame.DataFrame, bool]-
Expand source code
@task( nout=2, max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) def treat_pluviometer_and_meteorological_data( dfr: pd.DataFrame, dataset_id: str, table_id: str, mode: str = "dev" ) -> Tuple[pd.DataFrame, bool]: """ Renomeia colunas e filtra dados com a hora e minuto do timestamp de execução mais próximo à este """ # Treat dfr if is from pluviometers if isinstance(dfr.columns, pd.MultiIndex): # Keeping only the rightmost level of the MultiIndex columns dfr.columns = dfr.columns.droplevel(level=0) rename_cols = { "N°": "id_estacao", "Hora Leitura": "data_medicao", "05 min": "acumulado_chuva_5min", "10 min": "acumulado_chuva_10min", "15 min": "acumulado_chuva_15min", "30 min": "acumulado_chuva_30min", "1h": "acumulado_chuva_1h", "2h": "acumulado_chuva_2h", "3h": "acumulado_chuva_3h", "4h": "acumulado_chuva_4h", "6h": "acumulado_chuva_6h", "12h": "acumulado_chuva_12h", "24h": "acumulado_chuva_24h", "96h": "acumulado_chuva_96h", "No Mês": "acumulado_chuva_mes", } else: rename_cols = { "N°": "id_estacao", "Hora Leitura": "data_medicao", "Temp. (°C)": "temperatura", "Umi. do Ar (%)": "umidade_ar", "Sen. Térmica (°C)": "sensacao_termica", "P. Atm. (hPa)": "pressao_atmosferica", "P. de Orvalho (°C)": "temperatura_orvalho", "Vel. do Vento (Km/h)": "velocidade_vento", "Dir. do Vento (°)": "direcao_vento", } # confirmar nome das colunas com inmet dfr.rename(columns=rename_cols, inplace=True) keep_cols = list(rename_cols.values()) # Elimina linhas em que o id_estacao é igual mantendo a de menor valor nas colunas float # dfr.sort_values(["id_estacao", "data_medicao"] + float_cols, inplace=True) dfr.drop_duplicates(subset=["id_estacao", "data_medicao"], keep="first") dfr["data_medicao"] = pd.to_datetime( dfr["data_medicao"], format="%d/%m/%Y - %H:%M:%S" ) log(f"Dataframe before comparing with last data saved on redis {dfr.head()}") log(f"Dataframe before comparing with last data saved on redis {dfr.iloc[0]}") dfr = save_updated_rows_on_redis( dfr, dataset_id, table_id, unique_id="id_estacao", date_column="data_medicao", date_format="%Y-%m-%d %H:%M:%S", mode=mode, ) empty_data = dfr.shape[0] == 0 if not empty_data: see_cols = ["id_estacao", "data_medicao", "last_update"] log( f"Dataframe after comparing with last data saved on redis {dfr[see_cols].head()}" ) log(f"Dataframe first row after comparing {dfr.iloc[0]}") dfr["data_medicao"] = dfr["data_medicao"].dt.strftime("%Y-%m-%d %H:%M:%S") log(f"Dataframe after converting to string {dfr[see_cols].head()}") # Save max date on redis to compare this with last dbt run max_date = str(dfr["data_medicao"].max()) redis_key = build_redis_key(dataset_id, table_id, name="last_update", mode=mode) log(f"Dataframe is not empty. Redis key: {redis_key} and new date: {max_date}") save_str_on_redis(redis_key, "date", max_date) if not empty_data: # Changin values "ND" and "-" to "None" dfr.replace(["ND", "-"], [None, None], inplace=True) # Fix columns order dfr = dfr[keep_cols] else: # If df is empty stop flow on flows.py log("Dataframe is empty. Skipping update flow.") return dfr, empty_data
Renomeia colunas e filtra dados com a hora e minuto do timestamp de execução mais próximo à este