Module pipelines.rj_smtr.tasks
Tasks for rj_smtr
Functions
def bq_upload(dataset_id: str,
table_id: str,
filepath: str,
raw_filepath: str = None,
partitions: str = None,
status: dict = None)-
Expand source code
@task def bq_upload( dataset_id: str, table_id: str, filepath: str, raw_filepath: str = None, partitions: str = None, status: dict = None, ): # pylint: disable=R0913 """ Upload raw and treated data to GCS and BigQuery. Args: dataset_id (str): dataset_id on BigQuery table_id (str): table_id on BigQuery filepath (str): Path to the saved treated .csv file raw_filepath (str, optional): Path to raw .json file. Defaults to None. partitions (str, optional): Partitioned directory structure, ie "ano=2022/mes=03/data=01". Defaults to None. status (dict, optional): Dict containing `error` key from upstream tasks. Returns: None """ log( f""" Received inputs: raw_filepath = {raw_filepath}, type = {type(raw_filepath)} treated_filepath = {filepath}, type = {type(filepath)} dataset_id = {dataset_id}, type = {type(dataset_id)} table_id = {table_id}, type = {type(table_id)} partitions = {partitions}, type = {type(partitions)} """ ) if status["error"] is not None: return status["error"] error = None try: # Upload raw to staging if raw_filepath: st_obj = Storage(table_id=table_id, dataset_id=dataset_id) log( f"""Uploading raw file to bucket {st_obj.bucket_name} at {st_obj.bucket_name}/{dataset_id}/{table_id}""" ) st_obj.upload( path=raw_filepath, partitions=partitions, mode="raw", if_exists="replace", ) # Creates and publish table if it does not exist, append to it otherwise create_or_append_table( dataset_id=dataset_id, table_id=table_id, path=filepath, partitions=partitions, ) except Exception: error = traceback.format_exc() log(f"[CATCHED] Task failed with error: \n{error}", level="error") return error
Upload raw and treated data to GCS and BigQuery.
Args
dataset_id
:str
- dataset_id on BigQuery
table_id
:str
- table_id on BigQuery
filepath
:str
- Path to the saved treated .csv file
raw_filepath
:str
, optional- Path to raw .json file. Defaults to None.
partitions
:str
, optional- Partitioned directory structure, ie "ano=2022/mes=03/data=01".
- Defaults to None.
status
:dict
, optional- Dict containing
error
key from
upstream tasks.
Returns
None
def bq_upload_from_dict(paths: dict, dataset_id: str, partition_levels: int = 1)
-
Expand source code
@task def bq_upload_from_dict(paths: dict, dataset_id: str, partition_levels: int = 1): """Upload multiple tables from a dict structured as {table_id: csv_path}. Present use case assumes table partitioned once. Adjust the parameter 'partition_levels' to best suit new uses. i.e. if your csv is saved as: <table_id>/date=<run_date>/<filename>.csv it has 1 level of partition. if your csv file is saved as: <table_id>/date=<run_date>/hour=<run_hour>/<filename>.csv it has 2 levels of partition Args: paths (dict): _description_ dataset_id (str): _description_ Returns: _type_: _description_ """ for key in paths.keys(): log("#" * 80) log(f"KEY = {key}") tb_dir = paths[key].parent # climb up the partition directories to reach the table dir for i in range(partition_levels): # pylint: disable=unused-variable tb_dir = tb_dir.parent log(f"tb_dir = {tb_dir}") create_or_append_table(dataset_id=dataset_id, table_id=key, path=tb_dir) log(f"Returning -> {tb_dir.parent}") return tb_dir.parent
Upload multiple tables from a dict structured as {table_id: csv_path}. Present use case assumes table partitioned once. Adjust the parameter 'partition_levels' to best suit new uses. i.e. if your csv is saved as:
/date= / .csv it has 1 level of partition. if your csv file is saved as: /date= /hour= / .csv it has 2 levels of partition Args
paths
:dict
- description
dataset_id
:str
- description
Returns
_type_
- description
def build_incremental_model(dbt_client: dbt_client.dbt_client.DbtClient,
dataset_id: str,
base_table_id: str,
mat_table_id: str,
field_name: str = 'data_versao',
refresh: bool = False,
wait=None)-
Expand source code
@task(max_retries=3, retry_delay=timedelta(seconds=10)) def build_incremental_model( # pylint: disable=too-many-arguments dbt_client: DbtClient, dataset_id: str, base_table_id: str, mat_table_id: str, field_name: str = "data_versao", refresh: bool = False, wait=None, # pylint: disable=unused-argument ): """ Utility task for backfilling table in predetermined steps. Assumes the step sizes will be defined on the .sql file. Args: dbt_client (DbtClient): DBT interface object dataset_id (str): Dataset id on BigQuery base_table_id (str): Base table from which to materialize (usually, an external table) mat_table_id (str): Target table id for materialization field_name (str, optional): Key field (column) for dbt incremental filters. Defaults to "data_versao". refresh (bool, optional): If True, rebuild the table from scratch. Defaults to False. wait (NoneType, optional): Placeholder parameter, used to wait previous tasks finish. Defaults to None. Returns: bool: whether the table was fully built or not. """ query_project_id = bq_project() last_mat_date = get_table_min_max_value( query_project_id, dataset_id, mat_table_id, field_name, "max" ) last_base_date = get_table_min_max_value( query_project_id, dataset_id, base_table_id, field_name, "max" ) log( f""" Base table last version: {last_base_date} Materialized table last version: {last_mat_date} """ ) run_command = f"run --select models/{dataset_id}/{mat_table_id}.sql" if refresh: log("Running in full refresh mode") log(f"DBT will run the following command:\n{run_command + ' --full-refresh'}") dbt_client.cli(run_command + " --full-refresh", sync=True) last_mat_date = get_table_min_max_value( query_project_id, dataset_id, mat_table_id, field_name, "max" ) if last_base_date > last_mat_date: log("Running interval step materialization") log(f"DBT will run the following command:\n{run_command}") while last_base_date > last_mat_date: running = dbt_client.cli(run_command, sync=True) last_mat_date = get_table_min_max_value( query_project_id, dataset_id, mat_table_id, field_name, "max", wait=running, ) log(f"After this step, materialized table last version is: {last_mat_date}") if last_mat_date == last_base_date: log("Materialized table reached base table version!") return True log("Did not run interval step materialization...") return False
Utility task for backfilling table in predetermined steps. Assumes the step sizes will be defined on the .sql file.
Args
dbt_client
:DbtClient
- DBT interface object
dataset_id
:str
- Dataset id on BigQuery
base_table_id
:str
- Base table from which to materialize (usually, an external table)
mat_table_id
:str
- Target table id for materialization
field_name
:str
, optional- Key field (column) for dbt incremental filters.
- Defaults to "data_versao".
refresh
:bool
, optional- If True, rebuild the table from scratch. Defaults to False.
wait
:NoneType
, optional- Placeholder parameter, used to wait previous tasks finish.
Defaults to None.
Returns
bool
- whether the table was fully built or not.
def check_mapped_query_logs_output(query_logs_output: list[tuple]) ‑> bool
-
Expand source code
@task def check_mapped_query_logs_output(query_logs_output: list[tuple]) -> bool: """ Task to check if there is recaptures pending Args: query_logs_output (list[tuple]): the return from a mapped query_logs execution Returns: bool: True if there is recaptures to do, otherwise False """ if len(query_logs_output) == 0: return False recapture_list = [i[0] for i in query_logs_output] return any(recapture_list)
Task to check if there is recaptures pending
Args
query_logs_output
:list[tuple]
- the return from a mapped query_logs execution
Returns
bool
- True if there is recaptures to do, otherwise False
def coalesce_task(value_list: Iterable)
-
Expand source code
@task(checkpoint=False) def coalesce_task(value_list: Iterable): """ Task to get the first non None value of a list Args: value_list (Iterable): a iterable object with the values Returns: any: value_list's first non None item """ try: return next(value for value in value_list if value is not None) except StopIteration: return None
Task to get the first non None value of a list
Args
value_list
:Iterable
- a iterable object with the values
Returns
any
- value_list's first non None item
def create_date_hour_partition(timestamp: datetime.datetime,
partition_date_name: str = 'data',
partition_date_only: bool = False) ‑> str-
Expand source code
@task def create_date_hour_partition( timestamp: datetime, partition_date_name: str = "data", partition_date_only: bool = False, ) -> str: """ Create a date (and hour) Hive partition structure from timestamp. Args: timestamp (datetime): timestamp to be used as reference partition_date_name (str, optional): partition name. Defaults to "data". partition_date_only (bool, optional): whether to add hour partition or not Returns: str: partition string """ partition = f"{partition_date_name}={timestamp.strftime('%Y-%m-%d')}" if not partition_date_only: partition += f"/hora={timestamp.strftime('%H')}" return partition
Create a date (and hour) Hive partition structure from timestamp.
Args
timestamp
:datetime
- timestamp to be used as reference
partition_date_name
:str
, optional- partition name. Defaults to "data".
partition_date_only
:bool
, optional- whether to add hour partition or not
Returns
str
- partition string
def create_dbt_run_vars(dataset_id: str,
dbt_vars: dict,
table_id: str,
raw_dataset_id: str,
raw_table_id: str,
mode: str,
timestamp: datetime.datetime) ‑> tuple[list[dict], list[dict] | dict | None, bool]-
Expand source code
@task(checkpoint=False, nout=3) def create_dbt_run_vars( dataset_id: str, dbt_vars: dict, table_id: str, raw_dataset_id: str, raw_table_id: str, mode: str, timestamp: datetime, ) -> tuple[list[dict], Union[list[dict], dict, None], bool]: """ Create the variables to be used in dbt materialization based on a dict Args: dataset_id (str): the dataset_id to get the variables dbt_vars (dict): dict containing the parameters table_id (str): the table_id get the date_range variable raw_dataset_id (str): the raw_dataset_id get the date_range variable raw_table_id (str): the raw_table_id get the date_range variable mode (str): the mode to get the date_range variable Returns: list[dict]: the variables to be used in DBT Union[list[dict], dict, None]: the date variable (date_range or run_date) bool: a flag that indicates if the date_range variable came from Redis """ log(f"Creating DBT variables. Parameter received: {dbt_vars}") if not dbt_vars: log("dbt_vars are blank. Skiping task...") return [None], None, False final_vars = [] date_var = None flag_date_range = False if "date_range" in dbt_vars.keys(): log("Creating date_range variable") # Set date_range variable manually if dict_contains_keys( dbt_vars["date_range"], ["date_range_start", "date_range_end"] ): date_var = { "date_range_start": dbt_vars["date_range"]["date_range_start"], "date_range_end": dbt_vars["date_range"]["date_range_end"], } # Create date_range using Redis else: if not table_id: log("table_id is blank. Skiping task...") return [None], None, False raw_table_id = raw_table_id or table_id date_var = get_materialization_date_range.run( dataset_id=dataset_id, table_id=dbt_vars["date_range"].get("table_alias", table_id), raw_dataset_id=raw_dataset_id, raw_table_id=raw_table_id, table_run_datetime_column_name=dbt_vars["date_range"].get( "table_run_datetime_column_name" ), mode=mode, delay_hours=dbt_vars["date_range"].get("delay_hours", 0), end_ts=timestamp, ) flag_date_range = True final_vars.append(date_var.copy()) log(f"date_range created: {date_var}") elif "run_date" in dbt_vars.keys(): log("Creating run_date variable") date_var = get_run_dates.run( date_range_start=dbt_vars["run_date"].get("date_range_start", False), date_range_end=dbt_vars["run_date"].get("date_range_end", False), day_datetime=timestamp, ) final_vars += [d.copy() for d in date_var] log(f"run_date created: {date_var}") elif "data_versao_gtfs" in dbt_vars.keys(): log("Creating data_versao_gtfs variable") date_var = {"data_versao_gtfs": dbt_vars["data_versao_gtfs"]} final_vars.append(date_var.copy()) if "version" in dbt_vars.keys(): log("Creating version variable") dataset_sha = fetch_dataset_sha.run(dataset_id=dataset_id) # if there are other variables inside the list, update each item adding the version variable if final_vars: final_vars = get_join_dict.run(dict_list=final_vars, new_dict=dataset_sha) else: final_vars.append(dataset_sha) log(f"version created: {dataset_sha}") log(f"All variables was created, final value is: {final_vars}") return final_vars, date_var, flag_date_range
Create the variables to be used in dbt materialization based on a dict
Args
dataset_id
:str
- the dataset_id to get the variables
dbt_vars
:dict
- dict containing the parameters
table_id
:str
- the table_id get the date_range variable
raw_dataset_id
:str
- the raw_dataset_id get the date_range variable
raw_table_id
:str
- the raw_table_id get the date_range variable
mode
:str
- the mode to get the date_range variable
Returns
list[dict]
- the variables to be used in DBT
Union[list[dict], dict, None]
- the date variable (date_range or run_date)
bool
- a flag that indicates if the date_range variable came from Redis
def create_local_partition_path(dataset_id: str, table_id: str, filename: str, partitions: str = None) ‑> str
-
Expand source code
@task def create_local_partition_path( dataset_id: str, table_id: str, filename: str, partitions: str = None ) -> str: """ Create the full path sctructure which to save data locally before upload. Args: dataset_id (str): dataset_id on BigQuery table_id (str): table_id on BigQuery filename (str, optional): Single csv name partitions (str, optional): Partitioned directory structure, ie "ano=2022/mes=03/data=01" Returns: str: String path having `mode` and `filetype` to be replaced afterwards, either to save raw or staging files. """ data_folder = os.getenv("DATA_FOLDER", "data") file_path = f"{os.getcwd()}/{data_folder}/{{mode}}/{dataset_id}/{table_id}" file_path += f"/{partitions}/{filename}.{{filetype}}" log(f"Creating file path: {file_path}") return file_path
Create the full path sctructure which to save data locally before upload.
Args
dataset_id
:str
- dataset_id on BigQuery
table_id
:str
- table_id on BigQuery
filename
:str
, optional- Single csv name
partitions
:str
, optional- Partitioned directory structure, ie "ano=2022/mes=03/data=01"
Returns
str
- String path having
mode
andfiletype
to be replaced afterwards,
either to save raw or staging files.
def create_request_params(extract_params: dict,
table_id: str,
dataset_id: str,
timestamp: datetime.datetime,
interval_minutes: int) ‑> tuple[str, str]-
Expand source code
@task(checkpoint=False, nout=2) def create_request_params( extract_params: dict, table_id: str, dataset_id: str, timestamp: datetime, interval_minutes: int, ) -> tuple[str, str]: """ Task to create request params Args: extract_params (dict): extract parameters table_id (str): table_id on BigQuery dataset_id (str): dataset_id on BigQuery timestamp (datetime): timestamp for flow run interval_minutes (int): interval in minutes between each capture Returns: request_params: host, database and query to request data request_url: url to request data """ request_params = None request_url = None if dataset_id == constants.BILHETAGEM_DATASET_ID.value: database = constants.BILHETAGEM_GENERAL_CAPTURE_PARAMS.value["databases"][ extract_params["database"] ] request_url = database["host"] request_params = { "database": extract_params["database"], "engine": database["engine"], "query": extract_params["query"], } if table_id == constants.BILHETAGEM_TRACKING_CAPTURE_PARAMS.value["table_id"]: project = bq_project(kind="bigquery_staging") log(f"project = {project}") try: logs_query = f""" SELECT timestamp_captura FROM `{project}.{dataset_id}_staging.{table_id}_logs` WHERE data <= '{timestamp.strftime("%Y-%m-%d")}' AND sucesso = "True" ORDER BY timestamp_captura DESC """ last_success_dates = bd.read_sql( query=logs_query, billing_project_id=project ) last_success_dates = last_success_dates.iloc[:, 0].to_list() for success_ts in last_success_dates: success_ts = datetime.fromisoformat(success_ts) last_id_query = f""" SELECT MAX(id) FROM `{project}.{dataset_id}_staging.{table_id}` WHERE data = '{success_ts.strftime("%Y-%m-%d")}' and hora = "{success_ts.strftime("%H")}"; """ last_captured_id = bd.read_sql( query=last_id_query, billing_project_id=project ) last_captured_id = last_captured_id.iloc[0][0] if last_captured_id is None: print("ID is None, trying next timestamp") else: log(f"last_captured_id = {last_captured_id}") break except GenericGBQException as err: if "404 Not found" in str(err): log("Table Not found, returning id = 0") last_captured_id = 0 request_params["query"] = request_params["query"].format( last_id=last_captured_id, max_id=int(last_captured_id) + extract_params["page_size"] * extract_params["max_pages"], ) request_params["page_size"] = extract_params["page_size"] request_params["max_pages"] = extract_params["max_pages"] else: if "get_updates" in extract_params.keys(): project = bq_project() log(f"project = {project}") columns_to_concat_bq = [ c.split(".")[-1] for c in extract_params["get_updates"] ] concat_arg = ",'_'," try: query = f""" SELECT CONCAT("'", {concat_arg.join(columns_to_concat_bq)}, "'") FROM `{project}.{dataset_id}_staging.{table_id}` """ log(query) last_values = bd.read_sql(query=query, billing_project_id=project) last_values = last_values.iloc[:, 0].to_list() last_values = ", ".join(last_values) update_condition = f"""CONCAT( {concat_arg.join(extract_params['get_updates'])} ) NOT IN ({last_values}) """ except GenericGBQException as err: if "404 Not found" in str(err): log("table not found, setting updates to 1=1") update_condition = "1=1" request_params["query"] = request_params["query"].format( update=update_condition ) datetime_range = get_datetime_range( timestamp=timestamp, interval=timedelta(minutes=interval_minutes) ) request_params["query"] = request_params["query"].format(**datetime_range) elif dataset_id == constants.GTFS_DATASET_ID.value: request_params = {"zip_filename": extract_params["filename"]} elif dataset_id == constants.SUBSIDIO_SPPO_RECURSOS_DATASET_ID.value: request_params = {} data_recurso = extract_params.get("data_recurso", timestamp) if isinstance(data_recurso, str): data_recurso = datetime.fromisoformat(data_recurso) extract_params["token"] = get_vault_secret( constants.SUBSIDIO_SPPO_RECURSO_API_SECRET_PATH.value )["data"]["token"] start = datetime.strftime( data_recurso - timedelta(minutes=interval_minutes), "%Y-%m-%dT%H:%M:%S.%MZ" ) end = datetime.strftime(data_recurso, "%Y-%m-%dT%H:%M:%S.%MZ") log(f" Start date {start}, end date {end}") service = constants.SUBSIDIO_SPPO_RECURSO_TABLE_CAPTURE_PARAMS.value[table_id] recurso_params = { "start": start, "end": end, "service": service, } extract_params["$filter"] = extract_params["$filter"].format(**recurso_params) request_params = extract_params request_url = constants.SUBSIDIO_SPPO_RECURSO_API_BASE_URL.value elif dataset_id == constants.STU_DATASET_ID.value: request_params = {"bucket_name": constants.STU_BUCKET_NAME.value} elif dataset_id == constants.VEICULO_DATASET_ID.value: request_url = get_vault_secret(extract_params["secret_path"])["data"][ "request_url" ] elif dataset_id == constants.VIAGEM_ZIRIX_RAW_DATASET_ID.value: request_url = f"{constants.ZIRIX_BASE_URL.value}/EnvioViagensConsolidadas" delay_minutes = extract_params["delay_minutes"] token = get_vault_secret(constants.ZIRIX_API_SECRET_PATH.value)["data"] token_key = list(token)[0] request_params = { "data_inicial": ( timestamp - timedelta(minutes=delay_minutes + interval_minutes) ).strftime("%Y-%m-%d %H:%M:%S"), "data_final": (timestamp - timedelta(minutes=delay_minutes)).strftime( "%Y-%m-%d %H:%M:%S" ), token_key: get_vault_secret(constants.ZIRIX_API_SECRET_PATH.value)["data"][ token_key ], } log( f"""Params: data_inicial: {request_params['data_inicial']} data_final: {request_params['data_final']}""" ) elif dataset_id == constants.CONTROLE_FINANCEIRO_DATASET_ID.value: request_url = extract_params["base_url"] + extract_params["sheet_id"] return request_params, request_url
Task to create request params
Args
extract_params
:dict
- extract parameters
table_id
:str
- table_id on BigQuery
dataset_id
:str
- dataset_id on BigQuery
timestamp
:datetime
- timestamp for flow run
interval_minutes
:int
- interval in minutes between each capture
Returns
request_params
- host, database and query to request data
request_url
- url to request data
def delay_now_time(timestamp: str, delay_minutes=6)
-
Expand source code
@task def delay_now_time(timestamp: str, delay_minutes=6): """Return timestamp string delayed by <delay_minutes> Args: timestamp (str): Isoformat timestamp string delay_minutes (int, optional): Minutes to delay timestamp by Defaults to 6. Returns: str : timestamp string formatted as "%Y-%m-%dT%H-%M-%S" """ ts_obj = datetime.fromisoformat(timestamp) ts_obj = ts_obj - timedelta(minutes=delay_minutes) return ts_obj.strftime("%Y-%m-%dT%H-%M-%S")
Return timestamp string delayed by
Args
timestamp
:str
- Isoformat timestamp string
delay_minutes
:int
, optional- Minutes to delay timestamp by Defaults to 6.
Returns
str
- timestamp string formatted as "%Y-%m-%dT%H-%M-%S"
def fetch_dataset_sha(dataset_id: str)
-
Expand source code
@task def fetch_dataset_sha(dataset_id: str): """Fetches the SHA of a branch from Github""" url = "https://api.github.com/repos/prefeitura-rio/queries-rj-smtr" url += f"/commits?queries-rj-smtr/rj_smtr/{dataset_id}" response = requests.get(url) if response.status_code != 200: return None dataset_version = response.json()[0]["sha"] return {"version": dataset_version}
Fetches the SHA of a branch from Github
def get_current_timestamp(timestamp=None, truncate_minute: bool = True, return_str: bool = False) ‑> datetime.datetime | str
-
Expand source code
@task def get_current_timestamp( timestamp=None, truncate_minute: bool = True, return_str: bool = False ) -> Union[datetime, str]: """ Get current timestamp for flow run. Args: timestamp: timestamp to be used as reference (optionally, it can be a string) truncate_minute: whether to truncate the timestamp to the minute or not return_str: if True, the return will be an isoformatted datetime string otherwise it returns a datetime object Returns: Union[datetime, str]: timestamp for flow run """ if isinstance(timestamp, str): timestamp = datetime.fromisoformat(timestamp) if not timestamp: timestamp = datetime.now(tz=timezone(constants.TIMEZONE.value)) if truncate_minute: timestamp = timestamp.replace(second=0, microsecond=0) if return_str: timestamp = timestamp.isoformat() return timestamp
Get current timestamp for flow run.
Args
timestamp
- timestamp to be used as reference (optionally, it can be a string)
truncate_minute
- whether to truncate the timestamp to the minute or not
return_str
- if True, the return will be an isoformatted datetime string otherwise it returns a datetime object
Returns
Union[datetime, str]
- timestamp for flow run
def get_join_dict(dict_list: list, new_dict: dict) ‑> List
-
Expand source code
@task def get_join_dict(dict_list: list, new_dict: dict) -> List: """ Updates a list of dictionaries with a new dictionary. """ for dict_temp in dict_list: dict_temp.update(new_dict) log(f"get_join_dict: {dict_list}") return dict_list
Updates a list of dictionaries with a new dictionary.
def get_local_dbt_client(host: str, port: int)
-
Expand source code
@task def get_local_dbt_client(host: str, port: int): """Set a DBT client for running CLI commands. Requires building container image for your queries repository. Args: host (str): hostname. When running locally, usually 'localhost' port (int): the port number in which the DBT rpc is running Returns: DbtClient: object used to run DBT commands. """ return get_dbt_client(host=host, port=port)
Set a DBT client for running CLI commands. Requires building container image for your queries repository.
Args
host
:str
- hostname. When running locally, usually 'localhost'
port
:int
- the port number in which the DBT rpc is running
Returns
DbtClient
- object used to run DBT commands.
def get_materialization_date_range(dataset_id: str,
table_id: str,
raw_dataset_id: str,
raw_table_id: str,
table_run_datetime_column_name: str = None,
mode: str = 'prod',
delay_hours: int = 0,
end_ts: datetime.datetime = None)-
Expand source code
@task( checkpoint=False, max_retries=constants.MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.RETRY_DELAY.value), ) def get_materialization_date_range( # pylint: disable=R0913 dataset_id: str, table_id: str, raw_dataset_id: str, raw_table_id: str, table_run_datetime_column_name: str = None, mode: str = "prod", delay_hours: int = 0, end_ts: datetime = None, ): """ Task for generating dict with variables to be passed to the --vars argument on DBT. Args: dataset_id (str): dataset_id on BigQuery table_id (str): model filename on the queries repo. eg: if you have a model defined in the file <filename>.sql, the table_id should be <filename> table_date_column_name (Optional, str): if it's the first time this is ran, will query the table for the maximum value on this field. If rebuild is true, will query the table for the minimum value on this field. rebuild (Optional, bool): if true, queries the minimum date value on the table and return a date range from that value to the datetime.now() time delay(Optional, int): hours delayed from now time for materialization range end_ts(Optional, datetime): date range's final date Returns: dict: containing date_range_start and date_range_end """ timestr = "%Y-%m-%dT%H:%M:%S" # get start from redis last_run = get_last_run_timestamp( dataset_id=dataset_id, table_id=table_id, mode=mode ) # if there's no timestamp set on redis, get max timestamp on source table if last_run is None: log("Failed to fetch key from Redis...\n Querying tables for last suceeded run") if Table(dataset_id=dataset_id, table_id=table_id).table_exists("prod"): last_run = get_table_min_max_value( query_project_id=bq_project(), dataset_id=dataset_id, table_id=table_id, field_name=table_run_datetime_column_name, kind="max", ) log( f""" Queried last run from {dataset_id}.{table_id} Got: {last_run} as type {type(last_run)} """ ) else: last_run = get_table_min_max_value( query_project_id=bq_project(), dataset_id=raw_dataset_id, table_id=raw_table_id, field_name=table_run_datetime_column_name, kind="max", ) log( f""" Queried last run from {raw_dataset_id}.{raw_table_id} Got: {last_run} as type {type(last_run)} """ ) else: last_run = datetime.strptime(last_run, timestr) if (not isinstance(last_run, datetime)) and (isinstance(last_run, date)): last_run = datetime(last_run.year, last_run.month, last_run.day) # set start to last run hour (H) start_ts = last_run.replace(minute=0, second=0, microsecond=0).strftime(timestr) # set end to now - delay if not end_ts: end_ts = pendulum.now(constants.TIMEZONE.value).replace( tzinfo=None, minute=0, second=0, microsecond=0 ) end_ts = (end_ts - timedelta(hours=delay_hours)).replace( minute=0, second=0, microsecond=0 ) end_ts = end_ts.strftime(timestr) date_range = {"date_range_start": start_ts, "date_range_end": end_ts} log(f"Got date_range as: {date_range}") return date_range
Task for generating dict with variables to be passed to the –vars argument on DBT.
Args
dataset_id
:str
- dataset_id on BigQuery
table_id
:str
- model filename on the queries repo.
eg
- if you have a model defined in the file
.sql, - the table_id should be
table_date_column_name
:Optional, str
- if it's the first time this
- is ran, will query the table for the maximum value on this field.
- If rebuild is true, will query the table for the minimum value
- on this field.
rebuild
:Optional, bool
- if true, queries the minimum date value on the
table and return a date range from that value to the datetime.now() time delay(Optional, int): hours delayed from now time for materialization range end_ts(Optional, datetime): date range's final date
Returns
dict
- containing date_range_start and date_range_end
def get_previous_date(days)
-
Expand source code
@task(checkpoint=False) def get_previous_date(days): """ Returns the date of {days} days ago in YYYY-MM-DD. """ now = pendulum.now(pendulum.timezone("America/Sao_Paulo")).subtract(days=days) return now.to_date_string()
Returns the date of {days} days ago in YYYY-MM-DD.
def get_raw(url: str,
headers: str = None,
filetype: str = 'json',
csv_args: dict = None,
params: dict = None) ‑> Dict-
Expand source code
@task def get_raw( # pylint: disable=R0912 url: str, headers: str = None, filetype: str = "json", csv_args: dict = None, params: dict = None, ) -> Dict: """ Request data from URL API Args: url (str): URL to send request headers (str, optional): Path to headers guardeded on Vault, if needed. filetype (str, optional): Filetype to be formatted (supported only: json, csv and txt) csv_args (dict, optional): Arguments for read_csv, if needed params (dict, optional): Params to be sent on request Returns: dict: Containing keys * `data` (json): data result * `error` (str): catched error, if any. Otherwise, returns None """ data = None error = None try: if headers is not None: headers = get_vault_secret(headers)["data"] # remove from headers, if present remove_headers = ["host", "databases"] for remove_header in remove_headers: if remove_header in list(headers.keys()): del headers[remove_header] response = requests.get( url, headers=headers, timeout=constants.MAX_TIMEOUT_SECONDS.value, params=params, ) if response.ok: # status code is less than 400 if filetype == "json": data = response.json() # todo: move to data check on specfic API # pylint: disable=W0102 if isinstance(data, dict) and "DescricaoErro" in data.keys(): error = data["DescricaoErro"] elif filetype in ("txt", "csv"): if csv_args is None: csv_args = {} data = pd.read_csv(io.StringIO(response.text), **csv_args).to_dict( orient="records" ) else: error = ( "Unsupported raw file extension. Supported only: json, csv and txt" ) except Exception: error = traceback.format_exc() log(f"[CATCHED] Task failed with error: \n{error}", level="error") return {"data": data, "error": error}
Request data from URL API
Args
url
:str
- URL to send request
headers
:str
, optional- Path to headers guardeded on Vault, if needed.
filetype
:str
, optional- Filetype to be formatted (supported only: json, csv and txt)
csv_args
:dict
, optional- Arguments for read_csv, if needed
params
:dict
, optional- Params to be sent on request
Returns
dict
- Containing keys
*
data
(json): data result *error
(str): catched error, if any. Otherwise, returns None
def get_raw_from_sources(source_type: str,
local_filepath: str,
source_path: str = None,
dataset_id: str = None,
table_id: str = None,
secret_path: str = None,
request_params: dict = None) ‑> tuple[str, str]-
Expand source code
@task(checkpoint=False, nout=2) def get_raw_from_sources( source_type: str, local_filepath: str, source_path: str = None, dataset_id: str = None, table_id: str = None, secret_path: str = None, request_params: dict = None, ) -> tuple[str, str]: """ Task to get raw data from sources Args: source_type (str): source type local_filepath (str): local filepath source_path (str, optional): source path. Defaults to None. dataset_id (str, optional): dataset_id on BigQuery. Defaults to None. table_id (str, optional): table_id on BigQuery. Defaults to None. secret_path (str, optional): secret path. Defaults to None. request_params (dict, optional): request parameters. Defaults to None. Returns: error: error catched from upstream tasks filepath: filepath to raw data """ error = None filepath = None data = None source_values = source_type.split("-", 1) source_type, filetype = ( source_values if len(source_values) == 2 else (source_values[0], None) ) log(f"Getting raw data from source type: {source_type}") try: if source_type == "api": error, data, filetype = get_raw_data_api( url=source_path, secret_path=secret_path, api_params=request_params, filetype=filetype, ) elif source_type == "gcs": error, data, filetype = get_raw_data_gcs( dataset_id=dataset_id, table_id=table_id, **request_params ) elif source_type == "db": error, data, filetype = get_raw_data_db( host=source_path, secret_path=secret_path, **request_params ) elif source_type == "movidesk": error, data, filetype = get_raw_recursos( request_url=source_path, request_params=request_params ) else: raise NotImplementedError(f"{source_type} not supported") filepath = save_raw_local_func( data=data, filepath=local_filepath, filetype=filetype ) except NotImplementedError: error = traceback.format_exc() log(f"[CATCHED] Task failed with error: \n{error}", level="error") log(f"Raw extraction ended returned values: {error}, {filepath}") return error, filepath
Task to get raw data from sources
Args
source_type
:str
- source type
local_filepath
:str
- local filepath
source_path
:str
, optional- source path. Defaults to None.
dataset_id
:str
, optional- dataset_id on BigQuery. Defaults to None.
table_id
:str
, optional- table_id on BigQuery. Defaults to None.
secret_path
:str
, optional- secret path. Defaults to None.
request_params
:dict
, optional- request parameters. Defaults to None.
Returns
error
- error catched from upstream tasks
filepath
- filepath to raw data
def get_rounded_timestamp(timestamp: str | datetime.datetime | None = None,
interval_minutes: int | None = None) ‑> datetime.datetime-
Expand source code
@task def get_rounded_timestamp( timestamp: Union[str, datetime, None] = None, interval_minutes: Union[int, None] = None, ) -> datetime: """ Calculate rounded timestamp for flow run. Args: timestamp (Union[str, datetime, None]): timestamp to be used as reference interval_minutes (Union[int, None], optional): interval in minutes between each recapture Returns: datetime: timestamp for flow run """ if isinstance(timestamp, str): timestamp = datetime.fromisoformat(timestamp) if not timestamp: timestamp = datetime.now(tz=timezone(constants.TIMEZONE.value)) timestamp = timestamp.replace(second=0, microsecond=0) if interval_minutes: if interval_minutes >= 60: hours = interval_minutes / 60 interval_minutes = round(((hours) % 1) * 60) if interval_minutes == 0: rounded_minutes = interval_minutes else: rounded_minutes = (timestamp.minute // interval_minutes) * interval_minutes timestamp = timestamp.replace(minute=rounded_minutes) return timestamp
Calculate rounded timestamp for flow run.
Args
timestamp
:Union[str, datetime, None]
- timestamp to be used as reference
interval_minutes
:Union[int, None]
, optional- interval in minutes between each recapture
Returns
datetime
- timestamp for flow run
def get_run_dates(date_range_start: str,
date_range_end: str,
day_datetime: datetime.datetime = None) ‑> List-
Expand source code
@task def get_run_dates( date_range_start: str, date_range_end: str, day_datetime: datetime = None ) -> List: """ Generates a list of dates between date_range_start and date_range_end. Args: date_range_start (str): the start date to create the date range date_range_end (str): the end date to create the date range day_datetime (datetime, Optional): a timestamp to use as run_date if the range start or end is False Returns: list: the list of run_dates """ if (date_range_start is False) or (date_range_end is False): if day_datetime: run_date = day_datetime.strftime("%Y-%m-%d") else: run_date = get_now_date.run() dates = [{"run_date": run_date}] else: dates = [ {"run_date": d.strftime("%Y-%m-%d")} for d in pd.date_range(start=date_range_start, end=date_range_end) ] log(f"Will run the following dates: {dates}") return dates
Generates a list of dates between date_range_start and date_range_end.
Args
date_range_start
:str
- the start date to create the date range
date_range_end
:str
- the end date to create the date range
day_datetime
:datetime, Optional
- a timestamp to use as run_date if the range start or end is False
Returns
list
- the list of run_dates
def get_scheduled_start_times(timestamp: datetime.datetime, parameters: list, intervals: dict | None = None)
-
Expand source code
@task def get_scheduled_start_times( timestamp: datetime, parameters: list, intervals: Union[None, dict] = None ): """ Task to get start times to schedule flows Args: timestamp (datetime): initial flow run timestamp parameters (list): parameters for the flow intervals (Union[None, dict], optional): intervals between each flow run. Defaults to None. Optionally, you can pass specific intervals for some table_ids. Suggests to pass intervals based on previous table observed execution times. Defaults to dict(default=timedelta(minutes=2)). Returns: list[datetime]: list of scheduled start times """ if intervals is None: intervals = dict() if "default" not in intervals.keys(): intervals["default"] = timedelta(minutes=2) timestamps = [None] last_schedule = timestamp for param in parameters[1:]: last_schedule += intervals.get( param.get("table_id", "default"), intervals["default"] ) timestamps.append(last_schedule) return timestamps
Task to get start times to schedule flows
Args
timestamp
:datetime
- initial flow run timestamp
parameters
:list
- parameters for the flow
intervals
:Union[None, dict]
, optional- intervals between each flow run. Defaults to None. Optionally, you can pass specific intervals for some table_ids. Suggests to pass intervals based on previous table observed execution times. Defaults to dict(default=timedelta(minutes=2)).
Returns
list[datetime]
- list of scheduled start times
def parse_timestamp_to_string(timestamp: datetime.datetime, pattern='%Y-%m-%d-%H-%M-%S') ‑> str
-
Expand source code
@task def parse_timestamp_to_string(timestamp: datetime, pattern="%Y-%m-%d-%H-%M-%S") -> str: """ Parse timestamp to string pattern. """ return timestamp.strftime(pattern)
Parse timestamp to string pattern.
def query_logs(dataset_id: str,
table_id: str,
datetime_filter=None,
max_recaptures: int = 90,
interval_minutes: int = 1,
recapture_window_days: int = 1)-
Expand source code
@task(nout=3, max_retries=3, retry_delay=timedelta(seconds=5)) def query_logs( dataset_id: str, table_id: str, datetime_filter=None, max_recaptures: int = 90, interval_minutes: int = 1, recapture_window_days: int = 1, ): """ Queries capture logs to check for errors Args: dataset_id (str): dataset_id on BigQuery table_id (str): table_id on BigQuery datetime_filter (pendulum.datetime.DateTime, optional): filter passed to query. This task will query the logs table for the last n (n = recapture_window_days) days before datetime_filter max_recaptures (int, optional): maximum number of recaptures to be done interval_minutes (int, optional): interval in minutes between each recapture recapture_window_days (int, optional): Number of days to query for erros Returns: lists: errors (bool), timestamps (list of pendulum.datetime.DateTime), previous_errors (list of previous errors) """ if not datetime_filter: datetime_filter = pendulum.now(constants.TIMEZONE.value).replace( second=0, microsecond=0 ) elif isinstance(datetime_filter, str): datetime_filter = datetime.fromisoformat(datetime_filter).replace( second=0, microsecond=0 ) datetime_filter = datetime_filter.strftime("%Y-%m-%d %H:%M:%S") query = f""" WITH t AS ( SELECT DATETIME(timestamp_array) AS timestamp_array FROM UNNEST( GENERATE_TIMESTAMP_ARRAY( TIMESTAMP_SUB('{datetime_filter}', INTERVAL {recapture_window_days} day), TIMESTAMP('{datetime_filter}'), INTERVAL {interval_minutes} minute) ) AS timestamp_array WHERE timestamp_array < '{datetime_filter}' ), logs_table AS ( SELECT SAFE_CAST(DATETIME(TIMESTAMP(timestamp_captura), "America/Sao_Paulo") AS DATETIME) timestamp_captura, SAFE_CAST(sucesso AS BOOLEAN) sucesso, SAFE_CAST(erro AS STRING) erro, SAFE_CAST(DATA AS DATE) DATA FROM {bq_project(kind="bigquery_staging")}.{dataset_id}_staging.{table_id}_logs AS t ), logs AS ( SELECT *, TIMESTAMP_TRUNC(timestamp_captura, minute) AS timestamp_array FROM logs_table WHERE DATA BETWEEN DATE(DATETIME_SUB('{datetime_filter}', INTERVAL {recapture_window_days} day)) AND DATE('{datetime_filter}') AND timestamp_captura BETWEEN DATETIME_SUB('{datetime_filter}', INTERVAL {recapture_window_days} day) AND '{datetime_filter}' ) SELECT CASE WHEN logs.timestamp_captura IS NOT NULL THEN logs.timestamp_captura ELSE t.timestamp_array END AS timestamp_captura, logs.erro FROM t LEFT JOIN logs ON logs.timestamp_array = t.timestamp_array WHERE logs.sucesso IS NOT TRUE """ log(f"Run query to check logs:\n{query}") results = bd.read_sql(query=query, billing_project_id=bq_project()) if len(results) > 0: results = results.sort_values(["timestamp_captura"]) results["timestamp_captura"] = ( pd.to_datetime(results["timestamp_captura"]) .dt.tz_localize(constants.TIMEZONE.value) .to_list() ) log(f"Recapture data for the following {len(results)} timestamps:\n{results}") if len(results) > max_recaptures: message = f""" [SPPO - Recaptures] Encontradas {len(results)} timestamps para serem recapturadas. Essa run processará as seguintes: ##### {results[:max_recaptures]} ##### Sobraram as seguintes para serem recapturadas na próxima run: ##### {results[max_recaptures:]} ##### """ log_critical(message) results = results[:max_recaptures] return True, results["timestamp_captura"].to_list(), results["erro"].to_list() return False, [], []
Queries capture logs to check for errors
Args
dataset_id
:str
- dataset_id on BigQuery
table_id
:str
- table_id on BigQuery
- datetime_filter (pendulum.datetime.DateTime, optional):
- filter passed to query. This task will query the logs table
- for the last n (n = recapture_window_days) days before datetime_filter
max_recaptures
:int
, optional- maximum number of recaptures to be done
interval_minutes
:int
, optional- interval in minutes between each recapture
recapture_window_days
:int
, optional- Number of days to query for erros
Returns
lists
- errors (bool),
timestamps (list of pendulum.datetime.DateTime), previous_errors (list of previous errors)
def save_raw_local(file_path: str, status: dict, mode: str = 'raw') ‑> str
-
Expand source code
@task def save_raw_local(file_path: str, status: dict, mode: str = "raw") -> str: """ Saves json response from API to .json file. Args: file_path (str): Path which to save raw file status (dict): Must contain keys * data: json returned from API * error: error catched from API request mode (str, optional): Folder to save locally, later folder which to upload to GCS. Returns: str: Path to the saved file """ _file_path = file_path.format(mode=mode, filetype="json") Path(_file_path).parent.mkdir(parents=True, exist_ok=True) if status["error"] is None: json.dump(status["data"], Path(_file_path).open("w", encoding="utf-8")) log(f"Raw data saved to: {_file_path}") return _file_path
Saves json response from API to .json file.
Args
file_path
:str
- Path which to save raw file
status
:dict
- Must contain keys * data: json returned from API * error: error catched from API request
mode
:str
, optional- Folder to save locally, later folder which to upload to GCS.
Returns
str
- Path to the saved file
def save_treated_local(file_path: str, status: dict, mode: str = 'staging') ‑> str
-
Expand source code
@task def save_treated_local(file_path: str, status: dict, mode: str = "staging") -> str: """ Save treated file to CSV. Args: file_path (str): Path which to save treated file status (dict): Must contain keys * `data`: dataframe returned from treatement * `error`: error catched from data treatement mode (str, optional): Folder to save locally, later folder which to upload to GCS. Returns: str: Path to the saved file """ log(f"Saving treated data to: {file_path}, {status}") _file_path = file_path.format(mode=mode, filetype="csv") Path(_file_path).parent.mkdir(parents=True, exist_ok=True) if status["error"] is None: status["data"].to_csv(_file_path, index=False) log(f"Treated data saved to: {_file_path}") return _file_path
Save treated file to CSV.
Args
file_path
:str
- Path which to save treated file
status
:dict
- Must contain keys
*
data
: dataframe returned from treatement *error
: error catched from data treatement mode
:str
, optional- Folder to save locally, later folder which to upload to GCS.
Returns
str
- Path to the saved file
def set_last_run_timestamp(dataset_id: str, table_id: str, timestamp: str, mode: str = 'prod', wait=None)
-
Expand source code
@task def set_last_run_timestamp( dataset_id: str, table_id: str, timestamp: str, mode: str = "prod", wait=None ): # pylint: disable=unused-argument """ Set the `last_run_timestamp` key for the dataset_id/table_id pair to datetime.now() time. Used after running a materialization to set the stage for the next to come Args: dataset_id (str): dataset_id on BigQuery table_id (str): model filename on the queries repo. timestamp: Last run timestamp end. wait (Any, optional): Used for defining dependencies inside the flow, in general, pass the output of the task which should be run imediately before this. Defaults to None. Returns: _type_: _description_ """ log(f"Saving timestamp {timestamp} on Redis for {dataset_id}.{table_id}") redis_client = get_redis_client() key = dataset_id + "." + table_id if mode == "dev": key = f"{mode}.{key}" content = redis_client.get(key) if not content: content = {} content["last_run_timestamp"] = timestamp redis_client.set(key, content) return True
Set the
last_run_timestamp
key for the dataset_id/table_id pair to datetime.now() time. Used after running a materialization to set the stage for the next to comeArgs
dataset_id
:str
- dataset_id on BigQuery
table_id
:str
- model filename on the queries repo.
timestamp
- Last run timestamp end.
wait
:Any
, optional- Used for defining dependencies inside the flow,
in general, pass the output of the task which should be run imediately before this. Defaults to None.
Returns
_type_
- description
def transform_raw_to_nested_structure(raw_filepath: str,
filepath: str,
error: str,
timestamp: datetime.datetime,
primary_key: list = None,
flag_private_data: bool = False,
reader_args: dict = None) ‑> tuple[str, str]-
Expand source code
@task(nout=2) def transform_raw_to_nested_structure( raw_filepath: str, filepath: str, error: str, timestamp: datetime, primary_key: list = None, flag_private_data: bool = False, reader_args: dict = None, ) -> tuple[str, str]: """ Task to transform raw data to nested structure Args: raw_filepath (str): Path to the saved raw .json file filepath (str): Path to the saved treated .csv file error (str): Error catched from upstream tasks timestamp (datetime): timestamp for flow run primary_key (list, optional): Primary key to be used on nested structure flag_private_data (bool, optional): Flag to indicate if the task should log the data reader_args (dict): arguments to pass to pandas.read_csv or read_json Returns: str: Error traceback str: Path to the saved treated .csv file """ if error is None: try: # leitura do dado raw error, data = read_raw_data(filepath=raw_filepath, reader_args=reader_args) if primary_key is None: primary_key = [] if not flag_private_data: log( f""" Received inputs: - timestamp:\n{timestamp} - data:\n{data.head()}""" ) if error is None: # Check empty dataframe if data.empty: log("Empty dataframe, skipping transformation...") else: log(f"Raw data:\n{data_info_str(data)}", level="info") log("Adding captured timestamp column...", level="info") data["timestamp_captura"] = timestamp if "customFieldValues" not in data: log("Striping string columns...", level="info") for col in data.columns[data.dtypes == "object"].to_list(): data[col] = data[col].str.strip() if ( constants.GTFS_DATASET_ID.value in raw_filepath and "ordem_servico" in raw_filepath and "tipo_os" not in data.columns ): data["tipo_os"] = "Regular" log( f"Finished cleaning! Data:\n{data_info_str(data)}", level="info" ) log("Creating nested structure...", level="info") pk_cols = primary_key + ["timestamp_captura"] data = ( data.groupby(pk_cols) .apply( lambda x: x[data.columns.difference(pk_cols)].to_json( orient="records", force_ascii=( constants.CONTROLE_FINANCEIRO_DATASET_ID.value not in raw_filepath ), ) ) .str.strip("[]") .reset_index(name="content")[ primary_key + ["content", "timestamp_captura"] ] ) log( f"Finished nested structure! Data:\n{data_info_str(data)}", level="info", ) # save treated local filepath = save_treated_local_func( data=data, error=error, filepath=filepath ) except Exception: # pylint: disable=W0703 error = traceback.format_exc() log(f"[CATCHED] Task failed with error: \n{error}", level="error") return error, filepath
Task to transform raw data to nested structure
Args
raw_filepath
:str
- Path to the saved raw .json file
filepath
:str
- Path to the saved treated .csv file
error
:str
- Error catched from upstream tasks
timestamp
:datetime
- timestamp for flow run
primary_key
:list
, optional- Primary key to be used on nested structure
flag_private_data
:bool
, optional- Flag to indicate if the task should log the data
reader_args
:dict
- arguments to pass to pandas.read_csv or read_json
Returns
str
- Error traceback
str
- Path to the saved treated .csv file
def unpack_mapped_results_nout2(mapped_results: Iterable) ‑> tuple[list[typing.Any], list[typing.Any]]
-
Expand source code
@task(checkpoint=False, nout=2) def unpack_mapped_results_nout2( mapped_results: Iterable, ) -> tuple[list[Any], list[Any]]: """ Task to unpack the results from an nout=2 tasks in 2 lists when it is mapped Args: mapped_results (Iterable): The mapped task return Returns: tuple[list[Any], list[Any]]: The task original return splited in 2 lists: - 1st list being all the first return - 2nd list being all the second return """ return [r[0] for r in mapped_results], [r[1] for r in mapped_results]
Task to unpack the results from an nout=2 tasks in 2 lists when it is mapped
Args
mapped_results
:Iterable
- The mapped task return
Returns
tuple[list[Any], list[Any]]
- The task original return splited in 2 lists: - 1st list being all the first return - 2nd list being all the second return
def upload_logs_to_bq(dataset_id: str,
parent_table_id: str,
timestamp: str,
error: str = None,
previous_error: str = None,
recapture: bool = False)-
Expand source code
@task def upload_logs_to_bq( # pylint: disable=R0913 dataset_id: str, parent_table_id: str, timestamp: str, error: str = None, previous_error: str = None, recapture: bool = False, ): """ Upload execution status table to BigQuery. Table is uploaded to the same dataset, named {parent_table_id}_logs. If passing status_dict, should not pass timestamp and error. Args: dataset_id (str): dataset_id on BigQuery parent_table_id (str): Parent table id related to the status table timestamp (str): ISO formatted timestamp string error (str, optional): String associated with error caught during execution Returns: None """ table_id = parent_table_id + "_logs" # Create partition directory filename = f"{table_id}_{timestamp.isoformat()}" partition = f"data={timestamp.date()}" filepath = Path( f"""data/staging/{dataset_id}/{table_id}/{partition}/{filename}.csv""" ) filepath.parent.mkdir(exist_ok=True, parents=True) # Create dataframe to be uploaded if not error and recapture is True: # if the recapture is succeeded, update the column erro dataframe = pd.DataFrame( { "timestamp_captura": [timestamp], "sucesso": [True], "erro": [f"[recapturado]{previous_error}"], } ) log(f"Recapturing {timestamp} with previous error:\n{error}") else: # not recapturing or error during flow execution dataframe = pd.DataFrame( { "timestamp_captura": [timestamp], "sucesso": [error is None], "erro": [error], } ) # Save data local dataframe.to_csv(filepath, index=False) # Upload to Storage create_or_append_table( dataset_id=dataset_id, table_id=table_id, path=filepath.as_posix(), partitions=partition, ) if error is not None: raise Exception(f"Pipeline failed with error: {error}")
Upload execution status table to BigQuery. Table is uploaded to the same dataset, named {parent_table_id}_logs. If passing status_dict, should not pass timestamp and error.
Args
dataset_id
:str
- dataset_id on BigQuery
parent_table_id
:str
- Parent table id related to the status table
timestamp
:str
- ISO formatted timestamp string
error
:str
, optional- String associated with error caught during execution
Returns
None
def upload_raw_data_to_gcs(error: str,
raw_filepath: str,
table_id: str,
dataset_id: str,
partitions: list,
bucket_name: str = None) ‑> str | None-
Expand source code
@task def upload_raw_data_to_gcs( error: str, raw_filepath: str, table_id: str, dataset_id: str, partitions: list, bucket_name: str = None, ) -> Union[str, None]: """ Upload raw data to GCS. Args: error (str): Error catched from upstream tasks. raw_filepath (str): Path to the saved raw .json file table_id (str): table_id on BigQuery dataset_id (str): dataset_id on BigQuery partitions (list): list of partition strings Returns: Union[str, None]: if there is an error returns it traceback, otherwise returns None """ if error is None: try: st_obj = Storage( table_id=table_id, dataset_id=dataset_id, bucket_name=bucket_name ) log( f"""Uploading raw file to bucket {st_obj.bucket_name} at {st_obj.bucket_name}/{dataset_id}/{table_id}""" ) st_obj.upload( path=raw_filepath, partitions=partitions, mode="raw", if_exists="replace", ) except Exception: error = traceback.format_exc() log(f"[CATCHED] Task failed with error: \n{error}", level="error") return error
Upload raw data to GCS.
Args
error
:str
- Error catched from upstream tasks.
raw_filepath
:str
- Path to the saved raw .json file
table_id
:str
- table_id on BigQuery
dataset_id
:str
- dataset_id on BigQuery
partitions
:list
- list of partition strings
Returns
Union[str, None]
- if there is an error returns it traceback, otherwise returns None
def upload_staging_data_to_gcs(error: str,
staging_filepath: str,
timestamp: datetime.datetime,
table_id: str,
dataset_id: str,
partitions: str,
previous_error: str = None,
recapture: bool = False,
bucket_name: str = None) ‑> str | None-
Expand source code
@task def upload_staging_data_to_gcs( error: str, staging_filepath: str, timestamp: datetime, table_id: str, dataset_id: str, partitions: str, previous_error: str = None, recapture: bool = False, bucket_name: str = None, ) -> Union[str, None]: """ Upload staging data to GCS. Args: error (str): Error catched from upstream tasks. staging_filepath (str): Path to the saved treated .csv file. timestamp (datetime): timestamp for flow run. table_id (str): table_id on BigQuery. dataset_id (str): dataset_id on BigQuery. partitions (str): partition string. previous_error (str, Optional): Previous error on recaptures. recapture: (bool, Optional): Flag that indicates if the run is recapture or not. bucket_name (str, Optional): The bucket name to save the data. Returns: Union[str, None]: if there is an error returns it traceback, otherwise returns None """ log(f"FILE PATH: {staging_filepath}") if error is None: try: # Creates and publish table if it does not exist, append to it otherwise create_or_append_table( dataset_id=dataset_id, table_id=table_id, path=staging_filepath, partitions=partitions, bucket_name=bucket_name, ) except Exception: error = traceback.format_exc() log(f"[CATCHED] Task failed with error: \n{error}", level="error") upload_run_logs_to_bq( dataset_id=dataset_id, parent_table_id=table_id, error=error, timestamp=timestamp, mode="staging", previous_error=previous_error, recapture=recapture, bucket_name=bucket_name, ) return error
Upload staging data to GCS.
Args
error
:str
- Error catched from upstream tasks.
staging_filepath
:str
- Path to the saved treated .csv file.
timestamp
:datetime
- timestamp for flow run.
table_id
:str
- table_id on BigQuery.
dataset_id
:str
- dataset_id on BigQuery.
partitions
:str
- partition string.
previous_error
:str, Optional
- Previous error on recaptures.
recapture
- (bool, Optional): Flag that indicates if the run is recapture or not.
bucket_name
:str, Optional
- The bucket name to save the data.
Returns
Union[str, None]
- if there is an error returns it traceback, otherwise returns None