The mlserver
package comes with inference runtime implementations for scikit-learn
and xgboost
models.
However, some times we may also need to roll out our own inference server, with custom logic to perform inference.
To support this scenario, MLServer makes it really easy to create your own extensions, which can then be containerised and deployed in a production environment.
In this example, we will train a numpyro
model.
The numpyro
library streamlines the implementation of probabilistic models, abstracting away advanced inference and training algorithms.
Out of the box, mlserver
doesn't provide an inference runtime for numpyro
.
However, through this example we will see how easy is to develop our own.
The first step will be to train our model.
This will be a very simple bayesian regression model, based on an example provided in the numpyro
docs.
Since this is a probabilistic model, during training we will compute an approximation to the posterior distribution of our model using MCMC.
# Original source code and more details can be found in:
# https://nbviewer.jupyter.org/github/pyro-ppl/numpyro/blob/master/notebooks/source/bayesian_regression.ipynb
import numpyro
import numpy as np
import pandas as pd
from numpyro import distributions as dist
from jax import random
from numpyro.infer import MCMC, NUTS
DATASET_URL = 'https://raw.githubusercontent.com/rmcelreath/rethinking/master/data/WaffleDivorce.csv'
dset = pd.read_csv(DATASET_URL, sep=';')
standardize = lambda x: (x - x.mean()) / x.std()
dset['AgeScaled'] = dset.MedianAgeMarriage.pipe(standardize)
dset['MarriageScaled'] = dset.Marriage.pipe(standardize)
dset['DivorceScaled'] = dset.Divorce.pipe(standardize)
def model(marriage=None, age=None, divorce=None):
a = numpyro.sample('a', dist.Normal(0., 0.2))
M, A = 0., 0.
if marriage is not None:
bM = numpyro.sample('bM', dist.Normal(0., 0.5))
M = bM * marriage
if age is not None:
bA = numpyro.sample('bA', dist.Normal(0., 0.5))
A = bA * age
sigma = numpyro.sample('sigma', dist.Exponential(1.))
mu = a + M + A
numpyro.sample('obs', dist.Normal(mu, sigma), obs=divorce)
# Start from this source of randomness. We will split keys for subsequent operations.
rng_key = random.PRNGKey(0)
rng_key, rng_key_ = random.split(rng_key)
num_warmup, num_samples = 1000, 2000
# Run NUTS.
kernel = NUTS(model)
mcmc = MCMC(kernel, num_warmup, num_samples)
mcmc.run(rng_key_, marriage=dset.MarriageScaled.values, divorce=dset.DivorceScaled.values)
mcmc.print_summary()
Now that we have trained our model, the next step will be to save it so that it can be loaded afterwards at serving-time. Note that, since this is a probabilistic model, we will only need to save the traces that approximate the posterior distribution over latent parameters.
This will get saved in a numpyro-divorce.json
file.
import json
samples = mcmc.get_samples()
serialisable = {}
for k, v in samples.items():
serialisable[k] = np.asarray(v).tolist()
model_file_name = "numpyro-divorce.json"
with open(model_file_name, 'w') as model_file:
json.dump(serialisable, model_file)
The next step will be to serve our model using mlserver
.
For that, we will first implement an extension which serve as the runtime to perform inference using our custom numpyro
model.
Our custom inference wrapper should be responsible of:
- Loading the model from the set samples we saved previously.
- Running inference using our model structure, and the posterior approximated from the samples.
%%writefile models.py
import json
import numpyro
import numpy as np
from typing import Dict
from jax import random
from mlserver import MLModel, types
from numpyro.infer import Predictive
from numpyro import distributions as dist
class NumpyroModel(MLModel):
async def load(self) -> bool:
model_uri = self._settings.parameters.uri
with open(model_uri) as model_file:
raw_samples = json.load(model_file)
self._samples = {}
for k, v in raw_samples.items():
self._samples[k] = np.array(v)
self._predictive = Predictive(self._model, self._samples)
self.ready = True
return self.ready
async def predict(self, payload: types.InferenceRequest) -> types.InferenceResponse:
inputs = self._extract_inputs(payload)
predictions = self._predictive(rng_key=random.PRNGKey(0), **inputs)
obs = predictions["obs"]
obs_mean = obs.mean()
return types.InferenceResponse(
id=payload.id,
model_name=self.name,
model_version=self.version,
outputs=[
types.ResponseOutput(
name="obs_mean",
shape=obs_mean.shape,
datatype="FP32",
data=np.asarray(obs_mean).tolist(),
)
],
)
def _extract_inputs(self, payload: types.InferenceRequest) -> Dict[str, np.ndarray]:
inputs = {}
for inp in payload.inputs:
inputs[inp.name] = np.array(inp.data)
return inputs
def _model(self, marriage=None, age=None, divorce=None):
a = numpyro.sample("a", dist.Normal(0.0, 0.2))
M, A = 0.0, 0.0
if marriage is not None:
bM = numpyro.sample("bM", dist.Normal(0.0, 0.5))
M = bM * marriage
if age is not None:
bA = numpyro.sample("bA", dist.Normal(0.0, 0.5))
A = bA * age
sigma = numpyro.sample("sigma", dist.Exponential(1.0))
mu = a + M + A
numpyro.sample("obs", dist.Normal(mu, sigma), obs=divorce)
The next step will be to create 2 configuration files:
settings.json
: holds the configuration of our server (e.g. ports, log level, etc.).model-settings.json
: holds the configuration of our model (e.g. input type, runtime to use, etc.).
%%writefile settings.json
{
"debug": "true"
}
%%writefile model-settings.json
{
"name": "numpyro-divorce",
"version": "v0.1.0",
"implementation": "models.NumpyroModel",
"parameters": {
"uri": "./numpyro-divorce.json"
}
}
Now that we have our config in-place, we can start the server by running mlserver start .
. This needs to either be ran from the same directory where our config files are or pointing to the folder where they are.
mlserver start .
Since this command will start the server and block the terminal, waiting for requests, this will need to be ran in the background on a separate terminal.
We now have our model being served by mlserver
.
To make sure that everything is working as expected, let's send a request from our test set.
For that, we can use the Python types that mlserver
provides out of box, or we can build our request manually.
import requests
x_0 = [28.0]
inference_request = {
"inputs": [
{
"name": "marriage",
"shape": [1],
"datatype": "FP32",
"data": x_0
}
]
}
endpoint = "http://localhost:8080/v2/models/numpyro-divorce/versions/v0.1.0/infer"
response = requests.post(endpoint, json=inference_request)
response.json()