Module pipelines.utils.dump_db.utils
Utilities for the Database Dump flows.
Functions
def build_query_new_columns(table_columns: List[str]) ‑> List[str]
-
Expand source code
def build_query_new_columns(table_columns: List[str]) -> List[str]: """ " Creates the query without accents. """ new_cols = remove_columns_accents(pd.DataFrame(columns=table_columns)) return "\n".join( [ f"{old_col} AS {new_col}," for old_col, new_col in zip(table_columns, new_cols) ] )
" Creates the query without accents.
def extract_last_partition_date(partitions_dict: dict, date_format: str)
-
Expand source code
def extract_last_partition_date(partitions_dict: dict, date_format: str): """ Extract last date from partitions folders """ last_partition_date = None for partition, values in partitions_dict.items(): new_values = [ date for date in values if is_date(date_string=date, date_format=date_format) ] try: last_partition_date = datetime.strptime( max(new_values), date_format ).strftime(date_format) log( f"last partition from {partition} is in date format " f"{date_format}: {last_partition_date}" ) except ValueError: log( f"partition {partition} is not a date or not in correct format {date_format}" ) return last_partition_date
Extract last date from partitions folders
def generate_dump_db_schedules(interval: datetime.timedelta,
start_date: datetime.datetime,
labels: List[str],
db_database: str,
db_host: str,
db_port: str | int,
db_type: str,
dataset_id: str,
vault_secret_path: str,
table_parameters: dict,
batch_size: int = 50000,
runs_interval_minutes: int = 15) ‑> List[prefect.schedules.clocks.IntervalClock]-
Expand source code
def generate_dump_db_schedules( # pylint: disable=too-many-arguments,too-many-locals interval: timedelta, start_date: datetime, labels: List[str], db_database: str, db_host: str, db_port: Union[str, int], db_type: str, dataset_id: str, vault_secret_path: str, table_parameters: dict, batch_size: int = 50000, runs_interval_minutes: int = 15, ) -> List[IntervalClock]: """ Generates multiple schedules for database dumping. """ db_port = str(db_port) clocks = [] for count, (table_id, parameters) in enumerate(table_parameters.items()): parameter_defaults = { "batch_size": batch_size, "vault_secret_path": vault_secret_path, "db_database": db_database, "db_host": db_host, "db_port": db_port, "db_type": db_type, "dataset_id": dataset_id, "table_id": table_id, "dump_mode": parameters["dump_mode"], "execute_query": query_to_line(parameters["execute_query"]), } # Add remaining parameters if value is not None for key, value in parameters.items(): if value is not None and key not in ["interval"]: parameter_defaults[key] = value if "dbt_alias" in parameters: parameter_defaults["dbt_alias"] = parameters["dbt_alias"] if "dataset_id" in parameters: parameter_defaults["dataset_id"] = parameters["dataset_id"] new_interval = parameters["interval"] if "interval" in parameters else interval clocks.append( IntervalClock( interval=new_interval, start_date=start_date + timedelta(minutes=runs_interval_minutes * count), labels=labels, parameter_defaults=parameter_defaults, ) ) return clocks
Generates multiple schedules for database dumping.