Module pipelines.rj_escritorio.notify_flooding.tasks
Tasks for the flooding notification pipeline.
Functions
def compare_flooding_occurences(from_api: List[Dict[str, Union[str, int, float]]], from_cache: List[Dict[str, Union[str, int, float]]]) ‑> Tuple[List[Dict[str, Union[str, int, float]]], List[Dict[str, Union[str, int, float]]]]
-
Compare flooding occurrences from the API with the ones in the cache.
Args
from_api
- List of flooding occurrences from the API.
from_cache
- List of flooding occurrences from the cache.
Returns
Tuple with the new flooding occurrences, the closed flooding occurrences and the current flooding occurrences.
def filter_flooding_occurences(open_occurrences: List[Dict[str, Union[str, int, float]]], flooding_pop_id: Union[int, List[int]]) ‑> List[Dict[str, Union[str, int, float]]]
-
Filter flooding occurrences from the API response.
Args
open_occurrences
- List of open occurrences from the API.
flooding_pop_id
- ID or list of IDs of the flooding POPs.
Returns
List of flooding occurrences.
def get_cached_flooding_occurences(redis_key: str, host: str = 'redis.redis.svc.cluster.local', port: int = 6379, db: int = 0, password: str = None) ‑> List[Dict[str, Union[str, int, float]]]
-
Get flooding occurrences from Redis.
Args
redis_key
- Key to the flooding occurrences in Redis.
host
- Redis host.
port
- Redis port.
db
- Redis database.
password
- Redis password.
Returns
List of flooding occurrences.
def get_open_occurrences(api_url: str) ‑> List[Dict[str, Union[str, int, float]]]
-
Get open occurrences from the API.
Args
api_url
- URL to the COR-Comando API (open occurences endpoint)
Returns
List of open occurrences.
def parse_comma_separated_string_to_list(input_text: str, output_type: type = builtins.int) ‑> List[Any]
-
Parse a comma separated string to a list.
Args
input
- Input string.
output_type
- Type of the output list.
Returns
List of the input string elements.
def send_email_for_flooding_occurence(occurence: Dict[str, Union[str, int, float]], mode: str, to_email: Union[str, List[str]], email_configuration_secret_path: str, radius: int = 10)
-
Send an email for a flooding occurrence.
Args
occurence
- Flooding occurrence.
mode
- Must be "new" or "closed".
to_email
- Email (or list of emails) to send the email to.
email_configuration_secret_path
- Path to the from email in Vault. This provides username, password and SMTP server.
def update_flooding_occurences_cache(flooding_occurrences: List[Dict[str, Union[str, int, float]]], redis_key: str, host: str = 'redis.redis.svc.cluster.local', port: int = 6379, db: int = 0, password: str = None)
-
Update the flooding occurrences cache.
Args
flooding_occurrences
- List of flooding occurrences.
redis_key
- Key to the flooding occurrences in Redis.
host
- Redis host.
port
- Redis port.
db
- Redis database.
password
- Redis password.