Module pipelines.utils.execute_dbt_model.tasks
Tasks related to DBT flows.
def get_k8s_dbt_client(mode: str = 'dev', wait=None) ‑> dbt_client.dbt_client.DbtClient
Expand source code
@task( checkpoint=False, max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) def get_k8s_dbt_client( mode: str = "dev", wait=None, ) -> DbtClient: """ Get a DBT client for the Kubernetes cluster. """ if mode not in ["dev", "prod"]: raise ValueError(f"Invalid mode: {mode}") return get_dbt_client( host=f"dbt-rpc-{mode}", )
Get a DBT client for the Kubernetes cluster.
def is_running_at_datario(current_flow_labels: List[str]) ‑> bool
Expand source code
@task(checkpoint=False) def is_running_at_datario(current_flow_labels: List[str]) -> bool: """ Check if the current flow is running at datario agent. """ return "datario" in current_flow_labels
Check if the current flow is running at datario agent.
def is_valid_dictionary(dictionary: Dict[Any, Any]) ‑> Dict[str, Any]
Expand source code
@task def is_valid_dictionary(dictionary: Dict[Any, Any]) -> Dict[str, Any]: """ Checks whether the dictionary is in the format Dict[str, str]. """ if not isinstance(dictionary, dict): raise ValueError("The dictionary is not a dictionary.") if not all(isinstance(key, str) for key in dictionary.keys()): raise ValueError("The dictionary keys are not strings.") return dictionary
Checks whether the dictionary is in the format Dict[str, str].
def merge_dictionaries(dict1: Dict[str, Any], dict2: Dict[str, Any]) ‑> Dict[str, Any]
Expand source code
@task def merge_dictionaries(dict1: Dict[str, Any], dict2: Dict[str, Any]) -> Dict[str, Any]: """ Merge dictionaries. """ return {**dict1, **dict2}
Merge dictionaries.
def model_parameters_from_secrets(dictionary: Dict[str, str]) ‑> Dict[str, Any]
Expand source code
@task def model_parameters_from_secrets(dictionary: Dict[str, str]) -> Dict[str, Any]: """ Gets model parameters from Vault secrets. Args: dictionary (Dict[str, str]): Dictionary with the parameter name as key and the secret path as value. The secret must contain a single key called "value" with the parameter value. Returns: Dict[str, Any]: Dictionary with the parameter name as key and the secret value as value. """ return { key: get_vault_secret(value)["data"]["value"] for key, value in dictionary.items() }
Gets model parameters from Vault secrets.
:Dict[str, str]
- Dictionary with the parameter name as key and the secret path
as value. The secret must contain a single key called "value" with the parameter value.
Dict[str, Any]
- Dictionary with the parameter name as key and the secret value as value.
def run_dbt_model(dbt_client: dbt_client.dbt_client.DbtClient,
dataset_id: str,
table_id: str = None,
dbt_alias: bool = False,
upstream: bool = None,
downstream: bool = None,
exclude: str = None,
flags: str = None,
sync: bool = True,
Expand source code
@task( checkpoint=False, max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) # pylint: disable=too-many-arguments, too-many-locals def run_dbt_model( dbt_client: DbtClient, dataset_id: str, table_id: str = None, dbt_alias: bool = False, upstream: bool = None, downstream: bool = None, exclude: str = None, flags: str = None, _vars: Union[dict, List[Dict]] = None, sync: bool = True, wait=None, ): """ Run a DBT model. Args: dbt_client (DbtClient): DBT client. dataset_id (str): Dataset ID of the dbt model. table_id (str, optional): Table ID of the dbt model. If None, the whole dataset will be run. dbt_alias (bool, optional): If True, the model will be run by its alias. Defaults to False. upstream (bool, optional): If True, the upstream models will be run. downstream (bool, optional): If True, the downstream models will be run. exclude (str, optional): Models to exclude from the run. flags (str, optional): Flags to pass to the dbt run command. See: _vars (Union[dict, List[Dict]], optional): Variables to pass to dbt. Defaults to None. sync (bool, optional): Whether to run the command synchronously. """ # Set models and upstream/downstream for dbt run_command = "dbt run --select " if upstream: run_command += "+" if table_id: if dbt_alias: table_id = f"{dataset_id}.{dataset_id}__{table_id}" else: table_id = f"{dataset_id}.{table_id}" else: table_id = dataset_id run_command += f"{table_id}" if downstream: run_command += "+" if exclude: run_command += f" --exclude {exclude}" if _vars: if isinstance(_vars, list): vars_dict = {} for elem in _vars: vars_dict.update(elem) vars_str = f'"{vars_dict}"' run_command += f" --vars {vars_str}" else: vars_str = f'"{_vars}"' run_command += f" --vars {vars_str}" if flags: run_command += f" {flags}" log(f"Will run the following command:\n{run_command}") logs_dict = dbt_client.cli( run_command, sync=sync, logs=True, ) parse_dbt_logs(logs_dict, log_queries=True) return log("Finished running dbt model")
Run a DBT model.
- DBT client.
- Dataset ID of the dbt model.
, optional- Table ID of the dbt model. If None, the
- whole dataset will be run.
, optional- If True, the model will be run by
- its alias. Defaults to False.
, optional- If True, the upstream models will be run.
, optional- If True, the downstream models will
- be run.
, optional- Models to exclude from the run.
, optional- Flags to pass to the dbt run command.
- See:
:Union[dict, List[Dict]]
, optional- Variables to pass to
- dbt. Defaults to None.
, optional- Whether to run the command synchronously.