Module pipelines.rj_smtr.br_rj_riodejaneiro_rdo.utils

General purpose functions for the br_rj_riodejaneiro_rdo project

Functions

def build_table_id(mode: str, report_type: str)
Expand source code
def build_table_id(mode: str, report_type: str):
    """Build table_id based on which table is the target
    of current flow run

    Args:
        mode (str): SPPO or STPL
        report_type (str): RHO or RDO

    Returns:
        str: table_id
    """
    if mode == "SPPO":
        if report_type == "RDO":
            table_id = constants.SPPO_RDO_TABLE_ID.value
        else:
            table_id = constants.SPPO_RHO_TABLE_ID.value
    if mode == "STPL":
        # slice the string to get rid of V at end of
        # STPL reports filenames
        if report_type[:3] == "RDO":
            table_id = constants.STPL_RDO_TABLE_ID.value
        else:
            table_id = constants.STPL_RHO_TABLE_ID.value
    return table_id

Build table_id based on which table is the target of current flow run

Args

mode : str
SPPO or STPL
report_type : str
RHO or RDO

Returns

str
table_id
def generate_ftp_schedules(interval_minutes: int, label: str = 'rj-smtr')
Expand source code
def generate_ftp_schedules(
    interval_minutes: int, label: str = emd_constants.RJ_SMTR_AGENT_LABEL.value
):
    """Generates IntervalClocks with the parameters needed to capture
    each report.

    Args:
        interval_minutes (int): interval which this flow will be run.
        label (str, optional): Prefect label, defines which agent to use when launching flow run.
        Defaults to emd_constants.RJ_SMTR_AGENT_LABEL.value.

    Returns:
        List(IntervalClock): containing the clocks for scheduling runs
    """
    modes = ["SPPO", "STPL"]
    reports = ["RDO", "RHO"]
    clocks = []
    for mode in modes:
        for report in reports:
            clocks.append(
                IntervalClock(
                    interval=timedelta(minutes=interval_minutes),
                    start_date=datetime(
                        2022, 12, 16, 5, 0, tzinfo=timezone(constants.TIMEZONE.value)
                    ),
                    parameter_defaults={
                        "transport_mode": mode,
                        "report_type": report,
                        "table_id": build_table_id(mode=mode, report_type=report),
                    },
                    labels=[label],
                )
            )
    return clocks

Generates IntervalClocks with the parameters needed to capture each report.

Args

interval_minutes : int
interval which this flow will be run.
label : str, optional
Prefect label, defines which agent to use when launching flow run.

Defaults to emd_constants.RJ_SMTR_AGENT_LABEL.value.

Returns

List(IntervalClock): containing the clocks for scheduling runs

def merge_file_info_and_errors(files: list, errors: list)
Expand source code
def merge_file_info_and_errors(files: list, errors: list):
    """

    Args:
        files (list): List of dicts
        errors (list): list of errors

    Returns:
        list: containing dicts with updated error
    """
    for i, file in enumerate(files):
        file["error"] = errors[i]
    return files

Args

files : list
List of dicts
errors : list
list of errors

Returns

list
containing dicts with updated error