Module pipelines.utils.dump_datario.utils

General utilities for interacting with datario-dump

Functions

def generate_dump_datario_schedules(interval: datetime.timedelta,
start_date: datetime.datetime,
labels: List[str],
table_parameters: dict,
runs_interval_minutes: int = 15) ‑> List[prefect.schedules.clocks.IntervalClock]
Expand source code
def generate_dump_datario_schedules(  # pylint: disable=too-many-arguments,too-many-locals
    interval: timedelta,
    start_date: datetime,
    labels: List[str],
    table_parameters: dict,
    runs_interval_minutes: int = 15,
) -> List[IntervalClock]:
    """
    Generates multiple schedules for dumping datario tables.
    """
    clocks = []
    for count, (table_id, parameters) in enumerate(table_parameters.items()):
        parameter_defaults = {
            "url": parameters["url"],
            "dataset_id": parameters["dataset_id"],
            "dump_mode": parameters["dump_mode"],
            "table_id": table_id,
        }
        if "materialize_after_dump" in parameters:
            parameter_defaults["materialize_after_dump"] = parameters[
                "materialize_after_dump"
            ]
        if "materialization_mode" in parameters:
            parameter_defaults["materialization_mode"] = parameters[
                "materialization_mode"
            ]
        if "geometry_column" in parameters:
            parameter_defaults["geometry_column"] = parameters["geometry_column"]
        if "convert_to_crs_4326" in parameters:
            parameter_defaults["convert_to_crs_4326"] = parameters[
                "convert_to_crs_4326"
            ]
        if "geometry_3d_to_2d" in parameters:
            parameter_defaults["geometry_3d_to_2d"] = parameters["geometry_3d_to_2d"]

        new_interval = parameters["interval"] if "interval" in parameters else interval
        clocks.append(
            IntervalClock(
                interval=new_interval,
                start_date=start_date
                + timedelta(minutes=runs_interval_minutes * count),
                labels=labels,
                parameter_defaults=parameter_defaults,
            )
        )
    return clocks

Generates multiple schedules for dumping datario tables.

def load_wkt(x)
Expand source code
def load_wkt(x):
    """
    Fromt text to geometry
    """
    try:
        return wkt.loads(x)
    except Exception:
        return None

Fromt text to geometry

def remove_third_dimension(geom)
Expand source code
def remove_third_dimension(geom):
    """
    Remove third dimension from geometry
    """
    if geom is None:
        return None

    if geom.is_empty:
        return geom

    if isinstance(geom, Polygon):
        exterior = geom.exterior
        new_exterior = remove_third_dimension(exterior)

        interiors = geom.interiors
        new_interiors = [remove_third_dimension(int) for int in interiors]
        return Polygon(new_exterior, new_interiors)

    elif isinstance(geom, LinearRing):
        return LinearRing([xy[:2] for xy in list(geom.coords)])

    elif isinstance(geom, LineString):
        return LineString([xy[:2] for xy in list(geom.coords)])

    elif isinstance(geom, Point):
        return Point([xy[:2] for xy in list(geom.coords)])

    elif isinstance(geom, MultiPoint):
        points = list(geom.geoms)
        new_points = [remove_third_dimension(point) for point in points]
        return MultiPoint(new_points)

    elif isinstance(geom, MultiLineString):
        lines = list(geom.geoms)
        new_lines = [remove_third_dimension(line) for line in lines]
        return MultiLineString(new_lines)

    elif isinstance(geom, MultiPolygon):
        pols = list(geom.geoms)

        new_pols = [remove_third_dimension(pol) for pol in pols]
        return MultiPolygon(new_pols)

    elif isinstance(geom, GeometryCollection):
        geoms = list(geom.geoms)

        new_geoms = [remove_third_dimension(geom) for geom in geoms]
        return GeometryCollection(new_geoms)

    else:
        raise RuntimeError(
            f"Currently this type of geometry is not supported: {type(geom)}"
        )

Remove third dimension from geometry