Module pipelines.utils.dump_earth_engine_asset.tasks

Tasks for dumping data directly from BigQuery to GCS.

Functions

def create_table_asset(service_account: str,
service_account_secret_path: str,
project_id: str,
gcs_file_asset_path: str,
ee_asset_path: str)
Expand source code
@task
def create_table_asset(
    service_account: str,
    service_account_secret_path: str,
    project_id: str,
    gcs_file_asset_path: str,
    ee_asset_path: str,
):
    """
    Create a table asset in Earth Engine.

    parameters:
        service_account
            Service account email in the format of earth-engine@<project-id>.iam.gserviceaccount.com
        service_account_secret_path
            Path to the .json file containing the service account secret.
        project_id:
            Earth Engine project ID.
        gcs_asset_path
            Path to the asset in Google Cloud Storage in the format of "gs://<project-id>/<some-folder>/file.csv"
        ee_asset_path
            Path that the asset will be created in Earth Engine in the format of
            projects/<project-id>/assets/<asset_name> or users/<user-id>/<asset_name>
    """

    credentials = ee.ServiceAccountCredentials(
        service_account, service_account_secret_path
    )
    ee.Initialize(credentials=credentials, project=project_id)

    params = {
        "name": ee_asset_path,
        "sources": [{"primaryPath": gcs_file_asset_path, "charset": "UTF-8"}],
    }

    request_id = ee.data.newTaskId(1)[0]
    task_status = ee.data.startTableIngestion(request_id=request_id, params=params)
    log(ee.data.getTaskStatus(task_status["id"]))

Create a table asset in Earth Engine.

parameters: service_account Service account email in the format of earth-engine@.iam.gserviceaccount.com service_account_secret_path Path to the .json file containing the service account secret. project_id: Earth Engine project ID. gcs_asset_path Path to the asset in Google Cloud Storage in the format of "gs:////file.csv" ee_asset_path Path that the asset will be created in Earth Engine in the format of projects//assets/ or users//

def download_data_to_gcs(project_id: str = None,
query: str = None,
gcs_asset_path: str = None,
bd_project_mode: str = 'prod',
billing_project_id: str = None,
location: str = 'US')
Expand source code
@task
def download_data_to_gcs(  # pylint: disable=R0912,R0913,R0914,R0915
    project_id: str = None,
    query: str = None,
    gcs_asset_path: str = None,
    bd_project_mode: str = "prod",
    billing_project_id: str = None,
    location: str = "US",
):
    """
    Get data from BigQuery.
    """
    # Try to get project_id from environment variable
    if not project_id:
        log("Project ID was not provided, trying to get it from environment variable")
        try:
            bd_base = Base()
            project_id = bd_base.config["gcloud-projects"][bd_project_mode]["name"]
        except KeyError:
            pass
        if not project_id:
            raise ValueError(
                "project_id must be either provided or inferred from environment variables"
            )
        log(f"Project ID was inferred from environment variables: {project_id}")

    # If query is not a string, raise an error
    if not isinstance(query, str):
        raise ValueError("query must be either a string or a Jinja2 template")
    log(f"Query was provided: {query}")

    # Get billing project ID
    if not billing_project_id:
        log(
            "Billing project ID was not provided, trying to get it from environment variable"
        )
        try:
            bd_base = Base()
            billing_project_id = bd_base.config["gcloud-projects"][bd_project_mode][
                "name"
            ]
        except KeyError:
            pass
        if not billing_project_id:
            raise ValueError(
                "billing_project_id must be either provided or inferred from environment variables"
            )
        log(
            f"Billing project ID was inferred from environment variables: {billing_project_id}"
        )

    # pylint: disable=E1124
    client = google_client(project_id, billing_project_id, from_file=True, reauth=False)
    job_config = bigquery.QueryJobConfig()
    job_config.dry_run = True
    job = client["bigquery"].query(query, job_config=job_config)
    while not job.done():
        sleep(1)
    # pylint: disable=E1101
    table_size = job.total_bytes_processed
    log(f'Table size: {human_readable(table_size, unit="B", unit_divider=1024)}')

    # Get data
    log("Querying data from BigQuery")
    job = client["bigquery"].query(query)
    while not job.done():
        sleep(1)
    # pylint: disable=protected-access
    dest_table = job._properties["configuration"]["query"]["destinationTable"]
    dest_project_id = dest_table["projectId"]
    dest_dataset_id = dest_table["datasetId"]
    dest_table_id = dest_table["tableId"]
    log(
        f"Query results were stored in {dest_project_id}.{dest_dataset_id}.{dest_table_id}"
    )

    blob_path = f"{gcs_asset_path}/data*.csv.gz"
    log(f"Loading data to {blob_path}")
    dataset_ref = bigquery.DatasetReference(dest_project_id, dest_dataset_id)
    table_ref = dataset_ref.table(dest_table_id)
    job_config = bigquery.job.ExtractJobConfig(compression="GZIP")
    extract_job = client["bigquery"].extract_table(
        table_ref,
        blob_path,
        location=location,
        job_config=job_config,
    )
    extract_job.result()
    log("Data was loaded successfully")

    # Get the BLOB we've just created and make it public
    blobs = list_blobs_with_prefix(project_id, blob_path)
    log(f"{blobs}")
    if not blobs:
        raise ValueError(f"No blob found at {blob_path}")

    return blob_path

Get data from BigQuery.

def get_earth_engine_key_from_vault(vault_path_earth_engine_key: str)
Expand source code
@task
def get_earth_engine_key_from_vault(
    vault_path_earth_engine_key: str,
):
    """
    Get earth engine service account key from vault.
    """
    log(
        f"Getting Earth Engine key from https://vault.dados.rio/ui/vault/secrets/secret/show/{vault_path_earth_engine_key}"
    )
    vault_client = get_vault_client()

    secret = vault_client.secrets.kv.read_secret_version(vault_path_earth_engine_key)[
        "data"
    ]["key"]

    service_account_secret_path = Path("/tmp/earth-engine/key.json")
    service_account_secret_path.parent.mkdir(parents=True, exist_ok=True)

    with open(service_account_secret_path, "w", encoding="utf-8") as f:
        json.dump(secret, f, ensure_ascii=False, indent=4)

    return service_account_secret_path

Get earth engine service account key from vault.

def get_project_id(project_id: str = None, bd_project_mode: str = 'prod')
Expand source code
@task
def get_project_id(
    project_id: str = None,
    bd_project_mode: str = "prod",
):
    """
    Get the project ID.
    """
    if project_id:
        return project_id
    log("Project ID was not provided, trying to get it from environment variable")
    try:
        bd_base = Base()
        project_id = bd_base.config["gcloud-projects"][bd_project_mode]["name"]
    except KeyError:
        pass
    if not project_id:
        raise ValueError(
            "project_id must be either provided or inferred from environment variables"
        )
    log(f"Project ID was inferred from environment variables: {project_id}")
    return project_id

Get the project ID.

def trigger_cron_job(project_id: str, ee_asset_path: str, cron_expression: str)
Expand source code
@task(nout=2)
def trigger_cron_job(
    project_id: str,
    ee_asset_path: str,
    cron_expression: str,
):
    """
    Tells whether to trigger a cron job.
    """
    redis_client = get_redis_client()
    key = f"{project_id}__{ee_asset_path}"
    log(f"Checking if cron job should be triggered for {key}")
    val = redis_client.get(key)
    current_datetime = datetime.now()
    if val and val is dict and "last_trigger" in val:
        last_trigger = val["last_trigger"]
        log(f"Last trigger: {last_trigger}")
        if last_trigger:
            return determine_whether_to_execute_or_not(
                cron_expression, current_datetime, last_trigger
            )
    log(f"No last trigger found for {key}")
    return True, current_datetime

Tells whether to trigger a cron job.

def update_last_trigger(project_id: str, ee_asset_path: str, execution_time: datetime.datetime)
Expand source code
@task
def update_last_trigger(
    project_id: str,
    ee_asset_path: str,
    execution_time: datetime,
):
    """
    Update the last trigger.
    """
    redis_client = get_redis_client()
    key = f"{project_id}__{ee_asset_path}"
    redis_client.set(key, {"last_trigger": execution_time})

Update the last trigger.