Module pipelines.rj_cor.meteorologia.radar.precipitacao.src.utils.data_utils
Functions
def distance(latlon1: tuple, latlon2: tuple)
-
Expand source code
def distance(latlon1: tuple, latlon2: tuple): return GEODESIC.inv(latlon1[1], latlon1[0], latlon2[1], latlon2[0])[2]
def get_close_azimuthal_grid_points(ref_latlon: tuple,
point_latlon: tuple,
epsilon: float,
ranges: list = range(125, 250000, 250),
azimuths: list = range(0, 360))-
Expand source code
def get_close_azimuthal_grid_points( ref_latlon: tuple, point_latlon: tuple, epsilon: float, ranges: list = range(125, 250000, 250), azimuths: list = range(NRAYS), ): fwd_azimuth, back_azimuth, distance = GEODESIC.inv( ref_latlon[1], ref_latlon[0], point_latlon[1], point_latlon[0] ) if distance > epsilon: delta_theta = np.arcsin(epsilon / distance) * 180 / np.pi fwd_azimuth = (fwd_azimuth + 360) % 360 first_range = int((distance - epsilon - ranges[0]) / (ranges[1] - ranges[0])) last_range = int((distance + epsilon - ranges[0]) / (ranges[1] - ranges[0])) + 1 first_angle = ( int((fwd_azimuth - delta_theta - azimuths[0]) / (azimuths[1] - azimuths[0])) % 360 ) last_angle = ( int((fwd_azimuth + delta_theta - azimuths[0]) / (azimuths[1] - azimuths[0])) + 1 ) % 360 else: first_range = 0 last_range = int((distance + epsilon - ranges[0]) / (ranges[1] - ranges[0])) + 1 first_angle = 0 last_angle = len(azimuths) - 1 close_points = [] azimuth_enumeration = [] if first_angle <= last_angle: azimuth_enumeration = list(enumerate(azimuths[first_angle : last_angle + 1])) else: azimuth_enumeration = list( enumerate(list(azimuths[first_angle:]) + list(azimuths[: last_angle + 1])) ) range_enumeration = list(enumerate(list(ranges[first_range : last_range + 1]))) for i, azimuth in azimuth_enumeration: for j, range in range_enumeration: endlon, endlat = GEODESIC.fwd(ref_latlon[1], ref_latlon[0], azimuth, range)[ :2 ] distance = GEODESIC.inv(point_latlon[1], point_latlon[0], endlon, endlat)[2] if distance < epsilon: close_points.append(((i + first_angle) % NRAYS, j + first_range)) return close_points
def get_containing_square(ref_latlon: tuple,
point_latlon: tuple,
range_start: float = 125,
range_step: float = 250,
azimuth_start: float = 0,
azimuth_step: float = 1)-
Expand source code
def get_containing_square( ref_latlon: tuple, point_latlon: tuple, range_start: float = 125, range_step: float = 250, azimuth_start: float = 0, azimuth_step: float = 1, ): assert ( azimuth_start == 0 and azimuth_step == 1 ), "Function only implemented for az start 0 and az step 1." fwd_azimuth, back_azimuth, distance = GEODESIC.inv( ref_latlon[1], ref_latlon[0], point_latlon[1], point_latlon[0] ) fwd_azimuth = (360 + fwd_azimuth) % 360 azimuth_index = int((fwd_azimuth - azimuth_start) / azimuth_step) % 360 range_index = int((distance - range_start) / range_step) return [ (i, j) for i in [azimuth_index, azimuth_index + 1] for j in [range_index, range_index + 1] ]
def get_files_in_range(first_time_str: str, last_time_str: str, folder_path: pathlib.Path) ‑> list
-
Expand source code
def get_files_in_range( first_time_str: str, last_time_str: str, folder_path: pathlib.Path ) -> list: first_time = datetime.datetime.strptime(first_time_str, "%H%M%S") last_time = datetime.datetime.strptime(last_time_str, "%H%M%S") difference = int((last_time - first_time).total_seconds() / (5 * 60)) patterns = [ (first_time + datetime.timedelta(minutes=5 * d)).strftime("*%H%M%S*") for d in range(difference + 1) ] filepaths = [f for f in folder_path.iterdir() if any(f.match(p) for p in patterns)] filepaths.sort() return filepaths
def get_k_nearest_directed_grid_neighbors(ref_latlon: tuple,
point_latlon: tuple,
k: int,
ranges: list = range(125, 250000, 250),
azimuths: list = range(0, 360))-
Expand source code
def get_k_nearest_directed_grid_neighbors( ref_latlon: tuple, point_latlon: tuple, k: int, ranges: list = range(125, 250000, 250), azimuths: list = range(360), ): assert list(azimuths) == list( range(360) ), "Function only implemented for azimuths from 0 to 359, step 1." delta = ranges[1] - ranges[0] distance = delta grid_points = [] while len(grid_points) < k: distance = distance + delta grid_points = get_close_azimuthal_grid_points( ref_latlon, point_latlon, distance, ranges, azimuths ) distances = [] angles = [] for point in grid_points: angle, distance = GEODESIC.inv( point_latlon[1], point_latlon[0], *( GEODESIC.fwd( ref_latlon[1], ref_latlon[0], azimuths[point[0]], ranges[point[1]], )[0:2] ), )[::2] angle_bin = int(((angle + 180 / k) % 360) * k / 360) distances.append(distance) angles.append(angle_bin) grid_points = list(zip(grid_points, distances, angles)) missing_angle_bins = list(range(k)) grid_points.sort( key=lambda x: x[1], ) new_list = [] for point in grid_points: try: missing_angle_bins.remove(point[2]) new_list.append(point) except ValueError: continue grid_points = new_list grid_points.sort(key=lambda x: x[2]) grid_points = [point[0] for point in grid_points] return grid_points
def get_k_nearest_grid_neighbors(ref_latlon: tuple,
point_latlon: tuple,
k: int,
ranges: list = range(125, 250000, 250),
azimuths: list = range(0, 360))-
Expand source code
def get_k_nearest_grid_neighbors( ref_latlon: tuple, point_latlon: tuple, k: int, ranges: list = range(125, 250000, 250), azimuths: list = range(360), ): delta = ranges[1] - ranges[0] distance = 2 * delta grid_points = get_close_azimuthal_grid_points( ref_latlon, point_latlon, distance, ranges, azimuths ) while len(grid_points) < k: distance = distance + delta grid_points = get_close_azimuthal_grid_points( ref_latlon, point_latlon, distance, ranges, azimuths ) grid_points.sort( key=lambda x: GEODESIC.inv( point_latlon[1], point_latlon[0], *( GEODESIC.fwd( ref_latlon[1], ref_latlon[0], azimuths[x[0]], ranges[x[1]] )[0:2] ), )[2], ) return grid_points[:k]
def get_latlon(ref_latlon, dist, azimuth)
-
Expand source code
def get_latlon(ref_latlon, dist, azimuth): lon, lat, _ = GEODESIC.fwd(ref_latlon[1], ref_latlon[0], azimuth, dist)[:2] return (lat, lon)
def get_repeated_values(df: pandas.core.frame.DataFrame, columns: list, strictly_repeating: bool = True) ‑> list
-
Expand source code
def get_repeated_values( df: pd.DataFrame, columns: list, strictly_repeating: bool = True ) -> list: df_copy = df.copy() column_values = [list(df_copy[col]) for col in columns] df_copy["zipped"] = list(zip(*column_values)) duplicates = [ g for _, g in df_copy.groupby("zipped") if len(g) > strictly_repeating ] dicts = [] for duplicate_df in duplicates: rows = duplicate_df.index.tolist() value = duplicate_df["zipped"][rows[0]] dicts.append({"value": value, "rows": rows, "columns": columns}) return dicts