Module pipelines.rj_smtr.projeto_subsidio_sppo.tasks

Tasks for projeto_subsidio_sppo

Functions

def check_param(param: str) ‑> bool
Expand source code
@task
def check_param(param: str) -> bool:
    """
    Check if param is None
    """
    return param is None

Check if param is None

def subsidio_data_quality_check(mode: str, params: dict, code_owners: list = None, check_params: dict = None) ‑> bool
Expand source code
@task
def subsidio_data_quality_check(
    mode: str, params: dict, code_owners: list = None, check_params: dict = None
) -> bool:
    """
    Verifica qualidade de dados para o processo de apuração de subsídio

    Args:
        mode (str): Modo de execução (pre ou pos)
        params (dict): Parameters for the checks
        code_owners (list): Code owners to be notified
        check_params (dict): queries and order columns for the checks

    Returns:
        test_check (bool): True if all checks passed, False otherwise
    """

    if mode not in ["pre", "pos"]:
        raise ValueError("Mode must be 'pre' or 'pos'")

    if check_params is None:
        check_params = smtr_constants.SUBSIDIO_SPPO_DATA_CHECKS_PARAMS.value

    if code_owners is None:
        code_owners = smtr_constants.SUBSIDIO_SPPO_CODE_OWNERS.value

    checks = dict()

    request_params = {
        "start_timestamp": f"""{params["start_date"]} 00:00:00""",
        "end_timestamp": (
            datetime.strptime(params["end_date"], "%Y-%m-%d") + timedelta(hours=27)
        ).strftime("%Y-%m-%d %H:%M:%S"),
    }

    if mode == "pos":
        request_params["end_timestamp"] = f"""{params["end_date"]} 00:00:00"""
        request_params[
            "dataset_id"
        ] = smtr_constants.SUBSIDIO_SPPO_DASHBOARD_DATASET_ID.value

    checks_list = (
        smtr_constants.SUBSIDIO_SPPO_DATA_CHECKS_PRE_LIST.value
        if mode == "pre"
        else smtr_constants.SUBSIDIO_SPPO_DATA_CHECKS_POS_LIST.value
    )

    for (
        table_id,
        test_check_list,
    ) in checks_list.items():
        checks[table_id] = perform_checks_for_table(
            table_id, request_params, test_check_list, check_params
        )

    log(checks)

    date_range = (
        params["start_date"]
        if params["start_date"] == params["end_date"]
        else f'{params["start_date"]} a {params["end_date"]}'
    )

    webhook_url = get_vault_secret(
        secret_path=smtr_constants.SUBSIDIO_SPPO_SECRET_PATH.value
    )["data"]["discord_data_check_webhook"]

    test_check = all(
        table["status"] for sublist in checks.values() for table in sublist
    )

    formatted_messages = [
        ":green_circle: " if test_check else ":red_circle: ",
        f"**{mode.capitalize()}-Data Quality Checks - Apuração de Subsídio - {date_range}**\n\n",
    ]

    if "general" in checks:
        formatted_messages.extend(
            f'{":white_check_mark:" if check["status"] else ":x:"} {check["desc"]}\n'
            for check in checks["general"]
        )

    format_send_discord_message(formatted_messages, webhook_url)

    for table_id, checks_ in checks.items():
        if table_id != "general":
            formatted_messages = [
                f"*{table_id}:*\n"
                + "\n".join(
                    f'{":white_check_mark:" if check["status"] else ":x:"} {check["desc"]}'
                    for check in checks_
                )
            ]
            format_send_discord_message(formatted_messages, webhook_url)

    formatted_messages = ["\n\n"]

    if mode == "pre":
        formatted_messages.append(
            ""
            if test_check
            else """:warning: **Status:** Necessidade de revisão dos dados de entrada!\n"""
        )

    if mode == "pos":
        formatted_messages.append(
            ":tada: **Status:** Sucesso"
            if test_check
            else ":warning: **Status:** Testes falharam. Necessidade de revisão dos dados finais!\n"
        )

    if not test_check:
        at_code_owners = [
            f'    - <@{constants.OWNERS_DISCORD_MENTIONS.value[code_owner]["user_id"]}>\n'
            if constants.OWNERS_DISCORD_MENTIONS.value[code_owner]["type"] == "user"
            else f'    - <@!{constants.OWNERS_DISCORD_MENTIONS.value[code_owner]["user_id"]}>\n'
            if constants.OWNERS_DISCORD_MENTIONS.value[code_owner]["type"]
            == "user_nickname"
            else f'    - <#{constants.OWNERS_DISCORD_MENTIONS.value[code_owner]["user_id"]}>\n'
            if constants.OWNERS_DISCORD_MENTIONS.value[code_owner]["type"] == "channel"
            else f'    - <@&{constants.OWNERS_DISCORD_MENTIONS.value[code_owner]["user_id"]}>\n'
            for code_owner in code_owners
        ]

        formatted_messages.extend(at_code_owners)

    format_send_discord_message(formatted_messages, webhook_url)

    return test_check

Verifica qualidade de dados para o processo de apuração de subsídio

Args

mode : str
Modo de execução (pre ou pos)
params : dict
Parameters for the checks
code_owners : list
Code owners to be notified
check_params : dict
queries and order columns for the checks

Returns

test_check (bool): True if all checks passed, False otherwise