Module pipelines.utils.georeference.tasks
Tasks for georeferencing tables
Functions
def dataframe_to_csv(dataframe: pandas.core.frame.DataFrame, filename: str = 'data.csv') ‑> None
-
Expand source code
@task def dataframe_to_csv(dataframe: pd.DataFrame, filename: str = "data.csv") -> None: """ Save dataframe to csv """ filename = filename if filename.endswith(".csv") else f"{filename}.csv" temp_filename = Path(f"/tmp/{uuid4()}/{filename}") temp_filename.parent.mkdir(parents=True, exist_ok=True) dataframe.to_csv(temp_filename, index=False) return str(temp_filename.parent)
Save dataframe to csv
def georeference_dataframe(new_addresses: pandas.core.frame.DataFrame, log_divider: int = 60) ‑> pandas.core.frame.DataFrame
-
Expand source code
@task def georeference_dataframe( new_addresses: pd.DataFrame, log_divider: int = 60 ) -> pd.DataFrame: """ Georeference all addresses in a dataframe """ start_time = time() all_addresses = new_addresses["address"].tolist() all_addresses = [f"{address}, Rio de Janeiro" for address in all_addresses] geolocator = Nominatim(user_agent="prefeitura-rio") geocode = RateLimiter(geolocator.geocode, min_delay_seconds=1) log(f"There are {len(all_addresses)} addresses to georeference") locations: List[Location] = [] for i, address in enumerate(all_addresses): if i % log_divider == 0: log(f"Georeferencing address {i} of {len(all_addresses)}...") location = geocode(address) locations.append(location) geolocated_addresses = [ { "latitude": location.latitude, "longitude": location.longitude, } if location is not None else {"latitude": None, "longitude": None} for location in locations ] output = pd.DataFrame(geolocated_addresses) output["address"] = new_addresses["address"] output[["latitude", "longitude"]] = output.apply( lambda x: check_if_belongs_to_rio(x.latitude, x.longitude), axis=1, result_type="expand", ) log(f"--- {(time() - start_time)} seconds ---") return output
Georeference all addresses in a dataframe
def get_new_addresses(source_dataset_id: str,
source_table_id: str,
source_table_address_column: str,
destination_dataset_id: str,
destination_table_id: str,
georef_mode: str,
current_flow_labels: List[str]) ‑> Tuple[pandas.core.frame.DataFrame, bool]-
Expand source code
@task(nout=2) def get_new_addresses( # pylint: disable=too-many-arguments, too-many-locals source_dataset_id: str, source_table_id: str, source_table_address_column: str, destination_dataset_id: str, destination_table_id: str, georef_mode: str, current_flow_labels: List[str], ) -> Tuple[pd.DataFrame, bool]: """ Get new addresses from source table """ new_addresses = pd.DataFrame(columns=["address"]) exists_new_addresses = False source_table_ref = f"{source_dataset_id}.{source_table_id}" destination_table_ref = f"{destination_dataset_id}.{destination_table_id}" billing_project_id = current_flow_labels[0] if georef_mode == "distinct": query_source = f""" SELECT DISTINCT {source_table_address_column} FROM `{source_table_ref}` """ query_destination = f""" SELECT DISTINCT address FROM `{destination_table_ref}` """ source_addresses = bd.read_sql( query_source, billing_project_id=billing_project_id, from_file=True ) source_addresses.columns = ["address"] try: destination_addresses = bd.read_sql( query_destination, billing_project_id=billing_project_id, from_file=True ) destination_addresses.columns = ["address"] except Exception: # pylint: disable=broad-except destination_addresses = pd.DataFrame(columns=["address"]) # pylint: disable=invalid-unary-operand-type new_addresses = source_addresses[ ~source_addresses.isin(destination_addresses) ].dropna() exists_new_addresses = not new_addresses.empty return new_addresses, exists_new_addresses
Get new addresses from source table
def validate_georeference_mode(mode: str) ‑> None
-
Expand source code
@task def validate_georeference_mode(mode: str) -> None: """ Validates georeference mode """ if mode not in [ "distinct", # insert new modes here ]: raise ValueError( f"Invalid georeference mode: {mode}. Valid modes are: distinct" )
Validates georeference mode