Module pipelines.rj_escritorio.notify_flooding.utils
Utilities for the flooding notification pipeline.
Functions
def get_circle(latitude: float, longitude: float, radius: float, fname: str = None) ‑> geopandas.geodataframe.GeoDataFrame
-
Expand source code
def get_circle( latitude: float, longitude: float, radius: float, fname: str = None, ) -> gpd.GeoDataFrame: """ Get a circle geometry from a point and a radius. Args: latitude: Latitude of the point. longitude: Longitude of the point. radius: Radius of the circle in meters. fname: File name to save the circle geometry in KML (if None, the geometry is not saved). Returns: GeoDataFrame with the circle geometry. """ center = Point(longitude, latitude) dataframe = gpd.GeoDataFrame(geometry=gpd.GeoSeries(center).buffer(radius / 111139)) if fname is not None: fname = Path(fname).with_suffix(".kml") dataframe.to_file(fname, driver="KML") return dataframe
Get a circle geometry from a point and a radius.
Args
latitude
- Latitude of the point.
longitude
- Longitude of the point.
radius
- Radius of the circle in meters.
fname
- File name to save the circle geometry in KML (if None, the geometry is not saved).
Returns
GeoDataFrame with the circle geometry.
def send_email(from_address: str,
to_address: str | List[str],
subject: str,
body: str,
smtp_server: str,
smtp_port: int,
smtp_username: str = None,
smtp_password: str = None,
tls: bool = True,
attachment: str = None)-
Expand source code
def send_email( from_address: str, to_address: Union[str, List[str]], subject: str, body: str, smtp_server: str, smtp_port: int, smtp_username: str = None, smtp_password: str = None, tls: bool = True, attachment: str = None, ): """ Sends an email, optionally with an attachment. """ message = MIMEMultipart() message["From"] = from_address if isinstance(to_address, str): to_address = [to_address] message["To"] = ", ".join(to_address) message["Subject"] = subject message.attach(MIMEText(body, "plain")) if attachment is not None: with open(attachment, "rb") as file: payload = MIMEBase("application", "octate-stream") payload.set_payload(file.read()) encoders.encode_base64(payload) payload.add_header("Content-Disposition", f"attachment; filename={attachment}") message.attach(payload) session = smtplib.SMTP(smtp_server, smtp_port) if tls: session.starttls() if smtp_username is None: smtp_username = from_address if smtp_password is not None: session.login(smtp_username, smtp_password) text = message.as_string() session.sendmail(from_address, to_address, text) session.quit()
Sends an email, optionally with an attachment.