Module pipelines.utils.predict_flow.tasks
Tasks for the predict flow.
Functions
def generate_dataframe_from_predictions(predictions: List[Any],
output_column_name: str,
include_timestamp: bool = False,
timestamp: str = None,
save_path: str | pathlib.Path = None) ‑> pandas.core.frame.DataFrame-
Expand source code
@task(checkpoint=False) def generate_dataframe_from_predictions( predictions: List[Any], output_column_name: str, include_timestamp: bool = False, timestamp: str = None, save_path: Union[str, Path] = None, ) -> pd.DataFrame: """ Generate a dataframe from the predictions. """ dataframe = pd.DataFrame(data=predictions, columns=[output_column_name]) if include_timestamp: if not timestamp: timestamp = pendulum.now( tz=constants.DEFAULT_TIMEZONE.value ).to_datetime_string() dataframe["timestamp"] = timestamp if save_path: if not isinstance(save_path, Path): save_path = Path(save_path) dataframe_to_csv(dataframe=dataframe, path=save_path / "data.csv") return dataframe
Generate a dataframe from the predictions.
def get_model(model_name: str, model_version_or_stage: str, tracking_server_uri: str = None)
-
Expand source code
@task(checkpoint=False) def get_model( model_name: str, model_version_or_stage: str, tracking_server_uri: str = None, ): """ Get model from MLflow model registry. """ if not tracking_server_uri: tracking_server_uri = constants.MLFLOW_TRACKING_URI.value mlflow.set_tracking_uri(tracking_server_uri) model_uri = f"models:/{model_name}/{model_version_or_stage}" log(f"Tracking server URI: {tracking_server_uri}") log(f"Model URI: {model_uri}") model = mlflow.pyfunc.load_model(model_uri=model_uri) return model
Get model from MLflow model registry.
def predict(data: Dict[str, List[Any]], model: mlflow.pyfunc.PyFuncModel) ‑> numpy.ndarray
-
Expand source code
@task(checkpoint=False) def predict(data: Dict[str, List[Any]], model: mlflow.pyfunc.PyFuncModel) -> ndarray: """ Uses an MLflow model to predict using the data in the dataframe. """ # From pandas-split to dataframe dataframe = pd.DataFrame(data=data["data"], columns=data["columns"]) # Predict predictions = model.predict(dataframe) return list(predictions)
Uses an MLflow model to predict using the data in the dataframe.
def prepare_dataframe_for_prediction(dataframe: pandas.core.frame.DataFrame) ‑> Dict[str, List[Any]]
-
Expand source code
@task(checkpoint=False) def prepare_dataframe_for_prediction(dataframe: pd.DataFrame) -> Dict[str, List[Any]]: """ Use pandas split """ return dataframe.to_dict(orient="split")
Use pandas split