Module pipelines.utils.dump_db.tasks
General purpose tasks for dumping database data.
Functions
def database_execute(database: Database, query: str, wait=None, flow_name: str = None, labels: List[str] = None, dataset_id: str = None, table_id: str = None) ‑> None
-
Executes a query on the database.
Args
database
- The database object.
query
- The query to execute.
def database_fetch(database: Database, batch_size: str, wait=None, flow_name: str = None, labels: List[str] = None, dataset_id: str = None, table_id: str = None)
-
Fetches the results of a query on the database.
def database_get(database_type: str, hostname: str, port: int, user: str, password: str, database: str, wait=None) ‑> Database
-
Returns a database object.
Args
database_type
- The type of the database.
hostname
- The hostname of the database.
port
- The port of the database.
user
- The username of the database.
password
- The password of the database.
database
- The database name.
Returns
A database object.
def dump_batches_to_file(database: Database, batch_size: int, prepath: Union[str, pathlib.Path], partition_columns: List[str] = None, batch_data_type: str = 'csv', wait=None, flow_name: str = None, labels: List[str] = None, dataset_id: str = None, table_id: str = None) ‑> pathlib.Path
-
Dumps batches of data to FILE.
def dump_upload_batch(database: Database, batch_size: int, dataset_id: str, table_id: str, dump_mode: str, partition_columns: List[str] = None, batch_data_type: str = 'csv', biglake_table: bool = True, log_number_of_batches: int = 100)
-
This task will dump and upload batches of data, sequentially.
def format_partitioned_query(query: str, dataset_id: str, table_id: str, database_type: str, partition_columns: List[str] = None, lower_bound_date: str = None, date_format: str = None, wait=None)
-
Formats a query for fetching partitioned data.
def parse_comma_separated_string_to_list(text: str) ‑> List[str]
-
Parses a comma separated string to a list.
Args
text
- The text to parse.
Returns
A list of strings.