Module prefeitura_rio.pipelines_utils.gcs

Expand source code
# -*- coding: utf-8 -*-
from typing import List

try:
    from google.cloud import storage
    from google.cloud.storage.blob import Blob
except ImportError:
    from prefeitura_rio.utils import base_assert_dependencies

    base_assert_dependencies(["google.cloud.storage"], extras=["pipelines"])

from prefeitura_rio.pipelines_utils.env import get_bd_credentials_from_env
from prefeitura_rio.pipelines_utils.prefect import get_flow_run_mode


def get_gcs_client(mode: str = None) -> storage.Client:
    """
    Get a GCS client with the credentials from the environment.
    Mode needs to be "prod" or "staging"

    Args:
        mode (str): The mode to filter by (prod or staging).

    Returns:
        storage.Client: The GCS client.
    """
    if not mode:
        mode = get_flow_run_mode()
    credentials = get_bd_credentials_from_env(mode=mode)
    return storage.Client(credentials=credentials)


def list_blobs_with_prefix(bucket_name: str, prefix: str, mode: str = None) -> List[Blob]:
    """
    Lists all the blobs in the bucket that begin with the prefix.
    This can be used to list all blobs in a "folder", e.g. "public/".
    Mode needs to be "prod" or "staging"

    Args:
        bucket_name (str): The name of the bucket.
        prefix (str): The prefix to filter by.
        mode (str): The mode to filter by (prod or staging).

    Returns:
        List[Blob]: The list of blobs.
    """
    if not mode:
        mode = get_flow_run_mode()
    storage_client = get_gcs_client(mode=mode)
    blobs = storage_client.list_blobs(bucket_name, prefix=prefix)
    return list(blobs)


def parse_blobs_to_partition_list(blobs: List[Blob]) -> List[str]:
    """
    Extracts the partition information from the blobs.
    """
    partitions = []
    for blob in blobs:
        for folder in blob.name.split("/"):
            if "=" in folder:
                key = folder.split("=")[0]
                value = folder.split("=")[1]
                if key == "data_particao":
                    partitions.append(value)
    return partitions

Functions

def get_gcs_client(mode: str = None) ‑> google.cloud.storage.client.Client

Get a GCS client with the credentials from the environment. Mode needs to be "prod" or "staging"

Args

mode : str
The mode to filter by (prod or staging).

Returns

storage.Client
The GCS client.
Expand source code
def get_gcs_client(mode: str = None) -> storage.Client:
    """
    Get a GCS client with the credentials from the environment.
    Mode needs to be "prod" or "staging"

    Args:
        mode (str): The mode to filter by (prod or staging).

    Returns:
        storage.Client: The GCS client.
    """
    if not mode:
        mode = get_flow_run_mode()
    credentials = get_bd_credentials_from_env(mode=mode)
    return storage.Client(credentials=credentials)
def list_blobs_with_prefix(bucket_name: str, prefix: str, mode: str = None) ‑> List[google.cloud.storage.blob.Blob]

Lists all the blobs in the bucket that begin with the prefix. This can be used to list all blobs in a "folder", e.g. "public/". Mode needs to be "prod" or "staging"

Args

bucket_name : str
The name of the bucket.
prefix : str
The prefix to filter by.
mode : str
The mode to filter by (prod or staging).

Returns

List[Blob]
The list of blobs.
Expand source code
def list_blobs_with_prefix(bucket_name: str, prefix: str, mode: str = None) -> List[Blob]:
    """
    Lists all the blobs in the bucket that begin with the prefix.
    This can be used to list all blobs in a "folder", e.g. "public/".
    Mode needs to be "prod" or "staging"

    Args:
        bucket_name (str): The name of the bucket.
        prefix (str): The prefix to filter by.
        mode (str): The mode to filter by (prod or staging).

    Returns:
        List[Blob]: The list of blobs.
    """
    if not mode:
        mode = get_flow_run_mode()
    storage_client = get_gcs_client(mode=mode)
    blobs = storage_client.list_blobs(bucket_name, prefix=prefix)
    return list(blobs)
def parse_blobs_to_partition_list(blobs: List[google.cloud.storage.blob.Blob]) ‑> List[str]

Extracts the partition information from the blobs.

Expand source code
def parse_blobs_to_partition_list(blobs: List[Blob]) -> List[str]:
    """
    Extracts the partition information from the blobs.
    """
    partitions = []
    for blob in blobs:
        for folder in blob.name.split("/"):
            if "=" in folder:
                key = folder.split("=")[0]
                value = folder.split("=")[1]
                if key == "data_particao":
                    partitions.append(value)
    return partitions