Module pipelines.rj_escritorio.data_catalog.tasks
Tasks for generating a data catalog from BigQuery.
Functions
def generate_dataframe_from_list_of_tables(list_of_tables: list) ‑> pandas.core.frame.DataFrame
-
Expand source code
@task def generate_dataframe_from_list_of_tables(list_of_tables: list) -> pd.DataFrame: """ Generate a Pandas DataFrame from a list of tables. Args: list_of_tables: List of tables. Returns: Pandas DataFrame. """ dataframe = pd.DataFrame(list_of_tables) log(f"Generated DataFrame with shape {dataframe.shape}.") return dataframe
Generate a Pandas DataFrame from a list of tables.
Args
list_of_tables
- List of tables.
Returns
Pandas DataFrame.
def list_projects(mode: str = 'prod', exclude_dev: bool = True) ‑> List[str]
-
Expand source code
@task def list_projects( mode: str = "prod", exclude_dev: bool = True, ) -> List[str]: """ Lists all GCP projects that we have access to. Args: mode: Credentials mode. exclude_dev: Exclude projects that ends with "-dev". Returns: List of project IDs. """ credentials = get_credentials_from_env(mode=mode) service = discovery.build("cloudresourcemanager", "v1", credentials=credentials) request = service.projects().list() projects = [] while request is not None: response = request.execute() for project in response.get("projects", []): project_id = project["projectId"] if exclude_dev and project_id.endswith("-dev"): log(f"Excluding dev project {project_id}.") continue log(f"Found project {project_id}.") projects.append(project_id) request = service.projects().list_next( previous_request=request, previous_response=response ) log(f"Found {len(projects)} projects.") return projects
Lists all GCP projects that we have access to.
Args
mode
- Credentials mode.
exclude_dev
- Exclude projects that ends with "-dev".
Returns
List of project IDs.
def list_tables(project_id: str,
client: google.cloud.bigquery.client.Client = None,
mode: str = 'prod',
exclude_staging: bool = True,
exclude_test: bool = True,
exclude_logs: bool = True)-
Expand source code
@task def list_tables( # pylint: disable=too-many-arguments project_id: str, client: bigquery.Client = None, mode: str = "prod", exclude_staging: bool = True, exclude_test: bool = True, exclude_logs: bool = True, ): """ List all datasets and tables in a project. Args: client: BigQuery client. project_id: Project ID. mode: BigQuery client mode. exclude_staging: Exclude staging datasets. exclude_test: Exclude anything that contains the word "test". exclude_logs: Exclude log datasets. Returns: List of dictionaries in the format: { "project_id": "project_id", "dataset_id": "dataset_id", "table_id": "table_id", "url": "https://console.cloud.google.com/bigquery?p={project_id}&d={dataset_id}&t={table_id}&page=table", "private": True/False, } """ if client is None: log(f"Creating BigQuery client in mode {mode}.") client = get_bigquery_client(mode=mode) log(f"Listing tables in project {project_id}.") tables = [] try: datasets = client.list_datasets(project=project_id) for dataset in datasets: dataset_id: str = dataset.dataset_id if exclude_staging and dataset_id.endswith("_staging"): log(f"Excluding staging dataset {dataset_id}.") continue if exclude_test and "test" in dataset_id: log(f"Excluding test dataset {dataset_id}.") continue if exclude_logs and ( dataset_id.startswith("logs_") or dataset_id.endswith("_logs") ): log(f"Excluding logs dataset {dataset_id}.") continue for table in client.list_tables(dataset): table_id = table.table_id table_object = client.get_table(table.reference) if exclude_test and "test" in table_id: log(f"Excluding test table {table_id}.") continue table_description = table_object.description table_info = { "project_id": project_id, "dataset_id": dataset_id, "table_id": table_id, "description": table_description, "url": f"https://console.cloud.google.com/bigquery?p={project_id}&d={dataset_id}&t={table_id}&page=table", "private": not project_id == "datario", } tables.append(table_info) except BadRequest: # This will happen if BigQuery API is not enabled for this project. Just return an empty # list return tables except NotFound: # This will happen if BigQuery API is not enabled for this project. Just return an empty # list return tables log(f"Found {len(tables)} tables in project {project_id}.") return tables
List all datasets and tables in a project.
Args
client
- BigQuery client.
project_id
- Project ID.
mode
- BigQuery client mode.
exclude_staging
- Exclude staging datasets.
exclude_test
- Exclude anything that contains the word "test".
exclude_logs
- Exclude log datasets.
Returns
List of dictionaries in the format: { "project_id": "project_id", "dataset_id": "dataset_id", "table_id": "table_id", "url": "https://console.cloud.google.com/bigquery?p={project_id}&d={dataset_id}&t={table_id}&page=table", "private": True/False, }
def merge_list_of_list_of_tables(list_of_list_of_tables: list) ‑> list
-
Expand source code
@task def merge_list_of_list_of_tables(list_of_list_of_tables: list) -> list: """ Merge a list of list of tables into a single list of tables. Args: list_of_list_of_tables: List of list of tables. Returns: List of tables. """ list_of_tables = [ table for list_of_tables in list_of_list_of_tables for table in list_of_tables ] log(f"Merged {len(list_of_tables)} tables.") return list_of_tables
Merge a list of list of tables into a single list of tables.
Args
list_of_list_of_tables
- List of list of tables.
Returns
List of tables.
def update_gsheets_data_catalog(dataframe: pandas.core.frame.DataFrame, spreadsheet_url: str, sheet_name: str) ‑> None
-
Expand source code
@task def update_gsheets_data_catalog( dataframe: pd.DataFrame, spreadsheet_url: str, sheet_name: str ) -> None: """ Update a Google Sheets spreadsheet with a DataFrame. Args: dataframe: Pandas DataFrame. spreadsheet_url: Google Sheets spreadsheet URL. sheet_name: Google Sheets sheet name. """ # Get gspread client credentials = get_credentials_from_env( scopes=[ "https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/drive", ] ) gspread_client = gspread.authorize(credentials) # Open spreadsheet log(f"Opening Google Sheets spreadsheet {spreadsheet_url} with sheet {sheet_name}.") sheet = gspread_client.open_by_url(spreadsheet_url) worksheet = sheet.worksheet(sheet_name) # Update spreadsheet log("Deleting old data.") worksheet.clear() log("Rewriting headers.") write_data_to_gsheets( worksheet=worksheet, data=[dataframe.columns.tolist()], ) log("Updating new data.") write_data_to_gsheets( worksheet=worksheet, data=dataframe.values.tolist(), start_cell="A2", ) # Add filters log("Adding filters.") first_col = "A" last_col = chr(ord(first_col) + len(dataframe.columns) - 1) worksheet.set_basic_filter(f"{first_col}:{last_col}") # Resize columns log("Resizing columns.") worksheet.columns_auto_resize(0, len(dataframe.columns) - 1) log("Done.")
Update a Google Sheets spreadsheet with a DataFrame.
Args
dataframe
- Pandas DataFrame.
spreadsheet_url
- Google Sheets spreadsheet URL.
sheet_name
- Google Sheets sheet name.