Module pipelines.utils.dump_url.utils
General purpose tasks for dumping data from URLs.
Functions
def generate_dump_url_schedules(interval: datetime.timedelta,
start_date: datetime.datetime,
labels: List[str],
dataset_id: str,
table_parameters: dict,
batch_data_type: str = 'csv',
runs_interval_minutes: int = 15) ‑> List[prefect.schedules.clocks.IntervalClock]-
Expand source code
def generate_dump_url_schedules( # pylint: disable=too-many-arguments,too-many-locals interval: timedelta, start_date: datetime, labels: List[str], # db_database: str, # db_host: str, # db_port: Union[str, int], # db_type: str, dataset_id: str, # vault_secret_path: str, table_parameters: dict, batch_data_type: str = "csv", runs_interval_minutes: int = 15, ) -> List[IntervalClock]: """ Generates multiple schedules for url dumping. """ # db_port = str(db_port) clocks = [] for count, (table_id, parameters) in enumerate(table_parameters.items()): parameter_defaults = { "batch_data_type": batch_data_type, "url": parameters["url"], "url_type": parameters["url_type"], "dataset_id": dataset_id if dataset_id != "" else parameters["dataset_id"], "table_id": table_id, "dump_mode": parameters["dump_mode"], # "vault_secret_path": vault_secret_path, # "db_database": db_database, # "db_host": db_host, # "db_port": db_port, # "db_type": db_type, # "execute_query": query_to_line(parameters["execute_query"]), } if "gsheets_sheet_order" in parameters: parameter_defaults["gsheets_sheet_order"] = parameters[ "gsheets_sheet_order" ] if "gsheets_sheet_name" in parameters: parameter_defaults["gsheets_sheet_name"] = parameters["gsheets_sheet_name"] if "gsheets_sheet_range" in parameters: parameter_defaults["gsheets_sheet_range"] = parameters[ "gsheets_sheet_range" ] if "partition_columns" in parameters: parameter_defaults["partition_columns"] = parameters["partition_columns"] if "materialize_after_dump" in parameters: parameter_defaults["materialize_after_dump"] = parameters[ "materialize_after_dump" ] if "materialization_mode" in parameters: parameter_defaults["materialization_mode"] = parameters[ "materialization_mode" ] if "materialize_to_datario" in parameters: parameter_defaults["materialize_to_datario"] = parameters[ "materialize_to_datario" ] if "encoding" in parameters: parameter_defaults["encoding"] = parameters["encoding"] if "on_bad_lines" in parameters: parameter_defaults["on_bad_lines"] = parameters["on_bad_lines"] if "separator" in parameters: parameter_defaults["separator"] = parameters["separator"] # if "dbt_model_secret_parameters" in parameters: # parameter_defaults["dbt_model_secret_parameters"] = parameters[ # "dbt_model_secret_parameters" # ] # if "partition_date_format" in parameters: # parameter_defaults["partition_date_format"] = parameters[ # "partition_date_format" # ] # if "lower_bound_date" in parameters: # parameter_defaults["lower_bound_date"] = parameters["lower_bound_date"] new_interval = parameters["interval"] if "interval" in parameters else interval clocks.append( IntervalClock( interval=new_interval, start_date=start_date + timedelta(minutes=runs_interval_minutes * count), labels=labels, parameter_defaults=parameter_defaults, ) ) return clocks
Generates multiple schedules for url dumping.
def handle_dataframe_chunk(dataframe: pandas.core.frame.DataFrame,
save_path: str,
partition_columns: List[str],
event_id: str,
idx: int,
build_json_dataframe: bool = False,
dataframe_key_column: str = None)-
Expand source code
def handle_dataframe_chunk( dataframe: pd.DataFrame, save_path: str, partition_columns: List[str], event_id: str, idx: int, build_json_dataframe: bool = False, dataframe_key_column: str = None, ): """ Handles a chunk of dataframe. """ if not partition_columns or partition_columns[0] == "": partition_column = None else: partition_column = partition_columns[0] old_columns = dataframe.columns.tolist() dataframe.columns = remove_columns_accents(dataframe) new_columns_dict = dict(zip(old_columns, dataframe.columns.tolist())) if idx == 0: if partition_column: log( f"Partition column: {partition_column} FOUND!! Write to partitioned files" ) else: log("NO partition column specified! Writing unique files") log(f"New columns without accents: {new_columns_dict}") dataframe = clean_dataframe(dataframe) if partition_column: dataframe, date_partition_columns = parse_date_columns( dataframe, new_columns_dict[partition_column] ) partitions = date_partition_columns + [ new_columns_dict[col] for col in partition_columns[1:] ] to_partitions( data=dataframe, partition_columns=partitions, savepath=save_path, data_type="csv", build_json_dataframe=build_json_dataframe, dataframe_key_column=dataframe_key_column, ) else: dataframe_to_csv( dataframe=dataframe, path=Path(save_path) / f"{event_id}-{idx}.csv", build_json_dataframe=build_json_dataframe, dataframe_key_column=dataframe_key_column, )
Handles a chunk of dataframe.