Module pipelines.utils.custom

Customizing stuff for the pipelines package.

Functions

def wait_for_flow_run_with_timeout(timeout: Union[int, datetime.timedelta])

Builds the wait_for_flow_run task with a timeout.

Example: if you provide timeout=120, it would be equivalent to:

@task(timeout=120)
def wait_for_flow_run(...):
    ...

Classes

class CustomFlow (name: str, schedule: prefect.schedules.schedules.Schedule = None, executor: prefect.executors.base.Executor = None, environment: prefect.environments.execution.base.Environment = None, run_config: prefect.run_configs.base.RunConfig = None, storage: prefect.storage.base.Storage = None, tasks: Iterable[prefect.core.task.Task] = None, edges: Iterable[prefect.core.edge.Edge] = None, reference_tasks: Iterable[prefect.core.task.Task] = None, state_handlers: List[Callable] = None, validate: bool = None, result: Optional[prefect.engine.result.base.Result] = None, terminal_state_handler: Optional[Callable[[ForwardRef('Flow'), prefect.engine.state.State, Set[prefect.engine.state.State]], Optional[prefect.engine.state.State]]] = None, code_owners: Optional[List[str]] = None, skip_if_running: bool = False)

A custom Flow class that implements code ownership in order to make it easier to notify people when a FlowRun fails.

Expand source code
class CustomFlow(Flow):
    """
    A custom Flow class that implements code ownership in order to make it easier to
    notify people when a FlowRun fails.
    """

    def __init__(  # pylint: disable=too-many-arguments, too-many-locals
        self,
        name: str,
        schedule: Schedule = None,
        executor: Executor = None,
        environment: Environment = None,
        run_config: RunConfig = None,
        storage: Storage = None,
        tasks: Iterable[Task] = None,
        edges: Iterable[Edge] = None,
        reference_tasks: Iterable[Task] = None,
        state_handlers: List[Callable] = None,
        validate: bool = None,
        result: Optional[Result] = None,
        terminal_state_handler: Optional[
            Callable[["Flow", State, Set[State]], Optional[State]]
        ] = None,
        code_owners: Optional[List[str]] = None,
        skip_if_running: bool = False,
    ):
        if skip_if_running:
            if state_handlers is None:
                state_handlers = []
            state_handlers.append(skip_if_running_handler)
        super().__init__(
            name=name,
            schedule=schedule,
            executor=executor,
            environment=environment,
            run_config=run_config,
            storage=storage,
            tasks=tasks,
            edges=edges,
            reference_tasks=reference_tasks,
            state_handlers=state_handlers,
            on_failure=partial(
                notify_discord_on_failure,
                secret_path=constants.EMD_DISCORD_WEBHOOK_SECRET_PATH.value,
                code_owners=code_owners,
            ),
            validate=validate,
            result=result,
            terminal_state_handler=terminal_state_handler,
        )

Ancestors

  • prefect.core.flow.Flow