Module pipelines.rj_cor.meteorologia.radar.precipitacao.src.predict_rain

Functions

def run_model_prediction(base_path: str, verbose: bool = True, n_jobs: int = 1, output_hdf: bool = False)
Expand source code
def run_model_prediction(
    base_path: str, verbose: bool = True, n_jobs: int = 1, output_hdf: bool = False
):
    """
    Run model predictions and save results
    """
    REF_LATLON = (-22.9932804107666, -43.58795928955078)

    prediction_specs_filepath = pathlib.Path(f"{base_path}/src/predict_rain_specs.json")

    with open(prediction_specs_filepath, "r") as json_file:
        prediction_specs_dict = json.load(json_file)
    log(f"[DEBUG] read json ok: {prediction_specs_dict}")
    input_hdfs = prediction_specs_dict["radar_ppi_hdfs"]
    coordinates_filepath = prediction_specs_dict["coordinates"]
    radar_cal_specs_dicts_filepath = prediction_specs_dict["radar_cal_specs"]
    indices_folder = prediction_specs_dict["indices_folder"]
    model_filepath = prediction_specs_dict["model_filepath"]
    output_path = prediction_specs_dict["output_path"]

    radar_cal_specs_dicts_filepath = pathlib.Path(radar_cal_specs_dicts_filepath)
    model_filepath = pathlib.Path(model_filepath)

    coordinates = []
    id_hex = []
    bairro = []

    with open(coordinates_filepath, "r") as f:
        for line in f:
            latlon = (float(line.split(",")[0]), float(line.split(",")[1]))
            coordinates.append(latlon)
            id_hex.append(line.split(",")[2])
            bairro.append(line.split(",")[3])
    log("[DEBUG] read coordinates filepath ok")
    with open(radar_cal_specs_dicts_filepath, "r") as json_file:
        json_list = list(json_file)
    dicts_list = []
    log("[DEBUG] read radar cal specs ok")
    for json_str in json_list:
        result = json.loads(json_str)
        dicts_list.append(result)

    radar_data_hdfs = []

    last_hdf = h5py.File(input_hdfs[-1])
    log("[DEBUG] read last hdf ok")
    hdf_date = last_hdf["what"].attrs["date"].decode("UTF-8")
    hdf_time = last_hdf["what"].attrs["time"].decode("UTF-8")

    dt = datetime.datetime.strptime(f"{hdf_date} {hdf_time[:-2]}", "%Y%m%d %H%M")
    prediction_dt = dt + datetime.timedelta(minutes=5)
    prediction_dt_str = prediction_dt.strftime("%Y%m%d %H%M%S")

    prediction_date, prediction_time = prediction_dt_str.split(" ")

    assert len(input_hdfs), "Number of passed radar hdf files must be 3."

    for hdf in input_hdfs:
        check_integrity(hdf)
        radar_data_dict = {}
        for specs_dict in dicts_list:
            if "_" in specs_dict["process_type"]:
                process, by_feature = specs_dict["process_type"].split("_")
                origin_radar_data = f"{process}-{by_feature}"
                index_matrix = radar_data_dict[origin_radar_data].indices
                new_radar_data = process_ppi(
                    verbose,
                    specs_dict["feature"],
                    specs_dict["process_type"],
                    hdf,
                    index_matrix,
                )
            else:
                new_radar_data = process_ppi(
                    verbose, specs_dict["feature"], specs_dict["process_type"], hdf
                )

            assert new_radar_data.nrays == NRAYS, f"nrays should be {NRAYS}."
            radar_data_dict[
                f"{specs_dict['process_type']}-{specs_dict['feature']}"
            ] = new_radar_data

        radar_data_hdfs.append(radar_data_dict)

    log("Radar data processed successfully.")

    coordinates = np.array(coordinates)

    # Get samplings to calculate indices

    sampling_set = set()
    for specs_dict in dicts_list:
        sampling_set = sampling_set.union(set(specs_dict["sampling"]))

    # Load pre-existing information (if it exists)

    indices_paths = pathlib.Path(indices_folder).rglob("*.npy")

    if len(list(indices_paths)):
        p2f = PointsToFeatures.load_indices(indices_folder)
        assert np.all(
            np.isclose(p2f.coordinates, coordinates)
        ), "Given coordinates are not the same as loaded ones."

        log("Indices loaded successfully.")
    else:
        p2f = PointsToFeatures(coordinates)
        log(
            "No indices saved: calculating...",
        )

    samplings_to_remove = set(p2f.indices)

    # Calculate remaining information

    sampling_set = sampling_set.difference(samplings_to_remove)

    for sampling in sampling_set:
        if "_nearest" in sampling:
            k, _ = sampling.split("_")
            k = int(k)
            p2f.calc_k_nearest(k, n_jobs)
        elif sampling == "beam":
            p2f.calc_beam(n_jobs)
        else:
            raise Exception(f"{sampling} sampling not implemented.")

    if len(sampling_set):
        p2f.save_indices(indices_folder)

        log("Indices saved successfully.")

    X = [p2f.get_dist()]

    for specs_dict in dicts_list:
        for rd_dict in radar_data_hdfs:
            process_key = specs_dict["process_type"] + "-" + specs_dict["feature"]
            rd = rd_dict[process_key]
            for operation in specs_dict["operations"]:
                feature = parse_operation(p2f, rd.unwrap_data(), operation)
                X.append(feature)

    X = np.hstack(X)
    X = np.where(X != np.inf, X, 0)

    # Make predictions

    log("Opening pipeline with joblib")
    pipe = joblib.load(open(model_filepath, "rb"))

    y_pred = pipe.predict(X)

    log("Predictions made successfully.")

    output_path = pathlib.Path(output_path)
    output_path.mkdir(parents=True, exist_ok=True)

    model_parent_folders = str(model_filepath.parents[0]).split("/")[-2:]
    model = "/".join(model_parent_folders)

    predictions = PredictedData(
        y_pred,
        coordinates,
        radar_cal_specs_dicts_filepath.stem,
        model,
        prediction_date,
        prediction_time,
    )

    if output_hdf:
        predictions.save_hdf(output_path / "predictions.hdf")
    else:
        # np.savetxt(output_filepath, predictions.predictions, delimiter=",")
        log(f"[DEBUG] predictions.predictions: {predictions.predictions}")
        log(f"[DEBUG] predictions.coordinates: {predictions.coordinates}")
        log(f"[DEBUG] id_hex: {id_hex[:10]}")
        dfr = pd.DataFrame(
            predictions.coordinates, columns=["latitude", "longitude"]
        ).astype(float)
        dfr["predictions"] = predictions.predictions
        dfr["id_h3"] = id_hex
        dfr["bairro"] = bairro
        dfr["bairro"] = dfr["bairro"].str.replace("\n", "")
        dfr["data_medicao"] = pd.to_datetime(
            prediction_dt_str, format="%Y%m%d %H%M%S", utc=True
        )
        dfr["data_medicao"] = (
            dfr["data_medicao"]
            .dt.tz_convert("America/Sao_Paulo")
            .dt.strftime("%Y-%m-%d %H:%M:%S")
        )
        data_medicao = dfr["data_medicao"].max()
        log(f"DEBUUUUUUGGG data_medicao: {data_medicao}")
        log(f"DEBUUUUUUGGG {dfr.head()}")
        # dfr.to_csv(output_path / f"predictions_{data_medicao}.csv", index=False)
        return dfr

Run model predictions and save results