Module pipelines.rj_escritorio.flooding_detection.tasks

Functions

def get_last_update(rain_api_update_url: str) ‑> datetime.datetime
Expand source code
@task
def get_last_update(
    rain_api_update_url: str,
) -> datetime:
    """
    Gets the last update datetime from the rain API.

    Args:
        rain_api_update_url: The rain API update url.

    Returns:
        The last update datetime.
    """
    data = requests.get(rain_api_update_url).text
    data = data.strip('"')
    log(f"Last update: {data}")
    return datetime.strptime(data, "%d/%m/%Y %H:%M:%S")

Gets the last update datetime from the rain API.

Args

rain_api_update_url
The rain API update url.

Returns

The last update datetime.

def get_openai_api_key(secret_path: str) ‑> str
Expand source code
@task
def get_openai_api_key(secret_path: str) -> str:
    """
    Gets the OpenAI API key.

    Args:
        secret_path: The secret path.

    Returns:
        The OpenAI API key.
    """
    secret = get_vault_secret(secret_path)["data"]
    return secret["api_key"]

Gets the OpenAI API key.

Args

secret_path
The secret path.

Returns

The OpenAI API key.

def get_prediction(camera_with_image: Dict[str, str | float],
flooding_prompt: str,
openai_api_key: str,
openai_api_model: str,
openai_api_max_tokens: int = 300,
openai_api_url: str = 'https://api.openai.com/v1/chat/completions') ‑> Dict[str, str | float | bool]
Expand source code
@task
def get_prediction(
    camera_with_image: Dict[str, Union[str, float]],
    flooding_prompt: str,
    openai_api_key: str,
    openai_api_model: str,
    openai_api_max_tokens: int = 300,
    openai_api_url: str = "https://api.openai.com/v1/chat/completions",
) -> Dict[str, Union[str, float, bool]]:
    """
    Gets the flooding detection prediction from OpenAI API.

    Args:
        camera_with_image: The camera with image in the following format:
            {
                "id_camera": "1",
                "url_camera": "rtsp://...",
                "latitude": -22.912,
                "longitude": -43.230,
                "image_base64": "base64...",
                "attempt_classification": True,
            }
        flooding_prompt: The flooding prompt.
        openai_api_key: The OpenAI API key.
        openai_api_model: The OpenAI API model.
        openai_api_max_tokens: The OpenAI API max tokens.
        openai_api_url: The OpenAI API URL.

    Returns: The camera with image and classification in the following format:
        {
            "id_camera": "1",
            "url_camera": "rtsp://...",
            "latitude": -22.912,
            "longitude": -43.230,
            "image_base64": "base64...",
            "ai_classification": [
                {
                    "object": "alagamento",
                    "label": True,
                    "confidence": 0.7,
                }
            ],
        }
    """
    # TODO:
    # - Add confidence value
    # Setup the request
    if not camera_with_image["attempt_classification"]:
        camera_with_image["ai_classification"] = [
            {
                "object": "alagamento",
                "label": False,
                "confidence": 0.7,
            }
        ]
        return camera_with_image
    if not camera_with_image["image_base64"]:
        camera_with_image["ai_classification"] = [
            {
                "object": "alagamento",
                "label": None,
                "confidence": 0.7,
            }
        ]
        return camera_with_image
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {openai_api_key}",
    }
    payload = {
        "model": openai_api_model,
        "messages": [
            {
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": flooding_prompt,
                    },
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": f"data:image/jpeg;base64,{camera_with_image['image_base64']}"
                        },
                    },
                ],
            }
        ],
        "max_tokens": openai_api_max_tokens,
    }
    response = requests.post(openai_api_url, headers=headers, json=payload)
    data: dict = response.json()
    if data.get("error"):
        flooding_detected = None
        log(f"Failed to get prediction: {data['error']}")
    else:
        content: str = data["choices"][0]["message"]["content"]
        json_string = content.replace("```json\n", "").replace("\n```", "")
        json_object = json.loads(json_string)
        flooding_detected = json_object["flooding_detected"]
        log(f"Successfully got prediction: {flooding_detected}")
    camera_with_image["ai_classification"] = [
        {
            "object": "alagamento",
            "label": flooding_detected,
            "confidence": 0.7,
        }
    ]
    return camera_with_image

Gets the flooding detection prediction from OpenAI API.

Args

camera_with_image
The camera with image in the following format: { "id_camera": "1", "url_camera": "rtsp://…", "latitude": -22.912, "longitude": -43.230, "image_base64": "base64…", "attempt_classification": True, }
flooding_prompt
The flooding prompt.
openai_api_key
The OpenAI API key.
openai_api_model
The OpenAI API model.
openai_api_max_tokens
The OpenAI API max tokens.
openai_api_url
The OpenAI API URL.

Returns: The camera with image and classification in the following format: { "id_camera": "1", "url_camera": "rtsp://…", "latitude": -22.912, "longitude": -43.230, "image_base64": "base64…", "ai_classification": [ { "object": "alagamento", "label": True, "confidence": 0.7, } ], }

def get_snapshot(camera: Dict[str, str | float]) ‑> Dict[str, str | float]
Expand source code
@task(
    max_retries=2,
    retry_delay=timedelta(seconds=1),
)
def get_snapshot(
    camera: Dict[str, Union[str, float]],
) -> Dict[str, Union[str, float]]:
    """
    Gets a snapshot from a camera.

    Args:
        camera: The camera in the following format:
            {
                "id_camera": "1",
                "url_camera": "rtsp://...",
                "latitude": -22.912,
                "longitude": -43.230,
                "attempt_classification": True,
            }

    Returns:
        The camera with image in the following format:
            {
                "id_camera": "1",
                "url_camera": "rtsp://...",
                "latitude": -22.912,
                "longitude": -43.230,
                "attempt_classification": True,
                "image_base64": "base64...",
            }
    """
    try:
        rtsp_url = camera["url_camera"]
        cap = cv2.VideoCapture(rtsp_url)
        ret, frame = cap.read()
        if not ret:
            raise RuntimeError(f"Failed to get snapshot from URL {rtsp_url}.")
        cap.release()
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        img = Image.fromarray(frame)
        buffer = io.BytesIO()
        img.save(buffer, format="JPEG")
        img_b64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
        log(f"Successfully got snapshot from URL {rtsp_url}.")
        camera["image_base64"] = img_b64
    except Exception:
        log(f"Failed to get snapshot from URL {rtsp_url}.")
        camera["image_base64"] = None
    return camera

Gets a snapshot from a camera.

Args

camera
The camera in the following format: { "id_camera": "1", "url_camera": "rtsp://…", "latitude": -22.912, "longitude": -43.230, "attempt_classification": True, }

Returns

The camera with image in the following format: { "id_camera": "1", "url_camera": "rtsp://…", "latitude": -22.912, "longitude": -43.230, "attempt_classification": True, "image_base64": "base64…", }

def pick_cameras(rain_api_data_url: str,
cameras_data_url: str,
last_update: datetime.datetime,
predictions_buffer_key: str,
number_mock_rain_cameras: int = 0) ‑> List[Dict[str, str | float]]
Expand source code
@task
def pick_cameras(
    rain_api_data_url: str,
    cameras_data_url: str,
    last_update: datetime,
    predictions_buffer_key: str,
    number_mock_rain_cameras: int = 0,
) -> List[Dict[str, Union[str, float]]]:
    """
    Picks cameras based on the raining hexagons and last update.

    Args:
        rain_api_data_url: The rain API data url.
        last_update: The last update datetime.
        predictions_buffer_key: The Redis key for the predictions buffer.

    Returns:
        A list of cameras in the following format:
            [
                {
                    "id_camera": "1",
                    "url_camera": "rtsp://...",
                    "latitude": -22.912,
                    "longitude": -43.230,
                    "attempt_classification": True,
                },
                ...
            ]
    """
    # Download the cameras data
    cameras_data_path = Path("/tmp") / "cameras_geo_min.csv"
    if not download_file(url=cameras_data_url, output_path=cameras_data_path):
        raise RuntimeError("Failed to download the cameras data.")
    cameras = pd.read_csv(cameras_data_path)
    cameras = cameras.drop(columns=["geometry"])
    geometry = [Point(xy) for xy in zip(cameras["longitude"], cameras["latitude"])]
    df_cameras = gpd.GeoDataFrame(cameras, geometry=geometry)
    df_cameras.crs = {"init": "epsg:4326"}
    log("Successfully downloaded cameras data.")
    log(f"Cameras shape: {df_cameras.shape}")

    # Get rain data
    rain_data = requests.get(rain_api_data_url).json()
    df_rain = pd.DataFrame(rain_data)
    df_rain["last_update"] = last_update
    log("Successfully downloaded rain data.")
    log(f"Rain data shape: {df_rain.shape}")

    # Join the dataframes
    df_cameras_h3 = pd.merge(df_cameras, df_rain, how="left", on="id_h3")
    log("Successfully joined the dataframes.")
    log(f"Cameras H3 shape: {df_cameras_h3.shape}")

    # Modify status based on buffers
    for _, row in df_cameras_h3.iterrows():
        predictions_buffer_camera_key = f"{predictions_buffer_key}_{row['id_camera']}"
        predictions_buffer = redis_get_prediction_buffer(predictions_buffer_camera_key)
        # Get most common prediction
        most_common_prediction = max(
            set(predictions_buffer), key=predictions_buffer.count
        )
        # Get last prediction
        last_prediction = predictions_buffer[-1]
        # Add classifications
        if most_common_prediction or last_prediction:
            row["status"] = "chuva moderada"

    # Mock a few cameras when argument is set
    if number_mock_rain_cameras > 0:
        df_len = len(df_cameras_h3)
        for _ in range(number_mock_rain_cameras):
            mocked_index = random.randint(0, df_len)
            df_cameras_h3.loc[mocked_index, "status"] = "chuva moderada"
            log(f'Mocked camera ID: {df_cameras_h3.loc[mocked_index]["id_camera"]}')

    # Set output
    output = []
    for _, row in df_cameras_h3.iterrows():
        output.append(
            {
                "id_camera": row["id_camera"],
                "nome_camera": row["nome"],
                "url_camera": row["rtsp"],
                "latitude": row["geometry"].y,
                "longitude": row["geometry"].x,
                # "attempt_classification": (
                #     row["status"] not in ["sem chuva", "chuva fraca"]
                # ),
                "attempt_classification": True,
            }
        )
    log(f"Picked cameras: {output}")
    return output

Picks cameras based on the raining hexagons and last update.

Args

rain_api_data_url
The rain API data url.
last_update
The last update datetime.
predictions_buffer_key
The Redis key for the predictions buffer.

Returns

A list of cameras in the following format: [ { "id_camera": "1", "url_camera": "rtsp://…", "latitude": -22.912, "longitude": -43.230, "attempt_classification": True, }, … ]

def update_flooding_api_data(cameras_with_image_and_classification: List[Dict[str, str | float | bool]],
data_key: str,
last_update_key: str,
predictions_buffer_key: str) ‑> None
Expand source code
@task
def update_flooding_api_data(
    cameras_with_image_and_classification: List[Dict[str, Union[str, float, bool]]],
    data_key: str,
    last_update_key: str,
    predictions_buffer_key: str,
) -> None:
    """
    Updates Redis keys with flooding detection data and last update datetime (now).

    Args:
        cameras_with_image_and_classification: The cameras with image and classification
            in the following format:
                [
                    {
                        "id_camera": "1",
                        "url_camera": "rtsp://...",
                        "latitude": -22.912,
                        "longitude": -43.230,
                        "image_base64": "base64...",
                        "ai_classification": [
                            {
                                "object": "alagamento",
                                "label": True,
                                "confidence": 0.7,
                            }
                        ],
                    },
                    ...
                ]
        data_key: The Redis key for the flooding detection data.
        last_update_key: The Redis key for the last update datetime.
        predictions_buffer_key: The Redis key for the predictions buffer.
    """
    # Build API data
    last_update = pendulum.now(tz="America/Sao_Paulo")
    api_data = []
    for camera_with_image_and_classification in cameras_with_image_and_classification:
        # Get AI classifications
        ai_classification = []
        current_prediction = camera_with_image_and_classification["ai_classification"][
            0
        ]["label"]
        if current_prediction is None:
            api_data.append(
                {
                    "datetime": last_update.to_datetime_string(),
                    "id_camera": camera_with_image_and_classification["id_camera"],
                    "url_camera": camera_with_image_and_classification["url_camera"],
                    "latitude": camera_with_image_and_classification["latitude"],
                    "longitude": camera_with_image_and_classification["longitude"],
                    "image_base64": camera_with_image_and_classification[
                        "image_base64"
                    ],
                    "ai_classification": ai_classification,
                }
            )
            continue
        predictions_buffer_camera_key = f"{predictions_buffer_key}_{camera_with_image_and_classification['id_camera']}"  # noqa
        predictions_buffer = redis_add_to_prediction_buffer(
            predictions_buffer_camera_key, current_prediction
        )
        # Get most common prediction
        most_common_prediction = max(
            set(predictions_buffer), key=predictions_buffer.count
        )
        # Add classifications
        ai_classification.append(
            {
                "object": "alagamento",
                "label": most_common_prediction,
                "confidence": 0.7,
            }
        )
        api_data.append(
            {
                "datetime": last_update.to_datetime_string(),
                "id_camera": camera_with_image_and_classification["id_camera"],
                "url_camera": camera_with_image_and_classification["url_camera"],
                "latitude": camera_with_image_and_classification["latitude"],
                "longitude": camera_with_image_and_classification["longitude"],
                "image_base64": camera_with_image_and_classification["image_base64"],
                "ai_classification": ai_classification,
            }
        )

    # Update API data
    redis_client = get_redis_client(db=1)
    redis_client.set(data_key, api_data)
    redis_client.set(last_update_key, last_update.to_datetime_string())
    log("Successfully updated flooding detection data.")

Updates Redis keys with flooding detection data and last update datetime (now).

Args

cameras_with_image_and_classification
The cameras with image and classification in the following format: [ { "id_camera": "1", "url_camera": "rtsp://…", "latitude": -22.912, "longitude": -43.230, "image_base64": "base64…", "ai_classification": [ { "object": "alagamento", "label": True, "confidence": 0.7, } ], }, … ]
data_key
The Redis key for the flooding detection data.
last_update_key
The Redis key for the last update datetime.
predictions_buffer_key
The Redis key for the predictions buffer.