Module pipelines.rj_smtr.tasks

Tasks for rj_smtr

Functions

def bq_upload(dataset_id: str, table_id: str, filepath: str, raw_filepath: str = None, partitions: str = None, status: dict = None)

Upload raw and treated data to GCS and BigQuery.

Args

dataset_id : str
dataset_id on BigQuery
table_id : str
table_id on BigQuery
filepath : str
Path to the saved treated .csv file
raw_filepath : str, optional
Path to raw .json file. Defaults to None.
partitions : str, optional
Partitioned directory structure, ie "ano=2022/mes=03/data=01".
Defaults to None.
status : dict, optional
Dict containing error key from

upstream tasks.

Returns

None

def bq_upload_from_dict(paths: dict, dataset_id: str, partition_levels: int = 1)

Upload multiple tables from a dict structured as {table_id: csv_path}. Present use case assumes table partitioned once. Adjust the parameter 'partition_levels' to best suit new uses. i.e. if your csv is saved as: /date=/.csv it has 1 level of partition. if your csv file is saved as: /date=/hour=/.csv it has 2 levels of partition

Args

paths : dict
description
dataset_id : str
description

Returns

_type_
description
def build_incremental_model(dbt_client: dbt_client.dbt_client.DbtClient, dataset_id: str, base_table_id: str, mat_table_id: str, field_name: str = 'data_versao', refresh: bool = False, wait=None)

Utility task for backfilling table in predetermined steps. Assumes the step sizes will be defined on the .sql file.

Args

dbt_client : DbtClient
DBT interface object
dataset_id : str
Dataset id on BigQuery
base_table_id : str
Base table from which to materialize (usually, an external table)
mat_table_id : str
Target table id for materialization
field_name : str, optional
Key field (column) for dbt incremental filters.
Defaults to "data_versao".
refresh : bool, optional
If True, rebuild the table from scratch. Defaults to False.
wait : NoneType, optional
Placeholder parameter, used to wait previous tasks finish.

Defaults to None.

Returns

bool
whether the table was fully built or not.
def check_mapped_query_logs_output(query_logs_output: list[tuple]) ‑> bool

Task to check if there is recaptures pending

Args

query_logs_output : list[tuple]
the return from a mapped query_logs execution

Returns

bool
True if there is recaptures to do, otherwise False
def coalesce_task(value_list: Iterable)

Task to get the first non None value of a list

Args

value_list : Iterable
a iterable object with the values

Returns

any
value_list's first non None item
def create_date_hour_partition(timestamp: datetime.datetime, partition_date_name: str = 'data', partition_date_only: bool = False) ‑> str

Create a date (and hour) Hive partition structure from timestamp.

Args

timestamp : datetime
timestamp to be used as reference
partition_date_name : str, optional
partition name. Defaults to "data".
partition_date_only : bool, optional
whether to add hour partition or not

Returns

str
partition string
def create_dbt_run_vars(dataset_id: str, dbt_vars: dict, table_id: str, raw_dataset_id: str, raw_table_id: str, mode: str, timestamp: datetime.datetime) ‑> tuple[list[dict], typing.Union[list[dict], dict, NoneType], bool]

Create the variables to be used in dbt materialization based on a dict

Args

dataset_id : str
the dataset_id to get the variables
dbt_vars : dict
dict containing the parameters
table_id : str
the table_id get the date_range variable
raw_dataset_id : str
the raw_dataset_id get the date_range variable
raw_table_id : str
the raw_table_id get the date_range variable
mode : str
the mode to get the date_range variable

Returns

list[dict]
the variables to be used in DBT
Union[list[dict], dict, None]
the date variable (date_range or run_date)
bool
a flag that indicates if the date_range variable came from Redis
def create_local_partition_path(dataset_id: str, table_id: str, filename: str, partitions: str = None) ‑> str

Create the full path sctructure which to save data locally before upload.

Args

dataset_id : str
dataset_id on BigQuery
table_id : str
table_id on BigQuery
filename : str, optional
Single csv name
partitions : str, optional
Partitioned directory structure, ie "ano=2022/mes=03/data=01"

Returns

str
String path having mode and filetype to be replaced afterwards,

either to save raw or staging files.

def create_request_params(extract_params: dict, table_id: str, dataset_id: str, timestamp: datetime.datetime, interval_minutes: int) ‑> tuple[str, str]

Task to create request params

Args

extract_params : dict
extract parameters
table_id : str
table_id on BigQuery
dataset_id : str
dataset_id on BigQuery
timestamp : datetime
timestamp for flow run
interval_minutes : int
interval in minutes between each capture

Returns

request_params
host, database and query to request data
request_url
url to request data
def delay_now_time(timestamp: str, delay_minutes=6)

Return timestamp string delayed by

Args

timestamp : str
Isoformat timestamp string
delay_minutes : int, optional
Minutes to delay timestamp by Defaults to 6.

Returns

str
timestamp string formatted as "%Y-%m-%dT%H-%M-%S"
def fetch_dataset_sha(dataset_id: str)

Fetches the SHA of a branch from Github

def get_current_timestamp(timestamp=None, truncate_minute: bool = True, return_str: bool = False) ‑> Union[datetime.datetime, str]

Get current timestamp for flow run.

Args

timestamp
timestamp to be used as reference (optionally, it can be a string)
truncate_minute
whether to truncate the timestamp to the minute or not
return_str
if True, the return will be an isoformatted datetime string otherwise it returns a datetime object

Returns

Union[datetime, str]
timestamp for flow run
def get_join_dict(dict_list: list, new_dict: dict) ‑> List

Updates a list of dictionaries with a new dictionary.

def get_local_dbt_client(host: str, port: int)

Set a DBT client for running CLI commands. Requires building container image for your queries repository.

Args

host : str
hostname. When running locally, usually 'localhost'
port : int
the port number in which the DBT rpc is running

Returns

DbtClient
object used to run DBT commands.
def get_materialization_date_range(dataset_id: str, table_id: str, raw_dataset_id: str, raw_table_id: str, table_run_datetime_column_name: str = None, mode: str = 'prod', delay_hours: int = 0, end_ts: datetime.datetime = None)

Task for generating dict with variables to be passed to the –vars argument on DBT.

Args

dataset_id : str
dataset_id on BigQuery
table_id : str
model filename on the queries repo.
eg
if you have a model defined in the file .sql,
the table_id should be
table_date_column_name : Optional, str
if it's the first time this
is ran, will query the table for the maximum value on this field.
If rebuild is true, will query the table for the minimum value
on this field.
rebuild : Optional, bool
if true, queries the minimum date value on the

table and return a date range from that value to the datetime.now() time delay(Optional, int): hours delayed from now time for materialization range end_ts(Optional, datetime): date range's final date

Returns

dict
containing date_range_start and date_range_end
def get_previous_date(days)

Returns the date of {days} days ago in YYYY-MM-DD.

def get_raw(url: str, headers: str = None, filetype: str = 'json', csv_args: dict = None, params: dict = None) ‑> Dict

Request data from URL API

Args

url : str
URL to send request
headers : str, optional
Path to headers guardeded on Vault, if needed.
filetype : str, optional
Filetype to be formatted (supported only: json, csv and txt)
csv_args : dict, optional
Arguments for read_csv, if needed
params : dict, optional
Params to be sent on request

Returns

dict
Containing keys * data (json): data result * error (str): catched error, if any. Otherwise, returns None
def get_raw_from_sources(source_type: str, local_filepath: str, source_path: str = None, dataset_id: str = None, table_id: str = None, secret_path: str = None, request_params: dict = None) ‑> tuple[str, str]

Task to get raw data from sources

Args

source_type : str
source type
local_filepath : str
local filepath
source_path : str, optional
source path. Defaults to None.
dataset_id : str, optional
dataset_id on BigQuery. Defaults to None.
table_id : str, optional
table_id on BigQuery. Defaults to None.
secret_path : str, optional
secret path. Defaults to None.
request_params : dict, optional
request parameters. Defaults to None.

Returns

error
error catched from upstream tasks
filepath
filepath to raw data
def get_rounded_timestamp(timestamp: Union[str, datetime.datetime, ForwardRef(None)] = None, interval_minutes: Optional[int] = None) ‑> datetime.datetime

Calculate rounded timestamp for flow run.

Args

timestamp : Union[str, datetime, None]
timestamp to be used as reference
interval_minutes : Union[int, None], optional
interval in minutes between each recapture

Returns

datetime
timestamp for flow run
def get_run_dates(date_range_start: str, date_range_end: str, day_datetime: datetime.datetime = None) ‑> List

Generates a list of dates between date_range_start and date_range_end.

Args

date_range_start : str
the start date to create the date range
date_range_end : str
the end date to create the date range
day_datetime : datetime, Optional
a timestamp to use as run_date if the range start or end is False

Returns

list
the list of run_dates
def get_scheduled_start_times(timestamp: datetime.datetime, parameters: list, intervals: Optional[None] = None)

Task to get start times to schedule flows

Args

timestamp : datetime
initial flow run timestamp
parameters : list
parameters for the flow
intervals : Union[None, dict], optional
intervals between each flow run. Defaults to None. Optionally, you can pass specific intervals for some table_ids. Suggests to pass intervals based on previous table observed execution times. Defaults to dict(default=timedelta(minutes=2)).

Returns

list[datetime]
list of scheduled start times
def parse_timestamp_to_string(timestamp: datetime.datetime, pattern='%Y-%m-%d-%H-%M-%S') ‑> str

Parse timestamp to string pattern.

def query_logs(dataset_id: str, table_id: str, datetime_filter=None, max_recaptures: int = 90, interval_minutes: int = 1, recapture_window_days: int = 1)

Queries capture logs to check for errors

Args

dataset_id : str
dataset_id on BigQuery
table_id : str
table_id on BigQuery
datetime_filter (pendulum.datetime.DateTime, optional):
filter passed to query. This task will query the logs table
for the last n (n = recapture_window_days) days before datetime_filter
max_recaptures : int, optional
maximum number of recaptures to be done
interval_minutes : int, optional
interval in minutes between each recapture
recapture_window_days : int, optional
Number of days to query for erros

Returns

lists
errors (bool),

timestamps (list of pendulum.datetime.DateTime), previous_errors (list of previous errors)

def save_raw_local(file_path: str, status: dict, mode: str = 'raw') ‑> str

Saves json response from API to .json file.

Args

file_path : str
Path which to save raw file
status : dict
Must contain keys * data: json returned from API * error: error catched from API request
mode : str, optional
Folder to save locally, later folder which to upload to GCS.

Returns

str
Path to the saved file
def save_treated_local(file_path: str, status: dict, mode: str = 'staging') ‑> str

Save treated file to CSV.

Args

file_path : str
Path which to save treated file
status : dict
Must contain keys * data: dataframe returned from treatement * error: error catched from data treatement
mode : str, optional
Folder to save locally, later folder which to upload to GCS.

Returns

str
Path to the saved file
def set_last_run_timestamp(dataset_id: str, table_id: str, timestamp: str, mode: str = 'prod', wait=None)

Set the last_run_timestamp key for the dataset_id/table_id pair to datetime.now() time. Used after running a materialization to set the stage for the next to come

Args

dataset_id : str
dataset_id on BigQuery
table_id : str
model filename on the queries repo.
timestamp
Last run timestamp end.
wait : Any, optional
Used for defining dependencies inside the flow,

in general, pass the output of the task which should be run imediately before this. Defaults to None.

Returns

_type_
description
def transform_raw_to_nested_structure(raw_filepath: str, filepath: str, error: str, timestamp: datetime.datetime, primary_key: list = None, flag_private_data: bool = False, reader_args: dict = None) ‑> tuple[str, str]

Task to transform raw data to nested structure

Args

raw_filepath : str
Path to the saved raw .json file
filepath : str
Path to the saved treated .csv file
error : str
Error catched from upstream tasks
timestamp : datetime
timestamp for flow run
primary_key : list, optional
Primary key to be used on nested structure
flag_private_data : bool, optional
Flag to indicate if the task should log the data
reader_args : dict
arguments to pass to pandas.read_csv or read_json

Returns

str
Error traceback
str
Path to the saved treated .csv file
def unpack_mapped_results_nout2(mapped_results: Iterable) ‑> tuple[list[typing.Any], list[typing.Any]]

Task to unpack the results from an nout=2 tasks in 2 lists when it is mapped

Args

mapped_results : Iterable
The mapped task return

Returns

tuple[list[Any], list[Any]]
The task original return splited in 2 lists: - 1st list being all the first return - 2nd list being all the second return
def upload_logs_to_bq(dataset_id: str, parent_table_id: str, timestamp: str, error: str = None, previous_error: str = None, recapture: bool = False)

Upload execution status table to BigQuery. Table is uploaded to the same dataset, named {parent_table_id}_logs. If passing status_dict, should not pass timestamp and error.

Args

dataset_id : str
dataset_id on BigQuery
parent_table_id : str
Parent table id related to the status table
timestamp : str
ISO formatted timestamp string
error : str, optional
String associated with error caught during execution

Returns

None

def upload_raw_data_to_gcs(error: str, raw_filepath: str, table_id: str, dataset_id: str, partitions: list, bucket_name: str = None) ‑> Optional[str]

Upload raw data to GCS.

Args

error : str
Error catched from upstream tasks.
raw_filepath : str
Path to the saved raw .json file
table_id : str
table_id on BigQuery
dataset_id : str
dataset_id on BigQuery
partitions : list
list of partition strings

Returns

Union[str, None]
if there is an error returns it traceback, otherwise returns None
def upload_staging_data_to_gcs(error: str, staging_filepath: str, timestamp: datetime.datetime, table_id: str, dataset_id: str, partitions: str, previous_error: str = None, recapture: bool = False, bucket_name: str = None) ‑> Optional[str]

Upload staging data to GCS.

Args

error : str
Error catched from upstream tasks.
staging_filepath : str
Path to the saved treated .csv file.
timestamp : datetime
timestamp for flow run.
table_id : str
table_id on BigQuery.
dataset_id : str
dataset_id on BigQuery.
partitions : str
partition string.
previous_error : str, Optional
Previous error on recaptures.
recapture
(bool, Optional): Flag that indicates if the run is recapture or not.
bucket_name : str, Optional
The bucket name to save the data.

Returns

Union[str, None]
if there is an error returns it traceback, otherwise returns None