Module prefeitura_rio.pipelines_utils.bd
Expand source code
# -*- coding: utf-8 -*-
from pathlib import Path
from typing import Union
try:
import basedosdados as bd
from basedosdados.upload.base import Base
except ImportError:
from prefeitura_rio.utils import base_assert_dependencies
base_assert_dependencies(["basedosdados"], extras=["pipelines"])
from prefeitura_rio.pipelines_utils.io import dump_header_to_file
from prefeitura_rio.pipelines_utils.logging import log
from prefeitura_rio.pipelines_utils.prefect import get_flow_run_mode
def create_table_and_upload_to_gcs(
data_path: Union[str, Path],
dataset_id: str,
table_id: str,
dump_mode: str,
biglake_table: bool = True,
) -> None:
"""
Create table using BD+ and upload to GCS.
"""
bd_version = bd.__version__
log(f"USING BASEDOSDADOS {bd_version}")
# pylint: disable=C0103
tb = bd.Table(dataset_id=dataset_id, table_id=table_id)
table_staging = f"{tb.table_full_name['staging']}"
# pylint: disable=C0103
st = bd.Storage(dataset_id=dataset_id, table_id=table_id)
storage_path = f"{st.bucket_name}.staging.{dataset_id}.{table_id}"
storage_path_link = (
f"https://console.cloud.google.com/storage/browser/{st.bucket_name}"
f"/staging/{dataset_id}/{table_id}"
)
# prod datasets is public if the project is datario. staging are private im both projects
dataset_is_public = tb.client["bigquery_prod"].project == "datario"
#####################################
#
# MANAGEMENT OF TABLE CREATION
#
#####################################
log("STARTING TABLE CREATION MANAGEMENT")
if dump_mode == "append":
if tb.table_exists(mode="staging"):
log(f"MODE APPEND: Table ALREADY EXISTS:" f"\n{table_staging}" f"\n{storage_path_link}")
else:
# the header is needed to create a table when doesn't exist
log("MODE APPEND: Table DOESN'T EXISTS\nStart to CREATE HEADER file")
header_path = dump_header_to_file(data_path=data_path)
log("MODE APPEND: Created HEADER file:\n" f"{header_path}")
tb.create(
path=header_path,
if_storage_data_exists="replace",
if_table_exists="replace",
biglake_table=biglake_table,
dataset_is_public=dataset_is_public,
)
log(
"MODE APPEND: Sucessfully CREATED A NEW TABLE:\n"
f"{table_staging}\n"
f"{storage_path_link}"
) # pylint: disable=C0301
st.delete_table(mode="staging", bucket_name=st.bucket_name, not_found_ok=True)
log(
"MODE APPEND: Sucessfully REMOVED HEADER DATA from Storage:\n"
f"{storage_path}\n"
f"{storage_path_link}"
) # pylint: disable=C0301
elif dump_mode == "overwrite":
if tb.table_exists(mode="staging"):
log(
"MODE OVERWRITE: Table ALREADY EXISTS, DELETING OLD DATA!\n"
f"{storage_path}\n"
f"{storage_path_link}"
) # pylint: disable=C0301
st.delete_table(mode="staging", bucket_name=st.bucket_name, not_found_ok=True)
log(
"MODE OVERWRITE: Sucessfully DELETED OLD DATA from Storage:\n"
f"{storage_path}\n"
f"{storage_path_link}"
) # pylint: disable=C0301
tb.delete(mode="all")
log(
"MODE OVERWRITE: Sucessfully DELETED TABLE:\n"
f"{table_staging}\n"
f"{tb.table_full_name['prod']}"
) # pylint: disable=C0301
# the header is needed to create a table when doesn't exist
# in overwrite mode the header is always created
log("MODE OVERWRITE: Table DOESN'T EXISTS\nStart to CREATE HEADER file")
header_path = dump_header_to_file(data_path=data_path)
log("MODE OVERWRITE: Created HEADER file:\n" f"{header_path}")
tb.create(
path=header_path,
if_storage_data_exists="replace",
if_table_exists="replace",
biglake_table=biglake_table,
dataset_is_public=dataset_is_public,
)
log(
"MODE OVERWRITE: Sucessfully CREATED TABLE\n"
f"{table_staging}\n"
f"{storage_path_link}"
)
st.delete_table(mode="staging", bucket_name=st.bucket_name, not_found_ok=True)
log(
f"MODE OVERWRITE: Sucessfully REMOVED HEADER DATA from Storage\n:"
f"{storage_path}\n"
f"{storage_path_link}"
) # pylint: disable=C0301
#####################################
#
# Uploads a bunch of files using BD+
#
#####################################
log("STARTING UPLOAD TO GCS")
if tb.table_exists(mode="staging"):
# the name of the files need to be the same or the data doesn't get overwritten
tb.append(filepath=data_path, if_exists="replace")
log(
f"STEP UPLOAD: Successfully uploaded {data_path} to Storage:\n"
f"{storage_path}\n"
f"{storage_path_link}"
)
else:
# pylint: disable=C0301
log("STEP UPLOAD: Table does not exist in STAGING, need to create first")
return data_path
def get_project_id(mode: str = None) -> str:
"""
Get the project ID from the environment.
Args:
mode (str): The mode to filter by (prod or staging).
"""
if not mode:
mode = get_flow_run_mode()
if mode not in ["prod", "staging"]:
raise ValueError("Mode must be 'prod' or 'staging'")
base = Base()
return base.config["gcloud-projects"][mode]["name"]
Functions
def create_table_and_upload_to_gcs(data_path: Union[str, pathlib.Path], dataset_id: str, table_id: str, dump_mode: str, biglake_table: bool = True) ‑> None
-
Create table using BD+ and upload to GCS.
Expand source code
def create_table_and_upload_to_gcs( data_path: Union[str, Path], dataset_id: str, table_id: str, dump_mode: str, biglake_table: bool = True, ) -> None: """ Create table using BD+ and upload to GCS. """ bd_version = bd.__version__ log(f"USING BASEDOSDADOS {bd_version}") # pylint: disable=C0103 tb = bd.Table(dataset_id=dataset_id, table_id=table_id) table_staging = f"{tb.table_full_name['staging']}" # pylint: disable=C0103 st = bd.Storage(dataset_id=dataset_id, table_id=table_id) storage_path = f"{st.bucket_name}.staging.{dataset_id}.{table_id}" storage_path_link = ( f"https://console.cloud.google.com/storage/browser/{st.bucket_name}" f"/staging/{dataset_id}/{table_id}" ) # prod datasets is public if the project is datario. staging are private im both projects dataset_is_public = tb.client["bigquery_prod"].project == "datario" ##################################### # # MANAGEMENT OF TABLE CREATION # ##################################### log("STARTING TABLE CREATION MANAGEMENT") if dump_mode == "append": if tb.table_exists(mode="staging"): log(f"MODE APPEND: Table ALREADY EXISTS:" f"\n{table_staging}" f"\n{storage_path_link}") else: # the header is needed to create a table when doesn't exist log("MODE APPEND: Table DOESN'T EXISTS\nStart to CREATE HEADER file") header_path = dump_header_to_file(data_path=data_path) log("MODE APPEND: Created HEADER file:\n" f"{header_path}") tb.create( path=header_path, if_storage_data_exists="replace", if_table_exists="replace", biglake_table=biglake_table, dataset_is_public=dataset_is_public, ) log( "MODE APPEND: Sucessfully CREATED A NEW TABLE:\n" f"{table_staging}\n" f"{storage_path_link}" ) # pylint: disable=C0301 st.delete_table(mode="staging", bucket_name=st.bucket_name, not_found_ok=True) log( "MODE APPEND: Sucessfully REMOVED HEADER DATA from Storage:\n" f"{storage_path}\n" f"{storage_path_link}" ) # pylint: disable=C0301 elif dump_mode == "overwrite": if tb.table_exists(mode="staging"): log( "MODE OVERWRITE: Table ALREADY EXISTS, DELETING OLD DATA!\n" f"{storage_path}\n" f"{storage_path_link}" ) # pylint: disable=C0301 st.delete_table(mode="staging", bucket_name=st.bucket_name, not_found_ok=True) log( "MODE OVERWRITE: Sucessfully DELETED OLD DATA from Storage:\n" f"{storage_path}\n" f"{storage_path_link}" ) # pylint: disable=C0301 tb.delete(mode="all") log( "MODE OVERWRITE: Sucessfully DELETED TABLE:\n" f"{table_staging}\n" f"{tb.table_full_name['prod']}" ) # pylint: disable=C0301 # the header is needed to create a table when doesn't exist # in overwrite mode the header is always created log("MODE OVERWRITE: Table DOESN'T EXISTS\nStart to CREATE HEADER file") header_path = dump_header_to_file(data_path=data_path) log("MODE OVERWRITE: Created HEADER file:\n" f"{header_path}") tb.create( path=header_path, if_storage_data_exists="replace", if_table_exists="replace", biglake_table=biglake_table, dataset_is_public=dataset_is_public, ) log( "MODE OVERWRITE: Sucessfully CREATED TABLE\n" f"{table_staging}\n" f"{storage_path_link}" ) st.delete_table(mode="staging", bucket_name=st.bucket_name, not_found_ok=True) log( f"MODE OVERWRITE: Sucessfully REMOVED HEADER DATA from Storage\n:" f"{storage_path}\n" f"{storage_path_link}" ) # pylint: disable=C0301 ##################################### # # Uploads a bunch of files using BD+ # ##################################### log("STARTING UPLOAD TO GCS") if tb.table_exists(mode="staging"): # the name of the files need to be the same or the data doesn't get overwritten tb.append(filepath=data_path, if_exists="replace") log( f"STEP UPLOAD: Successfully uploaded {data_path} to Storage:\n" f"{storage_path}\n" f"{storage_path_link}" ) else: # pylint: disable=C0301 log("STEP UPLOAD: Table does not exist in STAGING, need to create first") return data_path
def get_project_id(mode: str = None) ‑> str
-
Get the project ID from the environment.
Args
mode
:str
- The mode to filter by (prod or staging).
Expand source code
def get_project_id(mode: str = None) -> str: """ Get the project ID from the environment. Args: mode (str): The mode to filter by (prod or staging). """ if not mode: mode = get_flow_run_mode() if mode not in ["prod", "staging"]: raise ValueError("Mode must be 'prod' or 'staging'") base = Base() return base.config["gcloud-projects"][mode]["name"]