Module pipelines.rj_escritorio.birthday_notification.tasks
Tasks for the daily birthday flow.
Functions
def get_birthdays_by_date(date: str) ‑> List[str]
-
Expand source code
@task def get_birthdays_by_date(date: str) -> List[str]: """ Get birthdays by date. """ birthdays_url = f"https://bot.dados.rio/users/?birthday={date}" secret = get_vault_secret("bot-rio-api-token") token = secret["data"]["token"] response = requests.get(birthdays_url, headers={"Authorization": f"Token {token}"}) response.raise_for_status() return [item["name"] for item in response.json()["results"]]
Get birthdays by date.
def get_todays_date() ‑> str
-
Expand source code
@task def get_todays_date() -> str: """ Get today's date in format dd-mm """ return ( pendulum.now() .replace(tzinfo=pytz.timezone("America/Sao_Paulo")) .strftime("%d-%m") )
Get today's date in format dd-mm
def send_birthday_message(names: List[str], secret_path: str) ‑> None
-
Expand source code
@task def send_birthday_message(names: List[str], secret_path: str) -> None: """ Send birthday message. """ secret = get_vault_secret(secret_path) webhook_url = secret["data"]["url"] for name in names: message = f"Tem aniversário hoje!!! Parabéns {name}! 🥳🥳🥳" send_discord_message(message, webhook_url)
Send birthday message.