Module pipelines.rj_smtr.utils
General purpose functions for rj_smtr
Functions
def bq_project(kind: str = 'bigquery_prod')
-
Get the set BigQuery project_id
Args
kind
:str
, optional- Which client to get the project name from.
Options are 'bigquery_staging', 'bigquery_prod' and 'storage_staging' Defaults to 'bigquery_prod'.
Returns
str
- the requested project_id
def check_not_null(data: pandas.core.frame.DataFrame, columns: list, subset_query: str = None)
-
Check if there are null values in columns.
Args
columns
:list
- list of columns to check
subset_query
:str
- query to check if there are important data
being removed
Returns
None
def check_relation(data: pandas.core.frame.DataFrame, columns: list)
-
Check relation between collumns.
Args
data
:pd.DataFrame
- dataframe to be modified
columns
:list
- list of lists of columns to be checked
Returns
None
def close_db_connection(connection, engine: str)
-
Safely close a database connection
Args
connection
- the database connection
engine
:str
- The datase management system
def connect_ftp(secret_path: str = None, secure: bool = True)
-
Connect to FTP
Returns
ImplicitFTP_TLS
- ftp client
def create_bq_external_table(table_obj: basedosdados.upload.table.Table, path: str, bucket_name: str)
-
Creates an BigQuery External table based on sample data
Args
table_obj
:Table
- BD Table object
path
:str
- Table data local path
bucket_name
:str, Optional
- The bucket name where the data is located
def create_bq_table_schema(data_sample_path: Union[str, pathlib.Path]) ‑> list[google.cloud.bigquery.schema.SchemaField]
-
Create the bq schema based on the structure of data_sample_path.
Args
data_sample_path
:str, Path
- Data sample path to auto complete columns names
Returns
list[bigquery.SchemaField]
- The table schema
def create_or_append_table(dataset_id: str, table_id: str, path: str, partitions: str = None, bucket_name: str = None)
-
Conditionally create table or append data to its relative GCS folder.
Args
dataset_id
:str
- target dataset_id on BigQuery
table_id
:str
- target table_id on BigQuery
path
:str
- Path to .csv data file
partitions
:str
- partition string.
bucket_name
:str, Optional
- The bucket name to save the data.
def custom_serialization(obj: Any) ‑> Any
-
Function to serialize not JSON serializable objects
Args
obj
:Any
- Object to serialize
Returns
Any
- Serialized object
def data_info_str(data: pandas.core.frame.DataFrame)
-
Return dataframe info as a str to log
Args
data
:pd.DataFrame
- dataframe
Returns
data.info() as a string
def dict_contains_keys(input_dict: dict, keys: list[str]) ‑> bool
-
Test if the input dict has all keys present in the list
Args
input_dict
:dict
- the dict to test if has the keys
keys
:list[str]
- the list containing the keys to check
Returns
bool
- True if the input_dict has all the keys otherwise False
def execute_db_query(engine: str, query: str, connection, connector, connection_info: dict) ‑> list[dict]
-
Execute a query if retries
Args
query
:str
- the SQL Query to execute
engine
:str
- The database management system
connection
- The database connection
connector
- The database connector (to do reconnections)
connection_info
:dict
- The database connector params (to do reconnections)
Returns
list[dict]
- The query results
def filter_data(data: pandas.core.frame.DataFrame, filters: list, subset_query: str = None)
-
Filter data from a dataframe
Args
data
:pd.DataFrame
- data DataFrame
filters
:list
- list of queries to filter data
Returns
pandas.DataFrame
- data without filter data
def filter_null(data: pandas.core.frame.DataFrame, columns: list, subset_query: str = None)
-
Filter null values in columns.
Args
columns
:list
- list of columns to check
subset_query
:str
- query to check if there are important data
being removed
Returns
pandas.DataFrame
- data without null values
def format_send_discord_message(formatted_messages: list, webhook_url: str)
-
Format and send a message to discord
Args
formatted_messages
:list
- The formatted messages
webhook_url
:str
- The webhook url
Returns
None
def generate_df_and_save(data: dict, fname: pathlib.Path)
-
Save DataFrame as csv
Args
data
:dict
- dict with the data which to build the DataFrame
fname
:Path
- description
def generate_execute_schedules(clock_interval: datetime.timedelta, labels: List[str], table_parameters: Union[list[dict], dict], runs_interval_minutes: int = 15, start_date: datetime.datetime = datetime.datetime(2020, 1, 1, 0, 0, tzinfo=<DstTzInfo 'America/Sao_Paulo' LMT-1 day, 20:54:00 STD>), **general_flow_params) ‑> List[prefect.schedules.clocks.IntervalClock]
-
Generates multiple schedules
Args
clock_interval
:timedelta
- The interval to run the schedule
labels
:List[str]
- The labels to be added to the schedule
table_parameters
:list
- The table parameters to iterate over
runs_interval_minutes
:int
, optional- The interval between each schedule. Defaults to 15.
start_date
:datetime
, optional- The start date of the schedule. Defaults to datetime(2020, 1, 1, tzinfo=pytz.timezone(emd_constants.DEFAULT_TIMEZONE.value)).
general_flow_params
- Any param that you want to pass to the flow
Returns
List[IntervalClock]
- The list of schedules
def get_datetime_range(timestamp: datetime.datetime, interval: datetime.timedelta) ‑> dict
-
Task to get datetime range in UTC
Args
timestamp
:datetime
- timestamp to get datetime range
interval
:timedelta
- interval to get datetime range
Returns
dict
- datetime range
def get_last_run_timestamp(dataset_id: str, table_id: str, mode: str = 'prod') ‑> str
-
Query redis to retrive the time for when the last materialization ran.
Args
dataset_id
:str
- dataset_id on BigQuery
table_id
:str
- model filename on the queries repo.
eg
- if you have a model defined in the file
.sql,
the table_id should be
mode (str): Returns
Union[str, None]
- description
def get_raw_data_api(url: str, secret_path: str = None, api_params: dict = None, filetype: str = None) ‑> tuple[str, str, str]
-
Request data from URL API
Args
url
:str
- URL to request data
secret_path
:str
, optional- Secret path to get headers. Defaults to None.
api_params
:dict
, optional- Parameters to pass to API. Defaults to None.
filetype
:str
, optional- Filetype to save raw file. Defaults to None.
Returns
tuple[str, str, str]
- Error, data and filetype
def get_raw_data_db(query: str, engine: str, host: str, secret_path: str, database: str, page_size: int = None, max_pages: int = None) ‑> tuple[str, str, str]
-
Get data from Databases
Args
query
:str
- the SQL Query to execute
engine
:str
- The database management system
host
:str
- The database host
secret_path
:str
- Secret path to get credentials
database
:str
- The database to connect
page_size
:int, Optional
- The maximum number of rows returned by the paginated query if you set a value for this argument, the query will have LIMIT and OFFSET appended to it
max_pages
:int, Optional
- The maximum number of paginated queries to execute
Returns
tuple[str, str, str]
- Error, data and filetype
def get_raw_data_gcs(dataset_id: str, table_id: str, zip_filename: str = None, bucket_name: str = None) ‑> tuple[str, str, str]
-
Get raw data from GCS
Args
dataset_id
:str
- The dataset id on BigQuery.
table_id
:str
- The table id on BigQuery.
zip_filename
:str
, optional- The zip file name. Defaults to None.
bucket_name
:str, Optional
- The bucket name to get the data.
Returns
tuple[str, str, str]
- Error, data and filetype
def get_raw_recursos(request_url: str, request_params: dict) ‑> tuple[str, str, str]
-
Returns a dataframe with recursos data from movidesk api.
def get_table_min_max_value(query_project_id: str, dataset_id: str, table_id: str, field_name: str, kind: str, wait=None)
-
Query a table to get the maximum value for the chosen field. Useful to incrementally materialize tables via DBT
Args
dataset_id
:str
- dataset_id on BigQuery
table_id
:str
- table_id on BigQuery
field_name
:str
- column name to query
kind
:str
- which value to get. Accepts min and max
def get_upload_storage_blob(dataset_id: str, filename: str, bucket_name: str = None) ‑> google.cloud.storage.blob.Blob
-
Get a blob from upload zone in storage
Args
dataset_id
:str
- The dataset id on BigQuery.
filename
:str
- The filename in GCS.
bucket_name
:str, Optional
- The bucket name to get the data.
Returns
Blob
- blob object
def log_critical(message: str, secret_path: str = 'critical_webhook')
-
Logs message to critical discord channel specified
Args
message
:str
- Message to post on the channel
secret_path
:str
, optional- Secret path storing the webhook to critical channel.
Defaults to constants.CRITICAL_SECRETPATH.value.
def map_dict_keys(data: dict, mapping: dict) ‑> None
-
Map old keys to new keys in a dict.
def perform_check(desc: str, check_params: dict, request_params: dict) ‑> dict
-
Perform a check on a query
Args
desc
:str
- The check description
check_params
:dict
- The check parameters * query (str): SQL query to be executed * order_columns (list): order columns for query log results, in case of failure (optional)
request_params
:dict
- The request parameters
Returns
dict
- The check status
def perform_checks_for_table(table_id: str, request_params: dict, test_check_list: dict, check_params: dict) ‑> dict
-
Perform checks for a table
Args
table_id
:str
- The table id
request_params
:dict
- The request parameters
test_check_list
:dict
- The test check list
check_params
:dict
- The check parameters
Returns
dict
- The checks
def read_raw_data(filepath: str, reader_args: dict = None) ‑> tuple[str, pandas.core.frame.DataFrame]
-
Read raw data from file
Args
filepath
:str
- filepath to read
reader_args
:dict
- arguments to pass to pandas.read_csv or read_json
Returns
tuple[str, pd.DataFrame]
- error and data
def safe_cast(val, to_type, default=None)
-
Safe cast value.
def save_raw_local_func(data: Union[dict, str], filepath: str, mode: str = 'raw', filetype: str = 'json') ‑> str
-
Saves json response from API to .json file.
Args
data
:Union[dict, str]
- Raw data to save
filepath
:str
- Path which to save raw file
mode
:str
, optional- Folder to save locally, later folder which to upload to GCS.
filetype
:str
, optional- The file format
Returns
str
- Path to the saved file
def save_treated_local_func(filepath: str, data: pandas.core.frame.DataFrame, error: str, mode: str = 'staging') ‑> str
-
Save treated file to CSV.
Args
filepath
:str
- Path to save file
data
:pd.DataFrame
- Dataframe to save
error
:str
- Error catched during execution
mode
:str
, optional- Folder to save locally, later folder which to upload to GCS.
Returns
str
- Path to the saved file
def set_redis_rdo_files(redis_client, dataset_id: str, table_id: str)
-
Register downloaded files to Redis
Args
redis_client
:_type_
- description
dataset_id
:str
- dataset_id on BigQuery
table_id
:str
- table_id on BigQuery
Returns
bool
- if the key was properly set
def upload_run_logs_to_bq(dataset_id: str, parent_table_id: str, timestamp: str, error: str = None, previous_error: str = None, recapture: bool = False, mode: str = 'raw', bucket_name: str = None)
-
Upload execution status table to BigQuery. Table is uploaded to the same dataset, named {parent_table_id}_logs. If passing status_dict, should not pass timestamp and error.
Args
dataset_id
:str
- dataset_id on BigQuery
parent_table_id
:str
- table_id on BigQuery
timestamp
:str
- timestamp to get datetime range
error
:str
- error catched during execution
previous_error
:str
- previous error catched during execution
recapture
:bool
- if the execution was a recapture
mode
:str
- folder to save locally, later folder which to upload to GCS
bucket_name
:str, Optional
- The bucket name to save the data.
Returns
None