Module pipelines.utils.elasticsearch_metrics.tasks

Tasks for handling Elasticsearch metrics

Functions

def format_metrics(**args)
Expand source code
@task(checkpoint=False)
def format_metrics(
    **args,
):
    """
    Formats metrics to be indexed in Elasticsearch
    """
    return dict(**args)

Formats metrics to be indexed in Elasticsearch

def post_metrics(flow_name: str = None,
labels: List[str] = None,
event_type: str = None,
dataset_id: str = None,
table_id: str = None,
metrics: Dict[str, Any] = None)
Expand source code
@task(checkpoint=False)
def post_metrics(  # pylint: disable=too-many-arguments
    flow_name: str = None,
    labels: List[str] = None,
    event_type: str = None,
    dataset_id: str = None,
    table_id: str = None,
    metrics: Dict[str, Any] = None,
):
    """
    Posts metrics to Elasticsearch
    """
    document = format_document(
        flow_name=flow_name,
        labels=labels,
        event_type=event_type,
        dataset_id=dataset_id,
        table_id=table_id,
        metrics=metrics,
    )
    index_document(document)

Posts metrics to Elasticsearch

def start_timer() ‑> float
Expand source code
@task(checkpoint=False)
def start_timer() -> float:
    """
    Starts a timer
    """
    return time()

Starts a timer

def stop_timer(start_time: float) ‑> float
Expand source code
@task(checkpoint=False)
def stop_timer(start_time: float) -> float:
    """
    Stops a timer
    """
    return time() - start_time

Stops a timer