Module pipelines.rj_cor.tasks

Common Tasks for rj-cor

Functions

def get_on_redis(dataset_id: str, table_id: str, mode: str = 'prod', wait=None) ‑> list
Expand source code
@task
def get_on_redis(
    dataset_id: str,
    table_id: str,
    mode: str = "prod",
    wait=None,
) -> list:
    """
    Get filenames saved on Redis.
    """
    redis_client = get_redis_client()
    key = build_redis_key(dataset_id, table_id, "files", mode)
    files_on_redis = redis_client.get(key)
    files_on_redis = [] if files_on_redis is None else files_on_redis
    files_on_redis = list(set(files_on_redis))
    files_on_redis.sort()
    return files_on_redis

Get filenames saved on Redis.

def save_on_redis(dataset_id: str,
table_id: str,
mode: str = 'prod',
files: list = [],
keep_last: int = 50,
wait=None) ‑> None
Expand source code
@task(trigger=all_successful)
def save_on_redis(
    dataset_id: str,
    table_id: str,
    mode: str = "prod",
    files: list = [],
    keep_last: int = 50,
    wait=None,
) -> None:
    """
    Set the last updated time on Redis.
    """
    redis_client = get_redis_client()
    key = build_redis_key(dataset_id, table_id, "files", mode)
    files = list(set(files))
    print(">>>> save on redis files ", files)
    files.sort()
    files = files[-keep_last:]
    redis_client.set(key, files)

Set the last updated time on Redis.