Module pipelines.rj_cor.meteorologia.utils

General utilities for meteorologia.

Functions

def save_updated_rows_on_redis(dfr: pandas.core.frame.DataFrame,
dataset_id: str,
table_id: str,
unique_id: str = 'id_estacao',
date_column: str = 'data_medicao',
date_format: str = '%Y-%m-%d %H:%M:%S',
mode: str = 'prod') ‑> pandas.core.frame.DataFrame
Expand source code
def save_updated_rows_on_redis(
    dfr: pd.DataFrame,
    dataset_id: str,
    table_id: str,
    unique_id: str = "id_estacao",
    date_column: str = "data_medicao",
    date_format: str = "%Y-%m-%d %H:%M:%S",
    mode: str = "prod",
) -> pd.DataFrame:
    """
    Acess redis to get the last time each unique_id was updated, return
    updated unique_id as a DataFrame and save new dates on redis
    """

    redis_client = get_redis_client()

    key = mode + "." + dataset_id + "." + table_id

    # Access all data saved on redis with this key
    updates = redis_client.hgetall(key)

    # Convert data in dictionary in format with unique_id in key and last updated time as value
    # Example > {"12": "2022-06-06 14:45:00"}
    updates = {k.decode("utf-8"): v.decode("utf-8") for k, v in updates.items()}

    # Convert dictionary to dfr
    updates = pd.DataFrame(updates.items(), columns=[unique_id, "last_update"])
    log(f">>> data saved in redis: {updates}")

    # dfr and updates need to have the same index, in our case unique_id
    missing_in_dfr = [
        i for i in updates[unique_id].unique() if i not in dfr[unique_id].unique()
    ]
    missing_in_updates = [
        i for i in dfr[unique_id].unique() if i not in updates[unique_id].unique()
    ]

    # If unique_id doesn't exists on updates we create a fake date for this station on updates
    if len(missing_in_updates) > 0:
        for i in missing_in_updates:
            updates = updates.append(
                {unique_id: i, "last_update": "1900-01-01 00:00:00"},
                ignore_index=True,
            )

    # If unique_id doesn't exists on dfr we remove this stations from updates
    if len(missing_in_dfr) > 0:
        updates = updates[~updates[unique_id].isin(missing_in_dfr)]

    # Merge dfs using unique_id
    dfr = dfr.merge(updates, how="left", on=unique_id)

    # Keep on dfr only the stations that has a time after the one that is saved on redis
    dfr[date_column] = dfr[date_column].apply(pd.to_datetime, format=date_format)
    dfr["last_update"] = dfr["last_update"].apply(
        pd.to_datetime, format="%Y-%m-%d %H:%M:%S"
    )
    dfr = dfr[dfr[date_column] > dfr["last_update"]].dropna(subset=[unique_id])

    # Keep only the last date for each unique_id
    keep_cols = [unique_id, date_column]
    new_updates = dfr[keep_cols].sort_values(keep_cols)
    new_updates = new_updates.groupby(unique_id, as_index=False).tail(1)
    new_updates[date_column] = new_updates[date_column].astype(str)

    # Convert stations with the new updates dates in a dictionary
    new_updates = dict(zip(new_updates[unique_id], new_updates[date_column]))
    log(f">>> data to save in redis as a dict: {new_updates}")

    # Save this new information on redis
    [redis_client.hset(key, k, v) for k, v in new_updates.items()]

    return dfr.reset_index()

Acess redis to get the last time each unique_id was updated, return updated unique_id as a DataFrame and save new dates on redis