Module pipelines.rj_cor.tasks
Common Tasks for rj-cor
Functions
def get_on_redis(dataset_id: str, table_id: str, mode: str = 'prod', wait=None) ‑> list
-
Expand source code
@task def get_on_redis( dataset_id: str, table_id: str, mode: str = "prod", wait=None, ) -> list: """ Get filenames saved on Redis. """ redis_client = get_redis_client() key = build_redis_key(dataset_id, table_id, "files", mode) files_on_redis = redis_client.get(key) files_on_redis = [] if files_on_redis is None else files_on_redis files_on_redis = list(set(files_on_redis)) files_on_redis.sort() return files_on_redis
Get filenames saved on Redis.
def save_on_redis(dataset_id: str,
table_id: str,
mode: str = 'prod',
files: list = [],
keep_last: int = 50,
wait=None) ‑> None-
Expand source code
@task(trigger=all_successful) def save_on_redis( dataset_id: str, table_id: str, mode: str = "prod", files: list = [], keep_last: int = 50, wait=None, ) -> None: """ Set the last updated time on Redis. """ redis_client = get_redis_client() key = build_redis_key(dataset_id, table_id, "files", mode) files = list(set(files)) print(">>>> save on redis files ", files) files.sort() files = files[-keep_last:] redis_client.set(key, files)
Set the last updated time on Redis.