Module pipelines.rj_escritorio.flooding_detection.utils

Data in: https://drive.google.com/drive/folders/1C-W_MMFAAJy5Lq_rHDzXUesEUyzke5gw

Functions

def build_rtsp(row: pandas.core.series.Series) ‑> str
Expand source code
def build_rtsp(row: pd.Series) -> str:
    """
    Builds a complete RTSP URL from the given row data.

    Parameters:
    - row (pd.Series): A pandas Series containing 'username', 'password', 'path', and 'ip'.

    Returns:
    - str: The complete RTSP URL.
    """
    username = row["username"]
    password = row["password"]
    path = row["path"]
    ip = row["ip"]
    # If we have username and password, add them to the URL
    if username and password:
        return f"rtsp://{username}:{password}@{ip}/{path}"
    else:
        return f"rtsp://{ip}/{path}"

Builds a complete RTSP URL from the given row data.

Parameters: - row (pd.Series): A pandas Series containing 'username', 'password', 'path', and 'ip'.

Returns: - str: The complete RTSP URL.

def clean_and_padronize_cameras() ‑> geopandas.geodataframe.GeoDataFrame
Expand source code
def clean_and_padronize_cameras() -> gpd.GeoDataFrame:
    """
    Cleans and standardizes camera data from a CSV file, then merges it with geographical data.

    Returns:
    - gpd.GeoDataFrame: A GeoDataFrame containing the cleaned, standardized, and geographically
      enriched camera data.
    """
    df = pd.read_csv(
        "./data/Cameras_em_2023-11-13.csv", delimiter=";", encoding="latin1"
    )
    df.columns = remove_columns_accents(df)
    df["codigo"] = df["codigo"].str.replace("'", "")
    df = df[df["status"] == "Online"]
    df = df[df["rtsp"].str.startswith("rtsp")]

    df["ip_in_rtsp"] = df.apply(lambda row: row["ip"] in row["rtsp"], axis=1)
    df[~df["ip_in_rtsp"]]

    df["ip"] = df["ip"].replace("10.151.48.04", "10.151.48.4")
    df["ip_in_rtsp"] = df.apply(lambda row: row["ip"] in row["rtsp"], axis=1)
    df[~df["ip_in_rtsp"]]

    df[["username", "password", "path"]] = df.apply(extract_data, axis=1)
    df["rtsp"] = df.apply(build_rtsp, axis=1)
    # Filter out by subnet
    df = df[
        df["ip"].str.startswith("10.10")
        | df["ip"].str.startswith("10.151")
        | df["ip"].str.startswith("10.152")
        | df["ip"].str.startswith("10.153")
        | df["ip"].str.startswith("10.50")
        | df["ip"].str.startswith("10.52")
    ]
    log("cameras: ", df.shape)
    cameras_h3 = get_cameras_h3(df=df)

    cols = [
        "codigo",
        "nome_da_camera",
        "rtsp",
        "latitude",
        "longitude",
        "geometry",
        "id_h3",
    ]
    cameras_h3 = cameras_h3[cols]
    cameras_h3 = cameras_h3.rename(
        columns={"codigo": "id_camera", "nome_da_camera": "nome"}
    )

    cameras_h3 = cameras_h3.reset_index(drop=True)
    log("cameras_h3: ", cameras_h3.shape)

    cameras_h3_bolsao = get_cameras_h3_bolsao(cameras_h3, buffer=0.002)
    # remove duplicate bolsoes
    cameras_h3_bolsao = cameras_h3_bolsao.drop_duplicates(subset="id_camera")
    log("cameras_h3_bolsao: ", cameras_h3_bolsao.shape)
    log("is_bolsao: ", cameras_h3_bolsao["is_bolsao"].sum())
    return cameras_h3_bolsao.reset_index(drop=True)

Cleans and standardizes camera data from a CSV file, then merges it with geographical data.

Returns: - gpd.GeoDataFrame: A GeoDataFrame containing the cleaned, standardized, and geographically enriched camera data.

def download_file(url: str, output_path: str | pathlib.Path) ‑> bool
Expand source code
def download_file(url: str, output_path: Union[str, Path]) -> bool:
    """
    Downloads a file from a URL.

    Args:
        url: The URL.
        output_path: The output path.

    Returns:
        Whether the file was downloaded successfully.
    """
    response = requests.get(url)
    if response.status_code == 200:
        with open(output_path, "wb") as f:
            f.write(response.content)
        return True
    return False

Downloads a file from a URL.

Args

url
The URL.
output_path
The output path.

Returns

Whether the file was downloaded successfully.

def extract_data(row: Dict[str, Any]) ‑> pandas.core.series.Series
Expand source code
def extract_data(row: Dict[str, Any]) -> pd.Series:
    """
    Extracts username, password, and path from a given row with camera data.

    Parameters:
    - row (Dict[str, Any]): A dictionary representing a row of camera data.
      Expected keys are 'rtsp' and 'ip'.

    Returns:
    - pd.Series: A pandas Series containing extracted 'username', 'password',
      and 'path' information.
    """

    try:
        rtsp = row["rtsp"]
        # Remove protocol
        rtsp = rtsp.replace("rtsp://", "").replace("rtsp:/", "")
        # If we have an "@" in the URL, we have username and password
        if "@" in rtsp:
            # Extract username and password
            username_password = rtsp.split("@")[0].split(":")
            if len(username_password) == 2:
                username = username_password[0]
                password = username_password[1]
            else:
                print(username_password)
                raise Exception("Why???")
            # Remove username and password from rtsp
            rtsp = rtsp.split("@")[1]
        else:
            username = None
            password = None
        # Extract path
        path = "/".join(rtsp.split("/")[1:])
        # Return the data
        return pd.Series(
            {
                "username": username,
                "password": password,
                "path": path,
            }
        )
    except Exception as exc:
        print(row["rtsp"])
        raise exc

Extracts username, password, and path from a given row with camera data.

Parameters: - row (Dict[str, Any]): A dictionary representing a row of camera data. Expected keys are 'rtsp' and 'ip'.

Returns: - pd.Series: A pandas Series containing extracted 'username', 'password', and 'path' information.

def get_cameras_h3(df: pandas.core.frame.DataFrame) ‑> geopandas.geodataframe.GeoDataFrame
Expand source code
def get_cameras_h3(df: pd.DataFrame) -> gpd.GeoDataFrame:
    """
    Enhances camera data with geographical information and joins it with rainfall data.

    Parameters:
    - df (pd.DataFrame): A DataFrame containing camera data.

    Returns:
    - gpd.GeoDataFrame: A GeoDataFrame containing the joined camera and rainfall data.
    """
    cameras = df.copy()
    geometry = [Point(xy) for xy in zip(cameras["longitude"], cameras["latitude"])]
    cameras_geo = gpd.GeoDataFrame(cameras, geometry=geometry)
    cameras_geo.crs = {"init": "epsg:4326"}

    pluviometro = get_rain_dataframe()
    pluviometro = pluviometro.rename(columns={"status": "status_chuva"})
    geometry = pluviometro["id_h3"].apply(lambda h3_id: h3_id_to_polygon(h3_id))
    pluviometro_geo = gpd.GeoDataFrame(pluviometro, geometry=geometry)
    pluviometro_geo.crs = {"init": "epsg:4326"}
    print("pluviometro_geo:", pluviometro_geo.shape)

    cameras_h3 = gpd.sjoin(cameras_geo, pluviometro_geo, how="left", op="within")
    cameras_h3 = cameras_h3.drop(columns=["index_right"])
    cameras_h3 = cameras_h3[cameras_h3["id_h3"].notnull()]

    return cameras_h3

Enhances camera data with geographical information and joins it with rainfall data.

Parameters: - df (pd.DataFrame): A DataFrame containing camera data.

Returns: - gpd.GeoDataFrame: A GeoDataFrame containing the joined camera and rainfall data.

def get_cameras_h3_bolsao(cameras_h3: geopandas.geodataframe.GeoDataFrame, buffer: int = 0.002)
Expand source code
def get_cameras_h3_bolsao(cameras_h3: gpd.GeoDataFrame, buffer: int = 0.002):
    """
    Enhances camera data with geographical information and joins it with flood pocket data.

    Parameters:
    - cameras_h3 (gpd.GeoDataFrame): A GeoDataFrame containing camera and h3 data.
    - buffer (int): A radius buffer around the flood pocket point.

    Returns:
    - gpd.GeoDataFrame: A GeoDataFrame containing the joined camera, rainfall and flood pocket data.
    """

    bolsao = pd.read_excel("./data/PLANILHAO_PDS_alimentaBI.xlsx")
    bolsao.columns = remove_columns_accents(bolsao)
    cols = ["codigo", "lat", "long", "classe_atual", "bacia", "sub_bacia"]
    bolsao = bolsao[cols]

    geometry = [Point(xy) for xy in zip(bolsao["long"], bolsao["lat"])]
    bolsao_geo = gpd.GeoDataFrame(bolsao, geometry=geometry)
    bolsao_geo.crs = {"init": "epsg:4326"}
    bolsao_geo["geometry"] = bolsao_geo["geometry"].buffer(buffer)
    bolsao_geo.insert(0, "is_bolsao", True)

    cameras_bolsao_h3 = gpd.sjoin(cameras_h3, bolsao_geo, how="left", op="within")

    cameras_bolsao_h3["geometry_bolsao_buffer_0.002"] = [
        Point(xy).buffer(buffer)
        for xy in zip(cameras_bolsao_h3["long"], cameras_bolsao_h3["lat"])
    ]
    cameras_bolsao_h3["geometry_bolsao_buffer_0.002"] = cameras_bolsao_h3[
        f"geometry_bolsao_buffer_{buffer}"
    ].apply(lambda x: np.nan if x.is_empty else x)
    cameras_bolsao_h3 = cameras_bolsao_h3.drop(columns=["index_right"])

    rename_bolsao_cols = {
        "codigo": "id_bolsao",
        "lat": "bolsao_latitude",
        "long": "bolsao_longitude",
        "classe_atual": "bolsao_classe_atual",
    }

    cameras_bolsao_h3 = cameras_bolsao_h3.rename(columns=rename_bolsao_cols)

    return cameras_bolsao_h3

Enhances camera data with geographical information and joins it with flood pocket data.

Parameters: - cameras_h3 (gpd.GeoDataFrame): A GeoDataFrame containing camera and h3 data. - buffer (int): A radius buffer around the flood pocket point.

Returns: - gpd.GeoDataFrame: A GeoDataFrame containing the joined camera, rainfall and flood pocket data.

def get_rain_dataframe() ‑> pandas.core.frame.DataFrame
Expand source code
def get_rain_dataframe() -> pd.DataFrame:
    """
    Fetches and returns rainfall data from a specified API.

    Returns:
    - pd.DataFrame: A pandas DataFrame containing the rainfall data.
    """
    api_url = "https://api.dados.rio/v2/clima_pluviometro/precipitacao_15min/"
    data = requests.get(api_url).json()
    df_rain = pd.DataFrame(data)

    last_update_url = "https://api.dados.rio/v2/clima_pluviometro/ultima_atualizacao_precipitacao_15min/"  # noqa
    last_update = requests.get(last_update_url).json()
    df_rain["last_update"] = last_update
    df_rain["last_update"] = pd.to_datetime(df_rain["last_update"])

    return df_rain

Fetches and returns rainfall data from a specified API.

Returns: - pd.DataFrame: A pandas DataFrame containing the rainfall data.

def h3_id_to_polygon(h3_id: str)
Expand source code
def h3_id_to_polygon(h3_id: str):
    """
    Converts an H3 ID to a Polygon.

    Args:
        h3_id: The H3 ID.

    Returns:
        The Polygon.
    """
    return Polygon(h3.h3_to_geo_boundary(h3_id, geo_json=True))

Converts an H3 ID to a Polygon.

Args

h3_id
The H3 ID.

Returns

The Polygon.

def redis_add_to_prediction_buffer(key: str, value: bool, len_: int = 3) ‑> List[bool]
Expand source code
def redis_add_to_prediction_buffer(key: str, value: bool, len_: int = 3) -> List[bool]:
    """
    Adds a value to the prediction buffer in Redis.

    Args:
        key: The Redis key.
        value: The value to be added.
        len: The length of the buffer.
    """
    prediction_buffer = redis_get_prediction_buffer(key, len_)
    prediction_buffer.append(value)
    prediction_buffer = prediction_buffer[-len_:]
    redis_client: RedisPal = get_redis_client(db=1)
    redis_client.set(key, prediction_buffer)
    return prediction_buffer

Adds a value to the prediction buffer in Redis.

Args

key
The Redis key.
value
The value to be added.
len
The length of the buffer.
def redis_get_prediction_buffer(key: str, len_: int = 3) ‑> List[bool]
Expand source code
def redis_get_prediction_buffer(key: str, len_: int = 3) -> List[bool]:
    """
    Gets the prediction buffer from Redis.

    Args:
        key: The Redis key.
        len: The length of the buffer.

    Returns:
        The prediction buffer.
    """
    redis_client: RedisPal = get_redis_client(db=1)
    prediction_buffer = redis_client.get(key)
    if prediction_buffer is None:
        return [False] * len_
    elif not isinstance(prediction_buffer, list):
        return [False] * len_
    elif len(prediction_buffer) < len_:
        diff = len_ - len(prediction_buffer)
        return [False] * diff + prediction_buffer
    return prediction_buffer

Gets the prediction buffer from Redis.

Args

key
The Redis key.
len
The length of the buffer.

Returns

The prediction buffer.