Module pipelines.rj_escritorio.bot_sistemas.tasks

Tasks for the Systems Bot flow.

Functions

def build_message(data: list) ‑> str
Expand source code
@task(checkpoint=False)
def build_message(data: list) -> str:
    """
    Builds the message
    """
    msg = """
    {% macro printconds(data, status, fase) -%}

        {% for d in data -%}
        {% if (d.status == status) and (d.fase == fase) -%}
        - {{d.texto}}
        {% endif %}
        {%- endfor %}
    {% endmacro -%}


    Status dos Sistemas

    {% for f in  fases-%}
    *{{f}} - até {{prazos[f]}}*

    {% for s in  statuses-%}
    *{{s}}*
        {{ printconds(data, s, f) }}
    {%- endfor %}
    {%- endfor %}
    """
    statuses = [
        "Não Iniciado",
        "Em Andamento",
        "Funcionamento Parcial",
        "Funcionamento Pleno",
    ]
    fases = ["Fase 1", "Fase 2", "Fase 3", "Fase 4"]
    prazos = {
        "Fase 1": "05/09",
        "Fase 2": "26/09",
        "Fase 3": "10/10",
        "Fase 4": "07/11",
    }
    return (
        Environment()
        .from_string(msg)
        .render(
            data=data,
            statuses=statuses,
            fases=fases,
            prazos=prazos,
        )
    )

Builds the message

def get_data(sheet_id: str, sheet_name: str) ‑> pandas.core.frame.DataFrame
Expand source code
@task(checkpoint=False)
def get_data(sheet_id: str, sheet_name: str) -> pd.DataFrame:
    """
    Fetches data from Google Drive
    """
    url = "https://docs.google.com/spreadsheets/d/"
    url += f"{sheet_id}/gviz/tq?tqx=out:csv&sheet={sheet_name}"
    dataframe = pd.read_csv(url)
    return dataframe

Fetches data from Google Drive

def preprocess_data(dataframe: pandas.core.frame.DataFrame) ‑> pandas.core.frame.DataFrame
Expand source code
@task(checkpoint=False)
def preprocess_data(dataframe: pd.DataFrame) -> pd.DataFrame:
    """
    Preprocesses data
    """
    dataframe["dt_critica_dt"] = dataframe["dt_critica"].apply(
        lambda x: datetime.strptime(x, "%d/%m/%Y") if isinstance(x, str) else None
    )
    dataframe["dt_iplan_dt"] = dataframe["dt_iplan"].apply(
        lambda x: datetime.strptime(x, "%d/%m/%Y") if isinstance(x, str) else None
    )
    dataframe["texto"] = dataframe.apply(
        lambda x: f"{x['dt_critica']}, {x['nome']} | {x['nome_fantasia']}", 1
    )
    return dataframe.sort_values(by="dt_critica_dt")[
        ["fase", "status", "texto"]
    ].to_dict("records")

Preprocesses data

def split_and_send_messages(token: str, group_id: str, message: str) ‑> None
Expand source code
@task(checkpoint=False)
def split_and_send_messages(token: str, group_id: str, message: str) -> None:
    """
    Sends the alerts to the Telegram group.
    """
    messages = smart_split(
        text=message,
        max_length=constants.TELEGRAM_MAX_MESSAGE_LENGTH.value,
        separator="\n",
    )

    for msg in messages:
        if msg != "":
            msg = escape_markdown(msg, version=2)
            send_telegram_message(
                message=msg,
                token=token,
                chat_id=group_id,
                parse_mode=telegram.ParseMode.MARKDOWN_V2,
            )

Sends the alerts to the Telegram group.