Module pipelines.rj_cor.meteorologia.precipitacao_cemaden.tasks

Tasks for precipitacao_cemaden

Functions

def check_for_new_stations(dataframe: pandas.core.frame.DataFrame, wait=None) ‑> None
Expand source code
@task
def check_for_new_stations(
    dataframe: pd.DataFrame,
    wait=None,  # pylint: disable=unused-argument
) -> None:
    """
    Check if the updated stations are the same as before.
    If not, consider flow as failed and call attention to
    add this new station on estacoes_cemaden.
    I can't automatically update this new station, because
    I couldn't find a url that gives me the lat and lon for
    all the stations.
    """

    stations_before = [
        "3043",
        "3044",
        "3045",
        "3114",
        "3215",
        "7593",
        "7594",
        "7595",
        "7596",
        "7597",
        "7599",
        "7600",
        "7601",
        "7602",
        "7603",
        "7606",
        "7609",
        "7610",
        "7611",
        "7612",
        "7613",
        "7614",
        "7615",
    ]
    new_stations = [
        i for i in dataframe.id_estacao.unique() if str(i) not in stations_before
    ]
    if len(new_stations) != 0:
        message = f"New station identified. You need to update CEMADEN\
              estacoes_cemaden adding station(s) {new_stations}: \
              {dataframe[dataframe.id_estacao.isin(new_stations)]}  "
        log(message)
        raise ENDRUN(state=Failed(message))

Check if the updated stations are the same as before. If not, consider flow as failed and call attention to add this new station on estacoes_cemaden. I can't automatically update this new station, because I couldn't find a url that gives me the lat and lon for all the stations.

def download_data() ‑> pandas.core.frame.DataFrame
Expand source code
@task(
    nout=2,
    max_retries=constants.TASK_MAX_RETRIES.value,
    retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def download_data() -> pd.DataFrame:
    """
    Download data from API
    """

    url = "http://sjc.salvar.cemaden.gov.br/resources/graficos/interativo/getJson2.php?uf=RJ"
    dataframe = pd.read_json(url)
    return dataframe

Download data from API

def save_data(dataframe: pandas.core.frame.DataFrame) ‑> str | pathlib.Path
Expand source code
@task
def save_data(dataframe: pd.DataFrame) -> Union[str, Path]:
    """
    Save data on a csv file to be uploaded to GCP
    """

    prepath = Path("/tmp/precipitacao_cemaden/")
    prepath.mkdir(parents=True, exist_ok=True)

    partition_column = "data_medicao"
    dataframe, partitions = parse_date_columns(dataframe, partition_column)
    current_time = pendulum.now("America/Sao_Paulo").strftime("%Y%m%d%H%M")

    to_partitions(
        data=dataframe,
        partition_columns=partitions,
        savepath=prepath,
        data_type="csv",
        suffix=current_time,
    )
    log(f"[DEBUG] Files saved on {prepath}")
    return prepath

Save data on a csv file to be uploaded to GCP

def treat_data(dataframe: pandas.core.frame.DataFrame,
dataset_id: str,
table_id: str,
mode: str = 'dev') ‑> Tuple[pandas.core.frame.DataFrame, bool]
Expand source code
@task(
    nout=2,
    max_retries=constants.TASK_MAX_RETRIES.value,
    retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def treat_data(
    dataframe: pd.DataFrame, dataset_id: str, table_id: str, mode: str = "dev"
) -> Tuple[pd.DataFrame, bool]:
    """
    Rename cols and filter data using hour and minute from the nearest current timestamp
    """

    drop_cols = [
        "uf",
        "codibge",
        "cidade",
        "nomeestacao",
        "tipoestacao",
        "status",
    ]
    rename_cols = {
        "idestacao": "id_estacao",
        "ultimovalor": "acumulado_chuva_10_min",
        "datahoraUltimovalor": "data_medicao_utc",
        "acc1hr": "acumulado_chuva_1_h",
        "acc3hr": "acumulado_chuva_3_h",
        "acc6hr": "acumulado_chuva_6_h",
        "acc12hr": "acumulado_chuva_12_h",
        "acc24hr": "acumulado_chuva_24_h",
        "acc48hr": "acumulado_chuva_48_h",
        "acc72hr": "acumulado_chuva_72_h",
        "acc96hr": "acumulado_chuva_96_h",
    }

    dataframe = (
        dataframe[(dataframe["codibge"] == 3304557) & (dataframe["tipoestacao"] == 1)]
        .drop(drop_cols, axis=1)
        .rename(rename_cols, axis=1)
    )
    log(f"\n[DEBUG]: df.head() {dataframe.head()}")

    # Convert from UTC to São Paulo timezone
    dataframe["data_medicao_utc"] = pd.to_datetime(
        dataframe["data_medicao_utc"], dayfirst=True
    ) + pd.DateOffset(hours=0)
    dataframe["data_medicao"] = (
        dataframe["data_medicao_utc"]
        .dt.tz_localize("UTC")
        .dt.tz_convert("America/Sao_Paulo")
    )
    see_cols = ["data_medicao_utc", "data_medicao", "id_estacao", "acumulado_chuva_1_h"]
    log(f"DEBUG: data utc -> GMT-3 {dataframe[see_cols]}")

    date_format = "%Y-%m-%d %H:%M:%S"
    dataframe["data_medicao"] = dataframe["data_medicao"].dt.strftime(date_format)

    log(f"DEBUG: data {dataframe[see_cols]}")

    # Change values '-' and np.nan to NULL
    dataframe.replace(["-", np.nan], [0, None], inplace=True)

    # Change negative values to None
    float_cols = [
        "acumulado_chuva_10_min",
        "acumulado_chuva_1_h",
        "acumulado_chuva_3_h",
        "acumulado_chuva_6_h",
        "acumulado_chuva_12_h",
        "acumulado_chuva_24_h",
        "acumulado_chuva_48_h",
        "acumulado_chuva_72_h",
        "acumulado_chuva_96_h",
    ]
    dataframe[float_cols] = np.where(
        dataframe[float_cols] < 0, None, dataframe[float_cols]
    )

    # Eliminate where the id_estacao is the same keeping the smallest one
    dataframe.sort_values(["id_estacao", "data_medicao"] + float_cols, inplace=True)
    dataframe.drop_duplicates(subset=["id_estacao", "data_medicao"], keep="first")

    log(f"Dataframe before comparing with last data saved on redis {dataframe.head()}")

    dataframe = save_updated_rows_on_redis(
        dataframe,
        dataset_id,
        table_id,
        unique_id="id_estacao",
        date_column="data_medicao",
        date_format=date_format,
        mode=mode,
    )

    log(f"Dataframe after comparing with last data saved on redis {dataframe.head()}")

    # If df is empty stop flow
    if dataframe.shape[0] == 0:
        skip_text = "No new data available on API"
        log(skip_text)
        raise ENDRUN(state=Skipped(skip_text))

    # Fix columns order
    dataframe = dataframe[
        [
            "id_estacao",
            "data_medicao",
            "acumulado_chuva_10_min",
            "acumulado_chuva_1_h",
            "acumulado_chuva_3_h",
            "acumulado_chuva_6_h",
            "acumulado_chuva_12_h",
            "acumulado_chuva_24_h",
            "acumulado_chuva_48_h",
            "acumulado_chuva_72_h",
            "acumulado_chuva_96_h",
        ]
    ]

    return dataframe

Rename cols and filter data using hour and minute from the nearest current timestamp