Module pipelines.utils.elasticsearch_metrics.tasks
Tasks for handling Elasticsearch metrics
Functions
def format_metrics(**args)
-
Expand source code
@task(checkpoint=False) def format_metrics( **args, ): """ Formats metrics to be indexed in Elasticsearch """ return dict(**args)
Formats metrics to be indexed in Elasticsearch
def post_metrics(flow_name: str = None,
labels: List[str] = None,
event_type: str = None,
dataset_id: str = None,
table_id: str = None,
metrics: Dict[str, Any] = None)-
Expand source code
@task(checkpoint=False) def post_metrics( # pylint: disable=too-many-arguments flow_name: str = None, labels: List[str] = None, event_type: str = None, dataset_id: str = None, table_id: str = None, metrics: Dict[str, Any] = None, ): """ Posts metrics to Elasticsearch """ document = format_document( flow_name=flow_name, labels=labels, event_type=event_type, dataset_id=dataset_id, table_id=table_id, metrics=metrics, ) index_document(document)
Posts metrics to Elasticsearch
def start_timer() ‑> float
-
Expand source code
@task(checkpoint=False) def start_timer() -> float: """ Starts a timer """ return time()
Starts a timer
def stop_timer(start_time: float) ‑> float
-
Expand source code
@task(checkpoint=False) def stop_timer(start_time: float) -> float: """ Stops a timer """ return time() - start_time
Stops a timer