Module pipelines.utils.elasticsearch_metrics.utils

Utilities for handling metrics with Elasticsearch

Functions

def format_document(*,
flow_name: str = None,
labels: List[str] = None,
event_type: str = None,
dataset_id: str = None,
table_id: str = None,
metrics: Dict[str, Any] = None) ‑> dict
Expand source code
def format_document(
    *,
    flow_name: str = None,
    labels: List[str] = None,
    event_type: str = None,
    dataset_id: str = None,
    table_id: str = None,
    metrics: Dict[str, Any] = None,
) -> dict:
    """
    Formats a document in a well-defined format for our Elasticsearch index
    """
    return {
        "timestamp": pendulum.now(tz="America/Sao_Paulo"),
        "flow_name": flow_name,
        "labels": labels,
        "event_type": event_type,
        "dataset_id": dataset_id,
        "table_id": table_id,
        "metrics": metrics,
    }

Formats a document in a well-defined format for our Elasticsearch index

def get_elasticsearch_client(es_config_secret_path: str = 'elasticsearch-config') ‑> elasticsearch.Elasticsearch
Expand source code
def get_elasticsearch_client(
    es_config_secret_path: str = "elasticsearch-config",
) -> Elasticsearch:
    """
    Get an Elasticsearch client with configuration from Vault
    """
    try:
        es_config: str = get_vault_secret(es_config_secret_path)["data"]["config"]
    except Exception as exc:  # pylint: disable=broad-except
        log(f"Failed to get Elasticsearch config: {exc}", "error")
        return None
    try:
        es_config_dict: dict = json.loads(base64.b64decode(es_config.encode()).decode())
    except Exception as exc:  # pylint: disable=broad-except
        log(f"Failed to decode Elasticsearch config: {exc}", "error")
        return None
    return Elasticsearch(**es_config_dict)

Get an Elasticsearch client with configuration from Vault

def index_document(document: dict,
es_client: elasticsearch.Elasticsearch = None,
index: str = 'prefect-dados-rio') ‑> dict
Expand source code
def index_document(
    document: dict, es_client: Elasticsearch = None, index: str = "prefect-dados-rio"
) -> dict:
    """
    Indexes a document in Elasticsearch
    """
    if not es_client:
        es_client = get_elasticsearch_client()
    if not es_client:
        log("Impossible to index document, no Elasticsearch client available", "error")
        return None
    try:
        es_client.index(index=index, document=document)
    except Exception as exc:  # pylint: disable=broad-except
        log(f"Failed to index document: {exc}", "error")
        return None
    return document

Indexes a document in Elasticsearch