Module pipelines.utils.dump_datario.tasks

General purpose tasks for dumping database data.

Functions

def get_datario_geodataframe(url: str, path: str | pathlib.Path, wait=None)
Expand source code
@task(
    max_retries=constants.TASK_MAX_RETRIES.value,
    retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def get_datario_geodataframe(
    url: str,  # URL of the data.rio API
    path: Union[str, Path],
    wait=None,
):
    """ "
    Save a CSV from data.rio API

    Parameters:
        - url (str): URL of the data.rio API
        - path (Union[str, Path]): Local path to save the file
        - wait (Optional[Any]): Prefect task wait parameter (default: None)
    """

    # Create the path if it doesn't exist
    path = Path(path)
    path.mkdir(parents=True, exist_ok=True)

    # Set the file path to save the data
    file_path = path / "geo_data" / "data.geojson"
    file_path.parent.mkdir(parents=True, exist_ok=True)

    # Make a request to the API URL to download the data
    req = requests.get(url, stream=True)
    # Save the data to the specified file path
    with open(file_path, "wb") as file:
        for chunk in req.iter_content(chunk_size=1024):
            if chunk:
                file.write(chunk)
                file.flush()

    log("Data saved")

    return file_path

" Save a CSV from data.rio API

Parameters

  • url (str): URL of the data.rio API
  • path (Union[str, Path]): Local path to save the file
  • wait (Optional[Any]): Prefect task wait parameter (default: None)
def transform_geodataframe(file_path: str | pathlib.Path,
batch_size: int = 50000,
geometry_column: str = 'geometry',
convert_to_crs_4326: bool = False,
geometry_3d_to_2d: bool = False,
wait=None)
Expand source code
@task(
    max_retries=constants.TASK_MAX_RETRIES.value,
    retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def transform_geodataframe(
    file_path: Union[str, Path],
    batch_size: int = 50000,
    geometry_column: str = "geometry",
    convert_to_crs_4326: bool = False,
    geometry_3d_to_2d: bool = False,
    wait=None,
):  # sourcery skip: convert-to-enumerate
    """ "
    Transform a CSV from data.rio API

    Parameters:
        - file_path (Union[str, Path]): Path to the geojson file to be transformed.
        - batch_size (int): Number of rows to process at once.
        - geometry_column (str): Column containing the geometry data.
        - convert_to_crs_4326 (bool): Convert the geometry data to the crs 4326 projection.
        - geometry_3d_to_2d (bool): Convert the geometry data from 3D to 2D.
        - wait (None): Prefect task wait parameter (default: None)
    """
    eventid = datetime.now().strftime("%Y%m%d-%H%M%S")

    # move to path file since file_path is path / "geo_data" / "data.geojson"
    save_path = file_path.parent.parent / "csv_data" / f"{eventid}.csv"
    save_path.parent.mkdir(parents=True, exist_ok=True)

    geojson = geojsplit.GeoJSONBatchStreamer(file_path)

    # only print every print_mod batches
    mod = 1000
    count = 1
    for feature_collection in geojson.stream(batch=batch_size):
        geodataframe = gpd.GeoDataFrame.from_features(feature_collection["features"])
        log_mod(
            msg=f"{count} x {batch_size} rows: geodataframe loaded",
            index=count,
            mod=mod,
        )

        # move geometry column to the end
        cols = geodataframe.columns.tolist()
        cols.remove(geometry_column)
        cols.append(geometry_column)
        geodataframe = geodataframe[cols]

        # remove accents from columns
        geodataframe.columns = remove_columns_accents(geodataframe)
        geodataframe["geometry_wkt"] = geodataframe[geometry_column].copy()

        # convert geometry to crs 4326
        if convert_to_crs_4326:
            try:
                geodataframe.crs = "epsg:4326"
                geodataframe[geometry_column] = geodataframe[geometry_column].to_crs(
                    "epsg:4326"
                )
            except Exception as err:
                log(f"{count}: error converting to crs 4326: {err}")
                raise err

            log_mod(
                msg=f"{count}: geometry converted to crs 4326",
                index=count,
                mod=mod,
            )

        # convert geometry 3d to 2d
        if geometry_3d_to_2d:
            try:
                geodataframe[geometry_column] = (
                    geodataframe[geometry_column].astype(str).apply(load_wkt)
                )

                geodataframe[geometry_column] = geodataframe[geometry_column].apply(
                    remove_third_dimension
                )
            except Exception as err:
                log(f"{count}: error converting 3d to 2d: {err}")
                raise err

            log_mod(
                msg=f"{count}: geometry converted 3D to 2D",
                index=count,
                mod=mod,
            )

        log_mod(
            msg=f"{count}: new columns: {geodataframe.columns.tolist()}",
            index=count,
            mod=mod,
        )

        # save geodataframe to csv
        geodataframe.to_csv(
            save_path,
            index=False,
            encoding="utf-8",
            mode="a",
            header=not save_path.exists(),
        )

        # clear memory
        del geodataframe

        log_mod(
            msg=f"{count} x {batch_size} rows: Data saved",
            index=count,
            mod=mod,
        )
        count += 1
    log(f"{count} x {batch_size} DATA TRANSFORMED!!!")
    return save_path

" Transform a CSV from data.rio API

Parameters

  • file_path (Union[str, Path]): Path to the geojson file to be transformed.
  • batch_size (int): Number of rows to process at once.
  • geometry_column (str): Column containing the geometry data.
  • convert_to_crs_4326 (bool): Convert the geometry data to the crs 4326 projection.
  • geometry_3d_to_2d (bool): Convert the geometry data from 3D to 2D.
  • wait (None): Prefect task wait parameter (default: None)