Module pipelines.rj_smtr.utils

General purpose functions for rj_smtr

Functions

def bq_project(kind: str = 'bigquery_prod')

Get the set BigQuery project_id

Args

kind : str, optional
Which client to get the project name from.

Options are 'bigquery_staging', 'bigquery_prod' and 'storage_staging' Defaults to 'bigquery_prod'.

Returns

str
the requested project_id
def check_not_null(data: pandas.core.frame.DataFrame, columns: list, subset_query: str = None)

Check if there are null values in columns.

Args

columns : list
list of columns to check
subset_query : str
query to check if there are important data

being removed

Returns

None

def check_relation(data: pandas.core.frame.DataFrame, columns: list)

Check relation between collumns.

Args

data : pd.DataFrame
dataframe to be modified
columns : list
list of lists of columns to be checked

Returns

None

def close_db_connection(connection, engine: str)

Safely close a database connection

Args

connection
the database connection
engine : str
The datase management system
def connect_ftp(secret_path: str = None, secure: bool = True)

Connect to FTP

Returns

ImplicitFTP_TLS
ftp client
def create_bq_external_table(table_obj: basedosdados.upload.table.Table, path: str, bucket_name: str)

Creates an BigQuery External table based on sample data

Args

table_obj : Table
BD Table object
path : str
Table data local path
bucket_name : str, Optional
The bucket name where the data is located
def create_bq_table_schema(data_sample_path: Union[str, pathlib.Path]) ‑> list[google.cloud.bigquery.schema.SchemaField]

Create the bq schema based on the structure of data_sample_path.

Args

data_sample_path : str, Path
Data sample path to auto complete columns names

Returns

list[bigquery.SchemaField]
The table schema
def create_or_append_table(dataset_id: str, table_id: str, path: str, partitions: str = None, bucket_name: str = None)

Conditionally create table or append data to its relative GCS folder.

Args

dataset_id : str
target dataset_id on BigQuery
table_id : str
target table_id on BigQuery
path : str
Path to .csv data file
partitions : str
partition string.
bucket_name : str, Optional
The bucket name to save the data.
def custom_serialization(obj: Any) ‑> Any

Function to serialize not JSON serializable objects

Args

obj : Any
Object to serialize

Returns

Any
Serialized object
def data_info_str(data: pandas.core.frame.DataFrame)

Return dataframe info as a str to log

Args

data : pd.DataFrame
dataframe

Returns

data.info() as a string

def dict_contains_keys(input_dict: dict, keys: list[str]) ‑> bool

Test if the input dict has all keys present in the list

Args

input_dict : dict
the dict to test if has the keys
keys : list[str]
the list containing the keys to check

Returns

bool
True if the input_dict has all the keys otherwise False
def execute_db_query(engine: str, query: str, connection, connector, connection_info: dict) ‑> list[dict]

Execute a query if retries

Args

query : str
the SQL Query to execute
engine : str
The database management system
connection
The database connection
connector
The database connector (to do reconnections)
connection_info : dict
The database connector params (to do reconnections)

Returns

list[dict]
The query results
def filter_data(data: pandas.core.frame.DataFrame, filters: list, subset_query: str = None)

Filter data from a dataframe

Args

data : pd.DataFrame
data DataFrame
filters : list
list of queries to filter data

Returns

pandas.DataFrame
data without filter data
def filter_null(data: pandas.core.frame.DataFrame, columns: list, subset_query: str = None)

Filter null values in columns.

Args

columns : list
list of columns to check
subset_query : str
query to check if there are important data

being removed

Returns

pandas.DataFrame
data without null values
def format_send_discord_message(formatted_messages: list, webhook_url: str)

Format and send a message to discord

Args

formatted_messages : list
The formatted messages
webhook_url : str
The webhook url

Returns

None

def generate_df_and_save(data: dict, fname: pathlib.Path)

Save DataFrame as csv

Args

data : dict
dict with the data which to build the DataFrame
fname : Path
description
def generate_execute_schedules(clock_interval: datetime.timedelta, labels: List[str], table_parameters: Union[list[dict], dict], runs_interval_minutes: int = 15, start_date: datetime.datetime = datetime.datetime(2020, 1, 1, 0, 0, tzinfo=<DstTzInfo 'America/Sao_Paulo' LMT-1 day, 20:54:00 STD>), **general_flow_params) ‑> List[prefect.schedules.clocks.IntervalClock]

Generates multiple schedules

Args

clock_interval : timedelta
The interval to run the schedule
labels : List[str]
The labels to be added to the schedule
table_parameters : list
The table parameters to iterate over
runs_interval_minutes : int, optional
The interval between each schedule. Defaults to 15.
start_date : datetime, optional
The start date of the schedule. Defaults to datetime(2020, 1, 1, tzinfo=pytz.timezone(emd_constants.DEFAULT_TIMEZONE.value)).
general_flow_params
Any param that you want to pass to the flow

Returns

List[IntervalClock]
The list of schedules
def get_datetime_range(timestamp: datetime.datetime, interval: datetime.timedelta) ‑> dict

Task to get datetime range in UTC

Args

timestamp : datetime
timestamp to get datetime range
interval : timedelta
interval to get datetime range

Returns

dict
datetime range
def get_last_run_timestamp(dataset_id: str, table_id: str, mode: str = 'prod') ‑> str

Query redis to retrive the time for when the last materialization ran.

Args

dataset_id : str
dataset_id on BigQuery
table_id : str
model filename on the queries repo.
eg
if you have a model defined in the file .sql,

the table_id should be mode (str):

Returns

Union[str, None]
description
def get_raw_data_api(url: str, secret_path: str = None, api_params: dict = None, filetype: str = None) ‑> tuple[str, str, str]

Request data from URL API

Args

url : str
URL to request data
secret_path : str, optional
Secret path to get headers. Defaults to None.
api_params : dict, optional
Parameters to pass to API. Defaults to None.
filetype : str, optional
Filetype to save raw file. Defaults to None.

Returns

tuple[str, str, str]
Error, data and filetype
def get_raw_data_db(query: str, engine: str, host: str, secret_path: str, database: str, page_size: int = None, max_pages: int = None) ‑> tuple[str, str, str]

Get data from Databases

Args

query : str
the SQL Query to execute
engine : str
The database management system
host : str
The database host
secret_path : str
Secret path to get credentials
database : str
The database to connect
page_size : int, Optional
The maximum number of rows returned by the paginated query if you set a value for this argument, the query will have LIMIT and OFFSET appended to it
max_pages : int, Optional
The maximum number of paginated queries to execute

Returns

tuple[str, str, str]
Error, data and filetype
def get_raw_data_gcs(dataset_id: str, table_id: str, zip_filename: str = None, bucket_name: str = None) ‑> tuple[str, str, str]

Get raw data from GCS

Args

dataset_id : str
The dataset id on BigQuery.
table_id : str
The table id on BigQuery.
zip_filename : str, optional
The zip file name. Defaults to None.
bucket_name : str, Optional
The bucket name to get the data.

Returns

tuple[str, str, str]
Error, data and filetype
def get_raw_recursos(request_url: str, request_params: dict) ‑> tuple[str, str, str]

Returns a dataframe with recursos data from movidesk api.

def get_table_min_max_value(query_project_id: str, dataset_id: str, table_id: str, field_name: str, kind: str, wait=None)

Query a table to get the maximum value for the chosen field. Useful to incrementally materialize tables via DBT

Args

dataset_id : str
dataset_id on BigQuery
table_id : str
table_id on BigQuery
field_name : str
column name to query
kind : str
which value to get. Accepts min and max
def get_upload_storage_blob(dataset_id: str, filename: str, bucket_name: str = None) ‑> google.cloud.storage.blob.Blob

Get a blob from upload zone in storage

Args

dataset_id : str
The dataset id on BigQuery.
filename : str
The filename in GCS.
bucket_name : str, Optional
The bucket name to get the data.

Returns

Blob
blob object
def log_critical(message: str, secret_path: str = 'critical_webhook')

Logs message to critical discord channel specified

Args

message : str
Message to post on the channel
secret_path : str, optional
Secret path storing the webhook to critical channel.

Defaults to constants.CRITICAL_SECRETPATH.value.

def map_dict_keys(data: dict, mapping: dict) ‑> None

Map old keys to new keys in a dict.

def perform_check(desc: str, check_params: dict, request_params: dict) ‑> dict

Perform a check on a query

Args

desc : str
The check description
check_params : dict
The check parameters * query (str): SQL query to be executed * order_columns (list): order columns for query log results, in case of failure (optional)
request_params : dict
The request parameters

Returns

dict
The check status
def perform_checks_for_table(table_id: str, request_params: dict, test_check_list: dict, check_params: dict) ‑> dict

Perform checks for a table

Args

table_id : str
The table id
request_params : dict
The request parameters
test_check_list : dict
The test check list
check_params : dict
The check parameters

Returns

dict
The checks
def read_raw_data(filepath: str, reader_args: dict = None) ‑> tuple[str, pandas.core.frame.DataFrame]

Read raw data from file

Args

filepath : str
filepath to read
reader_args : dict
arguments to pass to pandas.read_csv or read_json

Returns

tuple[str, pd.DataFrame]
error and data
def safe_cast(val, to_type, default=None)

Safe cast value.

def save_raw_local_func(data: Union[dict, str], filepath: str, mode: str = 'raw', filetype: str = 'json') ‑> str

Saves json response from API to .json file.

Args

data : Union[dict, str]
Raw data to save
filepath : str
Path which to save raw file
mode : str, optional
Folder to save locally, later folder which to upload to GCS.
filetype : str, optional
The file format

Returns

str
Path to the saved file
def save_treated_local_func(filepath: str, data: pandas.core.frame.DataFrame, error: str, mode: str = 'staging') ‑> str

Save treated file to CSV.

Args

filepath : str
Path to save file
data : pd.DataFrame
Dataframe to save
error : str
Error catched during execution
mode : str, optional
Folder to save locally, later folder which to upload to GCS.

Returns

str
Path to the saved file
def set_redis_rdo_files(redis_client, dataset_id: str, table_id: str)

Register downloaded files to Redis

Args

redis_client : _type_
description
dataset_id : str
dataset_id on BigQuery
table_id : str
table_id on BigQuery

Returns

bool
if the key was properly set
def upload_run_logs_to_bq(dataset_id: str, parent_table_id: str, timestamp: str, error: str = None, previous_error: str = None, recapture: bool = False, mode: str = 'raw', bucket_name: str = None)

Upload execution status table to BigQuery. Table is uploaded to the same dataset, named {parent_table_id}_logs. If passing status_dict, should not pass timestamp and error.

Args

dataset_id : str
dataset_id on BigQuery
parent_table_id : str
table_id on BigQuery
timestamp : str
timestamp to get datetime range
error : str
error catched during execution
previous_error : str
previous error catched during execution
recapture : bool
if the execution was a recapture
mode : str
folder to save locally, later folder which to upload to GCS
bucket_name : str, Optional
The bucket name to save the data.

Returns

None