Module pipelines.utils.predict_flow.tasks

Tasks for the predict flow.

Functions

def generate_dataframe_from_predictions(predictions: List[Any],
output_column_name: str,
include_timestamp: bool = False,
timestamp: str = None,
save_path: str | pathlib.Path = None) ‑> pandas.core.frame.DataFrame
Expand source code
@task(checkpoint=False)
def generate_dataframe_from_predictions(
    predictions: List[Any],
    output_column_name: str,
    include_timestamp: bool = False,
    timestamp: str = None,
    save_path: Union[str, Path] = None,
) -> pd.DataFrame:
    """
    Generate a dataframe from the predictions.
    """
    dataframe = pd.DataFrame(data=predictions, columns=[output_column_name])
    if include_timestamp:
        if not timestamp:
            timestamp = pendulum.now(
                tz=constants.DEFAULT_TIMEZONE.value
            ).to_datetime_string()
        dataframe["timestamp"] = timestamp
    if save_path:
        if not isinstance(save_path, Path):
            save_path = Path(save_path)
        dataframe_to_csv(dataframe=dataframe, path=save_path / "data.csv")
    return dataframe

Generate a dataframe from the predictions.

def get_model(model_name: str, model_version_or_stage: str, tracking_server_uri: str = None)
Expand source code
@task(checkpoint=False)
def get_model(
    model_name: str,
    model_version_or_stage: str,
    tracking_server_uri: str = None,
):
    """
    Get model from MLflow model registry.
    """
    if not tracking_server_uri:
        tracking_server_uri = constants.MLFLOW_TRACKING_URI.value
    mlflow.set_tracking_uri(tracking_server_uri)
    model_uri = f"models:/{model_name}/{model_version_or_stage}"
    log(f"Tracking server URI: {tracking_server_uri}")
    log(f"Model URI: {model_uri}")
    model = mlflow.pyfunc.load_model(model_uri=model_uri)
    return model

Get model from MLflow model registry.

def predict(data: Dict[str, List[Any]], model: mlflow.pyfunc.PyFuncModel) ‑> numpy.ndarray
Expand source code
@task(checkpoint=False)
def predict(data: Dict[str, List[Any]], model: mlflow.pyfunc.PyFuncModel) -> ndarray:
    """
    Uses an MLflow model to predict using the data in the dataframe.
    """
    # From pandas-split to dataframe
    dataframe = pd.DataFrame(data=data["data"], columns=data["columns"])
    # Predict
    predictions = model.predict(dataframe)
    return list(predictions)

Uses an MLflow model to predict using the data in the dataframe.

def prepare_dataframe_for_prediction(dataframe: pandas.core.frame.DataFrame) ‑> Dict[str, List[Any]]
Expand source code
@task(checkpoint=False)
def prepare_dataframe_for_prediction(dataframe: pd.DataFrame) -> Dict[str, List[Any]]:
    """
    Use pandas split
    """
    return dataframe.to_dict(orient="split")

Use pandas split