Module pipelines.rj_escritorio.flooding_detection.tasks
Functions
def get_last_update(rain_api_update_url: str) ‑> datetime.datetime
-
Gets the last update datetime from the rain API.
Args
rain_api_update_url
- The rain API update url.
Returns
The last update datetime.
def get_openai_api_key(secret_path: str) ‑> str
-
Gets the OpenAI API key.
Args
secret_path
- The secret path.
Returns
The OpenAI API key.
def get_prediction(camera_with_image: Dict[str, Union[str, float]], flooding_prompt: str, openai_api_key: str, openai_api_model: str, openai_api_max_tokens: int = 300, openai_api_url: str = 'https://api.openai.com/v1/chat/completions') ‑> Dict[str, Union[str, float, bool]]
-
Gets the flooding detection prediction from OpenAI API.
Args
camera_with_image
- The camera with image in the following format: { "id_camera": "1", "url_camera": "rtsp://…", "latitude": -22.912, "longitude": -43.230, "image_base64": "base64…", "attempt_classification": True, }
flooding_prompt
- The flooding prompt.
openai_api_key
- The OpenAI API key.
openai_api_model
- The OpenAI API model.
openai_api_max_tokens
- The OpenAI API max tokens.
openai_api_url
- The OpenAI API URL.
Returns: The camera with image and classification in the following format: { "id_camera": "1", "url_camera": "rtsp://…", "latitude": -22.912, "longitude": -43.230, "image_base64": "base64…", "ai_classification": [ { "object": "alagamento", "label": True, "confidence": 0.7, } ], }
def get_snapshot(camera: Dict[str, Union[str, float]]) ‑> Dict[str, Union[str, float]]
-
Gets a snapshot from a camera.
Args
camera
- The camera in the following format: { "id_camera": "1", "url_camera": "rtsp://…", "latitude": -22.912, "longitude": -43.230, "attempt_classification": True, }
Returns
The camera with image in the following format: { "id_camera": "1", "url_camera": "rtsp://…", "latitude": -22.912, "longitude": -43.230, "attempt_classification": True, "image_base64": "base64…", }
def pick_cameras(rain_api_data_url: str, cameras_data_url: str, last_update: datetime.datetime, predictions_buffer_key: str, number_mock_rain_cameras: int = 0) ‑> List[Dict[str, Union[str, float]]]
-
Picks cameras based on the raining hexagons and last update.
Args
rain_api_data_url
- The rain API data url.
last_update
- The last update datetime.
predictions_buffer_key
- The Redis key for the predictions buffer.
Returns
A list of cameras in the following format: [ { "id_camera": "1", "url_camera": "rtsp://…", "latitude": -22.912, "longitude": -43.230, "attempt_classification": True, }, … ]
def update_flooding_api_data(cameras_with_image_and_classification: List[Dict[str, Union[str, float, bool]]], data_key: str, last_update_key: str, predictions_buffer_key: str) ‑> None
-
Updates Redis keys with flooding detection data and last update datetime (now).
Args
cameras_with_image_and_classification
- The cameras with image and classification in the following format: [ { "id_camera": "1", "url_camera": "rtsp://…", "latitude": -22.912, "longitude": -43.230, "image_base64": "base64…", "ai_classification": [ { "object": "alagamento", "label": True, "confidence": 0.7, } ], }, … ]
data_key
- The Redis key for the flooding detection data.
last_update_key
- The Redis key for the last update datetime.
predictions_buffer_key
- The Redis key for the predictions buffer.