Module pipelines.utils.backfill_flow.tasks

Tasks for the backfill flow

Functions

def create_timestamp_parameters(start: pendulum.datetime.DateTime,
end: pendulum.datetime.DateTime,
interval: pendulum.duration.Duration,
datetime_format: str = 'YYYY-MM-DD',
reverse: bool = False)
Expand source code
@task
def create_timestamp_parameters(
    start: pendulum.DateTime,
    end: pendulum.DateTime,
    interval: pendulum.Duration,
    datetime_format: str = "YYYY-MM-DD",
    reverse: bool = False,
):
    """
    Create a list of parameters for a flow that takes timestamp parameters.

    Args:
        start: The start of the range of timestamps to generate
        end: The end of the range of timestamps to generate
        interval: The interval between timestamps
        datetime_format: The format to use for the timestamps
        reverse: Whether to reverse the order of the timestamps

    Returns:
        A list of parameters for a flow that takes timestamp parameters
    """
    parameters = []
    while start < end:
        this_end = start + interval
        parameters.append(
            {
                "start": start.format(datetime_format),
                "end": this_end.format(datetime_format),
            }
        )
        start = this_end
    if reverse:
        parameters.reverse()
    return parameters

Create a list of parameters for a flow that takes timestamp parameters.

Args

start
The start of the range of timestamps to generate
end
The end of the range of timestamps to generate
interval
The interval between timestamps
datetime_format
The format to use for the timestamps
reverse
Whether to reverse the order of the timestamps

Returns

A list of parameters for a flow that takes timestamp parameters

def launch_flow(flow_name: str,
parameter: Dict[str, Any],
agent_label: str,
flow_project: str = 'main',
parameter_defaults: Dict[str, Any] = None,
help_name: str = None,
datetime_start_param: str = None,
datetime_end_param: str = None,
fetch_flow_run_info_sleep_time: int = 30,
prefect_client: prefect.client.client.Client = None)
Expand source code
@task
# pylint: disable=too-many-arguments
def launch_flow(
    flow_name: str,
    parameter: Dict[str, Any],
    agent_label: str,
    flow_project: str = "main",
    parameter_defaults: Dict[str, Any] = None,
    help_name: str = None,
    datetime_start_param: str = None,
    datetime_end_param: str = None,
    fetch_flow_run_info_sleep_time: int = 30,
    prefect_client: Client = None,
):
    """
    Launch a flow with the given parameters.

    Args:
        flow_name: The name of the flow to launch
        parameter: The backfill parameters to use when launching the flow
        agent_label: The agent label to use when launching the flow
        flow_project: The project to use when launching the flow
        parameter_defaults: The default parameters to use when launching the flow
        help_name: A help name to use when setting the flow run name
        datetime_start_param: The name of the start datetime parameter
        datetime_end_param: The name of the end datetime parameter
        fetch_flow_run_info_sleep_time: The time to sleep between fetching flow run info
        prefect_client: The Prefect client to use

    Returns:
        None
    """

    if parameter_defaults is None:
        parameter_defaults = {}

    if help_name is None:
        help_name = flow_name

    if prefect_client is None:
        prefect_client = Client()

    if datetime_start_param:
        parameter_defaults[datetime_start_param] = parameter["start"]

    if datetime_end_param:
        parameter_defaults[datetime_end_param] = parameter["end"]

    log(f"Launching run for window {parameter['start']} to {parameter['end']}")

    flow_run_id = run_registered(
        flow_name=flow_name,
        flow_project=flow_project,
        labels=[agent_label],
        parameters=parameter_defaults,
        run_description=f"Backfill - {help_name} - {parameter['start']} to {parameter['end']}",
    )

    state: State = prefect_client.get_flow_run_info(flow_run_id=flow_run_id).state

    while not state.is_finished():
        sleep(fetch_flow_run_info_sleep_time)
        state = prefect_client.get_flow_run_info(flow_run_id=flow_run_id).state

    if state.is_successful():
        log(f"Run for window {parameter['start']} to {parameter['end']} succeeded")
    else:
        log(
            f"Run for window {parameter['start']} to {parameter['end']} failed",
            level="error",
        )
        raise Exception(
            f"Run for window {parameter['start']} to {parameter['end']} failed"
        )

Launch a flow with the given parameters.

Args

flow_name
The name of the flow to launch
parameter
The backfill parameters to use when launching the flow
agent_label
The agent label to use when launching the flow
flow_project
The project to use when launching the flow
parameter_defaults
The default parameters to use when launching the flow
help_name
A help name to use when setting the flow run name
datetime_start_param
The name of the start datetime parameter
datetime_end_param
The name of the end datetime parameter
fetch_flow_run_info_sleep_time
The time to sleep between fetching flow run info
prefect_client
The Prefect client to use

Returns

None

def parse_datetime(datetime_string: str, timezone: str = 'America/Sao_Paulo') ‑> pendulum.datetime.DateTime
Expand source code
@task
def parse_datetime(
    datetime_string: str,
    timezone: str = "America/Sao_Paulo",
) -> pendulum.DateTime:
    """
    Parse a datetime string.

    Args:
        datetime_string: The datetime string to parse
        timezone: The timezone to use

    Returns:
        The parsed datetime
    """
    return pendulum.parse(datetime_string, strict=False, tz=timezone)

Parse a datetime string.

Args

datetime_string
The datetime string to parse
timezone
The timezone to use

Returns

The parsed datetime

def parse_duration(duration_dict: Dict[str, int]) ‑> pendulum.duration.Duration
Expand source code
@task
def parse_duration(
    duration_dict: Dict[str, int],
) -> pendulum.Duration:
    """
    Parse a duration dictionary.

    Args:
        duration_dict: The duration dictionary to parse

    Returns:
        The parsed duration
    """
    return pendulum.duration(**duration_dict)

Parse a duration dictionary.

Args

duration_dict
The duration dictionary to parse

Returns

The parsed duration