Module prefeitura_rio.pipelines_utils.state_handlers

Expand source code
# -*- coding: utf-8 -*-
try:
    import prefect
    import sentry_sdk
    from prefect.client import Client
    from prefect.engine.state import Skipped, State
except ImportError:
    from prefeitura_rio.utils import base_assert_dependencies

    base_assert_dependencies(["prefect", "sentry_sdk"], extras=["pipelines"])

from prefeitura_rio.pipelines_utils.infisical import get_secret, inject_bd_credentials
from prefeitura_rio.pipelines_utils.prefect import get_flow_run_mode


def handler_initialize_sentry(obj, old_state: State, new_state: State) -> State:
    """
    State handler that will set up Sentry.
    """
    if new_state.is_running():
        sentry_dsn = get_secret("SENTRY_DSN")
        environment = get_flow_run_mode()
        sentry_sdk.init(
            dsn=sentry_dsn,
            traces_sample_rate=0,
            environment=environment,
        )
    return new_state


def handler_inject_bd_credentials(obj, old_state: State, new_state: State) -> State:
    """
    State handler that will inject BD credentials into the environment.
    """
    if new_state.is_running():
        inject_bd_credentials()
    return new_state


def handler_skip_if_running(obj, old_state: State, new_state: State) -> State:
    """
    State handler that will skip a flow run if another instance of the flow is already running.

    Adapted from Prefect Discourse:
    https://tinyurl.com/4hn5uz2w
    """
    if new_state.is_running():
        client = Client()
        query = """
            query($flow_id: uuid) {
                flow_run(
                    where: {
                        _and: [
                            {state: {_eq: "Running"}},
                            {flow_id: {_eq: $flow_id}}
                        ]
                    }
                ) {
                    id
                }
            }
        """
        # pylint: disable=no-member
        response = client.graphql(
            query=query,
            variables=dict(flow_id=prefect.context.flow_id),
        )
        active_flow_runs = response["data"]["flow_run"]
        if active_flow_runs:
            logger = prefect.context.get("logger")
            message = "Skipping this flow run since there are already some flow runs in progress"
            logger.info(message)
            return Skipped(message)
    return new_state

Functions

def handler_initialize_sentry(obj, old_state: prefect.engine.state.State, new_state: prefect.engine.state.State) ‑> prefect.engine.state.State

State handler that will set up Sentry.

Expand source code
def handler_initialize_sentry(obj, old_state: State, new_state: State) -> State:
    """
    State handler that will set up Sentry.
    """
    if new_state.is_running():
        sentry_dsn = get_secret("SENTRY_DSN")
        environment = get_flow_run_mode()
        sentry_sdk.init(
            dsn=sentry_dsn,
            traces_sample_rate=0,
            environment=environment,
        )
    return new_state
def handler_inject_bd_credentials(obj, old_state: prefect.engine.state.State, new_state: prefect.engine.state.State) ‑> prefect.engine.state.State

State handler that will inject BD credentials into the environment.

Expand source code
def handler_inject_bd_credentials(obj, old_state: State, new_state: State) -> State:
    """
    State handler that will inject BD credentials into the environment.
    """
    if new_state.is_running():
        inject_bd_credentials()
    return new_state
def handler_skip_if_running(obj, old_state: prefect.engine.state.State, new_state: prefect.engine.state.State) ‑> prefect.engine.state.State

State handler that will skip a flow run if another instance of the flow is already running.

Adapted from Prefect Discourse: https://tinyurl.com/4hn5uz2w

Expand source code
def handler_skip_if_running(obj, old_state: State, new_state: State) -> State:
    """
    State handler that will skip a flow run if another instance of the flow is already running.

    Adapted from Prefect Discourse:
    https://tinyurl.com/4hn5uz2w
    """
    if new_state.is_running():
        client = Client()
        query = """
            query($flow_id: uuid) {
                flow_run(
                    where: {
                        _and: [
                            {state: {_eq: "Running"}},
                            {flow_id: {_eq: $flow_id}}
                        ]
                    }
                ) {
                    id
                }
            }
        """
        # pylint: disable=no-member
        response = client.graphql(
            query=query,
            variables=dict(flow_id=prefect.context.flow_id),
        )
        active_flow_runs = response["data"]["flow_run"]
        if active_flow_runs:
            logger = prefect.context.get("logger")
            message = "Skipping this flow run since there are already some flow runs in progress"
            logger.info(message)
            return Skipped(message)
    return new_state