Module pipelines.utils.utils

General utilities for all pipelines.


def batch_to_dataframe(batch: Tuple[Tuple], columns: List[str]) ‑> pandas.core.frame.DataFrame

Converts a batch of rows to a dataframe.

def build_redis_key(dataset_id: str, table_id: str, name: str = None, mode: str = 'prod')

Helper function for building a key to redis

def clean_dataframe(dataframe: pandas.core.frame.DataFrame) ‑> pandas.core.frame.DataFrame

Cleans a dataframe.

def compare_dates_between_tables_redis(key_table_1: str, format_date_table_1: str, key_table_2: str, format_date_table_2: str)

Function that checks if the date saved on the second table is bigger then the first one

def dataframe_to_csv(dataframe: pandas.core.frame.DataFrame, path: Union[str, pathlib.Path], build_json_dataframe: bool = False, dataframe_key_column: str = None) ‑> None

Writes a dataframe to CSV file.

def dataframe_to_parquet(dataframe: pandas.core.frame.DataFrame, path: Union[str, pathlib.Path], build_json_dataframe: bool = False, dataframe_key_column: str = None)

Writes a dataframe to Parquet file with Schema as STRING.

def delete_blobs_list(bucket_name: str, blobs: List[], mode: str = 'prod') ‑> None

Deletes all blobs in the bucket that are in the blobs list. Mode needs to be "prod" or "staging"

def determine_whether_to_execute_or_not(cron_expression: str, datetime_now: datetime.datetime, datetime_last_execution: datetime.datetime) ‑> bool

Determines whether the cron expression is currently valid.


The cron expression to check.
The current datetime.
The last datetime the cron expression was executed.


True if the cron expression should trigger, False otherwise.

def dump_header_to_file(data_path: Union[str, pathlib.Path], data_type: str = 'csv')

Writes a header to a CSV file.

def final_column_treatment(column: str) ‑> str

Adds an underline before column name if it only has numbers or remove all non alpha numeric characters besides underlines ("_").

def get_credentials_from_env(mode: str = 'prod', scopes: List[str] = None) ‑> google.oauth2.service_account.Credentials

Gets credentials from env vars

def get_redis_client(host: str = 'redis.redis.svc.cluster.local', port: int = 6379, db: int = 0, password: str = None) ‑> redis_pal.RedisPal.RedisPal

Returns a Redis client.

def get_redis_output(redis_key)

Get Redis output Example: {b'date': b'2023-02-27 07:29:04'}

def get_storage_blobs(dataset_id: str, table_id: str, mode: str = 'staging') ‑> list

Get all blobs from a table in a dataset.


dataset_id : str
dataset id
table_id : str
table id
mode : str, optional
mode to use. Defaults to "staging".


list of blobs
def get_username_and_password_from_secret(secret_path: str, client: hvac.v1.Client = None) ‑> Tuple[str, str]

Returns a username and password from a secret in Vault.

def get_vault_client() ‑> hvac.v1.Client

Returns a Vault client.

def get_vault_secret(secret_path: str, client: hvac.v1.Client = None) ‑> dict

Returns a secret from Vault.

def human_readable(value: Union[int, float], unit: str = '', unit_prefixes: List[str] = None, unit_divider: int = 1000, decimal_places: int = 2)

Formats a value in a human readable way.

def is_date(date_string: str, date_format: str = '%Y-%m-%d') ‑> Union[datetime.datetime, bool]

Checks whether a string is a valid date.

def list_blobs_with_prefix(bucket_name: str, prefix: str, mode: str = 'prod') ‑> List[]

Lists all the blobs in the bucket that begin with the prefix. This can be used to list all blobs in a "folder", e.g. "public/". Mode needs to be "prod" or "staging"

def log(msg: Any, level: str = 'info') ‑> None

Logs a message to prefect's logger.

def log_mod(msg: Any, level: str = 'info', index: int = 0, mod: int = 1)

Only logs a message if the index is a multiple of mod.

def notify_discord_on_failure(flow: prefect.core.flow.Flow, state: prefect.engine.state.State, secret_path: str, code_owners: Optional[List[str]] = None)

Notifies a Discord channel when a flow fails.

def parse_date_columns(dataframe: pandas.core.frame.DataFrame, partition_date_column: str) ‑> Tuple[pandas.core.frame.DataFrame, List[str]]

Parses the date columns to the partition format.

def parser_blobs_to_partition_dict(blobs: list) ‑> dict

Extracts the partition information from the blobs.

def query_to_line(query: str) ‑> str

Converts a query to a line.

def remove_columns_accents(dataframe: pandas.core.frame.DataFrame) ‑> list

Remove accents from dataframe columns.

def remove_tabs_from_query(query: str) ‑> str

Removes tabs from a query.

def run_cloud(flow: prefect.core.flow.Flow, labels: List[str], parameters: Dict[str, Any] = None, run_description: str = '')

Runs a flow on Prefect Server (must have VPN configured).

def run_local(flow: prefect.core.flow.Flow, parameters: Dict[str, Any] = None)

Runs a flow locally.

def run_registered(flow_name: str, flow_project: str, labels: List[str], parameters: Dict[str, Any] = None, run_description: str = '') ‑> str

Runs an already registered flow on Prefect Server (must have credentials configured).

def save_str_on_redis(redis_key: str, key: str, value: str)

Function to save a string on redis

def save_updated_rows_on_redis(dataframe: pandas.core.frame.DataFrame, dataset_id: str, table_id: str, unique_id: str = 'id_estacao', date_column: str = 'data_medicao', date_format: str = '%Y-%m-%d %H:%M:%S', mode: str = 'prod') ‑> pandas.core.frame.DataFrame

Acess redis to get the last time each unique_id was updated, return updated unique_id as a DataFrame and save new dates on redis

def send_discord_message(message: str, webhook_url: str) ‑> None

Sends a message to a Discord channel.

def send_telegram_message(message: str, token: str, chat_id: int, parse_mode: str = 'HTML')

Sends a message to a Telegram chat.

def set_default_parameters(flow: prefect.core.flow.Flow, default_parameters: dict) ‑> prefect.core.flow.Flow

Sets default parameters for a flow.

def skip_if_running_handler(obj, old_state: prefect.engine.state.State, new_state: prefect.engine.state.State) ‑> prefect.engine.state.State

State handler that will skip a flow run if another instance of the flow is already running.

Adapted from Prefect Discourse:

def smart_split(text: str, max_length: int, separator: str = ' ') ‑> List[str]

Splits a string into a list of strings.

def to_json_dataframe(dataframe: pandas.core.frame.DataFrame = None, csv_path: Union[str, pathlib.Path] = None, key_column: str = None, read_csv_kwargs: dict = None, save_to: Union[str, pathlib.Path] = None) ‑> pandas.core.frame.DataFrame

Manipulates a dataframe by keeping key_column and moving every other column data to a "content" column in JSON format. Example:

  • Input dataframe: pd.DataFrame({"key": ["a", "b", "c"], "col1": [1, 2, 3], "col2": [4, 5, 6]})
  • Output dataframe: pd.DataFrame({ "key": ["a", "b", "c"], "content": [{"col1": 1, "col2": 4}, {"col1": 2, "col2": 5}, {"col1": 3, "col2": 6}] })
def to_partitions(data: pandas.core.frame.DataFrame, partition_columns: List[str], savepath: str, data_type: str = 'csv', suffix: str = None, build_json_dataframe: bool = False, dataframe_key_column: str = None) ‑> List[pathlib.Path]

Save data in to hive patitions schema, given a dataframe and a list of partition columns.


data : pandas.core.frame.DataFrame
Dataframe to be partitioned.
partition_columns : list
List of columns to be used as partitions.
savepath : str, pathlib.PosixPath
folder path to save the partitions


data = { "ano": [2020, 2021, 2020, 2021, 2020, 2021, 2021,2025], "mes": [1, 2, 3, 4, 5, 6, 6,9], "sigla_uf": ["SP", "SP", "RJ", "RJ", "PR", "PR", "PR","PR"], "dado": ["a", "b", "c", "d", "e", "f", "g",'h'], } to_partitions( data=pd.DataFrame(data), partition_columns=['ano','mes','sigla_uf'], savepath='partitions/' )

def treat_redis_output(text)

Redis returns a dict where both key and value are byte string Example: {b'date': b'2023-02-27 07:29:04'}

def untuple_clocks(clocks)

Converts a list of tuples to a list of clocks.

def upload_files_to_storage(bucket_name: str, prefix: str, local_path: Union[str, pathlib.Path] = None, files_list: List[str] = None, mode: str = 'prod')

Uploads all files from local_path to bucket_name with prefix. Mode needs to be "prod" or "staging"