Module pipelines.utils.dump_db.tasks

General purpose tasks for dumping database data.

Functions

def database_execute(database: Database, query: str, wait=None, flow_name: str = None, labels: List[str] = None, dataset_id: str = None, table_id: str = None) ‑> None

Executes a query on the database.

Args

database
The database object.
query
The query to execute.
def database_fetch(database: Database, batch_size: str, wait=None, flow_name: str = None, labels: List[str] = None, dataset_id: str = None, table_id: str = None)

Fetches the results of a query on the database.

def database_get(database_type: str, hostname: str, port: int, user: str, password: str, database: str, wait=None) ‑> Database

Returns a database object.

Args

database_type
The type of the database.
hostname
The hostname of the database.
port
The port of the database.
user
The username of the database.
password
The password of the database.
database
The database name.

Returns

A database object.

def dump_batches_to_file(database: Database, batch_size: int, prepath: Union[str, pathlib.Path], partition_columns: List[str] = None, batch_data_type: str = 'csv', wait=None, flow_name: str = None, labels: List[str] = None, dataset_id: str = None, table_id: str = None) ‑> pathlib.Path

Dumps batches of data to FILE.

def dump_upload_batch(database: Database, batch_size: int, dataset_id: str, table_id: str, dump_mode: str, partition_columns: List[str] = None, batch_data_type: str = 'csv', biglake_table: bool = True, log_number_of_batches: int = 100)

This task will dump and upload batches of data, sequentially.

def format_partitioned_query(query: str, dataset_id: str, table_id: str, database_type: str, partition_columns: List[str] = None, lower_bound_date: str = None, date_format: str = None, wait=None)

Formats a query for fetching partitioned data.

def parse_comma_separated_string_to_list(text: str) ‑> List[str]

Parses a comma separated string to a list.

Args

text
The text to parse.

Returns

A list of strings.