Module pipelines.utils.execute_dbt_model.utils
General utilities for interacting with dbt-rpc
Functions
def generate_execute_dbt_model_schedules(interval: datetime.timedelta,
start_date: datetime.datetime,
labels: List[str],
table_parameters: dict,
runs_interval_minutes: int = 15) ‑> List[prefect.schedules.clocks.IntervalClock]-
Expand source code
def generate_execute_dbt_model_schedules( # pylint: disable=too-many-arguments,too-many-locals interval: timedelta, start_date: datetime, labels: List[str], table_parameters: dict, runs_interval_minutes: int = 15, ) -> List[IntervalClock]: """ Generates multiple schedules for execute dbt model. """ clocks = [] for count, (table_id, parameters) in enumerate(table_parameters.items()): parameter_defaults = { "dataset_id": parameters["dataset_id"], "table_id": table_id, "mode": parameters["mode"], } clocks.append( IntervalClock( interval=interval, start_date=start_date + timedelta(minutes=runs_interval_minutes * count), labels=labels, parameter_defaults=parameter_defaults, ) ) return clocks
Generates multiple schedules for execute dbt model.
def get_dbt_client(host: str = 'dbt-rpc', port: int = 8580, jsonrpc_version: str = '2.0') ‑> dbt_client.dbt_client.DbtClient
-
Expand source code
def get_dbt_client( host: str = "dbt-rpc", port: int = 8580, jsonrpc_version: str = "2.0", ) -> DbtClient: """ Returns a DBT RPC client. Args: host: The hostname of the DBT RPC server. port: The port of the DBT RPC server. jsonrpc_version: The JSON-RPC version to use. Returns: A DBT RPC client. """ return DbtClient( host=host, port=port, jsonrpc_version=jsonrpc_version, )
Returns a DBT RPC client.
Args
host
- The hostname of the DBT RPC server.
port
- The port of the DBT RPC server.
jsonrpc_version
- The JSON-RPC version to use.
Returns
A DBT RPC client.
def parse_dbt_logs(logs_dict: dict, log_queries: bool = False)
-
Expand source code
def parse_dbt_logs(logs_dict: dict, log_queries: bool = False): """Parse dbt returned logs, to print only needed pieces. Args: logs_dict (dict): logs dict returned when running a DBT command via DbtClient.cli() with argument logs = True """ for event in logs_dict["result"]["logs"]: if event["levelname"] == "INFO" or event["levelname"] == "ERROR": log(f"#####{event['levelname']}#####") log(event["message"]) if event["levelname"] == "DEBUG" and log_queries: if "On model" in event["message"]: log(event["message"])
Parse dbt returned logs, to print only needed pieces.
Args
logs_dict
:dict
- logs dict returned when running a DBT
command via DbtClient.cli() with argument logs = True