Module pipelines.utils.utils

General utilities for all pipelines.

Functions

def batch_to_dataframe(batch: Tuple[Tuple], columns: List[str]) ‑> pandas.core.frame.DataFrame
Expand source code
def batch_to_dataframe(batch: Tuple[Tuple], columns: List[str]) -> pd.DataFrame:
    """
    Converts a batch of rows to a dataframe.
    """
    return pd.DataFrame(batch, columns=columns)

Converts a batch of rows to a dataframe.

def build_redis_key(dataset_id: str, table_id: str, name: str = None, mode: str = 'prod')
Expand source code
def build_redis_key(
    dataset_id: str, table_id: str, name: str = None, mode: str = "prod"
):
    """
    Helper function for building a key to redis
    """
    key = dataset_id + "." + table_id
    if name:
        key = key + "." + name
    if mode == "dev":
        key = f"{mode}.{key}"
    return key

Helper function for building a key to redis

def clean_dataframe(dataframe: pandas.core.frame.DataFrame) ‑> pandas.core.frame.DataFrame
Expand source code
def clean_dataframe(dataframe: pd.DataFrame) -> pd.DataFrame:
    """
    Cleans a dataframe.
    """
    for col in dataframe.columns.tolist():
        if dataframe[col].dtype == object:
            try:
                dataframe[col] = (
                    dataframe[col]
                    .astype(str)
                    .str.replace("\x00", "")
                    .replace("None", np.nan)
                )
            except Exception as exc:
                print(
                    "Column: ",
                    col,
                    "\nData: ",
                    dataframe[col].tolist(),
                    "\n",
                    exc,
                )
                raise
    return dataframe

Cleans a dataframe.

def compare_dates_between_tables_redis(key_table_1: str,
format_date_table_1: str,
key_table_2: str,
format_date_table_2: str)
Expand source code
def compare_dates_between_tables_redis(
    key_table_1: str,
    format_date_table_1: str,
    key_table_2: str,
    format_date_table_2: str,
):
    """
    Function that checks if the date saved on the second
    table is bigger then the first one
    """

    # get saved date on redis
    date_1 = get_redis_output(key_table_1)
    date_2 = get_redis_output(key_table_2)

    # Return true if there is no date_1 or date_2 saved on redis
    if (len(date_1) == 0) | (len(date_2) == 0):
        return True

    # Convert date to pendulum
    date_1 = pendulum.from_format(date_1["date"], format_date_table_1)
    date_2 = pendulum.from_format(date_2["date"], format_date_table_2)
    comparison = date_1 < date_2
    log(f"Is {date_2} bigger than {date_1}? {comparison}")
    return comparison

Function that checks if the date saved on the second table is bigger then the first one

def dataframe_to_csv(dataframe: pandas.core.frame.DataFrame,
path: str | pathlib.Path,
build_json_dataframe: bool = False,
dataframe_key_column: str = None) ‑> None
Expand source code
def dataframe_to_csv(
    dataframe: pd.DataFrame,
    path: Union[str, Path],
    build_json_dataframe: bool = False,
    dataframe_key_column: str = None,
) -> None:
    """
    Writes a dataframe to CSV file.
    """
    if build_json_dataframe:
        dataframe = to_json_dataframe(dataframe, key_column=dataframe_key_column)

    # Remove filename from path
    path = Path(path)
    # Create directory if it doesn't exist
    path.parent.mkdir(parents=True, exist_ok=True)

    # Write dataframe to CSV
    dataframe.to_csv(path, index=False, encoding="utf-8")

Writes a dataframe to CSV file.

def dataframe_to_parquet(dataframe: pandas.core.frame.DataFrame,
path: str | pathlib.Path,
build_json_dataframe: bool = False,
dataframe_key_column: str = None)
Expand source code
def dataframe_to_parquet(
    dataframe: pd.DataFrame,
    path: Union[str, Path],
    build_json_dataframe: bool = False,
    dataframe_key_column: str = None,
):
    """
    Writes a dataframe to Parquet file with Schema as STRING.
    """
    # Code adapted from
    # https://stackoverflow.com/a/70817689/9944075

    if build_json_dataframe:
        dataframe = to_json_dataframe(dataframe, key_column=dataframe_key_column)

    # If the file already exists, we:
    # - Load it
    # - Merge the new dataframe with the existing one
    if Path(path).exists():
        # Load it
        original_df = pd.read_parquet(path)
        # Merge the new dataframe with the existing one
        dataframe = pd.concat([original_df, dataframe], sort=False)

    # Write dataframe to Parquet
    dataframe.to_parquet(path, engine="pyarrow")

Writes a dataframe to Parquet file with Schema as STRING.

def delete_blobs_list(bucket_name: str,
blobs: List[google.cloud.storage.blob.Blob],
mode: str = 'prod') ‑> None
Expand source code
def delete_blobs_list(bucket_name: str, blobs: List[Blob], mode: str = "prod") -> None:
    """
    Deletes all blobs in the bucket that are in the blobs list.
    Mode needs to be "prod" or "staging"
    """

    credentials = get_credentials_from_env(mode=mode)
    storage_client = storage.Client(credentials=credentials)

    bucket = storage_client.bucket(bucket_name)
    bucket.delete_blobs(blobs)

Deletes all blobs in the bucket that are in the blobs list. Mode needs to be "prod" or "staging"

def determine_whether_to_execute_or_not(cron_expression: str,
datetime_now: datetime.datetime,
datetime_last_execution: datetime.datetime) ‑> bool
Expand source code
def determine_whether_to_execute_or_not(
    cron_expression: str, datetime_now: datetime, datetime_last_execution: datetime
) -> bool:
    """
    Determines whether the cron expression is currently valid.

    Args:
        cron_expression: The cron expression to check.
        datetime_now: The current datetime.
        datetime_last_execution: The last datetime the cron expression was executed.

    Returns:
        True if the cron expression should trigger, False otherwise.
    """
    cron_expression_iterator = croniter.croniter(
        cron_expression, datetime_last_execution
    )
    next_cron_expression_time = cron_expression_iterator.get_next(datetime)
    return next_cron_expression_time <= datetime_now

Determines whether the cron expression is currently valid.

Args

cron_expression
The cron expression to check.
datetime_now
The current datetime.
datetime_last_execution
The last datetime the cron expression was executed.

Returns

True if the cron expression should trigger, False otherwise.

def dump_header_to_file(data_path: str | pathlib.Path, data_type: str = 'csv')
Expand source code
def dump_header_to_file(data_path: Union[str, Path], data_type: str = "csv"):
    """
    Writes a header to a CSV file.
    """
    try:
        assert data_type in ["csv", "parquet"]
    except AssertionError as exc:
        raise ValueError(f"Invalid data type: {data_type}") from exc
    # Remove filename from path
    path = Path(data_path)
    if not path.is_dir():
        path = path.parent
    # Grab first `data_type` file found
    found: bool = False
    file: str = None
    for subdir, _, filenames in walk(str(path)):
        for fname in filenames:
            if fname.endswith(f".{data_type}"):
                file = join(subdir, fname)
                log(f"Found {data_type.upper()} file: {file}")
                found = True
                break
        if found:
            break

    save_header_path = f"data/{uuid4()}"
    # discover if it's a partitioned table
    if partition_folders := [folder for folder in file.split("/") if "=" in folder]:
        partition_path = "/".join(partition_folders)
        save_header_file_path = Path(
            f"{save_header_path}/{partition_path}/header.{data_type}"
        )
        log(f"Found partition path: {save_header_file_path}")

    else:
        save_header_file_path = Path(f"{save_header_path}/header.{data_type}")
        log(f"Do not found partition path: {save_header_file_path}")

    # Create directory if it doesn't exist
    save_header_file_path.parent.mkdir(parents=True, exist_ok=True)

    # Read just first row and write dataframe to file
    if data_type == "csv":
        dataframe = pd.read_csv(file, nrows=1)
        dataframe.to_csv(save_header_file_path, index=False, encoding="utf-8")
    elif data_type == "parquet":
        dataframe = pd.read_parquet(file)[:1]
        dataframe_to_parquet(dataframe=dataframe, path=save_header_file_path)

    log(f"Wrote {data_type.upper()} header at {save_header_file_path}")

    return save_header_path

Writes a header to a CSV file.

def final_column_treatment(column: str) ‑> str
Expand source code
def final_column_treatment(column: str) -> str:
    """
    Adds an underline before column name if it only has numbers or remove all non alpha numeric
    characters besides underlines ("_").
    """
    try:
        int(column)
        return f"_{column}"
    except ValueError:  # pylint: disable=bare-except
        non_alpha_removed = re.sub(r"[\W]+", "", column)
        return non_alpha_removed

Adds an underline before column name if it only has numbers or remove all non alpha numeric characters besides underlines ("_").

def get_credentials_from_env(mode: str = 'prod', scopes: List[str] = None) ‑> google.oauth2.service_account.Credentials
Expand source code
def get_credentials_from_env(
    mode: str = "prod", scopes: List[str] = None
) -> service_account.Credentials:
    """
    Gets credentials from env vars
    """
    if mode not in ["prod", "staging"]:
        raise ValueError("Mode must be 'prod' or 'staging'")
    env: str = getenv(f"BASEDOSDADOS_CREDENTIALS_{mode.upper()}", "")
    if env == "":
        raise ValueError(f"BASEDOSDADOS_CREDENTIALS_{mode.upper()} env var not set!")
    info: dict = json.loads(base64.b64decode(env))
    cred: service_account.Credentials = (
        service_account.Credentials.from_service_account_info(info)
    )
    if scopes:
        cred = cred.with_scopes(scopes)
    return cred

Gets credentials from env vars

def get_redis_client(host: str = 'redis.redis.svc.cluster.local',
port: int = 6379,
db: int = 0,
password: str = None) ‑> redis_pal.RedisPal.RedisPal
Expand source code
def get_redis_client(
    host: str = "redis.redis.svc.cluster.local",
    port: int = 6379,
    db: int = 0,  # pylint: disable=C0103
    password: str = None,
) -> RedisPal:
    """
    Returns a Redis client.
    """
    return RedisPal(
        host=host,
        port=port,
        db=db,
        password=password,
    )

Returns a Redis client.

def get_redis_output(redis_key)
Expand source code
def get_redis_output(redis_key):
    """
    Get Redis output
    Example: {b'date': b'2023-02-27 07:29:04'}
    """
    redis_client = get_redis_client()
    output = redis_client.hgetall(redis_key)
    if len(output) > 0:
        output = treat_redis_output(output)
    return output

Get Redis output Example: {b'date': b'2023-02-27 07:29:04'}

def get_storage_blobs(dataset_id: str, table_id: str, mode: str = 'staging') ‑> list
Expand source code
def get_storage_blobs(dataset_id: str, table_id: str, mode: str = "staging") -> list:
    """
    Get all blobs from a table in a dataset.

    Args:
        dataset_id (str): dataset id
        table_id (str): table id
        mode (str, optional): mode to use. Defaults to "staging".

    Returns:
        list: list of blobs
    """

    bd_storage = bd.Storage(dataset_id=dataset_id, table_id=table_id)
    return list(
        bd_storage.client["storage_staging"]
        .bucket(bd_storage.bucket_name)
        .list_blobs(prefix=f"{mode}/{bd_storage.dataset_id}/{bd_storage.table_id}/")
    )

Get all blobs from a table in a dataset.

Args

dataset_id : str
dataset id
table_id : str
table id
mode : str, optional
mode to use. Defaults to "staging".

Returns

list
list of blobs
def get_username_and_password_from_secret(secret_path: str, client: hvac.v1.Client = None) ‑> Tuple[str, str]
Expand source code
def get_username_and_password_from_secret(
    secret_path: str,
    client: hvac.Client = None,
) -> Tuple[str, str]:
    """
    Returns a username and password from a secret in Vault.
    """
    secret = get_vault_secret(secret_path, client)
    return (
        secret["data"]["username"],
        secret["data"]["password"],
    )

Returns a username and password from a secret in Vault.

def get_vault_client() ‑> hvac.v1.Client
Expand source code
def get_vault_client() -> hvac.Client:
    """
    Returns a Vault client.
    """
    return hvac.Client(
        url=getenv("VAULT_ADDRESS").strip(),
        token=getenv("VAULT_TOKEN").strip(),
    )

Returns a Vault client.

def get_vault_secret(secret_path: str, client: hvac.v1.Client = None) ‑> dict
Expand source code
def get_vault_secret(secret_path: str, client: hvac.Client = None) -> dict:
    """
    Returns a secret from Vault.
    """
    vault_client = client or get_vault_client()
    return vault_client.secrets.kv.read_secret_version(secret_path)["data"]

Returns a secret from Vault.

def human_readable(value: int | float,
unit: str = '',
unit_prefixes: List[str] = None,
unit_divider: int = 1000,
decimal_places: int = 2)
Expand source code
def human_readable(
    value: Union[int, float],
    unit: str = "",
    unit_prefixes: List[str] = None,
    unit_divider: int = 1000,
    decimal_places: int = 2,
):
    """
    Formats a value in a human readable way.
    """
    if unit_prefixes is None:
        unit_prefixes = ["", "k", "M", "G", "T", "P", "E", "Z", "Y"]
    if value == 0:
        return f"{value}{unit}"
    unit_prefix = unit_prefixes[0]
    for prefix in unit_prefixes[1:]:
        if value < unit_divider:
            break
        unit_prefix = prefix
        value /= unit_divider
    return f"{value:.{decimal_places}f}{unit_prefix}{unit}"

Formats a value in a human readable way.

def is_date(date_string: str, date_format: str = '%Y-%m-%d') ‑> datetime.datetime | bool
Expand source code
def is_date(date_string: str, date_format: str = "%Y-%m-%d") -> Union[datetime, bool]:
    """
    Checks whether a string is a valid date.
    """
    try:
        return datetime.strptime(date_string, date_format).strftime(date_format)
    except ValueError:
        return False

Checks whether a string is a valid date.

def list_blobs_with_prefix(bucket_name: str, prefix: str, mode: str = 'prod') ‑> List[google.cloud.storage.blob.Blob]
Expand source code
def list_blobs_with_prefix(
    bucket_name: str, prefix: str, mode: str = "prod"
) -> List[Blob]:
    """
    Lists all the blobs in the bucket that begin with the prefix.
    This can be used to list all blobs in a "folder", e.g. "public/".
    Mode needs to be "prod" or "staging"
    """

    credentials = get_credentials_from_env(mode=mode)
    storage_client = storage.Client(credentials=credentials)

    # Note: Client.list_blobs requires at least package version 1.17.0.
    blobs = storage_client.list_blobs(bucket_name, prefix=prefix)

    return list(blobs)

Lists all the blobs in the bucket that begin with the prefix. This can be used to list all blobs in a "folder", e.g. "public/". Mode needs to be "prod" or "staging"

def log(msg: Any, level: str = 'info') ‑> None
Expand source code
def log(msg: Any, level: str = "info") -> None:
    """
    Logs a message to prefect's logger.
    """
    levels = {
        "debug": logging.DEBUG,
        "info": logging.INFO,
        "warning": logging.WARNING,
        "error": logging.ERROR,
        "critical": logging.CRITICAL,
    }

    blank_spaces = 8 * " "
    msg = blank_spaces + "----\n" + str(msg)
    msg = "\n".join([blank_spaces + line for line in msg.split("\n")]) + "\n\n"

    if level not in levels:
        raise ValueError(f"Invalid log level: {level}")
    prefect.context.logger.log(levels[level], msg)  # pylint: disable=E1101

Logs a message to prefect's logger.

def log_mod(msg: Any, level: str = 'info', index: int = 0, mod: int = 1)
Expand source code
def log_mod(msg: Any, level: str = "info", index: int = 0, mod: int = 1):
    """
    Only logs a message if the index is a multiple of mod.
    """
    if index % mod == 0 or index == 0:
        log(msg=f"iteration {index}:\n {msg}", level=level)

Only logs a message if the index is a multiple of mod.

def notify_discord_on_failure(flow: prefect.core.flow.Flow,
state: prefect.engine.state.State,
secret_path: str,
code_owners: List[str] | None = None)
Expand source code
def notify_discord_on_failure(
    flow: prefect.Flow,
    state: State,
    secret_path: str,
    code_owners: Optional[List[str]] = None,
):
    """
    Notifies a Discord channel when a flow fails.
    """
    url = get_vault_secret(secret_path)["data"]["url"]
    flow_run_id = prefect.context.get("flow_run_id")
    code_owners = code_owners or constants.DEFAULT_CODE_OWNERS.value
    code_owner_dict = constants.OWNERS_DISCORD_MENTIONS.value
    at_code_owners = []
    for code_owner in code_owners:
        code_owner_id = code_owner_dict[code_owner]["user_id"]
        code_owner_type = code_owner_dict[code_owner]["type"]

        if code_owner_type == "user":
            at_code_owners.append(f"    - <@{code_owner_id}>\n")
        elif code_owner_type == "user_nickname":
            at_code_owners.append(f"    - <@!{code_owner_id}>\n")
        elif code_owner_type == "channel":
            at_code_owners.append(f"    - <#{code_owner_id}>\n")
        elif code_owner_type == "role":
            at_code_owners.append(f"    - <@&{code_owner_id}>\n")

    message = (
        f":man_facepalming: Flow **{flow.name}** has failed."
        + f'\n  - State message: *"{state.message}"*'
        + "\n  - Link to the failed flow: "
        + f"https://prefect.dados.rio/flow-run/{flow_run_id}"
        + "\n  - Extra attention:\n"
        + "".join(at_code_owners)
    )
    send_discord_message(
        message=message,
        webhook_url=url,
    )

Notifies a Discord channel when a flow fails.

def parse_date_columns(dataframe: pandas.core.frame.DataFrame, partition_date_column: str) ‑> Tuple[pandas.core.frame.DataFrame, List[str]]
Expand source code
def parse_date_columns(
    dataframe: pd.DataFrame, partition_date_column: str
) -> Tuple[pd.DataFrame, List[str]]:
    """
    Parses the date columns to the partition format.
    """
    ano_col = "ano_particao"
    mes_col = "mes_particao"
    data_col = "data_particao"
    cols = [ano_col, mes_col, data_col]
    for col in cols:
        if col in dataframe.columns:
            raise ValueError(f"Column {col} already exists, please review your model.")

    dataframe[partition_date_column] = dataframe[partition_date_column].astype(str)
    dataframe[data_col] = pd.to_datetime(
        dataframe[partition_date_column], errors="coerce"
    )

    dataframe[ano_col] = (
        dataframe[data_col]
        .dt.year.fillna(-1)
        .astype(int)
        .astype(str)
        .replace("-1", np.nan)
    )

    dataframe[mes_col] = (
        dataframe[data_col]
        .dt.month.fillna(-1)
        .astype(int)
        .astype(str)
        .replace("-1", np.nan)
    )

    dataframe[data_col] = dataframe[data_col].dt.date

    return dataframe, [ano_col, mes_col, data_col]

Parses the date columns to the partition format.

def parser_blobs_to_partition_dict(blobs: list) ‑> dict
Expand source code
def parser_blobs_to_partition_dict(blobs: list) -> dict:
    """
    Extracts the partition information from the blobs.
    """

    partitions_dict = {}
    for blob in blobs:
        for folder in blob.name.split("/"):
            if "=" in folder:
                key = folder.split("=")[0]
                value = folder.split("=")[1]
                try:
                    partitions_dict[key].append(value)
                except KeyError:
                    partitions_dict[key] = [value]
    return partitions_dict

Extracts the partition information from the blobs.

def query_to_line(query: str) ‑> str
Expand source code
def query_to_line(query: str) -> str:
    """
    Converts a query to a line.
    """
    query = textwrap.dedent(query)
    return " ".join([line.strip() for line in query.split("\n")])

Converts a query to a line.

def remove_columns_accents(dataframe: pandas.core.frame.DataFrame) ‑> list
Expand source code
def remove_columns_accents(dataframe: pd.DataFrame) -> list:
    """
    Remove accents from dataframe columns.
    """
    columns = [str(column) for column in dataframe.columns]
    dataframe.columns = columns
    return list(
        dataframe.columns.str.normalize("NFKD")
        .str.encode("ascii", errors="ignore")
        .str.decode("utf-8")
        .map(lambda x: x.strip())
        .str.replace(" ", "_")
        .str.replace("/", "_")
        .str.replace("-", "_")
        .str.replace("\a", "_")
        .str.replace("\b", "_")
        .str.replace("\n", "_")
        .str.replace("\t", "_")
        .str.replace("\v", "_")
        .str.replace("\f", "_")
        .str.replace("\r", "_")
        .str.lower()
        .map(final_column_treatment)
    )

Remove accents from dataframe columns.

def remove_tabs_from_query(query: str) ‑> str
Expand source code
def remove_tabs_from_query(query: str) -> str:
    """
    Removes tabs from a query.
    """
    query = query_to_line(query)
    return re.sub(r"\s+", " ", query).strip()

Removes tabs from a query.

def run_cloud(flow: prefect.core.flow.Flow,
labels: List[str],
parameters: Dict[str, Any] = None,
run_description: str = '')
Expand source code
def run_cloud(
    flow: prefect.Flow,
    labels: List[str],
    parameters: Dict[str, Any] = None,
    run_description: str = "",
):
    """
    Runs a flow on Prefect Server (must have VPN configured).
    """
    # Setup no schedule
    flow.schedule = None

    # Change flow name for development and register
    flow.name = f"{flow.name} (development)"
    flow.run_config = KubernetesRun(image="ghcr.io/prefeitura-rio/prefect-flows:latest")
    flow_id = flow.register(project_name="main", labels=[])

    # Get Prefect Client and submit flow run
    client = Client()
    flow_run_id = client.create_flow_run(
        flow_id=flow_id,
        run_name=f"TEST RUN - {run_description} - {flow.name}",
        labels=labels,
        parameters=parameters,
    )

    # Print flow run link so user can check it
    print(f"Run submitted: TEST RUN - {run_description} - {flow.name}")
    print(f"Please check at: https://prefect.dados.rio/flow-run/{flow_run_id}")

Runs a flow on Prefect Server (must have VPN configured).

def run_local(flow: prefect.core.flow.Flow, parameters: Dict[str, Any] = None)
Expand source code
def run_local(flow: prefect.Flow, parameters: Dict[str, Any] = None):
    """
    Runs a flow locally.
    """
    # Setup for local run
    flow.storage = None
    flow.run_config = None
    flow.schedule = None

    # Run flow
    return flow.run(parameters=parameters) if parameters else flow.run()

Runs a flow locally.

def run_registered(flow_name: str,
flow_project: str,
labels: List[str],
parameters: Dict[str, Any] = None,
run_description: str = '') ‑> str
Expand source code
def run_registered(
    flow_name: str,
    flow_project: str,
    labels: List[str],
    parameters: Dict[str, Any] = None,
    run_description: str = "",
) -> str:
    """
    Runs an already registered flow on Prefect Server (must have credentials configured).
    """
    # Get Prefect Client and submit flow run
    client = Client()
    flow_result = client.graphql(
        {
            "query": {
                with_args(
                    "flow",
                    {
                        "where": {
                            "_and": [
                                {"name": {"_eq": flow_name}},
                                {"archived": {"_eq": False}},
                                {"project": {"name": {"_eq": flow_project}}},
                            ]
                        }
                    },
                ): {
                    "id",
                }
            }
        }
    )
    flows_found = flow_result.data.flow
    if len(flows_found) == 0:
        raise ValueError(f"Flow {flow_name} not found.")
    if len(flows_found) > 1:
        raise ValueError(f"More than one flow found for {flow_name}.")
    flow_id = flow_result["data"]["flow"][0]["id"]
    flow_run_id = client.create_flow_run(
        flow_id=flow_id,
        run_name=f"SUBMITTED REMOTELY - {run_description}",
        labels=labels,
        parameters=parameters,
    )

    # Print flow run link so user can check it
    print(f"Run submitted: SUBMITTED REMOTELY - {run_description}")
    print(f"Please check at: https://prefect.dados.rio/flow-run/{flow_run_id}")

    return flow_run_id

Runs an already registered flow on Prefect Server (must have credentials configured).

def save_str_on_redis(redis_key: str, key: str, value: str)
Expand source code
def save_str_on_redis(
    redis_key: str,
    key: str,
    value: str,
):
    """
    Function to save a string on redis
    """

    redis_client = get_redis_client()
    redis_client.hset(redis_key, key, value)

Function to save a string on redis

def save_updated_rows_on_redis(dataframe: pandas.core.frame.DataFrame,
dataset_id: str,
table_id: str,
unique_id: str = 'id_estacao',
date_column: str = 'data_medicao',
date_format: str = '%Y-%m-%d %H:%M:%S',
mode: str = 'prod') ‑> pandas.core.frame.DataFrame
Expand source code
def save_updated_rows_on_redis(  # pylint: disable=R0914
    dataframe: pd.DataFrame,
    dataset_id: str,
    table_id: str,
    unique_id: str = "id_estacao",
    date_column: str = "data_medicao",
    date_format: str = "%Y-%m-%d %H:%M:%S",
    mode: str = "prod",
) -> pd.DataFrame:
    """
    Acess redis to get the last time each unique_id was updated, return
    updated unique_id as a DataFrame and save new dates on redis
    """

    redis_client = get_redis_client()

    key = dataset_id + "." + table_id
    if mode == "dev":
        key = f"{mode}.{key}"

    # Access all data saved on redis with this key
    last_updates = redis_client.hgetall(key)

    if len(last_updates) == 0:
        last_updates = pd.DataFrame(dataframe[unique_id].unique(), columns=[unique_id])
        last_updates["last_update"] = "1900-01-01 00:00:00"
        log(f"Redis key: {key}\nCreating Redis fake values:\n {last_updates}")
    else:
        # Convert data in dictionary in format with unique_id in key and last updated time as value
        # Example > {"12": "2022-06-06 14:45:00"}
        last_updates = {
            k.decode("utf-8"): v.decode("utf-8") for k, v in last_updates.items()
        }

        # Convert dictionary to dataframe
        last_updates = pd.DataFrame(
            last_updates.items(), columns=[unique_id, "last_update"]
        )

        log(f"Redis key: {key}\nRedis actual values:\n {last_updates}")

    # Garante that both are string
    dataframe[unique_id] = dataframe[unique_id].astype(str)
    last_updates[unique_id] = last_updates[unique_id].astype(str)

    # dataframe and last_updates need to have the same index, in our case unique_id
    missing_in_dfr = [
        i
        for i in last_updates[unique_id].unique()
        if i not in dataframe[unique_id].unique()
    ]
    missing_in_updates = [
        i
        for i in dataframe[unique_id].unique()
        if i not in last_updates[unique_id].unique()
    ]

    # If unique_id doesn't exists on updates we create a fake date for this station on updates
    if len(missing_in_updates) > 0:
        for i, _id in enumerate(missing_in_updates):
            last_updates.loc[-i] = [_id, "1900-01-01 00:00:00"]

    # If unique_id doesn't exists on dataframe we remove this stations from last_updates
    if len(missing_in_dfr) > 0:
        last_updates = last_updates[~last_updates[unique_id].isin(missing_in_dfr)]

    # Merge dfs using unique_id
    dataframe = dataframe.merge(last_updates, how="left", on=unique_id)
    log(f"Comparing times: {dataframe.sort_values(unique_id)}")

    # Keep on dataframe only the stations that has a time after the one that is saved on redis
    dataframe[date_column] = dataframe[date_column].apply(
        pd.to_datetime, format=date_format
    ) + pd.DateOffset(hours=0)

    dataframe["last_update"] = dataframe["last_update"].apply(
        pd.to_datetime, format="%Y-%m-%d %H:%M:%S"
    ) + pd.DateOffset(hours=0)

    dataframe = dataframe[dataframe[date_column] > dataframe["last_update"]].dropna(
        subset=[unique_id]
    )
    log(f"Dataframe after comparison: {dataframe.sort_values(unique_id)}")
    # Keep only the last date for each unique_id
    keep_cols = [unique_id, date_column]
    new_updates = dataframe[keep_cols].sort_values(keep_cols)
    new_updates = new_updates.groupby(unique_id, as_index=False).tail(1)
    new_updates[date_column] = new_updates[date_column].dt.strftime("%Y-%m-%d %H:%M:%S")
    log(f">>> Updated df: {new_updates.head(10)}")

    # Convert stations with the new updates dates in a dictionary
    new_updates = dict(zip(new_updates[unique_id], new_updates[date_column]))
    log(f">>> data to save in redis as a dict: {new_updates}")

    # Save this new information on redis
    [redis_client.hset(key, k, v) for k, v in new_updates.items()]

    return dataframe.reset_index()

Acess redis to get the last time each unique_id was updated, return updated unique_id as a DataFrame and save new dates on redis

def send_discord_message(message: str, webhook_url: str) ‑> None
Expand source code
def send_discord_message(
    message: str,
    webhook_url: str,
) -> None:
    """
    Sends a message to a Discord channel.
    """
    requests.post(
        webhook_url,
        data={"content": message},
    )

Sends a message to a Discord channel.

def send_telegram_message(message: str, token: str, chat_id: int, parse_mode: str = 'HTML')
Expand source code
def send_telegram_message(
    message: str,
    token: str,
    chat_id: int,
    parse_mode: str = telegram.ParseMode.HTML,
):
    """
    Sends a message to a Telegram chat.
    """
    bot = telegram.Bot(token=token)
    bot.send_message(
        chat_id=chat_id,
        text=message,
        parse_mode=parse_mode,
    )

Sends a message to a Telegram chat.

def set_default_parameters(flow: prefect.core.flow.Flow, default_parameters: dict) ‑> prefect.core.flow.Flow
Expand source code
def set_default_parameters(
    flow: prefect.Flow, default_parameters: dict
) -> prefect.Flow:
    """
    Sets default parameters for a flow.
    """
    for parameter in flow.parameters():
        if parameter.name in default_parameters:
            parameter.default = default_parameters[parameter.name]
    return flow

Sets default parameters for a flow.

def skip_if_running_handler(obj,
old_state: prefect.engine.state.State,
new_state: prefect.engine.state.State) ‑> prefect.engine.state.State
Expand source code
def skip_if_running_handler(obj, old_state: State, new_state: State) -> State:
    """
    State handler that will skip a flow run if another instance of the flow is already running.

    Adapted from Prefect Discourse:
    https://tinyurl.com/4hn5uz2w
    """
    if new_state.is_running():
        client = Client()
        query = """
            query($flow_id: uuid) {
                flow_run(
                    where: {
                        _and: [
                            {state: {_eq: "Running"}},
                            {flow_id: {_eq: $flow_id}}
                        ]
                    }
                ) {
                    id
                }
            }
        """
        # pylint: disable=no-member
        response = client.graphql(
            query=query,
            variables=dict(flow_id=prefect.context.flow_id),
        )
        active_flow_runs = response["data"]["flow_run"]
        if active_flow_runs:
            logger = prefect.context.get("logger")
            message = "Skipping this flow run since there are already some flow runs in progress"
            logger.info(message)
            return Skipped(message)
    return new_state

State handler that will skip a flow run if another instance of the flow is already running.

Adapted from Prefect Discourse: https://tinyurl.com/4hn5uz2w

def smart_split(text: str, max_length: int, separator: str = ' ') ‑> List[str]
Expand source code
def smart_split(
    text: str,
    max_length: int,
    separator: str = " ",
) -> List[str]:
    """
    Splits a string into a list of strings.
    """
    if len(text) <= max_length:
        return [text]

    separator_index = text.rfind(separator, 0, max_length)
    if (separator_index >= max_length) or (separator_index == -1):
        raise ValueError(
            f'Cannot split text "{text}" into {max_length}'
            f'characters using separator "{separator}"'
        )

    return [
        text[:separator_index],
        *smart_split(
            text[separator_index + len(separator) :],
            max_length,
            separator,
        ),
    ]

Splits a string into a list of strings.

def to_json_dataframe(dataframe: pandas.core.frame.DataFrame = None,
csv_path: str | pathlib.Path = None,
key_column: str = None,
read_csv_kwargs: dict = None,
save_to: str | pathlib.Path = None) ‑> pandas.core.frame.DataFrame
Expand source code
def to_json_dataframe(
    dataframe: pd.DataFrame = None,
    csv_path: Union[str, Path] = None,
    key_column: str = None,
    read_csv_kwargs: dict = None,
    save_to: Union[str, Path] = None,
) -> pd.DataFrame:
    """
    Manipulates a dataframe by keeping key_column and moving every other column
    data to a "content" column in JSON format. Example:

    - Input dataframe: pd.DataFrame({"key": ["a", "b", "c"], "col1": [1, 2, 3], "col2": [4, 5, 6]})
    - Output dataframe: pd.DataFrame({
        "key": ["a", "b", "c"],
        "content": [{"col1": 1, "col2": 4}, {"col1": 2, "col2": 5}, {"col1": 3, "col2": 6}]
    })
    """
    if dataframe is None and not csv_path:
        raise ValueError("dataframe or dataframe_path is required")
    if csv_path:
        dataframe = pd.read_csv(csv_path, **read_csv_kwargs)
    if key_column:
        dataframe["content"] = dataframe.drop(columns=[key_column]).to_dict(
            orient="records"
        )
        dataframe = dataframe[["key", "content"]]
    else:
        dataframe["content"] = dataframe.to_dict(orient="records")
        dataframe = dataframe[["content"]]
    if save_to:
        dataframe.to_csv(save_to, index=False)
    return dataframe

Manipulates a dataframe by keeping key_column and moving every other column data to a "content" column in JSON format. Example:

  • Input dataframe: pd.DataFrame({"key": ["a", "b", "c"], "col1": [1, 2, 3], "col2": [4, 5, 6]})
  • Output dataframe: pd.DataFrame({ "key": ["a", "b", "c"], "content": [{"col1": 1, "col2": 4}, {"col1": 2, "col2": 5}, {"col1": 3, "col2": 6}] })
def to_partitions(data: pandas.core.frame.DataFrame,
partition_columns: List[str],
savepath: str,
data_type: str = 'csv',
suffix: str = None,
build_json_dataframe: bool = False,
dataframe_key_column: str = None) ‑> List[pathlib.Path]
Expand source code
def to_partitions(
    data: pd.DataFrame,
    partition_columns: List[str],
    savepath: str,
    data_type: str = "csv",
    suffix: str = None,
    build_json_dataframe: bool = False,
    dataframe_key_column: str = None,
) -> List[Path]:  # sourcery skip: raise-specific-error
    """Save data in to hive patitions schema, given a dataframe and a list of partition columns.
    Args:
        data (pandas.core.frame.DataFrame): Dataframe to be partitioned.
        partition_columns (list): List of columns to be used as partitions.
        savepath (str, pathlib.PosixPath): folder path to save the partitions
    Exemple:
        data = {
            "ano": [2020, 2021, 2020, 2021, 2020, 2021, 2021,2025],
            "mes": [1, 2, 3, 4, 5, 6, 6,9],
            "sigla_uf": ["SP", "SP", "RJ", "RJ", "PR", "PR", "PR","PR"],
            "dado": ["a", "b", "c", "d", "e", "f", "g",'h'],
        }
        to_partitions(
            data=pd.DataFrame(data),
            partition_columns=['ano','mes','sigla_uf'],
            savepath='partitions/'
        )
    """
    saved_files = []
    if isinstance(data, (pd.core.frame.DataFrame)):
        savepath = Path(savepath)

        # create unique combinations between partition columns
        unique_combinations = (
            data[partition_columns]
            .drop_duplicates(subset=partition_columns)
            .to_dict(orient="records")
        )

        for filter_combination in unique_combinations:
            patitions_values = [
                f"{partition}={value}"
                for partition, value in filter_combination.items()
            ]

            # get filtered data
            df_filter = data.loc[
                data[filter_combination.keys()]
                .isin(filter_combination.values())
                .all(axis=1),
                :,
            ]
            df_filter = df_filter.drop(columns=partition_columns).reset_index(drop=True)

            # create folder tree
            filter_save_path = Path(savepath / "/".join(patitions_values))
            filter_save_path.mkdir(parents=True, exist_ok=True)
            if suffix is not None:
                file_filter_save_path = (
                    Path(filter_save_path) / f"data_{suffix}.{data_type}"
                )
            else:
                file_filter_save_path = Path(filter_save_path) / f"data.{data_type}"

            if build_json_dataframe:
                df_filter = to_json_dataframe(
                    df_filter, key_column=dataframe_key_column
                )

            if data_type == "csv":
                # append data to csv
                df_filter.to_csv(
                    file_filter_save_path,
                    index=False,
                    mode="a",
                    header=not file_filter_save_path.exists(),
                )
                saved_files.append(file_filter_save_path)
            elif data_type == "parquet":
                dataframe_to_parquet(dataframe=df_filter, path=file_filter_save_path)
                saved_files.append(file_filter_save_path)
            else:
                raise ValueError(f"Invalid data type: {data_type}")
    else:
        raise BaseException("Data need to be a pandas DataFrame")

    return saved_files

Save data in to hive patitions schema, given a dataframe and a list of partition columns.

Args

data : pandas.core.frame.DataFrame
Dataframe to be partitioned.
partition_columns : list
List of columns to be used as partitions.
savepath : str, pathlib.PosixPath
folder path to save the partitions

Exemple

data = { "ano": [2020, 2021, 2020, 2021, 2020, 2021, 2021,2025], "mes": [1, 2, 3, 4, 5, 6, 6,9], "sigla_uf": ["SP", "SP", "RJ", "RJ", "PR", "PR", "PR","PR"], "dado": ["a", "b", "c", "d", "e", "f", "g",'h'], } to_partitions( data=pd.DataFrame(data), partition_columns=['ano','mes','sigla_uf'], savepath='partitions/' )

def treat_redis_output(text)
Expand source code
def treat_redis_output(text):
    """
    Redis returns a dict where both key and value are byte string
    Example: {b'date': b'2023-02-27 07:29:04'}
    """
    return {k.decode("utf-8"): v.decode("utf-8") for k, v in text.items()}

Redis returns a dict where both key and value are byte string Example: {b'date': b'2023-02-27 07:29:04'}

def untuple_clocks(clocks)
Expand source code
def untuple_clocks(clocks):
    """
    Converts a list of tuples to a list of clocks.
    """
    return [clock[0] if isinstance(clock, tuple) else clock for clock in clocks]

Converts a list of tuples to a list of clocks.

def upload_files_to_storage(bucket_name: str,
prefix: str,
local_path: str | pathlib.Path = None,
files_list: List[str] = None,
mode: str = 'prod')
Expand source code
def upload_files_to_storage(
    bucket_name: str,
    prefix: str,
    local_path: Union[str, Path] = None,
    files_list: List[str] = None,
    mode: str = "prod",
):
    """
    Uploads all files from `local_path` to `bucket_name` with `prefix`.
    Mode needs to be "prod" or "staging"
    """
    # Either local_path or files_list must be provided
    if local_path is None and files_list is None:
        raise ValueError("Either local_path or files_list must be provided")

    # If local_path is provided, get all files from it
    if local_path is not None:
        files_list: List[Path] = list(Path(local_path).glob("**/*"))

    # Assert all items in files_list are Path objects
    files_list: List[Path] = [Path(f) for f in files_list]

    credentials = get_credentials_from_env(mode=mode)
    storage_client = storage.Client(credentials=credentials)

    bucket = storage_client.bucket(bucket_name)

    for file in files_list:
        if file.is_file():
            blob = bucket.blob(f"{prefix}/{file.name}")
            blob.upload_from_filename(file)

Uploads all files from local_path to bucket_name with prefix. Mode needs to be "prod" or "staging"