Module pipelines.utils.utils

General utilities for all pipelines.


def batch_to_dataframe(batch: Tuple[Tuple], columns: List[str]) ‑> pandas.core.frame.DataFrame
Expand source code
def batch_to_dataframe(batch: Tuple[Tuple], columns: List[str]) -> pd.DataFrame:
    Converts a batch of rows to a dataframe.
    return pd.DataFrame(batch, columns=columns)

Converts a batch of rows to a dataframe.

def build_redis_key(dataset_id: str, table_id: str, name: str = None, mode: str = 'prod')
Expand source code
def build_redis_key(
    dataset_id: str, table_id: str, name: str = None, mode: str = "prod"
    Helper function for building a key to redis
    key = dataset_id + "." + table_id
    if name:
        key = key + "." + name
    if mode == "dev":
        key = f"{mode}.{key}"
    return key

Helper function for building a key to redis

def clean_dataframe(dataframe: pandas.core.frame.DataFrame) ‑> pandas.core.frame.DataFrame
Expand source code
def clean_dataframe(dataframe: pd.DataFrame) -> pd.DataFrame:
    Cleans a dataframe.
    for col in dataframe.columns.tolist():
        if dataframe[col].dtype == object:
                dataframe[col] = (
                    .str.replace("\x00", "")
                    .replace("None", np.nan)
            except Exception as exc:
                    "Column: ",
                    "\nData: ",
    return dataframe

Cleans a dataframe.

def compare_dates_between_tables_redis(key_table_1: str,
format_date_table_1: str,
key_table_2: str,
format_date_table_2: str)
Expand source code
def compare_dates_between_tables_redis(
    key_table_1: str,
    format_date_table_1: str,
    key_table_2: str,
    format_date_table_2: str,
    Function that checks if the date saved on the second
    table is bigger then the first one

    # get saved date on redis
    date_1 = get_redis_output(key_table_1)
    date_2 = get_redis_output(key_table_2)

    # Return true if there is no date_1 or date_2 saved on redis
    if (len(date_1) == 0) | (len(date_2) == 0):
        return True

    # Convert date to pendulum
    date_1 = pendulum.from_format(date_1["date"], format_date_table_1)
    date_2 = pendulum.from_format(date_2["date"], format_date_table_2)
    comparison = date_1 < date_2
    log(f"Is {date_2} bigger than {date_1}? {comparison}")
    return comparison

Function that checks if the date saved on the second table is bigger then the first one

def dataframe_to_csv(dataframe: pandas.core.frame.DataFrame,
path: str | pathlib.Path,
build_json_dataframe: bool = False,
dataframe_key_column: str = None) ‑> None
Expand source code
def dataframe_to_csv(
    dataframe: pd.DataFrame,
    path: Union[str, Path],
    build_json_dataframe: bool = False,
    dataframe_key_column: str = None,
) -> None:
    Writes a dataframe to CSV file.
    if build_json_dataframe:
        dataframe = to_json_dataframe(dataframe, key_column=dataframe_key_column)

    # Remove filename from path
    path = Path(path)
    # Create directory if it doesn't exist
    path.parent.mkdir(parents=True, exist_ok=True)

    # Write dataframe to CSV
    dataframe.to_csv(path, index=False, encoding="utf-8")

Writes a dataframe to CSV file.

def dataframe_to_parquet(dataframe: pandas.core.frame.DataFrame,
path: str | pathlib.Path,
build_json_dataframe: bool = False,
dataframe_key_column: str = None)
Expand source code
def dataframe_to_parquet(
    dataframe: pd.DataFrame,
    path: Union[str, Path],
    build_json_dataframe: bool = False,
    dataframe_key_column: str = None,
    Writes a dataframe to Parquet file with Schema as STRING.
    # Code adapted from

    if build_json_dataframe:
        dataframe = to_json_dataframe(dataframe, key_column=dataframe_key_column)

    # If the file already exists, we:
    # - Load it
    # - Merge the new dataframe with the existing one
    if Path(path).exists():
        # Load it
        original_df = pd.read_parquet(path)
        # Merge the new dataframe with the existing one
        dataframe = pd.concat([original_df, dataframe], sort=False)

    # Write dataframe to Parquet
    dataframe.to_parquet(path, engine="pyarrow")

Writes a dataframe to Parquet file with Schema as STRING.

def delete_blobs_list(bucket_name: str,
blobs: List[],
mode: str = 'prod') ‑> None
Expand source code
def delete_blobs_list(bucket_name: str, blobs: List[Blob], mode: str = "prod") -> None:
    Deletes all blobs in the bucket that are in the blobs list.
    Mode needs to be "prod" or "staging"

    credentials = get_credentials_from_env(mode=mode)
    storage_client = storage.Client(credentials=credentials)

    bucket = storage_client.bucket(bucket_name)

Deletes all blobs in the bucket that are in the blobs list. Mode needs to be "prod" or "staging"

def determine_whether_to_execute_or_not(cron_expression: str,
datetime_now: datetime.datetime,
datetime_last_execution: datetime.datetime) ‑> bool
Expand source code
def determine_whether_to_execute_or_not(
    cron_expression: str, datetime_now: datetime, datetime_last_execution: datetime
) -> bool:
    Determines whether the cron expression is currently valid.

        cron_expression: The cron expression to check.
        datetime_now: The current datetime.
        datetime_last_execution: The last datetime the cron expression was executed.

        True if the cron expression should trigger, False otherwise.
    cron_expression_iterator = croniter.croniter(
        cron_expression, datetime_last_execution
    next_cron_expression_time = cron_expression_iterator.get_next(datetime)
    return next_cron_expression_time <= datetime_now

Determines whether the cron expression is currently valid.


The cron expression to check.
The current datetime.
The last datetime the cron expression was executed.


True if the cron expression should trigger, False otherwise.

def dump_header_to_file(data_path: str | pathlib.Path, data_type: str = 'csv')
Expand source code
def dump_header_to_file(data_path: Union[str, Path], data_type: str = "csv"):
    Writes a header to a CSV file.
        assert data_type in ["csv", "parquet"]
    except AssertionError as exc:
        raise ValueError(f"Invalid data type: {data_type}") from exc
    # Remove filename from path
    path = Path(data_path)
    if not path.is_dir():
        path = path.parent
    # Grab first `data_type` file found
    found: bool = False
    file: str = None
    for subdir, _, filenames in walk(str(path)):
        for fname in filenames:
            if fname.endswith(f".{data_type}"):
                file = join(subdir, fname)
                log(f"Found {data_type.upper()} file: {file}")
                found = True
        if found:

    save_header_path = f"data/{uuid4()}"
    # discover if it's a partitioned table
    if partition_folders := [folder for folder in file.split("/") if "=" in folder]:
        partition_path = "/".join(partition_folders)
        save_header_file_path = Path(
        log(f"Found partition path: {save_header_file_path}")

        save_header_file_path = Path(f"{save_header_path}/header.{data_type}")
        log(f"Do not found partition path: {save_header_file_path}")

    # Create directory if it doesn't exist
    save_header_file_path.parent.mkdir(parents=True, exist_ok=True)

    # Read just first row and write dataframe to file
    if data_type == "csv":
        dataframe = pd.read_csv(file, nrows=1)
        dataframe.to_csv(save_header_file_path, index=False, encoding="utf-8")
    elif data_type == "parquet":
        dataframe = pd.read_parquet(file)[:1]
        dataframe_to_parquet(dataframe=dataframe, path=save_header_file_path)

    log(f"Wrote {data_type.upper()} header at {save_header_file_path}")

    return save_header_path

Writes a header to a CSV file.

def final_column_treatment(column: str) ‑> str
Expand source code
def final_column_treatment(column: str) -> str:
    Adds an underline before column name if it only has numbers or remove all non alpha numeric
    characters besides underlines ("_").
        return f"_{column}"
    except ValueError:  # pylint: disable=bare-except
        non_alpha_removed = re.sub(r"[\W]+", "", column)
        return non_alpha_removed

Adds an underline before column name if it only has numbers or remove all non alpha numeric characters besides underlines ("_").

def get_credentials_from_env(mode: str = 'prod', scopes: List[str] = None) ‑> google.oauth2.service_account.Credentials
Expand source code
def get_credentials_from_env(
    mode: str = "prod", scopes: List[str] = None
) -> service_account.Credentials:
    Gets credentials from env vars
    if mode not in ["prod", "staging"]:
        raise ValueError("Mode must be 'prod' or 'staging'")
    env: str = getenv(f"BASEDOSDADOS_CREDENTIALS_{mode.upper()}", "")
    if env == "":
        raise ValueError(f"BASEDOSDADOS_CREDENTIALS_{mode.upper()} env var not set!")
    info: dict = json.loads(base64.b64decode(env))
    cred: service_account.Credentials = (
    if scopes:
        cred = cred.with_scopes(scopes)
    return cred

Gets credentials from env vars

def get_redis_client(host: str = 'redis.redis.svc.cluster.local',
port: int = 6379,
db: int = 0,
password: str = None) ‑> redis_pal.RedisPal.RedisPal
Expand source code
def get_redis_client(
    host: str = "redis.redis.svc.cluster.local",
    port: int = 6379,
    db: int = 0,  # pylint: disable=C0103
    password: str = None,
) -> RedisPal:
    Returns a Redis client.
    return RedisPal(

Returns a Redis client.

def get_redis_output(redis_key)
Expand source code
def get_redis_output(redis_key):
    Get Redis output
    Example: {b'date': b'2023-02-27 07:29:04'}
    redis_client = get_redis_client()
    output = redis_client.hgetall(redis_key)
    if len(output) > 0:
        output = treat_redis_output(output)
    return output

Get Redis output Example: {b'date': b'2023-02-27 07:29:04'}

def get_storage_blobs(dataset_id: str, table_id: str, mode: str = 'staging') ‑> list
Expand source code
def get_storage_blobs(dataset_id: str, table_id: str, mode: str = "staging") -> list:
    Get all blobs from a table in a dataset.

        dataset_id (str): dataset id
        table_id (str): table id
        mode (str, optional): mode to use. Defaults to "staging".

        list: list of blobs

    bd_storage = bd.Storage(dataset_id=dataset_id, table_id=table_id)
    return list(

Get all blobs from a table in a dataset.


dataset_id : str
dataset id
table_id : str
table id
mode : str, optional
mode to use. Defaults to "staging".


list of blobs
def get_username_and_password_from_secret(secret_path: str, client: hvac.v1.Client = None) ‑> Tuple[str, str]
Expand source code
def get_username_and_password_from_secret(
    secret_path: str,
    client: hvac.Client = None,
) -> Tuple[str, str]:
    Returns a username and password from a secret in Vault.
    secret = get_vault_secret(secret_path, client)
    return (

Returns a username and password from a secret in Vault.

def get_vault_client() ‑> hvac.v1.Client
Expand source code
def get_vault_client() -> hvac.Client:
    Returns a Vault client.
    return hvac.Client(

Returns a Vault client.

def get_vault_secret(secret_path: str, client: hvac.v1.Client = None) ‑> dict
Expand source code
def get_vault_secret(secret_path: str, client: hvac.Client = None) -> dict:
    Returns a secret from Vault.
    vault_client = client or get_vault_client()
    return vault_client.secrets.kv.read_secret_version(secret_path)["data"]

Returns a secret from Vault.

def human_readable(value: int | float,
unit: str = '',
unit_prefixes: List[str] = None,
unit_divider: int = 1000,
decimal_places: int = 2)
Expand source code
def human_readable(
    value: Union[int, float],
    unit: str = "",
    unit_prefixes: List[str] = None,
    unit_divider: int = 1000,
    decimal_places: int = 2,
    Formats a value in a human readable way.
    if unit_prefixes is None:
        unit_prefixes = ["", "k", "M", "G", "T", "P", "E", "Z", "Y"]
    if value == 0:
        return f"{value}{unit}"
    unit_prefix = unit_prefixes[0]
    for prefix in unit_prefixes[1:]:
        if value < unit_divider:
        unit_prefix = prefix
        value /= unit_divider
    return f"{value:.{decimal_places}f}{unit_prefix}{unit}"

Formats a value in a human readable way.

def is_date(date_string: str, date_format: str = '%Y-%m-%d') ‑> datetime.datetime | bool
Expand source code
def is_date(date_string: str, date_format: str = "%Y-%m-%d") -> Union[datetime, bool]:
    Checks whether a string is a valid date.
        return datetime.strptime(date_string, date_format).strftime(date_format)
    except ValueError:
        return False

Checks whether a string is a valid date.

def list_blobs_with_prefix(bucket_name: str, prefix: str, mode: str = 'prod') ‑> List[]
Expand source code
def list_blobs_with_prefix(
    bucket_name: str, prefix: str, mode: str = "prod"
) -> List[Blob]:
    Lists all the blobs in the bucket that begin with the prefix.
    This can be used to list all blobs in a "folder", e.g. "public/".
    Mode needs to be "prod" or "staging"

    credentials = get_credentials_from_env(mode=mode)
    storage_client = storage.Client(credentials=credentials)

    # Note: Client.list_blobs requires at least package version 1.17.0.
    blobs = storage_client.list_blobs(bucket_name, prefix=prefix)

    return list(blobs)

Lists all the blobs in the bucket that begin with the prefix. This can be used to list all blobs in a "folder", e.g. "public/". Mode needs to be "prod" or "staging"

def log(msg: Any, level: str = 'info') ‑> None
Expand source code
def log(msg: Any, level: str = "info") -> None:
    Logs a message to prefect's logger.
    levels = {
        "debug": logging.DEBUG,
        "info": logging.INFO,
        "warning": logging.WARNING,
        "error": logging.ERROR,
        "critical": logging.CRITICAL,

    blank_spaces = 8 * " "
    msg = blank_spaces + "----\n" + str(msg)
    msg = "\n".join([blank_spaces + line for line in msg.split("\n")]) + "\n\n"

    if level not in levels:
        raise ValueError(f"Invalid log level: {level}")
    prefect.context.logger.log(levels[level], msg)  # pylint: disable=E1101

Logs a message to prefect's logger.

def log_mod(msg: Any, level: str = 'info', index: int = 0, mod: int = 1)
Expand source code
def log_mod(msg: Any, level: str = "info", index: int = 0, mod: int = 1):
    Only logs a message if the index is a multiple of mod.
    if index % mod == 0 or index == 0:
        log(msg=f"iteration {index}:\n {msg}", level=level)

Only logs a message if the index is a multiple of mod.

def notify_discord_on_failure(flow: prefect.core.flow.Flow,
state: prefect.engine.state.State,
secret_path: str,
code_owners: List[str] | None = None)
Expand source code
def notify_discord_on_failure(
    flow: prefect.Flow,
    state: State,
    secret_path: str,
    code_owners: Optional[List[str]] = None,
    Notifies a Discord channel when a flow fails.
    url = get_vault_secret(secret_path)["data"]["url"]
    flow_run_id = prefect.context.get("flow_run_id")
    code_owners = code_owners or constants.DEFAULT_CODE_OWNERS.value
    code_owner_dict = constants.OWNERS_DISCORD_MENTIONS.value
    at_code_owners = []
    for code_owner in code_owners:
        code_owner_id = code_owner_dict[code_owner]["user_id"]
        code_owner_type = code_owner_dict[code_owner]["type"]

        if code_owner_type == "user":
            at_code_owners.append(f"    - <@{code_owner_id}>\n")
        elif code_owner_type == "user_nickname":
            at_code_owners.append(f"    - <@!{code_owner_id}>\n")
        elif code_owner_type == "channel":
            at_code_owners.append(f"    - <#{code_owner_id}>\n")
        elif code_owner_type == "role":
            at_code_owners.append(f"    - <@&{code_owner_id}>\n")

    message = (
        f":man_facepalming: Flow **{}** has failed."
        + f'\n  - State message: *"{state.message}"*'
        + "\n  - Link to the failed flow: "
        + f"{flow_run_id}"
        + "\n  - Extra attention:\n"
        + "".join(at_code_owners)

Notifies a Discord channel when a flow fails.

def parse_date_columns(dataframe: pandas.core.frame.DataFrame, partition_date_column: str) ‑> Tuple[pandas.core.frame.DataFrame, List[str]]
Expand source code
def parse_date_columns(
    dataframe: pd.DataFrame, partition_date_column: str
) -> Tuple[pd.DataFrame, List[str]]:
    Parses the date columns to the partition format.
    ano_col = "ano_particao"
    mes_col = "mes_particao"
    data_col = "data_particao"
    cols = [ano_col, mes_col, data_col]
    for col in cols:
        if col in dataframe.columns:
            raise ValueError(f"Column {col} already exists, please review your model.")

    dataframe[partition_date_column] = dataframe[partition_date_column].astype(str)
    dataframe[data_col] = pd.to_datetime(
        dataframe[partition_date_column], errors="coerce"

    dataframe[ano_col] = (
        .replace("-1", np.nan)

    dataframe[mes_col] = (
        .replace("-1", np.nan)

    dataframe[data_col] = dataframe[data_col]

    return dataframe, [ano_col, mes_col, data_col]

Parses the date columns to the partition format.

def parser_blobs_to_partition_dict(blobs: list) ‑> dict
Expand source code
def parser_blobs_to_partition_dict(blobs: list) -> dict:
    Extracts the partition information from the blobs.

    partitions_dict = {}
    for blob in blobs:
        for folder in"/"):
            if "=" in folder:
                key = folder.split("=")[0]
                value = folder.split("=")[1]
                except KeyError:
                    partitions_dict[key] = [value]
    return partitions_dict

Extracts the partition information from the blobs.

def query_to_line(query: str) ‑> str
Expand source code
def query_to_line(query: str) -> str:
    Converts a query to a line.
    query = textwrap.dedent(query)
    return " ".join([line.strip() for line in query.split("\n")])

Converts a query to a line.

def remove_columns_accents(dataframe: pandas.core.frame.DataFrame) ‑> list
Expand source code
def remove_columns_accents(dataframe: pd.DataFrame) -> list:
    Remove accents from dataframe columns.
    columns = [str(column) for column in dataframe.columns]
    dataframe.columns = columns
    return list(
        .str.encode("ascii", errors="ignore")
        .map(lambda x: x.strip())
        .str.replace(" ", "_")
        .str.replace("/", "_")
        .str.replace("-", "_")
        .str.replace("\a", "_")
        .str.replace("\b", "_")
        .str.replace("\n", "_")
        .str.replace("\t", "_")
        .str.replace("\v", "_")
        .str.replace("\f", "_")
        .str.replace("\r", "_")

Remove accents from dataframe columns.

def remove_tabs_from_query(query: str) ‑> str
Expand source code
def remove_tabs_from_query(query: str) -> str:
    Removes tabs from a query.
    query = query_to_line(query)
    return re.sub(r"\s+", " ", query).strip()

Removes tabs from a query.

def run_cloud(flow: prefect.core.flow.Flow,
labels: List[str],
parameters: Dict[str, Any] = None,
run_description: str = '')
Expand source code
def run_cloud(
    flow: prefect.Flow,
    labels: List[str],
    parameters: Dict[str, Any] = None,
    run_description: str = "",
    Runs a flow on Prefect Server (must have VPN configured).
    # Setup no schedule
    flow.schedule = None

    # Change flow name for development and register = f"{} (development)"
    flow.run_config = KubernetesRun(image="")
    flow_id = flow.register(project_name="main", labels=[])

    # Get Prefect Client and submit flow run
    client = Client()
    flow_run_id = client.create_flow_run(
        run_name=f"TEST RUN - {run_description} - {}",

    # Print flow run link so user can check it
    print(f"Run submitted: TEST RUN - {run_description} - {}")
    print(f"Please check at:{flow_run_id}")

Runs a flow on Prefect Server (must have VPN configured).

def run_local(flow: prefect.core.flow.Flow, parameters: Dict[str, Any] = None)
Expand source code
def run_local(flow: prefect.Flow, parameters: Dict[str, Any] = None):
    Runs a flow locally.
    # Setup for local run = None
    flow.run_config = None
    flow.schedule = None

    # Run flow
    return if parameters else

Runs a flow locally.

def run_registered(flow_name: str,
flow_project: str,
labels: List[str],
parameters: Dict[str, Any] = None,
run_description: str = '') ‑> str
Expand source code
def run_registered(
    flow_name: str,
    flow_project: str,
    labels: List[str],
    parameters: Dict[str, Any] = None,
    run_description: str = "",
) -> str:
    Runs an already registered flow on Prefect Server (must have credentials configured).
    # Get Prefect Client and submit flow run
    client = Client()
    flow_result = client.graphql(
            "query": {
                        "where": {
                            "_and": [
                                {"name": {"_eq": flow_name}},
                                {"archived": {"_eq": False}},
                                {"project": {"name": {"_eq": flow_project}}},
                ): {
    flows_found =
    if len(flows_found) == 0:
        raise ValueError(f"Flow {flow_name} not found.")
    if len(flows_found) > 1:
        raise ValueError(f"More than one flow found for {flow_name}.")
    flow_id = flow_result["data"]["flow"][0]["id"]
    flow_run_id = client.create_flow_run(
        run_name=f"SUBMITTED REMOTELY - {run_description}",

    # Print flow run link so user can check it
    print(f"Run submitted: SUBMITTED REMOTELY - {run_description}")
    print(f"Please check at:{flow_run_id}")

    return flow_run_id

Runs an already registered flow on Prefect Server (must have credentials configured).

def save_str_on_redis(redis_key: str, key: str, value: str)
Expand source code
def save_str_on_redis(
    redis_key: str,
    key: str,
    value: str,
    Function to save a string on redis

    redis_client = get_redis_client()
    redis_client.hset(redis_key, key, value)

Function to save a string on redis

def save_updated_rows_on_redis(dataframe: pandas.core.frame.DataFrame,
dataset_id: str,
table_id: str,
unique_id: str = 'id_estacao',
date_column: str = 'data_medicao',
date_format: str = '%Y-%m-%d %H:%M:%S',
mode: str = 'prod') ‑> pandas.core.frame.DataFrame
Expand source code
def save_updated_rows_on_redis(  # pylint: disable=R0914
    dataframe: pd.DataFrame,
    dataset_id: str,
    table_id: str,
    unique_id: str = "id_estacao",
    date_column: str = "data_medicao",
    date_format: str = "%Y-%m-%d %H:%M:%S",
    mode: str = "prod",
) -> pd.DataFrame:
    Acess redis to get the last time each unique_id was updated, return
    updated unique_id as a DataFrame and save new dates on redis

    redis_client = get_redis_client()

    key = dataset_id + "." + table_id
    if mode == "dev":
        key = f"{mode}.{key}"

    # Access all data saved on redis with this key
    last_updates = redis_client.hgetall(key)

    if len(last_updates) == 0:
        last_updates = pd.DataFrame(dataframe[unique_id].unique(), columns=[unique_id])
        last_updates["last_update"] = "1900-01-01 00:00:00"
        log(f"Redis key: {key}\nCreating Redis fake values:\n {last_updates}")
        # Convert data in dictionary in format with unique_id in key and last updated time as value
        # Example > {"12": "2022-06-06 14:45:00"}
        last_updates = {
            k.decode("utf-8"): v.decode("utf-8") for k, v in last_updates.items()

        # Convert dictionary to dataframe
        last_updates = pd.DataFrame(
            last_updates.items(), columns=[unique_id, "last_update"]

        log(f"Redis key: {key}\nRedis actual values:\n {last_updates}")

    # Garante that both are string
    dataframe[unique_id] = dataframe[unique_id].astype(str)
    last_updates[unique_id] = last_updates[unique_id].astype(str)

    # dataframe and last_updates need to have the same index, in our case unique_id
    missing_in_dfr = [
        for i in last_updates[unique_id].unique()
        if i not in dataframe[unique_id].unique()
    missing_in_updates = [
        for i in dataframe[unique_id].unique()
        if i not in last_updates[unique_id].unique()

    # If unique_id doesn't exists on updates we create a fake date for this station on updates
    if len(missing_in_updates) > 0:
        for i, _id in enumerate(missing_in_updates):
            last_updates.loc[-i] = [_id, "1900-01-01 00:00:00"]

    # If unique_id doesn't exists on dataframe we remove this stations from last_updates
    if len(missing_in_dfr) > 0:
        last_updates = last_updates[~last_updates[unique_id].isin(missing_in_dfr)]

    # Merge dfs using unique_id
    dataframe = dataframe.merge(last_updates, how="left", on=unique_id)
    log(f"Comparing times: {dataframe.sort_values(unique_id)}")

    # Keep on dataframe only the stations that has a time after the one that is saved on redis
    dataframe[date_column] = dataframe[date_column].apply(
        pd.to_datetime, format=date_format
    ) + pd.DateOffset(hours=0)

    dataframe["last_update"] = dataframe["last_update"].apply(
        pd.to_datetime, format="%Y-%m-%d %H:%M:%S"
    ) + pd.DateOffset(hours=0)

    dataframe = dataframe[dataframe[date_column] > dataframe["last_update"]].dropna(
    log(f"Dataframe after comparison: {dataframe.sort_values(unique_id)}")
    # Keep only the last date for each unique_id
    keep_cols = [unique_id, date_column]
    new_updates = dataframe[keep_cols].sort_values(keep_cols)
    new_updates = new_updates.groupby(unique_id, as_index=False).tail(1)
    new_updates[date_column] = new_updates[date_column].dt.strftime("%Y-%m-%d %H:%M:%S")
    log(f">>> Updated df: {new_updates.head(10)}")

    # Convert stations with the new updates dates in a dictionary
    new_updates = dict(zip(new_updates[unique_id], new_updates[date_column]))
    log(f">>> data to save in redis as a dict: {new_updates}")

    # Save this new information on redis
    [redis_client.hset(key, k, v) for k, v in new_updates.items()]

    return dataframe.reset_index()

Acess redis to get the last time each unique_id was updated, return updated unique_id as a DataFrame and save new dates on redis

def send_discord_message(message: str, webhook_url: str) ‑> None
Expand source code
def send_discord_message(
    message: str,
    webhook_url: str,
) -> None:
    Sends a message to a Discord channel.
        data={"content": message},

Sends a message to a Discord channel.

def send_telegram_message(message: str, token: str, chat_id: int, parse_mode: str = 'HTML')
Expand source code
def send_telegram_message(
    message: str,
    token: str,
    chat_id: int,
    parse_mode: str = telegram.ParseMode.HTML,
    Sends a message to a Telegram chat.
    bot = telegram.Bot(token=token)

Sends a message to a Telegram chat.

def set_default_parameters(flow: prefect.core.flow.Flow, default_parameters: dict) ‑> prefect.core.flow.Flow
Expand source code
def set_default_parameters(
    flow: prefect.Flow, default_parameters: dict
) -> prefect.Flow:
    Sets default parameters for a flow.
    for parameter in flow.parameters():
        if in default_parameters:
            parameter.default = default_parameters[]
    return flow

Sets default parameters for a flow.

def skip_if_running_handler(obj,
old_state: prefect.engine.state.State,
new_state: prefect.engine.state.State) ‑> prefect.engine.state.State
Expand source code
def skip_if_running_handler(obj, old_state: State, new_state: State) -> State:
    State handler that will skip a flow run if another instance of the flow is already running.

    Adapted from Prefect Discourse:
    if new_state.is_running():
        client = Client()
        query = """
            query($flow_id: uuid) {
                    where: {
                        _and: [
                            {state: {_eq: "Running"}},
                            {flow_id: {_eq: $flow_id}}
                ) {
        # pylint: disable=no-member
        response = client.graphql(
        active_flow_runs = response["data"]["flow_run"]
        if active_flow_runs:
            logger = prefect.context.get("logger")
            message = "Skipping this flow run since there are already some flow runs in progress"
            return Skipped(message)
    return new_state

State handler that will skip a flow run if another instance of the flow is already running.

Adapted from Prefect Discourse:

def smart_split(text: str, max_length: int, separator: str = ' ') ‑> List[str]
Expand source code
def smart_split(
    text: str,
    max_length: int,
    separator: str = " ",
) -> List[str]:
    Splits a string into a list of strings.
    if len(text) <= max_length:
        return [text]

    separator_index = text.rfind(separator, 0, max_length)
    if (separator_index >= max_length) or (separator_index == -1):
        raise ValueError(
            f'Cannot split text "{text}" into {max_length}'
            f'characters using separator "{separator}"'

    return [
            text[separator_index + len(separator) :],

Splits a string into a list of strings.

def to_json_dataframe(dataframe: pandas.core.frame.DataFrame = None,
csv_path: str | pathlib.Path = None,
key_column: str = None,
read_csv_kwargs: dict = None,
save_to: str | pathlib.Path = None) ‑> pandas.core.frame.DataFrame
Expand source code
def to_json_dataframe(
    dataframe: pd.DataFrame = None,
    csv_path: Union[str, Path] = None,
    key_column: str = None,
    read_csv_kwargs: dict = None,
    save_to: Union[str, Path] = None,
) -> pd.DataFrame:
    Manipulates a dataframe by keeping key_column and moving every other column
    data to a "content" column in JSON format. Example:

    - Input dataframe: pd.DataFrame({"key": ["a", "b", "c"], "col1": [1, 2, 3], "col2": [4, 5, 6]})
    - Output dataframe: pd.DataFrame({
        "key": ["a", "b", "c"],
        "content": [{"col1": 1, "col2": 4}, {"col1": 2, "col2": 5}, {"col1": 3, "col2": 6}]
    if dataframe is None and not csv_path:
        raise ValueError("dataframe or dataframe_path is required")
    if csv_path:
        dataframe = pd.read_csv(csv_path, **read_csv_kwargs)
    if key_column:
        dataframe["content"] = dataframe.drop(columns=[key_column]).to_dict(
        dataframe = dataframe[["key", "content"]]
        dataframe["content"] = dataframe.to_dict(orient="records")
        dataframe = dataframe[["content"]]
    if save_to:
        dataframe.to_csv(save_to, index=False)
    return dataframe

Manipulates a dataframe by keeping key_column and moving every other column data to a "content" column in JSON format. Example:

  • Input dataframe: pd.DataFrame({"key": ["a", "b", "c"], "col1": [1, 2, 3], "col2": [4, 5, 6]})
  • Output dataframe: pd.DataFrame({ "key": ["a", "b", "c"], "content": [{"col1": 1, "col2": 4}, {"col1": 2, "col2": 5}, {"col1": 3, "col2": 6}] })
def to_partitions(data: pandas.core.frame.DataFrame,
partition_columns: List[str],
savepath: str,
data_type: str = 'csv',
suffix: str = None,
build_json_dataframe: bool = False,
dataframe_key_column: str = None) ‑> List[pathlib.Path]
Expand source code
def to_partitions(
    data: pd.DataFrame,
    partition_columns: List[str],
    savepath: str,
    data_type: str = "csv",
    suffix: str = None,
    build_json_dataframe: bool = False,
    dataframe_key_column: str = None,
) -> List[Path]:  # sourcery skip: raise-specific-error
    """Save data in to hive patitions schema, given a dataframe and a list of partition columns.
        data (pandas.core.frame.DataFrame): Dataframe to be partitioned.
        partition_columns (list): List of columns to be used as partitions.
        savepath (str, pathlib.PosixPath): folder path to save the partitions
        data = {
            "ano": [2020, 2021, 2020, 2021, 2020, 2021, 2021,2025],
            "mes": [1, 2, 3, 4, 5, 6, 6,9],
            "sigla_uf": ["SP", "SP", "RJ", "RJ", "PR", "PR", "PR","PR"],
            "dado": ["a", "b", "c", "d", "e", "f", "g",'h'],
    saved_files = []
    if isinstance(data, (pd.core.frame.DataFrame)):
        savepath = Path(savepath)

        # create unique combinations between partition columns
        unique_combinations = (

        for filter_combination in unique_combinations:
            patitions_values = [
                for partition, value in filter_combination.items()

            # get filtered data
            df_filter = data.loc[
            df_filter = df_filter.drop(columns=partition_columns).reset_index(drop=True)

            # create folder tree
            filter_save_path = Path(savepath / "/".join(patitions_values))
            filter_save_path.mkdir(parents=True, exist_ok=True)
            if suffix is not None:
                file_filter_save_path = (
                    Path(filter_save_path) / f"data_{suffix}.{data_type}"
                file_filter_save_path = Path(filter_save_path) / f"data.{data_type}"

            if build_json_dataframe:
                df_filter = to_json_dataframe(
                    df_filter, key_column=dataframe_key_column

            if data_type == "csv":
                # append data to csv
                    header=not file_filter_save_path.exists(),
            elif data_type == "parquet":
                dataframe_to_parquet(dataframe=df_filter, path=file_filter_save_path)
                raise ValueError(f"Invalid data type: {data_type}")
        raise BaseException("Data need to be a pandas DataFrame")

    return saved_files

Save data in to hive patitions schema, given a dataframe and a list of partition columns.


data : pandas.core.frame.DataFrame
Dataframe to be partitioned.
partition_columns : list
List of columns to be used as partitions.
savepath : str, pathlib.PosixPath
folder path to save the partitions


data = { "ano": [2020, 2021, 2020, 2021, 2020, 2021, 2021,2025], "mes": [1, 2, 3, 4, 5, 6, 6,9], "sigla_uf": ["SP", "SP", "RJ", "RJ", "PR", "PR", "PR","PR"], "dado": ["a", "b", "c", "d", "e", "f", "g",'h'], } to_partitions( data=pd.DataFrame(data), partition_columns=['ano','mes','sigla_uf'], savepath='partitions/' )

def treat_redis_output(text)
Expand source code
def treat_redis_output(text):
    Redis returns a dict where both key and value are byte string
    Example: {b'date': b'2023-02-27 07:29:04'}
    return {k.decode("utf-8"): v.decode("utf-8") for k, v in text.items()}

Redis returns a dict where both key and value are byte string Example: {b'date': b'2023-02-27 07:29:04'}

def untuple_clocks(clocks)
Expand source code
def untuple_clocks(clocks):
    Converts a list of tuples to a list of clocks.
    return [clock[0] if isinstance(clock, tuple) else clock for clock in clocks]

Converts a list of tuples to a list of clocks.

def upload_files_to_storage(bucket_name: str,
prefix: str,
local_path: str | pathlib.Path = None,
files_list: List[str] = None,
mode: str = 'prod')
Expand source code
def upload_files_to_storage(
    bucket_name: str,
    prefix: str,
    local_path: Union[str, Path] = None,
    files_list: List[str] = None,
    mode: str = "prod",
    Uploads all files from `local_path` to `bucket_name` with `prefix`.
    Mode needs to be "prod" or "staging"
    # Either local_path or files_list must be provided
    if local_path is None and files_list is None:
        raise ValueError("Either local_path or files_list must be provided")

    # If local_path is provided, get all files from it
    if local_path is not None:
        files_list: List[Path] = list(Path(local_path).glob("**/*"))

    # Assert all items in files_list are Path objects
    files_list: List[Path] = [Path(f) for f in files_list]

    credentials = get_credentials_from_env(mode=mode)
    storage_client = storage.Client(credentials=credentials)

    bucket = storage_client.bucket(bucket_name)

    for file in files_list:
        if file.is_file():
            blob = bucket.blob(f"{prefix}/{}")

Uploads all files from local_path to bucket_name with prefix. Mode needs to be "prod" or "staging"