Module pipelines.rj_escritorio.flooding_detection.utils

Data in: https://drive.google.com/drive/folders/1C-W_MMFAAJy5Lq_rHDzXUesEUyzke5gw

Functions

def build_rtsp(row: pandas.core.series.Series) ‑> str

Builds a complete RTSP URL from the given row data.

Parameters: - row (pd.Series): A pandas Series containing 'username', 'password', 'path', and 'ip'.

Returns: - str: The complete RTSP URL.

def clean_and_padronize_cameras() ‑> geopandas.geodataframe.GeoDataFrame

Cleans and standardizes camera data from a CSV file, then merges it with geographical data.

Returns: - gpd.GeoDataFrame: A GeoDataFrame containing the cleaned, standardized, and geographically enriched camera data.

def download_file(url: str, output_path: Union[str, pathlib.Path]) ‑> bool

Downloads a file from a URL.

Args

url
The URL.
output_path
The output path.

Returns

Whether the file was downloaded successfully.

def extract_data(row: Dict[str, Any]) ‑> pandas.core.series.Series

Extracts username, password, and path from a given row with camera data.

Parameters: - row (Dict[str, Any]): A dictionary representing a row of camera data. Expected keys are 'rtsp' and 'ip'.

Returns: - pd.Series: A pandas Series containing extracted 'username', 'password', and 'path' information.

def get_cameras_h3(df: pandas.core.frame.DataFrame) ‑> geopandas.geodataframe.GeoDataFrame

Enhances camera data with geographical information and joins it with rainfall data.

Parameters: - df (pd.DataFrame): A DataFrame containing camera data.

Returns: - gpd.GeoDataFrame: A GeoDataFrame containing the joined camera and rainfall data.

def get_cameras_h3_bolsao(cameras_h3: geopandas.geodataframe.GeoDataFrame, buffer: int = 0.002)

Enhances camera data with geographical information and joins it with flood pocket data.

Parameters: - cameras_h3 (gpd.GeoDataFrame): A GeoDataFrame containing camera and h3 data. - buffer (int): A radius buffer around the flood pocket point.

Returns: - gpd.GeoDataFrame: A GeoDataFrame containing the joined camera, rainfall and flood pocket data.

def get_rain_dataframe() ‑> pandas.core.frame.DataFrame

Fetches and returns rainfall data from a specified API.

Returns: - pd.DataFrame: A pandas DataFrame containing the rainfall data.

def h3_id_to_polygon(h3_id: str)

Converts an H3 ID to a Polygon.

Args

h3_id
The H3 ID.

Returns

The Polygon.

def redis_add_to_prediction_buffer(key: str, value: bool, len_: int = 3) ‑> List[bool]

Adds a value to the prediction buffer in Redis.

Args

key
The Redis key.
value
The value to be added.
len
The length of the buffer.
def redis_get_prediction_buffer(key: str, len_: int = 3) ‑> List[bool]

Gets the prediction buffer from Redis.

Args

key
The Redis key.
len
The length of the buffer.

Returns

The prediction buffer.