Module prefeitura_rio.pipelines_utils.prefect
Expand source code
# -*- coding: utf-8 -*-
from typing import List
try:
import prefect
from prefect import task
from prefect.client import Client
except ImportError:
from prefeitura_rio.utils import base_assert_dependencies
base_assert_dependencies(["prefect"], extras=["pipelines"])
def get_flow_run_mode() -> str:
"""
Returns the mode of the current flow run (either "prod" or "staging").
"""
project_name = prefect.context.get("project_name")
if project_name not in ["production", "staging"]:
raise ValueError(f"Invalid project name: {project_name}")
if project_name == "production":
return "prod"
return "staging"
def get_flow_run_url(id: str, prefix: str = "https://prefect-dev.dados.rio") -> str:
"""
Returns the URL of a flow run.
Args:
id (str): Flow run id.
Returns:
str: Flow run URL in the format <prefix>/<slug>/flow-run/<id>.
"""
prefix = prefix.rstrip("/")
tenant_id = prefect.context.get("config").get("cloud").get("tenant_id")
tenant_slug = get_tenant_slug(tenant_id)
url = f"{prefix}/{tenant_slug}/flow-run/{id}"
return url
def get_tenant_slug(tenant_id: str) -> str:
"""
Returns the slug of a tenant.
Args:
tenant_id (str): Tenant id.
Returns:
str: Tenant slug.
"""
client = Client()
response = client.graphql(
query="""
query ($tenant_id: uuid!) {
tenant (where: {id: {_eq: $tenant_id}}) {
slug
}
}
""",
variables={"tenant_id": tenant_id},
)
return response["data"]["tenant"][0]["slug"]
@task
def task_get_current_flow_run_labels() -> List[str]:
"""
Returns the labels of the current flow run.
"""
return prefect.context.get("config").get("cloud").get("agent").get("labels")
@task
def task_get_flow_group_id(flow_name: str) -> str:
"""
Returns the flow group id for the given flow name.
"""
client = Client()
response = client.graphql(
query="""
query ($flow_name: String!) {
flow (where: {name: {_eq: $flow_name}}) {
flow_group {
id
}
}
}
""",
variables={"flow_name": flow_name},
)
return response["data"]["flow"][0]["flow_group"]["id"]
@task
def task_rename_current_flow_run_dataset_table(prefix: str, dataset_id: str, table_id: str) -> None:
"""
Rename the current flow run.
"""
flow_run_id = prefect.context.get("flow_run_id")
client = Client()
return client.set_flow_run_name(flow_run_id, f"{prefix}{dataset_id}.{table_id}")
Functions
def get_flow_run_mode() ‑> str
-
Returns the mode of the current flow run (either "prod" or "staging").
Expand source code
def get_flow_run_mode() -> str: """ Returns the mode of the current flow run (either "prod" or "staging"). """ project_name = prefect.context.get("project_name") if project_name not in ["production", "staging"]: raise ValueError(f"Invalid project name: {project_name}") if project_name == "production": return "prod" return "staging"
def get_flow_run_url(id: str, prefix: str = 'https://prefect-dev.dados.rio') ‑> str
-
Returns the URL of a flow run.
Args
id
:str
- Flow run id.
Returns
str
- Flow run URL in the format
/ /flow-run/ .
Expand source code
def get_flow_run_url(id: str, prefix: str = "https://prefect-dev.dados.rio") -> str: """ Returns the URL of a flow run. Args: id (str): Flow run id. Returns: str: Flow run URL in the format <prefix>/<slug>/flow-run/<id>. """ prefix = prefix.rstrip("/") tenant_id = prefect.context.get("config").get("cloud").get("tenant_id") tenant_slug = get_tenant_slug(tenant_id) url = f"{prefix}/{tenant_slug}/flow-run/{id}" return url
def get_tenant_slug(tenant_id: str) ‑> str
-
Returns the slug of a tenant.
Args
tenant_id
:str
- Tenant id.
Returns
str
- Tenant slug.
Expand source code
def get_tenant_slug(tenant_id: str) -> str: """ Returns the slug of a tenant. Args: tenant_id (str): Tenant id. Returns: str: Tenant slug. """ client = Client() response = client.graphql( query=""" query ($tenant_id: uuid!) { tenant (where: {id: {_eq: $tenant_id}}) { slug } } """, variables={"tenant_id": tenant_id}, ) return response["data"]["tenant"][0]["slug"]
def task_get_current_flow_run_labels() ‑> List[str]
-
Returns the labels of the current flow run.
Expand source code
@task def task_get_current_flow_run_labels() -> List[str]: """ Returns the labels of the current flow run. """ return prefect.context.get("config").get("cloud").get("agent").get("labels")
def task_get_flow_group_id(flow_name: str) ‑> str
-
Returns the flow group id for the given flow name.
Expand source code
@task def task_get_flow_group_id(flow_name: str) -> str: """ Returns the flow group id for the given flow name. """ client = Client() response = client.graphql( query=""" query ($flow_name: String!) { flow (where: {name: {_eq: $flow_name}}) { flow_group { id } } } """, variables={"flow_name": flow_name}, ) return response["data"]["flow"][0]["flow_group"]["id"]
def task_rename_current_flow_run_dataset_table(prefix: str, dataset_id: str, table_id: str) ‑> None
-
Rename the current flow run.
Expand source code
@task def task_rename_current_flow_run_dataset_table(prefix: str, dataset_id: str, table_id: str) -> None: """ Rename the current flow run. """ flow_run_id = prefect.context.get("flow_run_id") client = Client() return client.set_flow_run_name(flow_run_id, f"{prefix}{dataset_id}.{table_id}")