Module pipelines.utils.utils
General utilities for all pipelines.
Functions
def batch_to_dataframe(batch: Tuple[Tuple], columns: List[str]) ‑> pandas.core.frame.DataFrame
-
Converts a batch of rows to a dataframe.
def build_redis_key(dataset_id: str, table_id: str, name: str = None, mode: str = 'prod')
-
Helper function for building a key to redis
def clean_dataframe(dataframe: pandas.core.frame.DataFrame) ‑> pandas.core.frame.DataFrame
-
Cleans a dataframe.
def compare_dates_between_tables_redis(key_table_1: str, format_date_table_1: str, key_table_2: str, format_date_table_2: str)
-
Function that checks if the date saved on the second table is bigger then the first one
def dataframe_to_csv(dataframe: pandas.core.frame.DataFrame, path: Union[str, pathlib.Path], build_json_dataframe: bool = False, dataframe_key_column: str = None) ‑> None
-
Writes a dataframe to CSV file.
def dataframe_to_parquet(dataframe: pandas.core.frame.DataFrame, path: Union[str, pathlib.Path], build_json_dataframe: bool = False, dataframe_key_column: str = None)
-
Writes a dataframe to Parquet file with Schema as STRING.
def delete_blobs_list(bucket_name: str, blobs: List[google.cloud.storage.blob.Blob], mode: str = 'prod') ‑> None
-
Deletes all blobs in the bucket that are in the blobs list. Mode needs to be "prod" or "staging"
def determine_whether_to_execute_or_not(cron_expression: str, datetime_now: datetime.datetime, datetime_last_execution: datetime.datetime) ‑> bool
-
Determines whether the cron expression is currently valid.
Args
cron_expression
- The cron expression to check.
datetime_now
- The current datetime.
datetime_last_execution
- The last datetime the cron expression was executed.
Returns
True if the cron expression should trigger, False otherwise.
def dump_header_to_file(data_path: Union[str, pathlib.Path], data_type: str = 'csv')
-
Writes a header to a CSV file.
def final_column_treatment(column: str) ‑> str
-
Adds an underline before column name if it only has numbers or remove all non alpha numeric characters besides underlines ("_").
def get_credentials_from_env(mode: str = 'prod', scopes: List[str] = None) ‑> google.oauth2.service_account.Credentials
-
Gets credentials from env vars
def get_redis_client(host: str = 'redis.redis.svc.cluster.local', port: int = 6379, db: int = 0, password: str = None) ‑> redis_pal.RedisPal.RedisPal
-
Returns a Redis client.
def get_redis_output(redis_key)
-
Get Redis output Example: {b'date': b'2023-02-27 07:29:04'}
def get_storage_blobs(dataset_id: str, table_id: str, mode: str = 'staging') ‑> list
-
Get all blobs from a table in a dataset.
Args
dataset_id
:str
- dataset id
table_id
:str
- table id
mode
:str
, optional- mode to use. Defaults to "staging".
Returns
list
- list of blobs
def get_username_and_password_from_secret(secret_path: str, client: hvac.v1.Client = None) ‑> Tuple[str, str]
-
Returns a username and password from a secret in Vault.
def get_vault_client() ‑> hvac.v1.Client
-
Returns a Vault client.
def get_vault_secret(secret_path: str, client: hvac.v1.Client = None) ‑> dict
-
Returns a secret from Vault.
def human_readable(value: Union[int, float], unit: str = '', unit_prefixes: List[str] = None, unit_divider: int = 1000, decimal_places: int = 2)
-
Formats a value in a human readable way.
def is_date(date_string: str, date_format: str = '%Y-%m-%d') ‑> Union[datetime.datetime, bool]
-
Checks whether a string is a valid date.
def list_blobs_with_prefix(bucket_name: str, prefix: str, mode: str = 'prod') ‑> List[google.cloud.storage.blob.Blob]
-
Lists all the blobs in the bucket that begin with the prefix. This can be used to list all blobs in a "folder", e.g. "public/". Mode needs to be "prod" or "staging"
def log(msg: Any, level: str = 'info') ‑> None
-
Logs a message to prefect's logger.
def log_mod(msg: Any, level: str = 'info', index: int = 0, mod: int = 1)
-
Only logs a message if the index is a multiple of mod.
def notify_discord_on_failure(flow: prefect.core.flow.Flow, state: prefect.engine.state.State, secret_path: str, code_owners: Optional[List[str]] = None)
-
Notifies a Discord channel when a flow fails.
def parse_date_columns(dataframe: pandas.core.frame.DataFrame, partition_date_column: str) ‑> Tuple[pandas.core.frame.DataFrame, List[str]]
-
Parses the date columns to the partition format.
def parser_blobs_to_partition_dict(blobs: list) ‑> dict
-
Extracts the partition information from the blobs.
def query_to_line(query: str) ‑> str
-
Converts a query to a line.
def remove_columns_accents(dataframe: pandas.core.frame.DataFrame) ‑> list
-
Remove accents from dataframe columns.
def remove_tabs_from_query(query: str) ‑> str
-
Removes tabs from a query.
def run_cloud(flow: prefect.core.flow.Flow, labels: List[str], parameters: Dict[str, Any] = None, run_description: str = '')
-
Runs a flow on Prefect Server (must have VPN configured).
def run_local(flow: prefect.core.flow.Flow, parameters: Dict[str, Any] = None)
-
Runs a flow locally.
def run_registered(flow_name: str, flow_project: str, labels: List[str], parameters: Dict[str, Any] = None, run_description: str = '') ‑> str
-
Runs an already registered flow on Prefect Server (must have credentials configured).
def save_str_on_redis(redis_key: str, key: str, value: str)
-
Function to save a string on redis
def save_updated_rows_on_redis(dataframe: pandas.core.frame.DataFrame, dataset_id: str, table_id: str, unique_id: str = 'id_estacao', date_column: str = 'data_medicao', date_format: str = '%Y-%m-%d %H:%M:%S', mode: str = 'prod') ‑> pandas.core.frame.DataFrame
-
Acess redis to get the last time each unique_id was updated, return updated unique_id as a DataFrame and save new dates on redis
def send_discord_message(message: str, webhook_url: str) ‑> None
-
Sends a message to a Discord channel.
def send_telegram_message(message: str, token: str, chat_id: int, parse_mode: str = 'HTML')
-
Sends a message to a Telegram chat.
def set_default_parameters(flow: prefect.core.flow.Flow, default_parameters: dict) ‑> prefect.core.flow.Flow
-
Sets default parameters for a flow.
def skip_if_running_handler(obj, old_state: prefect.engine.state.State, new_state: prefect.engine.state.State) ‑> prefect.engine.state.State
-
State handler that will skip a flow run if another instance of the flow is already running.
Adapted from Prefect Discourse: https://tinyurl.com/4hn5uz2w
def smart_split(text: str, max_length: int, separator: str = ' ') ‑> List[str]
-
Splits a string into a list of strings.
def to_json_dataframe(dataframe: pandas.core.frame.DataFrame = None, csv_path: Union[str, pathlib.Path] = None, key_column: str = None, read_csv_kwargs: dict = None, save_to: Union[str, pathlib.Path] = None) ‑> pandas.core.frame.DataFrame
-
Manipulates a dataframe by keeping key_column and moving every other column data to a "content" column in JSON format. Example:
- Input dataframe: pd.DataFrame({"key": ["a", "b", "c"], "col1": [1, 2, 3], "col2": [4, 5, 6]})
- Output dataframe: pd.DataFrame({ "key": ["a", "b", "c"], "content": [{"col1": 1, "col2": 4}, {"col1": 2, "col2": 5}, {"col1": 3, "col2": 6}] })
def to_partitions(data: pandas.core.frame.DataFrame, partition_columns: List[str], savepath: str, data_type: str = 'csv', suffix: str = None, build_json_dataframe: bool = False, dataframe_key_column: str = None) ‑> List[pathlib.Path]
-
Save data in to hive patitions schema, given a dataframe and a list of partition columns.
Args
data
:pandas.core.frame.DataFrame
- Dataframe to be partitioned.
partition_columns
:list
- List of columns to be used as partitions.
savepath
:str, pathlib.PosixPath
- folder path to save the partitions
Exemple
data = { "ano": [2020, 2021, 2020, 2021, 2020, 2021, 2021,2025], "mes": [1, 2, 3, 4, 5, 6, 6,9], "sigla_uf": ["SP", "SP", "RJ", "RJ", "PR", "PR", "PR","PR"], "dado": ["a", "b", "c", "d", "e", "f", "g",'h'], } to_partitions( data=pd.DataFrame(data), partition_columns=['ano','mes','sigla_uf'], savepath='partitions/' )
def treat_redis_output(text)
-
Redis returns a dict where both key and value are byte string Example: {b'date': b'2023-02-27 07:29:04'}
def untuple_clocks(clocks)
-
Converts a list of tuples to a list of clocks.
def upload_files_to_storage(bucket_name: str, prefix: str, local_path: Union[str, pathlib.Path] = None, files_list: List[str] = None, mode: str = 'prod')
-
Uploads all files from
local_path
tobucket_name
withprefix
. Mode needs to be "prod" or "staging"