Module pipelines.rj_cor.meteorologia.precipitacao_inea.tasks

Tasks for precipitacao_inea

Functions

def check_for_new_stations(dataframe: pandas.core.frame.DataFrame, wait=None) ‑> None
Expand source code
@task
def check_for_new_stations(
    dataframe: pd.DataFrame,
    wait=None,  # pylint: disable=unused-argument
) -> None:
    """
    Check if the updated stations are the same as before.
    If not, consider flow as failed and call attention to
    add this new station on estacoes_cemaden.
    I can't automatically update this new station, because
    I couldn't find a url that gives me the lat and lon for
    all the stations.
    """

    stations_before = [
        "1",
        "2",
        "3",
        "4",
        "5",
    ]
    new_stations = [
        i for i in dataframe.id_estacao.unique() if str(i) not in stations_before
    ]
    if len(new_stations) != 0:
        message = f"New station identified. You need to update INEA\
              estacoes_inea adding station(s) {new_stations}: \
              {dataframe[dataframe.id_estacao.isin(new_stations)]}  "
        log(message)
        raise ENDRUN(state=Failed(message))

Check if the updated stations are the same as before. If not, consider flow as failed and call attention to add this new station on estacoes_cemaden. I can't automatically update this new station, because I couldn't find a url that gives me the lat and lon for all the stations.

def check_new_data(dfr_pluviometric: pandas.core.frame.DataFrame,
dfr_fluviometric: pandas.core.frame.DataFrame) ‑> Tuple[bool, bool]
Expand source code
@task(
    nout=2,
    max_retries=constants.TASK_MAX_RETRIES.value,
    retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def check_new_data(
    dfr_pluviometric: pd.DataFrame,
    dfr_fluviometric: pd.DataFrame,
) -> Tuple[bool, bool]:
    """
    Check if the dataframes are empty
    """

    new_pluviometric_data = True
    new_fluviometric_data = True

    if dfr_pluviometric.shape[0] == 0:
        log("No new pluviometric data available on API")
        new_pluviometric_data = False
    if dfr_fluviometric.shape[0] == 0:
        log("No new fluviometric data available on API")
        new_fluviometric_data = False
    return new_pluviometric_data, new_fluviometric_data

Check if the dataframes are empty

def download_data() ‑> pandas.core.frame.DataFrame
Expand source code
@task(
    nout=2,
    max_retries=constants.TASK_MAX_RETRIES.value,
    retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def download_data() -> pd.DataFrame:
    """
    Download data from API
    """

    estacoes = {
        "1": "225543320",  # Campo Grande
        "2": "BE70E166",  # Capela Mayrink
        "3": "225543250",  # Eletrobras
        "4": "2243088",  # Realengo
        "5": "225443130",  # Sao Cristovao
    }

    dataframe = pd.DataFrame()
    for key, value in estacoes.items():
        url = f"http://200.20.53.8/alertadecheias/{value}.xlsx"
        dataframe_temp = pd.read_excel(url)
        dataframe_temp["id_estacao"] = key
        dataframe = pd.concat([dataframe, dataframe_temp])
    return dataframe

Download data from API

def save_data(dataframe: pandas.core.frame.DataFrame, folder_name: str = None) ‑> str | pathlib.Path
Expand source code
@task
def save_data(dataframe: pd.DataFrame, folder_name: str = None) -> Union[str, Path]:
    """
    Save data on a csv file to be uploaded to GCP
    """

    prepath = Path("/tmp/precipitacao")
    if folder_name:
        prepath = Path("/tmp/precipitacao") / folder_name
    prepath.mkdir(parents=True, exist_ok=True)

    log(f"Start saving data on {prepath}")
    log(f"Data to be saved {dataframe.head()}")

    partition_column = "data_medicao"
    dataframe, partitions = parse_date_columns(dataframe, partition_column)
    current_time = pendulum.now("America/Sao_Paulo").strftime("%Y%m%d%H%M")

    to_partitions(
        data=dataframe,
        partition_columns=partitions,
        savepath=prepath,
        data_type="csv",
        suffix=current_time,
    )
    log(f"[DEBUG] Files saved on {prepath}")
    return prepath

Save data on a csv file to be uploaded to GCP

def treat_data(dataframe: pandas.core.frame.DataFrame,
dataset_id: str,
table_id: str,
mode: str = 'dev') ‑> Tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame]
Expand source code
@task(
    nout=2,
    max_retries=constants.TASK_MAX_RETRIES.value,
    retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def treat_data(
    dataframe: pd.DataFrame, dataset_id: str, table_id: str, mode: str = "dev"
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Rename cols and filter data using Redis
    """

    dataframe["data_medicao"] = (
        pd.to_datetime(dataframe.Data, format="%d/%m/%Y").dt.strftime("%Y-%m-%d")
        + " "
        + dataframe["Hora"]
        + ":00"
    )

    rename_cols = {
        "Chuva Último dado": "acumulado_chuva_15_min",
        " Chuva Acumulada 1H": "acumulado_chuva_1_h",
        " Chuva Acumulada 4H": "acumulado_chuva_4_h",
        " Chuva Acumulada 24H": "acumulado_chuva_24_h",
        " Chuva Acumulada 96H": "acumulado_chuva_96_h",
        " Chuva Acumulada 30D": "acumulado_chuva_30_d",
        " Último Nível": "altura_agua",
    }
    dataframe.rename(columns=rename_cols, inplace=True)

    # replace all "Dado Nulo" to nan
    dataframe.replace({"Dado Nulo": np.nan}, inplace=True)

    # Eliminate where the id_estacao is the same keeping the smallest one
    dataframe.sort_values(
        ["id_estacao", "data_medicao"] + list(rename_cols.values()), inplace=True
    )
    dataframe.drop_duplicates(subset=["id_estacao", "data_medicao"], keep="first")

    date_format = "%Y-%m-%d %H:%M:%S"
    # dataframe["data_medicao"] = dataframe["data_medicao"].dt.strftime(date_format)

    log(f"Dataframe before comparing with last data saved on redis {dataframe.head()}")
    log(f"Dataframe before comparing {dataframe[dataframe['id_estacao']=='1']}")

    dataframe = save_updated_rows_on_redis(
        dataframe,
        dataset_id,
        table_id,
        unique_id="id_estacao",
        date_column="data_medicao",
        date_format=date_format,
        mode=mode,
    )

    log(f"Dataframe after comparing with last data saved on redis {dataframe.head()}")
    log(f"Dataframe after comparing {dataframe[dataframe['id_estacao']=='1']}")

    # If df is empty stop flow
    if dataframe.shape[0] == 0:
        skip_text = "No new data available on API"
        log(skip_text)
        raise ENDRUN(state=Skipped(skip_text))

    pluviometric_cols = [
        "id_estacao",
        "data_medicao",
        "acumulado_chuva_15_min",
        "acumulado_chuva_1_h",
        "acumulado_chuva_4_h",
        "acumulado_chuva_24_h",
        "acumulado_chuva_96_h",
        "acumulado_chuva_30_d",
    ]
    fluviometric_cols = ["id_estacao", "data_medicao", "altura_agua"]

    dfr_pluviometric = dataframe[pluviometric_cols].copy()
    dfr_fluviometric = dataframe.loc[
        dataframe["altura_agua"] != "Estação pluviométrica", fluviometric_cols
    ].copy()

    # Replace all values bigger than 10000 on "altura_agua" to nan
    dfr_fluviometric.loc[
        dfr_fluviometric["altura_agua"] > 10000, "altura_agua"
    ] = np.nan

    fluviometric_cols_order = [
        "id_estacao",
        "data_medicao",
        "altura_agua",
    ]
    dfr_fluviometric = dfr_fluviometric[fluviometric_cols_order].copy()

    return dfr_pluviometric, dfr_fluviometric

Rename cols and filter data using Redis

def wait_task() ‑> None
Expand source code
@task(skip_on_upstream_skip=False)
def wait_task() -> None:
    """Task create because prefect was messing up paths to be saved on each table"""
    log("End waiting pluviometric task to end.")

Task create because prefect was messing up paths to be saved on each table