Module pipelines.rj_escritorio.notify_flooding.tasks
Tasks for the flooding notification pipeline.
Functions
def compare_flooding_occurences(from_api: List[Dict[str, str | int | float]],
from_cache: List[Dict[str, str | int | float]]) ‑> Tuple[List[Dict[str, str | int | float]], List[Dict[str, str | int | float]]]-
Expand source code
@task(nout=3) def compare_flooding_occurences( from_api: List[Dict[str, Union[str, int, float]]], from_cache: List[Dict[str, Union[str, int, float]]], ) -> Tuple[ List[Dict[str, Union[str, int, float]]], List[Dict[str, Union[str, int, float]]], ]: """ Compare flooding occurrences from the API with the ones in the cache. Args: from_api: List of flooding occurrences from the API. from_cache: List of flooding occurrences from the cache. Returns: Tuple with the new flooding occurrences, the closed flooding occurrences and the current flooding occurrences. """ ids_from_api = [occurrence["id"] for occurrence in from_api] log(f"IDs from API: {ids_from_api}") ids_from_cache = [occurrence["id"] for occurrence in from_cache] log(f"IDs from cache: {ids_from_cache}") new_occurrences = [ occurrence for occurrence in from_api if occurrence["id"] not in ids_from_cache ] log(f"New occurrences: {[occurrence['id'] for occurrence in new_occurrences]}") closed_occurrences = [ occurrence for occurrence in from_cache if occurrence["id"] not in ids_from_api ] log( f"Closed occurrences: {[occurrence['id'] for occurrence in closed_occurrences]}" ) current_occurences = [ occurence for occurence in from_cache if occurence["id"] in ids_from_api ] log( f"Current occurrences: {[occurrence['id'] for occurrence in current_occurences]}" ) current_occurences += new_occurrences return new_occurrences, closed_occurrences, current_occurences
Compare flooding occurrences from the API with the ones in the cache.
Args
from_api
- List of flooding occurrences from the API.
from_cache
- List of flooding occurrences from the cache.
Returns
Tuple with the new flooding occurrences, the closed flooding occurrences and the current flooding occurrences.
def filter_flooding_occurences(open_occurrences: List[Dict[str, str | int | float]],
flooding_pop_id: int | List[int]) ‑> List[Dict[str, str | int | float]]-
Expand source code
@task def filter_flooding_occurences( open_occurrences: List[Dict[str, Union[str, int, float]]], flooding_pop_id: Union[int, List[int]], ) -> List[Dict[str, Union[str, int, float]]]: """ Filter flooding occurrences from the API response. Args: open_occurrences: List of open occurrences from the API. flooding_pop_id: ID or list of IDs of the flooding POPs. Returns: List of flooding occurrences. """ if isinstance(flooding_pop_id, int): flooding_pop_id = [flooding_pop_id] flooding_occurrences = [ occurrence for occurrence in open_occurrences if occurrence["pop_id"] in flooding_pop_id ] return flooding_occurrences
Filter flooding occurrences from the API response.
Args
open_occurrences
- List of open occurrences from the API.
flooding_pop_id
- ID or list of IDs of the flooding POPs.
Returns
List of flooding occurrences.
def get_cached_flooding_occurences(redis_key: str,
host: str = 'redis.redis.svc.cluster.local',
port: int = 6379,
db: int = 0,
password: str = None) ‑> List[Dict[str, str | int | float]]-
Expand source code
@task def get_cached_flooding_occurences( redis_key: str, host: str = "redis.redis.svc.cluster.local", port: int = 6379, db: int = 0, # pylint: disable=C0103 password: str = None, ) -> List[Dict[str, Union[str, int, float]]]: """ Get flooding occurrences from Redis. Args: redis_key: Key to the flooding occurrences in Redis. host: Redis host. port: Redis port. db: Redis database. password: Redis password. Returns: List of flooding occurrences. """ redis_client = get_redis_client(host=host, port=port, db=db, password=password) flooding_occurrences = redis_client.get(redis_key) if flooding_occurrences is None: flooding_occurrences = [] return flooding_occurrences
Get flooding occurrences from Redis.
Args
redis_key
- Key to the flooding occurrences in Redis.
host
- Redis host.
port
- Redis port.
db
- Redis database.
password
- Redis password.
Returns
List of flooding occurrences.
def get_open_occurrences(api_url: str) ‑> List[Dict[str, str | int | float]]
-
Expand source code
@task def get_open_occurrences(api_url: str) -> List[Dict[str, Union[str, int, float]]]: """ Get open occurrences from the API. Args: api_url: URL to the COR-Comando API (open occurences endpoint) Returns: List of open occurrences. """ try: response = requests.get(api_url) response.raise_for_status() except Exception as exc: raise Exception(f"Error getting open occurrences from API: {exc}") from exc try: data = response.json() except Exception as exc: raise Exception(f"Error parsing response from API: {exc}") from exc try: occurences = data["eventos"] except KeyError as exc: raise Exception(f"Error parsing response from API: {exc}") from exc return occurences
Get open occurrences from the API.
Args
api_url
- URL to the COR-Comando API (open occurences endpoint)
Returns
List of open occurrences.
def parse_comma_separated_string_to_list(input_text: str, output_type: type = builtins.int) ‑> List[Any]
-
Expand source code
@task def parse_comma_separated_string_to_list( input_text: str, output_type: type = int, ) -> List[Any]: """ Parse a comma separated string to a list. Args: input: Input string. output_type: Type of the output list. Returns: List of the input string elements. """ if input_text == "": return [] return [output_type(element) for element in input_text.split(",")]
Parse a comma separated string to a list.
Args
input
- Input string.
output_type
- Type of the output list.
Returns
List of the input string elements.
def send_email_for_flooding_occurence(occurence: Dict[str, str | int | float],
mode: str,
to_email: str | List[str],
email_configuration_secret_path: str,
radius: int = 10)-
Expand source code
@task def send_email_for_flooding_occurence( occurence: Dict[str, Union[str, int, float]], mode: str, to_email: Union[str, List[str]], email_configuration_secret_path: str, radius: int = 10, ): """ Send an email for a flooding occurrence. Args: occurence: Flooding occurrence. mode: Must be "new" or "closed". to_email: Email (or list of emails) to send the email to. email_configuration_secret_path: Path to the from email in Vault. This provides username, password and SMTP server. """ try: radius = int(radius) except ValueError as exc: raise ValueError(f"Invalid radius: {radius}") from exc if mode not in ["new", "closed"]: raise ValueError(f"Invalid mode: {mode}") secret = get_vault_secret(email_configuration_secret_path)["data"] if mode == "new": subject = f"NEW FLOOD OCCURENCE - ID {occurence['id']}" body = subject circle_fname = f"{uuid4()}.kml" get_circle( latitude=occurence["latitude"], longitude=occurence["longitude"], radius=radius, fname=circle_fname, ) attachment = circle_fname else: subject = f"CLOSED FLOOD OCCURENCE - ID {occurence['id']}" body = subject circle_fname = f"{uuid4()}.kml" get_circle( latitude=occurence["latitude"], longitude=occurence["longitude"], radius=radius, fname=circle_fname, ) attachment = circle_fname send_email( from_address=secret["smtp_username"], to_address=to_email, subject=subject, body=body, smtp_server=secret["smtp_server"], smtp_port=int(secret["smtp_port"]), smtp_username=secret["smtp_username"], smtp_password=secret["smtp_password"], tls=True, attachment=attachment, )
Send an email for a flooding occurrence.
Args
occurence
- Flooding occurrence.
mode
- Must be "new" or "closed".
to_email
- Email (or list of emails) to send the email to.
email_configuration_secret_path
- Path to the from email in Vault. This provides username, password and SMTP server.
def update_flooding_occurences_cache(flooding_occurrences: List[Dict[str, str | int | float]],
redis_key: str,
host: str = 'redis.redis.svc.cluster.local',
port: int = 6379,
db: int = 0,
password: str = None)-
Expand source code
@task def update_flooding_occurences_cache( # pylint: disable=R0913 flooding_occurrences: List[Dict[str, Union[str, int, float]]], redis_key: str, host: str = "redis.redis.svc.cluster.local", port: int = 6379, db: int = 0, # pylint: disable=C0103 password: str = None, ): """ Update the flooding occurrences cache. Args: flooding_occurrences: List of flooding occurrences. redis_key: Key to the flooding occurrences in Redis. host: Redis host. port: Redis port. db: Redis database. password: Redis password. """ redis_client = get_redis_client(host=host, port=port, db=db, password=password) redis_client.set(redis_key, flooding_occurrences)
Update the flooding occurrences cache.
Args
flooding_occurrences
- List of flooding occurrences.
redis_key
- Key to the flooding occurrences in Redis.
host
- Redis host.
port
- Redis port.
db
- Redis database.
password
- Redis password.