Module pipelines.rj_cor.bot_semaforo.tasks

Tasks for cor

Functions

def format_message(dataframe: pandas.core.frame.DataFrame) ‑> pandas.core.series.Series
Expand source code
@task(checkpoint=False)
def format_message(dataframe: pd.DataFrame) -> pd.Series:
    """
    Formats the message before sending it.
    """

    # Create a link for eath alert on google maps
    def map_link(dataframe: pd.DataFrame):
        url = (
            "https://www.google.com/maps/search/?api=1&query="
            + dataframe["semaforo_latitude"].astype(str)
            + ","
            + dataframe["semaforo_longitude"].astype(str)
            + "&zoom=21"
        )
        url = '<a href="' + url + '">' + dataframe["description"] + "</a>"
        return url

    def current_date_time():
        """
        Gets current "date and time" and "current date and time minus 1 hour in
        list [current, current_minus_1h]
        """
        date_format = "%Y-%m-%d %H:%M:%S"
        current = datetime.strptime(
            datetime.now(pytz.timezone("America/Sao_Paulo")).strftime(date_format),
            date_format,
        )
        current_minus_1h = current - timedelta(minutes=60)
        return current_minus_1h, current

    # Builds all alert messages
    alert = None
    thumbs_up_emoji = "\U0001F44D"
    current_minus_1h, current = current_date_time()

    mask = (dataframe["initial_ts"] > current_minus_1h) & (
        dataframe["initial_ts"] <= current
    )
    filered_alerts = dataframe[mask]
    if len(filered_alerts) > 0:
        filered_alerts["url"] = map_link(filered_alerts)
        filered_alerts["alert"] = (
            filered_alerts["initial_ts"].apply(lambda x: str(x)[11:16])
            + " - "
            + filered_alerts["name"]
            + " - "
            + filered_alerts["url"]
            + " - "
            + filered_alerts["sum_thumbs_up"].astype(str)
            + thumbs_up_emoji
            + "\n \n"
        )
        alert = "".join(filered_alerts["alert"].tolist())

    traffic_light_emoji = "\U0001F6A6"
    msg_header = (
        traffic_light_emoji
        + " CETRIO"
        + "\n \nALERTA WAZE - Semáforo quebrado - atualizado em "
        + current.strftime("%Y-%m-%d %H:%M:%S")
        + "\n"
        + "Alertas no período de: "
        + current_minus_1h.strftime("%H:%M")
        + " -> "
        + current.strftime("%H:%M")
        + "\n \n"
    )

    # Builds final message
    if alert:
        msg = msg_header + alert
    else:
        alert = "Não foram encontrados alertas no período" + "\n \n"
        msg = msg_header + alert

    return smart_split(
        text=msg,
        max_length=constants.TELEGRAM_MAX_MESSAGE_LENGTH.value,
        separator="\n",
    )

Formats the message before sending it.

def get_data() ‑> pandas.core.frame.DataFrame
Expand source code
@task(checkpoint=False)
def get_data() -> pd.DataFrame:
    """
    Returns the dataframe with the alerts.
    """
    query = """
    WITH semaforos AS (
    SELECT
    *,
    ST_BUFFER(geometry, 100) raio
    FROM `rj-escritorio-dev.transporte_rodoviario_cet.semaforos`),

    distinct_selection AS (
    SELECT
        DISTINCT ts,
        uuid
    FROM
        `rj-escritorio-dev.transporte_rodoviario_waze.alertas`
    WHERE
        type = 'HAZARD'
        AND subtype='HAZARD_ON_ROAD_TRAFFIC_LIGHT_FAULT'
        AND city = 'Rio de Janeiro'
        AND DATE_DIFF(CURRENT_DATE(), CAST(ts as DATETIME), DAY) < 1 ),
    intermediate_query AS (
        SELECT
        distinct_selection.*,
        original.street,
        original.geometry,
        original.number_thumbs_up,
        original.reliability
        FROM
        distinct_selection
        LEFT JOIN
        `rj-escritorio-dev.transporte_rodoviario_waze.alertas` AS original
        ON
        distinct_selection.ts = original.ts
        AND distinct_selection.uuid = original.uuid
        WHERE
        original.street IS NOT NULL
        AND original.city = "Rio de Janeiro"
        ORDER BY
        ts),
    clusters AS (
        SELECT
            *,
            -- epsilon: The epsilon that specifies the radius, measured in meters,
            -- around a core value. 10 20 50
            ST_CLUSTERDBSCAN(geometry, 50, 1) OVER () AS cluster_num,
            SAFE_CAST(FORMAT_DATE('%s', ts) AS INT64) AS ts_epoch,
        FROM intermediate_query),

    pontos_de_alertas AS (
    SELECT
        cluster_num,
        MAX(ts) as ts,
        MAX(uuid) as uuid,
        MAX(street) as street,
        SUM(
            CASE WHEN number_thumbs_up is not null then CAST(number_thumbs_up as FLOAT64) ELSE 0 END
        ) as number_thumbs_up,
        MAX(reliability) as reliability,
        MAX(ts_epoch) as ts_epoch,
        ST_CENTROID_AGG(geometry) centroid,
        COUNT(*) as number_cluster_alerts
    FROM clusters
    GROUP BY cluster_num
    ORDER BY cluster_num),

    alertas_no_raio AS (
    SELECT
    s.name,
    DATETIME(MIN(ts), 'America/Sao_Paulo') initial_ts,
    s.description,
    SUM(a.number_thumbs_up) sum_thumbs_up
    FROM semaforos as s
    INNER JOIN pontos_de_alertas as a
        ON ST_COVERS(s.raio, a.centroid)
    GROUP BY s.name, s.description)

    SELECT
    alertas_no_raio.*,
    ST_Y(geometry) semaforo_latitude,
    ST_X(geometry) semaforo_longitude
    FROM alertas_no_raio
    LEFT JOIN semaforos
        ON alertas_no_raio.name = semaforos.name
    ORDER BY initial_ts DESC
    """
    return bd.read_sql(query=query, billing_project_id="rj-cor", from_file=True)

Returns the dataframe with the alerts.

def get_token_and_group_id(secret_path: str) ‑> Tuple[str, int]
Expand source code
@task(checkpoint=False, nout=2)
def get_token_and_group_id(secret_path: str) -> Tuple[str, int]:
    """
    Returns Telegram token and group ID from a secret file.
    """
    secret = get_vault_secret(secret_path, client=None)
    return (
        secret["data"]["token"].strip(),
        int(secret["data"]["group_id"].strip()),
    )

Returns Telegram token and group ID from a secret file.

def send_messages(token: str, group_id: str, messages: List[str]) ‑> None
Expand source code
@task(checkpoint=False)
# pylint: disable=too-many-arguments
def send_messages(token: str, group_id: str, messages: List[str]) -> None:
    """
    Sends the alerts to the Telegram group.
    """
    for message in messages:
        if message != "":
            send_telegram_message(message=message, token=token, chat_id=group_id)

    url = (
        '<a href="https://datastudio.google.com/reporting/b2841cf6-dd1b-4700-b6a4-140495e93ff4">'
        + "MAPA GERAL</a>"
    )
    send_telegram_message(message=url, token=token, chat_id=group_id)

Sends the alerts to the Telegram group.