Module pipelines.utils.tasks
Helper tasks that could fit any pipeline.
Functions
def check_table_exists(dataset_id: str, table_id: str, wait=None) ‑> bool
-
Expand source code
@task( max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) def check_table_exists( dataset_id: str, table_id: str, wait=None, # pylint: disable=unused-argument ) -> bool: """ Check if table exists in staging on GCP """ # pylint: disable=C0103 tb = bd.Table(dataset_id=dataset_id, table_id=table_id) exists = tb.table_exists(mode="staging") log(f"Table {dataset_id}.{table_id} exists in staging: {exists}") return exists
Check if table exists in staging on GCP
def create_table_and_upload_to_gcs(data_path: str | pathlib.Path,
dataset_id: str,
table_id: str,
dump_mode: str,
biglake_table: bool = True,
wait=None) ‑> None-
Expand source code
@task( max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) def create_table_and_upload_to_gcs( data_path: Union[str, Path], dataset_id: str, table_id: str, dump_mode: str, biglake_table: bool = True, wait=None, # pylint: disable=unused-argument ) -> None: """ Create table using BD+ and upload to GCS. """ bd_version = bd.__version__ log(f"USING BASEDOSDADOS {bd_version}") # pylint: disable=C0103 tb = bd.Table(dataset_id=dataset_id, table_id=table_id) table_staging = f"{tb.table_full_name['staging']}" # pylint: disable=C0103 st = bd.Storage(dataset_id=dataset_id, table_id=table_id) storage_path = f"{st.bucket_name}.staging.{dataset_id}.{table_id}" storage_path_link = ( f"https://console.cloud.google.com/storage/browser/{st.bucket_name}" f"/staging/{dataset_id}/{table_id}" ) # prod datasets is public if the project is datario. staging are private im both projects dataset_is_public = tb.client["bigquery_prod"].project == "datario" ##################################### # # MANAGEMENT OF TABLE CREATION # ##################################### log("STARTING TABLE CREATION MANAGEMENT") if dump_mode == "append": if tb.table_exists(mode="staging"): log( f"MODE APPEND: Table ALREADY EXISTS:" f"\n{table_staging}" f"\n{storage_path_link}" ) else: # the header is needed to create a table when dosen't exist log("MODE APPEND: Table DOSEN'T EXISTS\nStart to CREATE HEADER file") header_path = dump_header_to_file(data_path=data_path) log("MODE APPEND: Created HEADER file:\n" f"{header_path}") tb.create( path=header_path, if_storage_data_exists="replace", if_table_exists="replace", biglake_table=biglake_table, dataset_is_public=dataset_is_public, ) log( "MODE APPEND: Sucessfully CREATED A NEW TABLE:\n" f"{table_staging}\n" f"{storage_path_link}" ) # pylint: disable=C0301 st.delete_table( mode="staging", bucket_name=st.bucket_name, not_found_ok=True ) log( "MODE APPEND: Sucessfully REMOVED HEADER DATA from Storage:\n" f"{storage_path}\n" f"{storage_path_link}" ) # pylint: disable=C0301 elif dump_mode == "overwrite": if tb.table_exists(mode="staging"): log( "MODE OVERWRITE: Table ALREADY EXISTS, DELETING OLD DATA!\n" f"{storage_path}\n" f"{storage_path_link}" ) # pylint: disable=C0301 st.delete_table( mode="staging", bucket_name=st.bucket_name, not_found_ok=True ) log( "MODE OVERWRITE: Sucessfully DELETED OLD DATA from Storage:\n" f"{storage_path}\n" f"{storage_path_link}" ) # pylint: disable=C0301 # delete only staging table and let DBT overwrite the prod table tb.delete(mode="staging") log( "MODE OVERWRITE: Sucessfully DELETED TABLE:\n" f"{table_staging}\n" ) # pylint: disable=C0301 # the header is needed to create a table when dosen't exist # in overwrite mode the header is always created log("MODE OVERWRITE: Table DOSEN'T EXISTS\nStart to CREATE HEADER file") header_path = dump_header_to_file(data_path=data_path) log("MODE OVERWRITE: Created HEADER file:\n" f"{header_path}") tb.create( path=header_path, if_storage_data_exists="replace", if_table_exists="replace", biglake_table=biglake_table, dataset_is_public=dataset_is_public, ) log( "MODE OVERWRITE: Sucessfully CREATED TABLE\n" f"{table_staging}\n" f"{storage_path_link}" ) st.delete_table(mode="staging", bucket_name=st.bucket_name, not_found_ok=True) log( f"MODE OVERWRITE: Sucessfully REMOVED HEADER DATA from Storage\n:" f"{storage_path}\n" f"{storage_path_link}" ) # pylint: disable=C0301 ##################################### # # Uploads a bunch of files using BD+ # ##################################### log("STARTING UPLOAD TO GCS") if tb.table_exists(mode="staging"): # the name of the files need to be the same or the data doesn't get overwritten tb.append(filepath=data_path, if_exists="replace") log( f"STEP UPLOAD: Successfully uploaded {data_path} to Storage:\n" f"{storage_path}\n" f"{storage_path_link}" ) else: # pylint: disable=C0301 log("STEP UPLOAD: Table does not exist in STAGING, need to create first") return data_path
Create table using BD+ and upload to GCS.
def get_current_flow_labels() ‑> List[str]
-
Expand source code
@task def get_current_flow_labels() -> List[str]: """ Get the labels of the current flow. """ flow_run_id = prefect.context.get("flow_run_id") flow_run_view = FlowRunView.from_flow_run_id(flow_run_id) return flow_run_view.labels
Get the labels of the current flow.
def get_current_flow_mode(labels: List[str]) ‑> str
-
Expand source code
@task def get_current_flow_mode(labels: List[str]) -> str: """ Get the mode (prod/dev/staging) of the current flow. """ if labels[0].endswith("-dev"): return "dev" if labels[0].endswith("-staging"): return "staging" return "prod"
Get the mode (prod/dev/staging) of the current flow.
def get_now_date()
-
Expand source code
@prefect.task(checkpoint=False) def get_now_date(): """ Returns the current date in YYYY-MM-DD. """ now = pendulum.now(pendulum.timezone("America/Sao_Paulo")) return now.to_date_string()
Returns the current date in YYYY-MM-DD.
def get_now_time()
-
Expand source code
@prefect.task(checkpoint=False) def get_now_time(): """ Returns the HH:MM. """ now = pendulum.now(pendulum.timezone("America/Sao_Paulo")) return f"{now.hour}:{f'0{now.minute}' if len(str(now.minute))==1 else now.minute}"
Returns the HH:MM.
def get_user_and_password(secret_path: str, wait=None)
-
Expand source code
@task(checkpoint=False, nout=2) def get_user_and_password(secret_path: str, wait=None): """ Returns the user and password for the given secret path. """ log(f"Getting user and password for secret path: {secret_path}") return get_username_and_password_from_secret(secret_path)
Returns the user and password for the given secret path.
def greater_than(value, compare_to) ‑> bool
-
Expand source code
@task def greater_than(value, compare_to) -> bool: """ Returns True if value is greater than compare_to. """ return value > compare_to
Returns True if value is greater than compare_to.
def log_task(msg: Any, level: str = 'info', wait=None)
-
Expand source code
@prefect.task(checkpoint=False) def log_task(msg: Any, level: str = "info", wait=None): """ Logs a message to prefect's logger. """ log(msg, level)
Logs a message to prefect's logger.
def rename_current_flow_run_dataset_table(prefix: str, dataset_id, table_id, wait=None) ‑> None
-
Expand source code
@task def rename_current_flow_run_dataset_table( prefix: str, dataset_id, table_id, wait=None ) -> None: """ Rename the current flow run. """ flow_run_id = prefect.context.get("flow_run_id") client = Client() return client.set_flow_run_name(flow_run_id, f"{prefix}{dataset_id}.{table_id}")
Rename the current flow run.
def rename_current_flow_run_msg(msg: str, wait=None) ‑> None
-
Expand source code
@task def rename_current_flow_run_msg(msg: str, wait=None) -> None: """ Rename the current flow run. """ flow_run_id = prefect.context.get("flow_run_id") client = Client() return client.set_flow_run_name(flow_run_id, msg)
Rename the current flow run.
def rename_current_flow_run_now_time(prefix: str, now_time=None, wait=None) ‑> None
-
Expand source code
@task def rename_current_flow_run_now_time(prefix: str, now_time=None, wait=None) -> None: """ Rename the current flow run. """ flow_run_id = prefect.context.get("flow_run_id") client = Client() return client.set_flow_run_name(flow_run_id, f"{prefix}{now_time}")
Rename the current flow run.