Module pipelines.rj_cor.utils

Utils for rj-cor

Functions

def build_redis_key(dataset_id: str, table_id: str, name: str, mode: str = 'prod')
Expand source code
def build_redis_key(dataset_id: str, table_id: str, name: str, mode: str = "prod"):
    """
    Helper function for building a key where to store the last updated time
    """
    key = mode + "." + dataset_id + "." + table_id + "." + name
    return key

Helper function for building a key where to store the last updated time

def compare_actual_df_with_redis_df(dfr: pandas.core.frame.DataFrame,
dfr_redis: pandas.core.frame.DataFrame,
columns: list) ‑> pandas.core.frame.DataFrame
Expand source code
def compare_actual_df_with_redis_df(
    dfr: pd.DataFrame,
    dfr_redis: pd.DataFrame,
    columns: list,
) -> pd.DataFrame:
    """
    Compare df from redis to actual df and return only the rows from actual df
    that are not already saved on redis.
    """
    for col in columns:
        if col not in dfr_redis.columns:
            dfr_redis[col] = None
        dfr_redis[col] = dfr_redis[col].astype(dfr[col].dtypes)
    log(f"\nEnded conversion types from dfr to dfr_redis: \n{dfr_redis.dtypes}")

    dfr_diff = (
        pd.merge(dfr, dfr_redis, how="left", on=columns, indicator=True)
        .query('_merge == "left_only"')
        .drop("_merge", axis=1)
    )
    log(
        f"\nDf resulted from the difference between dft_redis and dfr: \n{dfr_diff.head()}"
    )

    updated_dfr_redis = pd.concat([dfr_redis, dfr_diff[columns]])

    return dfr_diff, updated_dfr_redis

Compare df from redis to actual df and return only the rows from actual df that are not already saved on redis.

def get_redis_output(redis_key, is_df: bool = False)
Expand source code
def get_redis_output(redis_key, is_df: bool = False):
    """
    Get Redis output. Use get to obtain a df from redis or hgetall if is a key value pair.
    """
    redis_client = get_redis_client()  # (host="127.0.0.1")

    if is_df:
        json_data = redis_client.get(redis_key)
        log(f"[DEGUB] json_data {json_data}")
        if json_data:
            # If data is found, parse the JSON string back to a Python object (dictionary)
            data_dict = json.loads(json_data)
            # Convert the dictionary back to a DataFrame
            return pd.DataFrame(data_dict)

        return pd.DataFrame()

    output = redis_client.hgetall(redis_key)
    if len(output) > 0:
        output = treat_redis_output(output)
    return output

Get Redis output. Use get to obtain a df from redis or hgetall if is a key value pair.