Module pipelines.rj_escritorio.notify_flooding.tasks

Tasks for the flooding notification pipeline.

Functions

def compare_flooding_occurences(from_api: List[Dict[str, Union[str, int, float]]], from_cache: List[Dict[str, Union[str, int, float]]]) ‑> Tuple[List[Dict[str, Union[str, int, float]]], List[Dict[str, Union[str, int, float]]]]

Compare flooding occurrences from the API with the ones in the cache.

Args

from_api
List of flooding occurrences from the API.
from_cache
List of flooding occurrences from the cache.

Returns

Tuple with the new flooding occurrences, the closed flooding occurrences and the current flooding occurrences.

def filter_flooding_occurences(open_occurrences: List[Dict[str, Union[str, int, float]]], flooding_pop_id: Union[int, List[int]]) ‑> List[Dict[str, Union[str, int, float]]]

Filter flooding occurrences from the API response.

Args

open_occurrences
List of open occurrences from the API.
flooding_pop_id
ID or list of IDs of the flooding POPs.

Returns

List of flooding occurrences.

def get_cached_flooding_occurences(redis_key: str, host: str = 'redis.redis.svc.cluster.local', port: int = 6379, db: int = 0, password: str = None) ‑> List[Dict[str, Union[str, int, float]]]

Get flooding occurrences from Redis.

Args

redis_key
Key to the flooding occurrences in Redis.
host
Redis host.
port
Redis port.
db
Redis database.
password
Redis password.

Returns

List of flooding occurrences.

def get_open_occurrences(api_url: str) ‑> List[Dict[str, Union[str, int, float]]]

Get open occurrences from the API.

Args

api_url
URL to the COR-Comando API (open occurences endpoint)

Returns

List of open occurrences.

def parse_comma_separated_string_to_list(input_text: str, output_type: type = builtins.int) ‑> List[Any]

Parse a comma separated string to a list.

Args

input
Input string.
output_type
Type of the output list.

Returns

List of the input string elements.

def send_email_for_flooding_occurence(occurence: Dict[str, Union[str, int, float]], mode: str, to_email: Union[str, List[str]], email_configuration_secret_path: str, radius: int = 10)

Send an email for a flooding occurrence.

Args

occurence
Flooding occurrence.
mode
Must be "new" or "closed".
to_email
Email (or list of emails) to send the email to.
email_configuration_secret_path
Path to the from email in Vault. This provides username, password and SMTP server.
def update_flooding_occurences_cache(flooding_occurrences: List[Dict[str, Union[str, int, float]]], redis_key: str, host: str = 'redis.redis.svc.cluster.local', port: int = 6379, db: int = 0, password: str = None)

Update the flooding occurrences cache.

Args

flooding_occurrences
List of flooding occurrences.
redis_key
Key to the flooding occurrences in Redis.
host
Redis host.
port
Redis port.
db
Redis database.
password
Redis password.