Module pipelines.utils.custom
Customizing stuff for the pipelines package.
Functions
def wait_for_flow_run_with_timeout(timeout: Union[int, datetime.timedelta])
-
Builds the
wait_for_flow_run
task with a timeout.Example: if you provide
timeout=120
, it would be equivalent to:@task(timeout=120) def wait_for_flow_run(...): ...
Classes
class CustomFlow (name: str, schedule: prefect.schedules.schedules.Schedule = None, executor: prefect.executors.base.Executor = None, environment: prefect.environments.execution.base.Environment = None, run_config: prefect.run_configs.base.RunConfig = None, storage: prefect.storage.base.Storage = None, tasks: Iterable[prefect.core.task.Task] = None, edges: Iterable[prefect.core.edge.Edge] = None, reference_tasks: Iterable[prefect.core.task.Task] = None, state_handlers: List[Callable] = None, validate: bool = None, result: Optional[prefect.engine.result.base.Result] = None, terminal_state_handler: Optional[Callable[[ForwardRef('Flow'), prefect.engine.state.State, Set[prefect.engine.state.State]], Optional[prefect.engine.state.State]]] = None, code_owners: Optional[List[str]] = None, skip_if_running: bool = False)
-
A custom Flow class that implements code ownership in order to make it easier to notify people when a FlowRun fails.
Expand source code
class CustomFlow(Flow): """ A custom Flow class that implements code ownership in order to make it easier to notify people when a FlowRun fails. """ def __init__( # pylint: disable=too-many-arguments, too-many-locals self, name: str, schedule: Schedule = None, executor: Executor = None, environment: Environment = None, run_config: RunConfig = None, storage: Storage = None, tasks: Iterable[Task] = None, edges: Iterable[Edge] = None, reference_tasks: Iterable[Task] = None, state_handlers: List[Callable] = None, validate: bool = None, result: Optional[Result] = None, terminal_state_handler: Optional[ Callable[["Flow", State, Set[State]], Optional[State]] ] = None, code_owners: Optional[List[str]] = None, skip_if_running: bool = False, ): if skip_if_running: if state_handlers is None: state_handlers = [] state_handlers.append(skip_if_running_handler) super().__init__( name=name, schedule=schedule, executor=executor, environment=environment, run_config=run_config, storage=storage, tasks=tasks, edges=edges, reference_tasks=reference_tasks, state_handlers=state_handlers, on_failure=partial( notify_discord_on_failure, secret_path=constants.EMD_DISCORD_WEBHOOK_SECRET_PATH.value, code_owners=code_owners, ), validate=validate, result=result, terminal_state_handler=terminal_state_handler, )
Ancestors
- prefect.core.flow.Flow