Module pipelines.rj_escritorio.flooding_detection.utils
Data in: https://drive.google.com/drive/folders/1C-W_MMFAAJy5Lq_rHDzXUesEUyzke5gw
Functions
def build_rtsp(row: pandas.core.series.Series) ‑> str
-
Builds a complete RTSP URL from the given row data.
Parameters: - row (pd.Series): A pandas Series containing 'username', 'password', 'path', and 'ip'.
Returns: - str: The complete RTSP URL.
def clean_and_padronize_cameras() ‑> geopandas.geodataframe.GeoDataFrame
-
Cleans and standardizes camera data from a CSV file, then merges it with geographical data.
Returns: - gpd.GeoDataFrame: A GeoDataFrame containing the cleaned, standardized, and geographically enriched camera data.
def download_file(url: str, output_path: Union[str, pathlib.Path]) ‑> bool
-
Downloads a file from a URL.
Args
url
- The URL.
output_path
- The output path.
Returns
Whether the file was downloaded successfully.
def extract_data(row: Dict[str, Any]) ‑> pandas.core.series.Series
-
Extracts username, password, and path from a given row with camera data.
Parameters: - row (Dict[str, Any]): A dictionary representing a row of camera data. Expected keys are 'rtsp' and 'ip'.
Returns: - pd.Series: A pandas Series containing extracted 'username', 'password', and 'path' information.
def get_cameras_h3(df: pandas.core.frame.DataFrame) ‑> geopandas.geodataframe.GeoDataFrame
-
Enhances camera data with geographical information and joins it with rainfall data.
Parameters: - df (pd.DataFrame): A DataFrame containing camera data.
Returns: - gpd.GeoDataFrame: A GeoDataFrame containing the joined camera and rainfall data.
def get_cameras_h3_bolsao(cameras_h3: geopandas.geodataframe.GeoDataFrame, buffer: int = 0.002)
-
Enhances camera data with geographical information and joins it with flood pocket data.
Parameters: - cameras_h3 (gpd.GeoDataFrame): A GeoDataFrame containing camera and h3 data. - buffer (int): A radius buffer around the flood pocket point.
Returns: - gpd.GeoDataFrame: A GeoDataFrame containing the joined camera, rainfall and flood pocket data.
def get_rain_dataframe() ‑> pandas.core.frame.DataFrame
-
Fetches and returns rainfall data from a specified API.
Returns: - pd.DataFrame: A pandas DataFrame containing the rainfall data.
def h3_id_to_polygon(h3_id: str)
-
Converts an H3 ID to a Polygon.
Args
h3_id
- The H3 ID.
Returns
The Polygon.
def redis_add_to_prediction_buffer(key: str, value: bool, len_: int = 3) ‑> List[bool]
-
Adds a value to the prediction buffer in Redis.
Args
key
- The Redis key.
value
- The value to be added.
len
- The length of the buffer.
def redis_get_prediction_buffer(key: str, len_: int = 3) ‑> List[bool]
-
Gets the prediction buffer from Redis.
Args
key
- The Redis key.
len
- The length of the buffer.
Returns
The prediction buffer.