Module pipelines.utils.utils

General utilities for all pipelines.

Functions

def batch_to_dataframe(batch: Tuple[Tuple], columns: List[str]) ‑> pandas.core.frame.DataFrame

Converts a batch of rows to a dataframe.

def build_redis_key(dataset_id: str, table_id: str, name: str = None, mode: str = 'prod')

Helper function for building a key to redis

def clean_dataframe(dataframe: pandas.core.frame.DataFrame) ‑> pandas.core.frame.DataFrame

Cleans a dataframe.

def compare_dates_between_tables_redis(key_table_1: str, format_date_table_1: str, key_table_2: str, format_date_table_2: str)

Function that checks if the date saved on the second table is bigger then the first one

def dataframe_to_csv(dataframe: pandas.core.frame.DataFrame, path: Union[str, pathlib.Path], build_json_dataframe: bool = False, dataframe_key_column: str = None) ‑> None

Writes a dataframe to CSV file.

def dataframe_to_parquet(dataframe: pandas.core.frame.DataFrame, path: Union[str, pathlib.Path], build_json_dataframe: bool = False, dataframe_key_column: str = None)

Writes a dataframe to Parquet file with Schema as STRING.

def delete_blobs_list(bucket_name: str, blobs: List[google.cloud.storage.blob.Blob], mode: str = 'prod') ‑> None

Deletes all blobs in the bucket that are in the blobs list. Mode needs to be "prod" or "staging"

def determine_whether_to_execute_or_not(cron_expression: str, datetime_now: datetime.datetime, datetime_last_execution: datetime.datetime) ‑> bool

Determines whether the cron expression is currently valid.

Args

cron_expression
The cron expression to check.
datetime_now
The current datetime.
datetime_last_execution
The last datetime the cron expression was executed.

Returns

True if the cron expression should trigger, False otherwise.

def dump_header_to_file(data_path: Union[str, pathlib.Path], data_type: str = 'csv')

Writes a header to a CSV file.

def final_column_treatment(column: str) ‑> str

Adds an underline before column name if it only has numbers or remove all non alpha numeric characters besides underlines ("_").

def get_credentials_from_env(mode: str = 'prod', scopes: List[str] = None) ‑> google.oauth2.service_account.Credentials

Gets credentials from env vars

def get_redis_client(host: str = 'redis.redis.svc.cluster.local', port: int = 6379, db: int = 0, password: str = None) ‑> redis_pal.RedisPal.RedisPal

Returns a Redis client.

def get_redis_output(redis_key)

Get Redis output Example: {b'date': b'2023-02-27 07:29:04'}

def get_storage_blobs(dataset_id: str, table_id: str, mode: str = 'staging') ‑> list

Get all blobs from a table in a dataset.

Args

dataset_id : str
dataset id
table_id : str
table id
mode : str, optional
mode to use. Defaults to "staging".

Returns

list
list of blobs
def get_username_and_password_from_secret(secret_path: str, client: hvac.v1.Client = None) ‑> Tuple[str, str]

Returns a username and password from a secret in Vault.

def get_vault_client() ‑> hvac.v1.Client

Returns a Vault client.

def get_vault_secret(secret_path: str, client: hvac.v1.Client = None) ‑> dict

Returns a secret from Vault.

def human_readable(value: Union[int, float], unit: str = '', unit_prefixes: List[str] = None, unit_divider: int = 1000, decimal_places: int = 2)

Formats a value in a human readable way.

def is_date(date_string: str, date_format: str = '%Y-%m-%d') ‑> Union[datetime.datetime, bool]

Checks whether a string is a valid date.

def list_blobs_with_prefix(bucket_name: str, prefix: str, mode: str = 'prod') ‑> List[google.cloud.storage.blob.Blob]

Lists all the blobs in the bucket that begin with the prefix. This can be used to list all blobs in a "folder", e.g. "public/". Mode needs to be "prod" or "staging"

def log(msg: Any, level: str = 'info') ‑> None

Logs a message to prefect's logger.

def log_mod(msg: Any, level: str = 'info', index: int = 0, mod: int = 1)

Only logs a message if the index is a multiple of mod.

def notify_discord_on_failure(flow: prefect.core.flow.Flow, state: prefect.engine.state.State, secret_path: str, code_owners: Optional[List[str]] = None)

Notifies a Discord channel when a flow fails.

def parse_date_columns(dataframe: pandas.core.frame.DataFrame, partition_date_column: str) ‑> Tuple[pandas.core.frame.DataFrame, List[str]]

Parses the date columns to the partition format.

def parser_blobs_to_partition_dict(blobs: list) ‑> dict

Extracts the partition information from the blobs.

def query_to_line(query: str) ‑> str

Converts a query to a line.

def remove_columns_accents(dataframe: pandas.core.frame.DataFrame) ‑> list

Remove accents from dataframe columns.

def remove_tabs_from_query(query: str) ‑> str

Removes tabs from a query.

def run_cloud(flow: prefect.core.flow.Flow, labels: List[str], parameters: Dict[str, Any] = None, run_description: str = '')

Runs a flow on Prefect Server (must have VPN configured).

def run_local(flow: prefect.core.flow.Flow, parameters: Dict[str, Any] = None)

Runs a flow locally.

def run_registered(flow_name: str, flow_project: str, labels: List[str], parameters: Dict[str, Any] = None, run_description: str = '') ‑> str

Runs an already registered flow on Prefect Server (must have credentials configured).

def save_str_on_redis(redis_key: str, key: str, value: str)

Function to save a string on redis

def save_updated_rows_on_redis(dataframe: pandas.core.frame.DataFrame, dataset_id: str, table_id: str, unique_id: str = 'id_estacao', date_column: str = 'data_medicao', date_format: str = '%Y-%m-%d %H:%M:%S', mode: str = 'prod') ‑> pandas.core.frame.DataFrame

Acess redis to get the last time each unique_id was updated, return updated unique_id as a DataFrame and save new dates on redis

def send_discord_message(message: str, webhook_url: str) ‑> None

Sends a message to a Discord channel.

def send_telegram_message(message: str, token: str, chat_id: int, parse_mode: str = 'HTML')

Sends a message to a Telegram chat.

def set_default_parameters(flow: prefect.core.flow.Flow, default_parameters: dict) ‑> prefect.core.flow.Flow

Sets default parameters for a flow.

def skip_if_running_handler(obj, old_state: prefect.engine.state.State, new_state: prefect.engine.state.State) ‑> prefect.engine.state.State

State handler that will skip a flow run if another instance of the flow is already running.

Adapted from Prefect Discourse: https://tinyurl.com/4hn5uz2w

def smart_split(text: str, max_length: int, separator: str = ' ') ‑> List[str]

Splits a string into a list of strings.

def to_json_dataframe(dataframe: pandas.core.frame.DataFrame = None, csv_path: Union[str, pathlib.Path] = None, key_column: str = None, read_csv_kwargs: dict = None, save_to: Union[str, pathlib.Path] = None) ‑> pandas.core.frame.DataFrame

Manipulates a dataframe by keeping key_column and moving every other column data to a "content" column in JSON format. Example:

  • Input dataframe: pd.DataFrame({"key": ["a", "b", "c"], "col1": [1, 2, 3], "col2": [4, 5, 6]})
  • Output dataframe: pd.DataFrame({ "key": ["a", "b", "c"], "content": [{"col1": 1, "col2": 4}, {"col1": 2, "col2": 5}, {"col1": 3, "col2": 6}] })
def to_partitions(data: pandas.core.frame.DataFrame, partition_columns: List[str], savepath: str, data_type: str = 'csv', suffix: str = None, build_json_dataframe: bool = False, dataframe_key_column: str = None) ‑> List[pathlib.Path]

Save data in to hive patitions schema, given a dataframe and a list of partition columns.

Args

data : pandas.core.frame.DataFrame
Dataframe to be partitioned.
partition_columns : list
List of columns to be used as partitions.
savepath : str, pathlib.PosixPath
folder path to save the partitions

Exemple

data = { "ano": [2020, 2021, 2020, 2021, 2020, 2021, 2021,2025], "mes": [1, 2, 3, 4, 5, 6, 6,9], "sigla_uf": ["SP", "SP", "RJ", "RJ", "PR", "PR", "PR","PR"], "dado": ["a", "b", "c", "d", "e", "f", "g",'h'], } to_partitions( data=pd.DataFrame(data), partition_columns=['ano','mes','sigla_uf'], savepath='partitions/' )

def treat_redis_output(text)

Redis returns a dict where both key and value are byte string Example: {b'date': b'2023-02-27 07:29:04'}

def untuple_clocks(clocks)

Converts a list of tuples to a list of clocks.

def upload_files_to_storage(bucket_name: str, prefix: str, local_path: Union[str, pathlib.Path] = None, files_list: List[str] = None, mode: str = 'prod')

Uploads all files from local_path to bucket_name with prefix. Mode needs to be "prod" or "staging"