Module pipelines.rj_smtr.tasks
Tasks for rj_smtr
Functions
def bq_upload(dataset_id: str, table_id: str, filepath: str, raw_filepath: str = None, partitions: str = None, status: dict = None)
-
Upload raw and treated data to GCS and BigQuery.
Args
dataset_id
:str
- dataset_id on BigQuery
table_id
:str
- table_id on BigQuery
filepath
:str
- Path to the saved treated .csv file
raw_filepath
:str
, optional- Path to raw .json file. Defaults to None.
partitions
:str
, optional- Partitioned directory structure, ie "ano=2022/mes=03/data=01".
- Defaults to None.
status
:dict
, optional- Dict containing
error
key from
upstream tasks.
Returns
None
def bq_upload_from_dict(paths: dict, dataset_id: str, partition_levels: int = 1)
-
Upload multiple tables from a dict structured as {table_id: csv_path}. Present use case assumes table partitioned once. Adjust the parameter 'partition_levels' to best suit new uses. i.e. if your csv is saved as:
/date= / .csv it has 1 level of partition. if your csv file is saved as: /date= /hour= / .csv it has 2 levels of partition Args
paths
:dict
- description
dataset_id
:str
- description
Returns
_type_
- description
def build_incremental_model(dbt_client: dbt_client.dbt_client.DbtClient, dataset_id: str, base_table_id: str, mat_table_id: str, field_name: str = 'data_versao', refresh: bool = False, wait=None)
-
Utility task for backfilling table in predetermined steps. Assumes the step sizes will be defined on the .sql file.
Args
dbt_client
:DbtClient
- DBT interface object
dataset_id
:str
- Dataset id on BigQuery
base_table_id
:str
- Base table from which to materialize (usually, an external table)
mat_table_id
:str
- Target table id for materialization
field_name
:str
, optional- Key field (column) for dbt incremental filters.
- Defaults to "data_versao".
refresh
:bool
, optional- If True, rebuild the table from scratch. Defaults to False.
wait
:NoneType
, optional- Placeholder parameter, used to wait previous tasks finish.
Defaults to None.
Returns
bool
- whether the table was fully built or not.
def check_mapped_query_logs_output(query_logs_output: list[tuple]) ‑> bool
-
Task to check if there is recaptures pending
Args
query_logs_output
:list[tuple]
- the return from a mapped query_logs execution
Returns
bool
- True if there is recaptures to do, otherwise False
def coalesce_task(value_list: Iterable)
-
Task to get the first non None value of a list
Args
value_list
:Iterable
- a iterable object with the values
Returns
any
- value_list's first non None item
def create_date_hour_partition(timestamp: datetime.datetime, partition_date_name: str = 'data', partition_date_only: bool = False) ‑> str
-
Create a date (and hour) Hive partition structure from timestamp.
Args
timestamp
:datetime
- timestamp to be used as reference
partition_date_name
:str
, optional- partition name. Defaults to "data".
partition_date_only
:bool
, optional- whether to add hour partition or not
Returns
str
- partition string
def create_dbt_run_vars(dataset_id: str, dbt_vars: dict, table_id: str, raw_dataset_id: str, raw_table_id: str, mode: str, timestamp: datetime.datetime) ‑> tuple[list[dict], typing.Union[list[dict], dict, NoneType], bool]
-
Create the variables to be used in dbt materialization based on a dict
Args
dataset_id
:str
- the dataset_id to get the variables
dbt_vars
:dict
- dict containing the parameters
table_id
:str
- the table_id get the date_range variable
raw_dataset_id
:str
- the raw_dataset_id get the date_range variable
raw_table_id
:str
- the raw_table_id get the date_range variable
mode
:str
- the mode to get the date_range variable
Returns
list[dict]
- the variables to be used in DBT
Union[list[dict], dict, None]
- the date variable (date_range or run_date)
bool
- a flag that indicates if the date_range variable came from Redis
def create_local_partition_path(dataset_id: str, table_id: str, filename: str, partitions: str = None) ‑> str
-
Create the full path sctructure which to save data locally before upload.
Args
dataset_id
:str
- dataset_id on BigQuery
table_id
:str
- table_id on BigQuery
filename
:str
, optional- Single csv name
partitions
:str
, optional- Partitioned directory structure, ie "ano=2022/mes=03/data=01"
Returns
str
- String path having
mode
andfiletype
to be replaced afterwards,
either to save raw or staging files.
def create_request_params(extract_params: dict, table_id: str, dataset_id: str, timestamp: datetime.datetime, interval_minutes: int) ‑> tuple[str, str]
-
Task to create request params
Args
extract_params
:dict
- extract parameters
table_id
:str
- table_id on BigQuery
dataset_id
:str
- dataset_id on BigQuery
timestamp
:datetime
- timestamp for flow run
interval_minutes
:int
- interval in minutes between each capture
Returns
request_params
- host, database and query to request data
request_url
- url to request data
def delay_now_time(timestamp: str, delay_minutes=6)
-
Return timestamp string delayed by
Args
timestamp
:str
- Isoformat timestamp string
delay_minutes
:int
, optional- Minutes to delay timestamp by Defaults to 6.
Returns
str
- timestamp string formatted as "%Y-%m-%dT%H-%M-%S"
def fetch_dataset_sha(dataset_id: str)
-
Fetches the SHA of a branch from Github
def get_current_timestamp(timestamp=None, truncate_minute: bool = True, return_str: bool = False) ‑> Union[datetime.datetime, str]
-
Get current timestamp for flow run.
Args
timestamp
- timestamp to be used as reference (optionally, it can be a string)
truncate_minute
- whether to truncate the timestamp to the minute or not
return_str
- if True, the return will be an isoformatted datetime string otherwise it returns a datetime object
Returns
Union[datetime, str]
- timestamp for flow run
def get_join_dict(dict_list: list, new_dict: dict) ‑> List
-
Updates a list of dictionaries with a new dictionary.
def get_local_dbt_client(host: str, port: int)
-
Set a DBT client for running CLI commands. Requires building container image for your queries repository.
Args
host
:str
- hostname. When running locally, usually 'localhost'
port
:int
- the port number in which the DBT rpc is running
Returns
DbtClient
- object used to run DBT commands.
def get_materialization_date_range(dataset_id: str, table_id: str, raw_dataset_id: str, raw_table_id: str, table_run_datetime_column_name: str = None, mode: str = 'prod', delay_hours: int = 0, end_ts: datetime.datetime = None)
-
Task for generating dict with variables to be passed to the –vars argument on DBT.
Args
dataset_id
:str
- dataset_id on BigQuery
table_id
:str
- model filename on the queries repo.
eg
- if you have a model defined in the file
.sql, - the table_id should be
table_date_column_name
:Optional, str
- if it's the first time this
- is ran, will query the table for the maximum value on this field.
- If rebuild is true, will query the table for the minimum value
- on this field.
rebuild
:Optional, bool
- if true, queries the minimum date value on the
table and return a date range from that value to the datetime.now() time delay(Optional, int): hours delayed from now time for materialization range end_ts(Optional, datetime): date range's final date
Returns
dict
- containing date_range_start and date_range_end
def get_previous_date(days)
-
Returns the date of {days} days ago in YYYY-MM-DD.
def get_raw(url: str, headers: str = None, filetype: str = 'json', csv_args: dict = None, params: dict = None) ‑> Dict
-
Request data from URL API
Args
url
:str
- URL to send request
headers
:str
, optional- Path to headers guardeded on Vault, if needed.
filetype
:str
, optional- Filetype to be formatted (supported only: json, csv and txt)
csv_args
:dict
, optional- Arguments for read_csv, if needed
params
:dict
, optional- Params to be sent on request
Returns
dict
- Containing keys
*
data
(json): data result *error
(str): catched error, if any. Otherwise, returns None
def get_raw_from_sources(source_type: str, local_filepath: str, source_path: str = None, dataset_id: str = None, table_id: str = None, secret_path: str = None, request_params: dict = None) ‑> tuple[str, str]
-
Task to get raw data from sources
Args
source_type
:str
- source type
local_filepath
:str
- local filepath
source_path
:str
, optional- source path. Defaults to None.
dataset_id
:str
, optional- dataset_id on BigQuery. Defaults to None.
table_id
:str
, optional- table_id on BigQuery. Defaults to None.
secret_path
:str
, optional- secret path. Defaults to None.
request_params
:dict
, optional- request parameters. Defaults to None.
Returns
error
- error catched from upstream tasks
filepath
- filepath to raw data
def get_rounded_timestamp(timestamp: Union[str, datetime.datetime, ForwardRef(None)] = None, interval_minutes: Optional[int] = None) ‑> datetime.datetime
-
Calculate rounded timestamp for flow run.
Args
timestamp
:Union[str, datetime, None]
- timestamp to be used as reference
interval_minutes
:Union[int, None]
, optional- interval in minutes between each recapture
Returns
datetime
- timestamp for flow run
def get_run_dates(date_range_start: str, date_range_end: str, day_datetime: datetime.datetime = None) ‑> List
-
Generates a list of dates between date_range_start and date_range_end.
Args
date_range_start
:str
- the start date to create the date range
date_range_end
:str
- the end date to create the date range
day_datetime
:datetime, Optional
- a timestamp to use as run_date if the range start or end is False
Returns
list
- the list of run_dates
def get_scheduled_start_times(timestamp: datetime.datetime, parameters: list, intervals: Optional[None] = None)
-
Task to get start times to schedule flows
Args
timestamp
:datetime
- initial flow run timestamp
parameters
:list
- parameters for the flow
intervals
:Union[None, dict]
, optional- intervals between each flow run. Defaults to None. Optionally, you can pass specific intervals for some table_ids. Suggests to pass intervals based on previous table observed execution times. Defaults to dict(default=timedelta(minutes=2)).
Returns
list[datetime]
- list of scheduled start times
def parse_timestamp_to_string(timestamp: datetime.datetime, pattern='%Y-%m-%d-%H-%M-%S') ‑> str
-
Parse timestamp to string pattern.
def query_logs(dataset_id: str, table_id: str, datetime_filter=None, max_recaptures: int = 90, interval_minutes: int = 1, recapture_window_days: int = 1)
-
Queries capture logs to check for errors
Args
dataset_id
:str
- dataset_id on BigQuery
table_id
:str
- table_id on BigQuery
- datetime_filter (pendulum.datetime.DateTime, optional):
- filter passed to query. This task will query the logs table
- for the last n (n = recapture_window_days) days before datetime_filter
max_recaptures
:int
, optional- maximum number of recaptures to be done
interval_minutes
:int
, optional- interval in minutes between each recapture
recapture_window_days
:int
, optional- Number of days to query for erros
Returns
lists
- errors (bool),
timestamps (list of pendulum.datetime.DateTime), previous_errors (list of previous errors)
def save_raw_local(file_path: str, status: dict, mode: str = 'raw') ‑> str
-
Saves json response from API to .json file.
Args
file_path
:str
- Path which to save raw file
status
:dict
- Must contain keys * data: json returned from API * error: error catched from API request
mode
:str
, optional- Folder to save locally, later folder which to upload to GCS.
Returns
str
- Path to the saved file
def save_treated_local(file_path: str, status: dict, mode: str = 'staging') ‑> str
-
Save treated file to CSV.
Args
file_path
:str
- Path which to save treated file
status
:dict
- Must contain keys
*
data
: dataframe returned from treatement *error
: error catched from data treatement mode
:str
, optional- Folder to save locally, later folder which to upload to GCS.
Returns
str
- Path to the saved file
def set_last_run_timestamp(dataset_id: str, table_id: str, timestamp: str, mode: str = 'prod', wait=None)
-
Set the
last_run_timestamp
key for the dataset_id/table_id pair to datetime.now() time. Used after running a materialization to set the stage for the next to comeArgs
dataset_id
:str
- dataset_id on BigQuery
table_id
:str
- model filename on the queries repo.
timestamp
- Last run timestamp end.
wait
:Any
, optional- Used for defining dependencies inside the flow,
in general, pass the output of the task which should be run imediately before this. Defaults to None.
Returns
_type_
- description
def transform_raw_to_nested_structure(raw_filepath: str, filepath: str, error: str, timestamp: datetime.datetime, primary_key: list = None, flag_private_data: bool = False, reader_args: dict = None) ‑> tuple[str, str]
-
Task to transform raw data to nested structure
Args
raw_filepath
:str
- Path to the saved raw .json file
filepath
:str
- Path to the saved treated .csv file
error
:str
- Error catched from upstream tasks
timestamp
:datetime
- timestamp for flow run
primary_key
:list
, optional- Primary key to be used on nested structure
flag_private_data
:bool
, optional- Flag to indicate if the task should log the data
reader_args
:dict
- arguments to pass to pandas.read_csv or read_json
Returns
str
- Error traceback
str
- Path to the saved treated .csv file
def unpack_mapped_results_nout2(mapped_results: Iterable) ‑> tuple[list[typing.Any], list[typing.Any]]
-
Task to unpack the results from an nout=2 tasks in 2 lists when it is mapped
Args
mapped_results
:Iterable
- The mapped task return
Returns
tuple[list[Any], list[Any]]
- The task original return splited in 2 lists: - 1st list being all the first return - 2nd list being all the second return
def upload_logs_to_bq(dataset_id: str, parent_table_id: str, timestamp: str, error: str = None, previous_error: str = None, recapture: bool = False)
-
Upload execution status table to BigQuery. Table is uploaded to the same dataset, named {parent_table_id}_logs. If passing status_dict, should not pass timestamp and error.
Args
dataset_id
:str
- dataset_id on BigQuery
parent_table_id
:str
- Parent table id related to the status table
timestamp
:str
- ISO formatted timestamp string
error
:str
, optional- String associated with error caught during execution
Returns
None
def upload_raw_data_to_gcs(error: str, raw_filepath: str, table_id: str, dataset_id: str, partitions: list, bucket_name: str = None) ‑> Optional[str]
-
Upload raw data to GCS.
Args
error
:str
- Error catched from upstream tasks.
raw_filepath
:str
- Path to the saved raw .json file
table_id
:str
- table_id on BigQuery
dataset_id
:str
- dataset_id on BigQuery
partitions
:list
- list of partition strings
Returns
Union[str, None]
- if there is an error returns it traceback, otherwise returns None
def upload_staging_data_to_gcs(error: str, staging_filepath: str, timestamp: datetime.datetime, table_id: str, dataset_id: str, partitions: str, previous_error: str = None, recapture: bool = False, bucket_name: str = None) ‑> Optional[str]
-
Upload staging data to GCS.
Args
error
:str
- Error catched from upstream tasks.
staging_filepath
:str
- Path to the saved treated .csv file.
timestamp
:datetime
- timestamp for flow run.
table_id
:str
- table_id on BigQuery.
dataset_id
:str
- dataset_id on BigQuery.
partitions
:str
- partition string.
previous_error
:str, Optional
- Previous error on recaptures.
recapture
- (bool, Optional): Flag that indicates if the run is recapture or not.
bucket_name
:str, Optional
- The bucket name to save the data.
Returns
Union[str, None]
- if there is an error returns it traceback, otherwise returns None