Module prefeitura_rio.pipelines_utils.bd
Expand source code
# -*- coding: utf-8 -*-
from pathlib import Path
from typing import Union
try:
    import basedosdados as bd
    from basedosdados.upload.base import Base
except ImportError:
    from prefeitura_rio.utils import base_assert_dependencies
    base_assert_dependencies(["basedosdados"], extras=["pipelines"])
from prefeitura_rio.pipelines_utils.io import dump_header_to_file
from prefeitura_rio.pipelines_utils.logging import log
from prefeitura_rio.pipelines_utils.prefect import get_flow_run_mode
def create_table_and_upload_to_gcs(
    data_path: Union[str, Path],
    dataset_id: str,
    table_id: str,
    dump_mode: str,
    biglake_table: bool = True,
) -> None:
    """
    Create table using BD+ and upload to GCS.
    """
    bd_version = bd.__version__
    log(f"USING BASEDOSDADOS {bd_version}")
    # pylint: disable=C0103
    tb = bd.Table(dataset_id=dataset_id, table_id=table_id)
    table_staging = f"{tb.table_full_name['staging']}"
    # pylint: disable=C0103
    st = bd.Storage(dataset_id=dataset_id, table_id=table_id)
    storage_path = f"{st.bucket_name}.staging.{dataset_id}.{table_id}"
    storage_path_link = (
        f"https://console.cloud.google.com/storage/browser/{st.bucket_name}"
        f"/staging/{dataset_id}/{table_id}"
    )
    # prod datasets is public if the project is datario. staging are private im both projects
    dataset_is_public = tb.client["bigquery_prod"].project == "datario"
    #####################################
    #
    # MANAGEMENT OF TABLE CREATION
    #
    #####################################
    log("STARTING TABLE CREATION MANAGEMENT")
    if dump_mode == "append":
        if tb.table_exists(mode="staging"):
            log(f"MODE APPEND: Table ALREADY EXISTS:" f"\n{table_staging}" f"\n{storage_path_link}")
        else:
            # the header is needed to create a table when doesn't exist
            log("MODE APPEND: Table DOESN'T EXISTS\nStart to CREATE HEADER file")
            header_path = dump_header_to_file(data_path=data_path)
            log("MODE APPEND: Created HEADER file:\n" f"{header_path}")
            tb.create(
                path=header_path,
                if_storage_data_exists="replace",
                if_table_exists="replace",
                biglake_table=biglake_table,
                dataset_is_public=dataset_is_public,
            )
            log(
                "MODE APPEND: Sucessfully CREATED A NEW TABLE:\n"
                f"{table_staging}\n"
                f"{storage_path_link}"
            )  # pylint: disable=C0301
            st.delete_table(mode="staging", bucket_name=st.bucket_name, not_found_ok=True)
            log(
                "MODE APPEND: Sucessfully REMOVED HEADER DATA from Storage:\n"
                f"{storage_path}\n"
                f"{storage_path_link}"
            )  # pylint: disable=C0301
    elif dump_mode == "overwrite":
        if tb.table_exists(mode="staging"):
            log(
                "MODE OVERWRITE: Table ALREADY EXISTS, DELETING OLD DATA!\n"
                f"{storage_path}\n"
                f"{storage_path_link}"
            )  # pylint: disable=C0301
            st.delete_table(mode="staging", bucket_name=st.bucket_name, not_found_ok=True)
            log(
                "MODE OVERWRITE: Sucessfully DELETED OLD DATA from Storage:\n"
                f"{storage_path}\n"
                f"{storage_path_link}"
            )  # pylint: disable=C0301
            tb.delete(mode="all")
            log(
                "MODE OVERWRITE: Sucessfully DELETED TABLE:\n"
                f"{table_staging}\n"
                f"{tb.table_full_name['prod']}"
            )  # pylint: disable=C0301
        # the header is needed to create a table when doesn't exist
        # in overwrite mode the header is always created
        log("MODE OVERWRITE: Table DOESN'T EXISTS\nStart to CREATE HEADER file")
        header_path = dump_header_to_file(data_path=data_path)
        log("MODE OVERWRITE: Created HEADER file:\n" f"{header_path}")
        tb.create(
            path=header_path,
            if_storage_data_exists="replace",
            if_table_exists="replace",
            biglake_table=biglake_table,
            dataset_is_public=dataset_is_public,
        )
        log(
            "MODE OVERWRITE: Sucessfully CREATED TABLE\n"
            f"{table_staging}\n"
            f"{storage_path_link}"
        )
        st.delete_table(mode="staging", bucket_name=st.bucket_name, not_found_ok=True)
        log(
            f"MODE OVERWRITE: Sucessfully REMOVED HEADER DATA from Storage\n:"
            f"{storage_path}\n"
            f"{storage_path_link}"
        )  # pylint: disable=C0301
    #####################################
    #
    # Uploads a bunch of files using BD+
    #
    #####################################
    log("STARTING UPLOAD TO GCS")
    if tb.table_exists(mode="staging"):
        # the name of the files need to be the same or the data doesn't get overwritten
        tb.append(filepath=data_path, if_exists="replace")
        log(
            f"STEP UPLOAD: Successfully uploaded {data_path} to Storage:\n"
            f"{storage_path}\n"
            f"{storage_path_link}"
        )
    else:
        # pylint: disable=C0301
        log("STEP UPLOAD: Table does not exist in STAGING, need to create first")
    return data_path
def get_project_id(mode: str = None) -> str:
    """
    Get the project ID from the environment.
    Args:
        mode (str): The mode to filter by (prod or staging).
    """
    if not mode:
        mode = get_flow_run_mode()
    if mode not in ["prod", "staging"]:
        raise ValueError("Mode must be 'prod' or 'staging'")
    base = Base()
    return base.config["gcloud-projects"][mode]["name"]
Functions
def create_table_and_upload_to_gcs(data_path: Union[str, pathlib.Path], dataset_id: str, table_id: str, dump_mode: str, biglake_table: bool = True) ‑> None- 
Create table using BD+ and upload to GCS.
Expand source code
def create_table_and_upload_to_gcs( data_path: Union[str, Path], dataset_id: str, table_id: str, dump_mode: str, biglake_table: bool = True, ) -> None: """ Create table using BD+ and upload to GCS. """ bd_version = bd.__version__ log(f"USING BASEDOSDADOS {bd_version}") # pylint: disable=C0103 tb = bd.Table(dataset_id=dataset_id, table_id=table_id) table_staging = f"{tb.table_full_name['staging']}" # pylint: disable=C0103 st = bd.Storage(dataset_id=dataset_id, table_id=table_id) storage_path = f"{st.bucket_name}.staging.{dataset_id}.{table_id}" storage_path_link = ( f"https://console.cloud.google.com/storage/browser/{st.bucket_name}" f"/staging/{dataset_id}/{table_id}" ) # prod datasets is public if the project is datario. staging are private im both projects dataset_is_public = tb.client["bigquery_prod"].project == "datario" ##################################### # # MANAGEMENT OF TABLE CREATION # ##################################### log("STARTING TABLE CREATION MANAGEMENT") if dump_mode == "append": if tb.table_exists(mode="staging"): log(f"MODE APPEND: Table ALREADY EXISTS:" f"\n{table_staging}" f"\n{storage_path_link}") else: # the header is needed to create a table when doesn't exist log("MODE APPEND: Table DOESN'T EXISTS\nStart to CREATE HEADER file") header_path = dump_header_to_file(data_path=data_path) log("MODE APPEND: Created HEADER file:\n" f"{header_path}") tb.create( path=header_path, if_storage_data_exists="replace", if_table_exists="replace", biglake_table=biglake_table, dataset_is_public=dataset_is_public, ) log( "MODE APPEND: Sucessfully CREATED A NEW TABLE:\n" f"{table_staging}\n" f"{storage_path_link}" ) # pylint: disable=C0301 st.delete_table(mode="staging", bucket_name=st.bucket_name, not_found_ok=True) log( "MODE APPEND: Sucessfully REMOVED HEADER DATA from Storage:\n" f"{storage_path}\n" f"{storage_path_link}" ) # pylint: disable=C0301 elif dump_mode == "overwrite": if tb.table_exists(mode="staging"): log( "MODE OVERWRITE: Table ALREADY EXISTS, DELETING OLD DATA!\n" f"{storage_path}\n" f"{storage_path_link}" ) # pylint: disable=C0301 st.delete_table(mode="staging", bucket_name=st.bucket_name, not_found_ok=True) log( "MODE OVERWRITE: Sucessfully DELETED OLD DATA from Storage:\n" f"{storage_path}\n" f"{storage_path_link}" ) # pylint: disable=C0301 tb.delete(mode="all") log( "MODE OVERWRITE: Sucessfully DELETED TABLE:\n" f"{table_staging}\n" f"{tb.table_full_name['prod']}" ) # pylint: disable=C0301 # the header is needed to create a table when doesn't exist # in overwrite mode the header is always created log("MODE OVERWRITE: Table DOESN'T EXISTS\nStart to CREATE HEADER file") header_path = dump_header_to_file(data_path=data_path) log("MODE OVERWRITE: Created HEADER file:\n" f"{header_path}") tb.create( path=header_path, if_storage_data_exists="replace", if_table_exists="replace", biglake_table=biglake_table, dataset_is_public=dataset_is_public, ) log( "MODE OVERWRITE: Sucessfully CREATED TABLE\n" f"{table_staging}\n" f"{storage_path_link}" ) st.delete_table(mode="staging", bucket_name=st.bucket_name, not_found_ok=True) log( f"MODE OVERWRITE: Sucessfully REMOVED HEADER DATA from Storage\n:" f"{storage_path}\n" f"{storage_path_link}" ) # pylint: disable=C0301 ##################################### # # Uploads a bunch of files using BD+ # ##################################### log("STARTING UPLOAD TO GCS") if tb.table_exists(mode="staging"): # the name of the files need to be the same or the data doesn't get overwritten tb.append(filepath=data_path, if_exists="replace") log( f"STEP UPLOAD: Successfully uploaded {data_path} to Storage:\n" f"{storage_path}\n" f"{storage_path_link}" ) else: # pylint: disable=C0301 log("STEP UPLOAD: Table does not exist in STAGING, need to create first") return data_path def get_project_id(mode: str = None) ‑> str- 
Get the project ID from the environment.
Args
mode:str- The mode to filter by (prod or staging).
 
Expand source code
def get_project_id(mode: str = None) -> str: """ Get the project ID from the environment. Args: mode (str): The mode to filter by (prod or staging). """ if not mode: mode = get_flow_run_mode() if mode not in ["prod", "staging"]: raise ValueError("Mode must be 'prod' or 'staging'") base = Base() return base.config["gcloud-projects"][mode]["name"]