Module pipelines.rj_escritorio.flooding_detection.tasks

Functions

def get_last_update(rain_api_update_url: str) ‑> datetime.datetime

Gets the last update datetime from the rain API.

Args

rain_api_update_url
The rain API update url.

Returns

The last update datetime.

def get_openai_api_key(secret_path: str) ‑> str

Gets the OpenAI API key.

Args

secret_path
The secret path.

Returns

The OpenAI API key.

def get_prediction(camera_with_image: Dict[str, Union[str, float]], flooding_prompt: str, openai_api_key: str, openai_api_model: str, openai_api_max_tokens: int = 300, openai_api_url: str = 'https://api.openai.com/v1/chat/completions') ‑> Dict[str, Union[str, float, bool]]

Gets the flooding detection prediction from OpenAI API.

Args

camera_with_image
The camera with image in the following format: { "id_camera": "1", "url_camera": "rtsp://…", "latitude": -22.912, "longitude": -43.230, "image_base64": "base64…", "attempt_classification": True, }
flooding_prompt
The flooding prompt.
openai_api_key
The OpenAI API key.
openai_api_model
The OpenAI API model.
openai_api_max_tokens
The OpenAI API max tokens.
openai_api_url
The OpenAI API URL.

Returns: The camera with image and classification in the following format: { "id_camera": "1", "url_camera": "rtsp://…", "latitude": -22.912, "longitude": -43.230, "image_base64": "base64…", "ai_classification": [ { "object": "alagamento", "label": True, "confidence": 0.7, } ], }

def get_snapshot(camera: Dict[str, Union[str, float]]) ‑> Dict[str, Union[str, float]]

Gets a snapshot from a camera.

Args

camera
The camera in the following format: { "id_camera": "1", "url_camera": "rtsp://…", "latitude": -22.912, "longitude": -43.230, "attempt_classification": True, }

Returns

The camera with image in the following format: { "id_camera": "1", "url_camera": "rtsp://…", "latitude": -22.912, "longitude": -43.230, "attempt_classification": True, "image_base64": "base64…", }

def pick_cameras(rain_api_data_url: str, cameras_data_url: str, last_update: datetime.datetime, predictions_buffer_key: str, number_mock_rain_cameras: int = 0) ‑> List[Dict[str, Union[str, float]]]

Picks cameras based on the raining hexagons and last update.

Args

rain_api_data_url
The rain API data url.
last_update
The last update datetime.
predictions_buffer_key
The Redis key for the predictions buffer.

Returns

A list of cameras in the following format: [ { "id_camera": "1", "url_camera": "rtsp://…", "latitude": -22.912, "longitude": -43.230, "attempt_classification": True, }, … ]

def update_flooding_api_data(cameras_with_image_and_classification: List[Dict[str, Union[str, float, bool]]], data_key: str, last_update_key: str, predictions_buffer_key: str) ‑> None

Updates Redis keys with flooding detection data and last update datetime (now).

Args

cameras_with_image_and_classification
The cameras with image and classification in the following format: [ { "id_camera": "1", "url_camera": "rtsp://…", "latitude": -22.912, "longitude": -43.230, "image_base64": "base64…", "ai_classification": [ { "object": "alagamento", "label": True, "confidence": 0.7, } ], }, … ]
data_key
The Redis key for the flooding detection data.
last_update_key
The Redis key for the last update datetime.
predictions_buffer_key
The Redis key for the predictions buffer.