Module pipelines.rj_escritorio.cleanup.tasks

Task definitions for the cleanup pipeline.

Functions

def cancel_flow_run(flow_run_dict: Dict[str, str], client: prefect.client.client.Client = None) ‑> None
Expand source code
@task
def cancel_flow_run(flow_run_dict: Dict[str, str], client: Client = None) -> None:
    """
    Cancels a flow run from the API.
    """
    flow_run_id = flow_run_dict["id"]
    log(f">>>>>>>>>> Cancelling flow run {flow_run_id}")
    if not client:
        client = Client()
    query = """
        mutation($flow_run_id: UUID!) {
            cancel_flow_run (
                input: {
                    flow_run_id: $flow_run_id
                }
            ) {
                state
            }
        }
    """
    response = client.graphql(query=query, variables=dict(flow_run_id=flow_run_id))
    state: str = response["data"]["cancel_flow_run"]["state"]
    log(f">>>>>>>>>> Flow run {flow_run_id} is now {state}")

Cancels a flow run from the API.

def delete_flow_run(flow_run_dict: Dict[str, str], client: prefect.client.client.Client = None) ‑> None
Expand source code
@task
def delete_flow_run(flow_run_dict: Dict[str, str], client: Client = None) -> None:
    """
    Deletes a flow run from the API.
    """
    flow_run_id = flow_run_dict["id"]
    log(f">>>>>>>>>> Deleting flow run {flow_run_id}")
    if not client:
        client = Client()
    query = """
        mutation($flow_run_id: UUID!) {
            delete_flow_run (
                input: {
                    flow_run_id: $flow_run_id
                }
            ) {
                success
            }
        }
    """
    response = client.graphql(query=query, variables=dict(flow_run_id=flow_run_id))
    success: bool = response["data"]["delete_flow_run"]["success"]
    if not success:
        raise Exception(f"Could not delete flow run {flow_run_id}")

Deletes a flow run from the API.

def get_old_flow_runs(days_old: int,
client: prefect.client.client.Client = None,
skip_running: bool = True) ‑> List[Dict[str, str]]
Expand source code
@task
def get_old_flow_runs(
    days_old: int, client: Client = None, skip_running: bool = True
) -> List[Dict[str, str]]:
    """
    Fetches old flow runs from the API.

    Args:
        days_old (int): The age of the flow runs (in days) to fetch.
        client (Client, optional): The prefect client. Defaults to None.
        skip_running (bool, optional): Whether to skip running flow runs. Defaults to True.

    Returns:
        A list containing one dictionary for every flow we got. The format for the
        dictionary is the following:
    ```py
    {
        "id": "some-uuid4",
        "state": "the-final-state-for-this-flow",
        "start_time": "2022-01-01T00:00:00.000000+00:00
    }
    ```
    """
    maximum_start_time = (
        pendulum.now(tz="America/Sao_Paulo").subtract(days=days_old).to_iso8601_string()
    )
    if not client:
        client = Client()
    query = """
        query($maximum_start_time: timestamptz) {
            flow_run (
                where: {
                    _and: [
                        {start_time: {_lte: $maximum_start_time}},
    """
    if skip_running:
        query += """
                        {state: {_neq: "Running"}},
        """
    query += """
                    ]
                }
            ) {
                id
                state
                start_time
            }
        }
    """
    response = client.graphql(
        query=query, variables=dict(maximum_start_time=maximum_start_time)
    )
    return response["data"]["flow_run"]

Fetches old flow runs from the API.

Args

days_old : int
The age of the flow runs (in days) to fetch.
client : Client, optional
The prefect client. Defaults to None.
skip_running : bool, optional
Whether to skip running flow runs. Defaults to True.

Returns

A list containing one dictionary for every flow we got. The format for the dictionary is the following:

{
    "id": "some-uuid4",
    "state": "the-final-state-for-this-flow",
    "start_time": "2022-01-01T00:00:00.000000+00:00
}
def get_old_running_flow_runs(older_than_days: int, client: prefect.client.client.Client = None)
Expand source code
@task
def get_old_running_flow_runs(older_than_days: int, client: Client = None):
    """
    Fetches old running flow runs from the API.

    Args:
        older_than_days (int): The age of the flow runs (in days) to fetch.
        client (Client, optional): The prefect client. Defaults to None.

    Returns:
        A list containing one dictionary for every flow we got. The format for the
        dictionary is the following:
    ```py
    {
        "id": "some-uuid4",
        "state": "the-final-state-for-this-flow",
        "start_time": "2022-01-01T00:00:00.000000+00:00
    }
    ```
    """
    maximum_start_time = (
        pendulum.now(tz="America/Sao_Paulo")
        .subtract(days=older_than_days)
        .to_iso8601_string()
    )
    if not client:
        client = Client()
    query = """
        query($maximum_start_time: timestamptz) {
            flow_run (
                where: {
                    _and: [
                        {start_time: {_lte: $maximum_start_time}},
                        {state: {_eq: "Running"}},
                    ]
                }
            ) {
                id
                state
                start_time
                labels
            }
        }
    """
    response = client.graphql(
        query=query, variables=dict(maximum_start_time=maximum_start_time)
    )
    return response["data"]["flow_run"]

Fetches old running flow runs from the API.

Args

older_than_days : int
The age of the flow runs (in days) to fetch.
client : Client, optional
The prefect client. Defaults to None.

Returns

A list containing one dictionary for every flow we got. The format for the dictionary is the following:

{
    "id": "some-uuid4",
    "state": "the-final-state-for-this-flow",
    "start_time": "2022-01-01T00:00:00.000000+00:00
}
def get_prefect_client() ‑> prefect.client.client.Client
Expand source code
@task(checkpoint=False)
def get_prefect_client() -> Client:
    """
    Returns the prefect client.
    """
    return Client()

Returns the prefect client.