Module pipelines.utils.whatsapp_bot.tasks
Tasks for sending messages through a Whatsapp Bot. This module assumes that the bot inherits from the template bot at https://github.com/prefeitura-rio/whatsapp-bot
Functions
def get_whatsapp_bot_api_url(secret_path: str) ‑> str
-
Expand source code
@task def get_whatsapp_bot_api_url(secret_path: str) -> str: """ Get the URL of the bot API. Args: secret_path (str): Path to the secret on Vault. Returns: str: The URL of the bot API. """ return get_vault_secret(secret_path)["data"]["url"]
Get the URL of the bot API.
Args
secret_path
:str
- Path to the secret on Vault.
Returns
str
- The URL of the bot API.
def send_whatsapp_message(bot_api_url: str, chat_id: str, message: str)
-
Expand source code
@task def send_whatsapp_message(bot_api_url: str, chat_id: str, message: str): """ Sends a message through a Whatsapp Bot. Args: bot_api_url (str): The URL of the bot. chat_id (str): The chat ID of the bot. message (str): The message to be sent. Returns: None Raises: """ data = { "chat_id": chat_id, "message": message, } try: response = requests.post(bot_api_url, json=data) response.raise_for_status() except requests.exceptions.HTTPError as exc: if response.status_code == 404: raise WhatsAppBotNotFoundError("Whatsapp Bot not found.") from exc raise WhatsAppSendMessageError( "Error sending message to Whatsapp Bot." ) from exc except requests.exceptions.RequestException as exc: raise WhatsAppSendMessageError( "Error sending message to Whatsapp Bot." ) from exc
Sends a message through a Whatsapp Bot.
Args
bot_api_url
:str
- The URL of the bot.
chat_id
:str
- The chat ID of the bot.
message
:str
- The message to be sent.
Returns
None Raises: