Desmitificando el training y serving de modelos predictivos on cloud: No necesitas mucha experiencia con Machine Learning para empezar

Desarrollador Big Data cinturón blanco

El mes pasado asistimos al workshop "Machine Learning in production" impartido por los amigos de @TheGurusTeam. Durante el mismo, entre otras cosas, levantamos un tracking server de MLflow, y lo usamos para desplegar varios modelos ML localmente (en la VM del servidor MLflow) y on cloud (AWS Sagemaker).

Desde el principio quedó patente que en el entorno Data Science es muy importante la cultura devops. No hay una manera estándar de desplegar modelos en producción. Con MLflow es fácil desacoplar los procesos de desarrollo (crear y entrenar) de modelos y el de usarlos para hacer predicciones en un entorno productivo.

Con MLflow podemos reducir drásticamente la brecha entre ambos mundos, dado que es fácil lanzar nuevos modelos en producción sin re-implementar ningún código, y con mínimo esfuerzo por parte del equipo de Data Engineering.

De acuerdo, pero este post no es (únicamente) acerca de MLflow.

Me gustaría ir más lejos, y aprovechar que Microsoft está apoyando completamente el software Open Source MLflow con Azure Machine Learning. Esto significa que nosotros, como desarrolladores, podemos hacer uso de la API MLFlow Traking (incluso sin levantar un tracking server) para el seguimiento de ejecuciones y el despliegue de modelos on cloud (servicio de Azure Machine Learning).

PREREQUISITOS

Para replicar los contenidos de este post necesitas una cuenta de Azure, un entorno de desarrollo Python (preferentemente Linux) (nosotros hemos usado Pycharm, VirtualBox, MobaXterm, Git, etc.), y sería deseable algo de conocimiento acerca de bash, Docker, Kubernetes, lenguaje Python, ML y Azure Cloud :-)

Paso 0

Lo primero de todo, debemos crear un espacio de trabajo (workspace) de Azure ML. En adelante, podemos obtener una referencia al mismo con el siguiente script:

# workspace.py
def get_workspace():  
    ws = Workspace.get(name=WORKSPACE_NAME, subscription_id=SUBSCRIPTION_ID, resource_group=RESOURCE_GROUP)
    return ws

Rompiendo el hielo (Azure ML "a pelo")

Supongamos que tienes un modelo entrenado (o imaginemos que tu Data Scientist te lo dió) serializado. En nuestro repositorio, puedes encontrar uno llamado worst.pickle ubicado en la carpeta artifacts (basado en un modelo de regresión Ridge de scikit-learn, pre-entrenado). Podemos registrar el modelo en el workspace de Azure ML usando el siguiente script:

# model.py
from azuremite.workspace import get_workspace

def model_register():  
    ws = get_workspace()
    model = Model.register(workspace=ws, model_path="../artifacts/worst.pickle", model_name="worst-model")
    return model

Una vez registrado el modelo, podemos obtener su path fácilmente:

# model.py
def get_model_path():  
    ws = get_workspace()
    model_path = Model.get_model_path('worst-model', _workspace=ws)
    return model_path 

Después, para recuperar el modelo, también hay que deserializarlo:

# score.py
from azuremite.model import get_model_path  
from sklearn.externals import joblib

def get_model():  
    model_path = get_model_path()
    _model = joblib.load(model_path)
    return _model 

Ya estamos listos para elaborar nuestro primer script de scoring del modelo:

# score.py

from sklearn.externals import joblib  
from azuremite.model import get_model_path  
import pandas as pd

def get_model():  
    model_path = get_model_path()
    _model = joblib.load(model_path)
    return _model

def init():  
    global model
    # retrieve the path to the model file using the model name
    model = get_model()

def run(raw_data):  
    df = pd.DataFrame(data)
    jsonContent = pd.DataFrame.to_json(df, orient='split')
    print(f"invocaremos con {jsonContent}")
    # make prediction
    y_hat = model.predict(df)
    # you can return any data type as long as it is JSON-serializable
    return y_hat.tolist()

if __name__ == '__main__':  
    init()
    data = [[2000]]
    prediction = run(data)
    print(prediction)

En este momento te imaginarás que exponer el método
def run(raw_data) como una API REST API sería el siguiente paso. Si, sería una operación francamente sencilla, y una vez hecho, el modelo estaría listo para hacer predicciones, accedido de diversas formas (API REST, cualquier tipo de "pipelines", etc).

Pero supongo que esto ya lo sabías. Espero que desde aquí, el contenido se vuelva ciertamente interesante.

Trabajando con Azure ML (al estilo MLflow)

Vamos a empezar a dar cierto apoyo al Data Scientist durante la fase de entrenamiento del modelo.

Para una primera impresión, puedes echar un vistazo al siguiente notebook (Jupyter notebook):

import mlflow  
from azureml.core import Workspace  
from azuremite.configuration import WORKSPACE_NAME, SUBSCRIPTION_ID, RESOURCE_GROUP, EXPERIMENT_NAME, MODEL_PATH  
from sklearn.linear_model import Ridge  
import mlflow.sklearn  
def get_workspace():  
    ws = Workspace.get(name=WORKSPACE_NAME, subscription_id=SUBSCRIPTION_ID, resource_group=RESOURCE_GROUP)
    return ws

Esta es la clave: con las dos siguientes lineas recuperamos la referencia al workspace de Azure ML, e indicamos que lo vamos a usar como "tracking server" de MLflow.

ws = get_workspace()  
mlflow.set_tracking_uri(ws.get_mlflow_tracking_uri())  

Este método carga algunos datos para el entrenamiento (en este caso, de un archivo .csv)

def get_training_data():  
  sample_files_path = os.environ.get('SAMPLE_FILES_PATH','.')
  trainingDataFilePath = sample_files_path + '/articulos_ml.csv'
  data = pd.read_csv(trainingDataFilePath)
  # veamos cuantas dimensiones y registros contiene (son 161 registros con 8 columnas)
  # print(data.shape)
  # Vamos a RECORTAR los datos en la zona donde se concentran más los puntos
  filtered_data = data[(data['Word count'] <= 3500) & (data['# Shares'] <= 80000)]
  return filtered_data

Este método sería un ejemplo simple de entrenamiento del modelo:

def train(alpha_arg, training_data):  
  dataX = training_data[["Word count"]]
  X_train = np.array(dataX)
  y_train = training_data['# Shares'].values
  # Creamos el objeto de Regresión Ridge
  regr = Ridge(alpha=alpha_arg)
  # Entrenamos nuestro modelo (worst model ever)
  regr.fit(X_train, y_train)
  return regr  

Y el código a continuación puede ser considerado "la joya de la corona" de este notebook. ;-) Empezamos registrando un nuevo experimento en Azure ML (únicamente usando API de MLflow). A continuación declaramos una ejecución (run) (si, tal cual, tanto MLflow como Azure ML comparten los mismos conceptos tales como "experimentos", "ejecuciones", "log metrics", "log artifacts", "log models", etc), entrenamos el modelo, registramos algunas métricas, y finalmente registramos (guardamos) el modelo entrenado. Recuerda, estamos trabajando contra AzureML con API MLflow.

experiment_name = EXPERIMENT_NAME  
mlflow.set_experiment(experiment_name)

with mlflow.start_run():  
  alpha = 0.01 
  regression_model = train(alpha)    
  mlflow.log_metric('alpha', alpha)
  # Save the model to the outputs directory for capture
  model_save_path = MODEL_PATH
  mlflow.sklearn.log_model(regression_model, model_save_path)

Puedes ver los resultados (visibles) en las siguientes capturas:

Nuevo experimento registrado Nuevo experimento registrado Podemos hacer seguimiento de las métricas desde el Azure Dashboard.
Seguimiento de ejecuciones Seguimiento de ejecuciones Modelo registrado Modelo registrado Detalles del modelo Detalles del modelo

Una vez que el proceso de entrenamiento y registro (almacenamiento) del modelo se ha completado, es tiempo de recuperarlo y hacer el scoring.

El siguiente fragmento de código te muestra como recuperar el modelo de la última ejecución (del experimento que tenemos entre manos):

# model2.py

from azureml.core.model import Model  
from azuremite.workspace import get_workspace  
from azuremite.configuration import EXPERIMENT_NAME, MODEL_PATH  
import mlflow.sklearn

def get_experiment():  
    ws = get_workspace()
    experiment_name = EXPERIMENT_NAME
    exp = ws.experiments[experiment_name]
    return exp

def get_last_run_id():  
    exp = get_experiment()
    runs = list(exp.get_runs())
    runId = runs[0].id
    print(f"last run ID={runId}")
    return runId

def get_model_uri(runId=get_last_run_id(), model_save_path=MODEL_PATH):  
    ws = get_workspace()
    mlflow.set_tracking_uri(ws.get_mlflow_tracking_uri())
    model_uri = 'runs:/{}/{}'.format(runId, model_save_path)
    return model_uri

def get_model(runId=get_last_run_id(), model_save_path=MODEL_PATH):  
    ws = get_workspace()
    mlflow.set_tracking_uri(ws.get_mlflow_tracking_uri())
    model_uri = get_model_uri(runId, model_save_path)
    model = mlflow.sklearn.load_model(model_uri)
    return model

Espero que los nombres de los métodos sean auto-explicativos. Con los métodos mostrados anteriormente, obtenemos la referencia a un experimento, encontramos el id de la última ejecución, y recuperamos el modelo asociado a la misma.

Ahora el modelo está listo para hacer el scoring, con el siguiente script (tan simple como el último):

# score2.py

from sklearn.externals import joblib

from azuremite.model2  import get_model

import pandas as pd

from azureml.core.model import Model

def init():  
    global model
    # retrieve the model
    model = get_model()

def run(raw_data):  
    df = pd.DataFrame(data)
    # make prediction
    y_hat = model.predict(df)
    # you can return any data type as long as it is JSON-serializable
    return y_hat.tolist()

if __name__ == '__main__':  
    init()
    data = [[2000]]
    prediction = run(data)
    print(prediction)

Construyendo la imagen

Es el momento de empaquetar el modelo y desplegarlo como una imagen Docker, lo que finalmente permitirá exponer el modelo como un servicio WEB.

Es realmente simple, vamos a mostrarte como hacerlo con un breve fragmento de código (recuerda, API MLflow):

# image.py
def build_image():  
    ws = get_workspace() 
    m_uri = get_model_uri()
    azure_image, azure_model = mlflow.azureml.build_image(model_uri=m_uri,
                                                          workspace=ws,
                                                          model_name='worst-model',
                                                          image_name=IMAGE_NAME,
                                                          synchronous=True)

Creando la imagen de Docker

Con esto conseguimos la creación de la imagen de Docker, la cual es mostrada en la sección "Images" del workspace:
Imagen creada

Desplegando el modelo

Ultimo paso para poner la guinda al pastel. La imagen recientemente construida puede ser desplegada en Azure Container Instances (ACI) o en un Azure Kubernetes Service (AKS) para el serving, con unas pocas líneas de código (con API MLflow, por supuesto).

# deploy_aci.py
from azureml.core.webservice import AciWebservice, Webservice  
from azuremite.workspace import get_workspace  
from azuremite.image import get_image  
from azuremite.configuration import LOCATION, MODEL_NAME

def deploy_image():  
  ws = get_workspace() 
  azure_image = get_image()
  aci_config = AciWebservice.deploy_configuration(cpu_cores=1,                                             memory_gb=1, tags={'method' : 'sklearn'}, 
description='Worst model',  
location=LOCATION)  
  webservice = Webservice.deploy_from_image(image=azure_image, workspace=ws, name=MODEL_NAME, deployment_config=aci_config)
  webservice.wait_for_deployment(show_output=True)

if __name__ == '__main__':  
    deploy_image()

Deploying to ACI

Estos son los detalles de la "Implementación", mostrados desde la interfaz de usuario de Azure ML:

Desplegado a  ACI Detalles del despliegue

En este punto podemos realizar consultas al endpoint del servicio WEB (ACI) de scoring, enviando requests HTTP POST que contengan el vector de las propiedades, para el ejemplo, usando Postman:

Consultando el servicio WEB ACI

Dado que la plataforma ACI solo está recomendada para entornos de staging y desarrollo, vamos a desplegar finalmente el modelo a un entorno "productivo" usando Azure Kubernetes Service (AKS).

# deploy_aks.py
from azuremite.workspace import get_workspace  
from azuremite.image import get_image  
from azuremite.cluster import get_cluster  
from azuremite.configuration import AKS_NAME

from azureml.core.webservice import Webservice, AksWebservice

def deploy_image():  
  ws = get_workspace() 
  azure_image = get_image()

  # Set the web service configuration (using default here with app insights)
  aks_config = AksWebservice.deploy_configuration(enable_app_insights=True)

  # Unique service name
  service_name = AKS_NAME

  aks_target = get_cluster()

  # Webservice creation using single command
  aks_service = Webservice.deploy_from_image( workspace=ws, 
                                              name=service_name,
                                              deployment_config = aks_config,
                                              image = azure_image,
                                              deployment_target = aks_target)

  aks_service.wait_for_deployment(show_output=True)

if __name__ == '__main__':  
    deploy_image()

Antes de realizar consultas al endpoint del servicio WEB (AKS) de scoring, debemos obtener la URI:

# deploy_aks.py
from azuremite.workspace import get_workspace  
from azuremite.image import get_image  
from azuremite.cluster import get_cluster  
from azuremite.configuration import AKS_NAME

from azureml.core.webservice import Webservice, AksWebservice

def get_service():  
  ws = get_workspace() 
  services = Webservice.list(workspace = ws, compute_type='AKS')
  return services[0]

if __name__ == '__main__':  
    print(get_service().scoring_uri)

Vamos a hacer scoring con del modelo (regresión) entrenado, una vez más.

Usaremos un script simple:

# score3.py

from azuremite.deploy_aks import get_service

import pandas as pd  
import requests  
import json

def init():  
    wservice = get_service()
    key1, key2 = wservice.get_keys()
    global scoring_uri, headers
    headers = {'Content-Type':'application/json',
               'Authorization': 'Bearer ' + key1}
    print(headers)
    scoring_uri = wservice.scoring_uri

def run():  
    jsonContent = {"columns":[0],"index":[0],"data":[[2000]]}
    # make prediction
    resp = requests.post(scoring_uri, json=jsonContent, headers=headers)
    # you can return any data type as long as it is JSON-serializable
    print(resp.status_code)
    print(resp.content)
    return resp.json()

if __name__ == '__main__':  
    init()
    prediction = run()
    print(prediction)

Scoring del modelo usando script o mediante Postman:
Scoring del modelo usando Postman (cabeceras) Scoring del modelo usando Postman

Conclusión

Con este post, deberías ser capaz de hacer serving de modelos en AKS (Azure Kubernetes Service). Construir el proyecto es fácil, y puede ahorrarte un montón de tiempo si eres novato trabajando con modelos on cloud. Como hemos visto, la API de MLflow te proporciona una forma fácil de empezar y desplegar modelos pre-entrenados en cuestión de minutos.

Puedes encontrar el código del post en el siguiente repo. Está publicado bajo la licencia "Unlicense", por tanto puedes modificarlo a voluntad y usarlo en tu proyecto.

Quieres aprender más?

Soy desarrollador WEB y desarrollador Big Data, residente en Madrid (Spain). Enfocado mayormente en las últimas tendencias en desarrollo de software, Big Data, Machine Learning y asuntos relacionados con la nube. Si lo deseas contacta conmigo en LinkedIn o Github.

Alberto Morales Morales

Software craftsman. Passion for developing quality code that can be proud of. Happily married.

Madrid, Spain.