Cómo hacer MLOps en R paso a paso

Últimamente se habla mucho de MLOps. Sin embargo, existen pocos ejemplos y, los ejemplos que existen la mayoría son en Python. Así pues, en este post vas a aprender qué es MLOps y cómo hacer MLOps con R. ¿Te suena interesante? ¡Vamos con ello!

Qué es MLOps

MLOps es una metodología para hacer la puesta en producción de los modelos de Machine Learning más eficiente.

Lo sé, dicho así suena muy amplio y es que, en realidad, lo es. Es más, no existe un único estándar de MLOps, sino que existen varios niveles de implantación de MLOps, según el uso y tipo de modelos de ML que tenga tu organización.

En mi opinión, los cuatro niveles de adopción de MLOps son los siguientes:

Nivel de MLOps 1: Deploy continuo de modelos de ML

Este nivel es ideal cuando empiezas a desarrollar modelos de ML. Esta etapa suele ser muy iterativa: requieres poner el modelo en producción varias veces hasta conseguir un modelo que funcione bien.

Así pues, este nivel de MLOps consiste en integrar nuestro repositorio de código (como GitHub), con el entorno de deployment. La idea es sencilla: cuando subamos una nueva versión de nuestro modelo a nuestro repositorio de código, se hará un deploy automático del modelo a producción.

Para ello, debemos usar los siguientes tipos de herramientas: Un repositorio de código como GitHub, GitLab, Bitbucket o uno en la nube. Una herramienta de CI/CD como GitHub Actions o Circle CI. Un lugar de deployment. Esto puede ser muy variado, desde una máquina virtual, un cluster de Kubernetes o servicios cloud como AWS Lambda o Cloud Run.

Si quieres aprender más sobre Github Actions, te recomiendo que leas este post.

Sin embargo, una vez ya tienes el modelo en producción, este nivel se queda corto. Y es que, en la vida real no basta con poner un modelo en producción, hay que hacer seguimiento del mismo para saber cuándo reentrenarlo.

Así pues, cuando ya hemos conseguido poner un modelo que funcione bien en producción, debemos pasar al siguiente nivel de MLOps.

Nivel 2 de MLOps: Visualizando la capacidad predictiva del modelo

Una vez ya tenemos un modelo en producción, el siguiente paso es tener un dashboard donde poder visualizar cómo se está comportando el modelo: comparación de predicciones vs realidad, distribución de las variables de entrenamiento vs las que recibe el modelo, etc.

El dashboard debe incluir la información que necesitemos para decidir cuándo debemos reentrenar el modelo. De esta forma, nosotros simplemente deberemos consular el dashboard, sin tener que hacer nada más.

Para ello se pueden utilizar muchas herramientas que son fácilmente integrables con R: desde una base de datos tradicional, un datawarehouse como Big Query o Snowflake o, incluso, si el proyecto es pequeño la información se puede guardar en ficheros .RData.

Además, como estamos haciendo MLOPs con R, la aplicación la podemos desarrollar con Shiny o Dash, por ejemplo.

Si quieres aprender a cómo poner aplicaciones Shiny en producción en Google Cloud te recomiendo que leas este post.

La limitación del segundo nivel de MLOps viene cuando tienes varios modelos en producción (o pocos pero con una realidad muy cambiante). Al fin y al cabo, no puedes tener a una persona continuamente revisando si se debe, o no, reentrenar el modelo y, el reentrenamiento también sería costoso (piensa que hasta ahora el reentrenamiento es manual).

Así pues, cuando ya se tienen varios modelos en producción,lo ideal es pasar al siguiente nivel de MLOPs.

Nivel 3 de MLOps: reentrenamiento automático mediante pipelines

En este nivel, el cambio es claro: el modelo ya no se reentrena de forma manual, sino que de forma automática se reentrenan muchos modelos y se elige al mejor. Todo este reentrenamiento ocurre en la nube.

Además, gracias a automatizar el reentrenamiento, podemos crear un sistema de tal forma que, si la capacidad predictiva baja de cierto umbral, el modelo se reentrene (y despliegue) de forma automática.

Para ello, debemos crear pipelines. La forma de hacerlo es muy diversa: se puede usar desde Apache Airflow, Kubeflow o, incluso, scripts. En nuestro caso, al tratarse de MLOPs en R, usaremos esta última opción: crearemos un script para cada paso del procesamiento de datos.

Como estamos trabajando con pocos datos, esto correrá sobre la máquina de CircleCI sobre la que se ejecute. Sin embargo, si el dataset fuese muy grande (big data) siempre se puede usar Apache Spark (sparklyr) u otras herramientas. En definitiva, aunque usemos scripts, se trata de un proceso escalable.

En nuestro caso de MLOps en R, crearemos un ejemplo a este nivel: un modelo de series temporales con reentrenamiento y despliegue automático del modelo y un dahsboard para visualizar la evolución de la capacidad predictiva.

Sin embargo, aunque este pueda parecer el último nivel, en realidad existe un nivel más, pensado para cubrir las necesidades de empresas grandes, como Google, Facebook, etc.

Nivel 4 de MLOPs: experimentación con CI/CD

Si lo piensan, el nivel 3 de MLOps tiene una limitación: experimentar se hace complicado. Y es que, si queremos probar diferentes tipos de preprocesamiento, por ejemplo, tenemos que cambiar y desplegar todo el pipeline, lo cual no es muy óptimo.

Para solucionar este problema, se paquetiza cada paso del pipeline, de tal forma que experimentar sea mucho más sencillo. Al fin y al cabo es como si hubiéramos roto el data pipeline en pequeñas piezas que podemos quitar y poner a nuestro gusto.

Bien, ahora que está claro qué es MLOPs y cómo funciona, vemos en qué consiste nuestro caso de MLOPS en R.

MLOps en R: prediciendo la demanda eléctrica de España

Estructuración del Proyecto

Como el título indica, este caso consiste en predecir la demanda eléctrica de España. Para ello, utilizo los datos de Red Eléctrica de España, el cual expone muchos datos del mercado eléctrico español mediante API gratuita (enlace).

El planteamiento del caso es el siguiente:

  • Script de extracción de datos: un script ejecutable que extrae los datos de REE y va guardándolo, junto al histórico de datos en un fichero .RData. Este script será automatizado usando Github Actions y un activador cron, para que obtengamos la información cada día.
  • Script de entrenamiento del modelo: un script que lee el histórico de datos y entrena varios modelos de series temporales basados en xgboost haciendo un pequeño grid search. Además, guardo cada modelo con sus parámetros y métricas en Neptune.ai, para así poder hacer seguimiento a los modelos.
  • Script de predicción: dado un modelo entrenado previamente y el histórico de datos, realiza predicciones para los próximos días. Además, va guardando todas las predicciones en un fichero, para así poder evaluar el rendimiento del modelo. Este script se ejecutará de forma recurrente cada día, tras haber ejecutado el script de extracción de datos.
  • Script de comprobación de reentrenamiento: un script que carga los datos de predicicones y los datos de realidad y comprueba el error de las predicciones. Si el error super un umbral definido, se lanzará el script de entrenamiento del modelo.

Ahora que conocemos el planteamiento del proyecto, vamos paso a paso.

Extracción de Datos

Como he comentado previamente, el script de extracción de datos se trata de un script autoejecutable que hace peticiones a un endpoint de la API de REE, extrae la información y la guarda, ya sea bien guardando el fichero o haciendo un append sobre los datos actuales.

Asimismo, de cara a facilitar la reproducibilidad de este caso o aplicarlo a otra API de REE, toda variable ha sido definida en el fichero parameters.yaml , el cual contiene la siguiente información respecto a la extracción de datos:

data_extraction:
  time_trunc : day
  category : demanda
  subcategory : evolucion
  first_date : 2014-01-01
  max_time_call : 365
  output_path : data
  output_filename_data : raw.RData
  output_filename_date : last_date.RData

Así pues, el siguiente script utiliza dica información para realizar la extracción de datos. A grandes rasgos, el proceso que sigue el script es el siguiente :

  1. Instalación y carga de librerías .
  2. Carga de parámetros del fichero parameters.yaml .
  3. Obtención de los periodos sobre los que se deben realizar las extracciones, teniendo en cuenta la última fecha de extracción, la fecha actual, y el número de días que permite extraer la API. En caso de no disponer de la última fecha de extracción, se usará el límite de la API, definido también en parameters.yaml.
  4. Realización de las diferentes peticiones APIs y su conversión a DataFrame.
  5. Append de los datos pasados y datos recién extraidos, a menos que sea la primera extracción.
  6. Guardado del histórico de datos y de la fecha de extracción.
libs = c('httr', 'jsonlite', 'lubridate', 'dplyr', 'yaml', 'glue')
sapply(libs[!libs %in% installed.packages()], install.packages)
sapply(libs, require, character.only = T)

# Read parameters file
parameters = read_yaml('config/parameters.yaml')[['data_extraction']]

time_trunc = parameters$time_trunc
category = parameters$category
subcategory = parameters$subcategory
first_date = parameters$first_date
output_path = parameters$output_path
output_filename_data = parameters$output_filename_data
output_filename_date = parameters$output_filename_date
max_time_call = parameters$max_time_call


# Create full paths
output_data = glue("{output_path}/{output_filename_data}")
output_date = glue("{output_path}/{output_filename_date}")

# Read last date
if(output_filename_date %in% list.files(output_path)){
  first_date =readRDS(output_date)
}


get_real_demand = function(url){
  resp = GET(url)
  tmp = content(resp, type = 'text', encoding = 'UTF-8') %>% fromJSON(.)
  return(tmp$included$attributes$values[[1]])
}

# I calculate the extraction dates as dates as a substract from today
if(time_trunc == 'hour'){
  now = floor_date(Sys.time(), unit = time_trunc) 
  hours_extract = difftime(now, ymd_hms(first_date), units = 'days') %>% as.numeric(.)
  hours_extract = hours_extract * 24  
} else{
  now = floor_date(Sys.Date()-1, unit = time_trunc)
  hours_extract = difftime(now, ymd(first_date), units = 'days') %>% as.numeric(.)
}


if(hours_extract > max_time_call){
  n_extractions = ceiling(hours_extract/max_time_call)
  diff_times = as.difftime( max_time_call * 1:n_extractions, units = paste0(time_trunc,'s'))
  date_extractions = now - diff_times

} else{
  date_extractions = c(first_date)

}

# I add today as last day
date_extractions = c(now, date_extractions)


# Change Format
date_extractions = format(date_extractions, '%Y-%m-%dT%H:%M')

if(time_trunc == 'hour'){
  date_extractions[length(date_extractions)] = ymd_hms(first_date) %>% format(., '%Y-%m-%dT%H:%M')  
} else{
  date_extractions[length(date_extractions)] = ymd(first_date) %>% format(., '%Y-%m-%dT%H:%M')
}


# I create the URLs
urls = c()

for(i in 1:(length(date_extractions)-1)){
  url = paste0(
    'https://apidatos.ree.es/en/datos/',
    category,'/', subcategory,
    '?start_date=', date_extractions[i+1],
    '&end_date=', date_extractions[i],
    '&time_trunc=', time_trunc
  )
  urls = c(urls, url)
}

# Get URLs Asynchronously
# resps = getURLAsynchronous(urls)
resps = lapply(urls, get_real_demand)

resps =list()
for(i in 1:length(urls)){
  resps[[i]] =  get_real_demand(urls[i])
  print(i)
  Sys.sleep(1)
}


# From list to df
data = bind_rows(resps)

# Check if data exists
if(output_filename_data %in% list.files(output_path)){

  # Read previous data & merge
  prev_data = readRDS(output_data)
  data = bind_rows(data, prev_data)

}

# Save the file
saveRDS(data, output_data)
saveRDS(now, output_date)

Con esto ya tenemos un pipeline sencillo de extracción de datos. Ahora, debemos automatizar este pipeline para que se ejecute de manera periódica, por ejemplo, cada día. Veamos cómo hacerlo.

Automatizar el pipeline de extracción de datos

En este blog he enseñado varias formas de automatizar scripts de R, desde cómo automatizar un script de R en local a cómo automatizar un script de R en la nube. Si bien cualquiera de los dos métodos nos serviría, una tercera forma de hacerlo es usar una herramienta de CI/CD, como es Github Actions o CircleCI.

En estos casos también se suele utilizar mucho Apache Airflow. Sin embargo, Airflow es una herramienta que funciona únicamente con Python, por lo que en nuestro caso no nos serviría.

Así pues, de cara a automatizar el data pipeline de este proceso de MLOps con R usaré Github Actions, puesto que es gratuito y además, si quieres aprender cómo usarlo, tengo un post sobre ello (enlace).

Por suerte, Github Actions tiene varios pasos para usuarios de R (enlace), lo cual facilitará mucho el trabajo, no teniendo que usar Docker en cada caso. Así pues, el fichero yaml para automatizar nuestro pipeline de extracción de datos es el siguiente:

name: update_data

on:
  schedule:
    - cron: '0 7 * * *' # Execute every day at 7
  workflow_dispatch: 

jobs:
  build:
    runs-on: ubuntu-latest
    steps:
      - name: Checkout
        uses: actions/checkout@v2

      - name: Install necesary libraries
        run: sudo apt-get install -y curl libssl-dev libcurl4-openssl-dev libxml2-dev

      - name: Install R
        uses: r-lib/actions/setup-r@v2 # Instalo R
        with: 
          r-version: '4.1.1'

      - name: Run Data Extraction
        run: Rscript src/data_extraction.R

      - name: Push data to Github
        run: bash src/github.sh

Perfecto, con esto ya tenemos nuestro proceso de extracción de datos funcionando. Ahora que tenemos los datos, veamos cómo abordar el primer nivel de MLOps con R: el despliegue automático del modelo. ¡Vamos con ello!

MLOps en R nivel 1: cómo desplegar automáticamente un modelo de R en producción

Al elaborar modelos de machine learning en R, la forma más común de poner modelos en producción es usando Docker. De hecho, es el método que utilizo en el post donde explico cómo poner modelos de Machine Learning en R en producción.

Si no tienes conocimientos de Docker, te recomiendo que leas este post donde explico paso a paso qué es Docker y cómo funciona.

Dicho esto, cómo desplegar un modelo en la nube de forma automática dependerá de dos cuestiones:

  1. La nube que estamos utilizando.
  2. Nuestra herramienta de Circle CI.

En mi opinión, la forma más sencilla de realizar este despliegue es usando Google Cloud y Github o Bitbucket. En este caso, Google tiene desarrollada una extensión, de tal forma que podemos ejecutar una tarea cada vez que hagamos un push a una rama de nuestra elección.

De todos modos, a continuación te explico cómo hacer el despliegue de forma automática tanto en AWS como en Azure:

  • Azure:
    • Despliegue en Kubernetes con Github (enlace).
    • Despliegue en Kubernetes con CircleCI (enlace).
  • AWS:
    • Despliegue en EKS con Github (enlace).
    • Despliegue en EKS con CircleCI (enlace).

Dicho esto, para desplegar nuestro modelo en producción existen dos enfoques diferentes:

  1. Envolver el modelo en una API. El primer caso consiste en crear una API que, cada vez que sea llamada, devuelva una predicción. Este suele ser el caso típico de los modelos con respuesta en tiempo real o casi tiempo real.
  2. Crear una automatización que recoja los datos necesarios, cargue el modelo y haga la predicción en batch. Dentro de este tipo de modelos encontramos la predicción de probabilidad de fuga de la empresa o el scoring de clientes, por ejemplo.

Si bien nuestro ejemplo se ajusta más al segundo caso, vamos a tomar el enfoque del primer caso. Así pues, para poner nuestro modelo de R en producción con MLOps, lo primero de todo es crear una API que envuelva a nuestro modelo. Veamos cómo hacerlo.

1. Creación de una API que devuelva predicciones

Para crear una API en R vamos a usar la librería Plumber. Esta librería permite convertir una función en una API de una forma muy sencilla. Simplemente debemos indicar los parámetros de nuestra función (que serán, a su vez los parámetros de la API) y el tipo de petición a realizar. El valor que devuelva nuestra función será, precisamente, el valor que devuelva nuestra API.

Así pues, al tratarse de un modelo de forecasting, el único input del modelo es el número de días sobre los que realizar predicciones, por lo que nuestra API tiene la siguiente forma:

# Install & load libraries  
libs = c('plumber', 'forecast', 'yaml', 'dplyr', 'timetk', 'lubridate',
         'tidymodels', 'modeltime', 'reticulate','neptune')

sapply(libs[!libs %in% installed.packages()], install.packages)
sapply(libs, require, character.only = T)

# Load Variables
predict_data_info =  read_yaml('config/parameters.yaml')[['predict']]
model_path = predict_data_info$model_path
data_path = predict_data_info$data_path
predictions_path = predict_data_info$predictions_path

# Load model
model = readRDS(model_path)
data = readRDS(data_path)

# Clean data
data$datetime = ymd_hms(data$datetime)
data$date = as.Date(data$datetime)
data$percentage = NULL
data$datetime = NULL


#* @apiTitle Predictions API
#* @apiDescription API to get predictions of Spanish Electricity Demand

#* Get the predictions
#* @param horizon The message to echo
#* @post /get_predictions
function(horizon = "2 months") {

  predictions = model %>%
    modeltime_forecast(
      h = horizon,
      actual_data = data
      ) %>%
    filter(.model_desc != 'ACTUAL') %>%
    select(.index, .value, .conf_lo, .conf_hi)

  predictions$prediction_date = Sys.Date()

  return(predictions)
}

Ahora que tenemos la API, pasamos el siguiente paso: crear una imagen Docker a partir de nuestra API.

2. Creación de una imagen Docker de nuestra API

Para crear una imagen Docker vamos a necesitar lo siguiente:

  1. El fichero de la propia API.
  2. El modelo guardado.
  3. Los datos que se han utilizado para entrenar el modelo.

En este sentido, es importante remarcar que mi directorio de trabajo es el siguiente:

├───.github
│   └───workflows
│       └───workflows
├───config
│   └───parameters.yaml
├───data
│   ├───last_date.RData
│   └───raw.RData
│
├───get_predictions.R
│
├───outputs
│   ├───best_model.RData
│   ├───model.RData
│   └───predictions.RData
└───src
    ├───check_retrain.R
    ├───data_extraction.R
    ├───github.sh
    ├───model_train.R
    ├───neptune_setup.R
    └───predict.RData

Así pues, para ello crearemos un fichero Dockerfile que nos incluya todo ese contenido de nuestro proyecto dentro de nuestra imagen Docker para, después, exponer nuestra API.

FROM asachet/rocker-tidymodels

# Install required libraries
RUN R -e 'install.packages(c("plumber", "forecast", "yaml", "dplyr", "timetk", "lubridate", "tidymodels", "modeltime"))'

# Copy model and script
RUN mkdir /data
RUN mkdir /config
RUN mkdir /outputs
COPY config/parameters.yaml /config
COPY data/raw.RData /data
COPY get_predictions.R /
COPY outputs/best_model.RData /outputs

# Plumb & run server
EXPOSE 8080
ENTRYPOINT ["R", "-e", \
    "pr <- plumber::plumb('get_predictions.R'); pr$run(host='0.0.0.0', port=8080)"]

Así pues, con esto, vamos a realizar el build de nuestra imagen:

docker build -t spain_electricity .

Una vez hemos montado la imagen, podemos comprobar que funciona correctamente. Para ello, simplemente tenemos que exponer el puerto 8080 de Docker en un puerto de nuestro ordenador. Esto lo podemos hacer con el siguiente comando:

docker run -p 8080:8080 spain_electricity

Como podemos ver en la siguiente imagen nuestra imagen funciona perfectamente. Así pues, ahora veamos cómo podemos hacer que la imagen se despliegue automáticamente en Google Cloud.

3. Cómo desplegar nuestro modelo de ML automáticamente en Google Cloud

3.1 Habilitar las APIs necesarias

Lo primero de todo es debemos habilitar las APIs que vamos a necesitar en nuestro ordenador, que son: la API de Cloud Build (enlace), la API de Cloud Run (enlace) y la API del Container Registry (enlace). Para ello, simplemente debes ir a cada uno de los enlaces que indico y dar al botón “Habilitar” o “Enable”, tal como se muestra en la siguiente imagen:

Ahora que tenemos las APIs habilidatas, pasamos al siguiente paso: cómo conectar Github con Google Cloud.

3.2 Cómo conectar Google Cloud con Github para hacer MLOps con R

Para conectar nuestro Github con Google Cloud tenemos que ir a la sección de Triggers de Cloud Build. Para que te hagas una idea, Cloud Build es el servicio de Google Cloud que permite montar las imágenes de Docker. Pues bien, vamos a conectar esta herramienta con Github para que, directamente, se suban las imágenes de Github a nuestro Cloud Run.

Para ello, lo primero de todo es conectar Cloud Build con nuestro repositorio, lo cual podemos hacer haciendo clic en el botón “Connect Repository” de la parte inferior de la página, tal como se muestra en la siguiente imágen:

Simplemente tendremos que elegir nuestra herramienta de control de código (Github), loguearnos y después, elegir el repositorio sobre el cual queremos que se haga la conexión, que en mi caso es este repositorio.

Una vez hecho esto nos aparecerá la opción de crear trigger. De momento podemos ignorarlo, puesto que es algo que explicaré en el siguiente punto.

3.3 Cómo ejecutar el deploy automático de una imagen en Cloud Run al hacer push

Para ejecutar el deploy automático, lo primero de todo vamos a crear un trigger. Para ello, desde la página inicial de Cloud Build vamos a hacer clic en “Create Triger”. De esta forma, se nos abrirá un nuevo menú, en el que podemos elegir las siguientes cuestiones:

  1. Evento que desencadena el deploy. Podemos elegir entre un push, nuevo tag o un pull request. En nuestro caso, elegiremos push.
  2. Repositorio en el cual debe fijarse Cloud Build. Debe ser el mismo repositorio que hemos conectado en el paso anterior. Asimismo, debemos elegir el branch en el que queremos que se fije, así como si debe haber cambiado (o no), un modelo concreto para hacer el deploy. En nuestro caso, haremos el deploy solo cuando cambie le fichero best_model.RData.
  3. Configuración de Cloud Build. Se trata de las instrucciones que debe seguir Cloud Build. En este sentido contamos con tres opciones:
    1. Fichero de configuración cloudbuild.yaml: se trata de un fichero YAML donde indicamos paso a paso qué debe ejecutar Cloud.
    2. Dockerfile: simplemente hará el build de la imagen Docker que encuentre en el fichero.
    3. Buildpacks: es una forma de contenerización de código, utilizado en Python, Go, Java y .Net.

Así pues, en nuestro caso, en nuestro caso utilizaremos la primera opción, puesto que es la que nos permitirá, no solo hacer el build de la imagen, sino además también desplegarla.

Así pues, dentro de la carpeta config hemos incluido el fichero cloudbuild.yaml , que contiene el siguiente contenido:

steps:
- name: 'gcr.io/cloud-builders/docker'
  args: ['build', '-t', 'gcr.io/spain-electricity-forecast/github.com/anderfernandez/MLOPs-en-R-con-Google-Cloud:$SHORT_SHA', '.']
- name: 'gcr.io/cloud-builders/docker'
  args: ['push', 'gcr.io/spain-electricity-forecast/github.com/anderfernandez/MLOPs-en-R-con-Google-Cloud:$SHORT_SHA']
- name: 'gcr.io/cloud-builders/gcloud'
  args: ['beta', 'run', 'deploy', 'spain-electricity-forecast', '--image=gcr.io/spain-electricity-forecast/github.com/anderfernandez/MLOPs-en-R-con-Google-Cloud:$SHORT_SHA', '--region=europe-west1', '--platform=managed']

Por último, para que la cuenta de servicio pueda acceder a Cloud Run a hacer el deploy, tenemos que darle permisos en Cloud Run. Para ello, simplemente tenemos que ir al apartado ajustes de Cloud Build (enlace) y permitir la API de Cloud y de Service Accounts, tal como se muestra a continuación:

Perfecto, con esto ya hemos completado el nivel 1 de MLOps en R: el deploy automático de nuestro modelo de R a Google Cloud Run cada vez que haya un cambio en el modelo entrenado.

Ahora, veamos cómo conseguir el siguiente nivel de MLOps en R.

MLOps en R nivel 2: Dashboard visualizar la capacidad predictiva del modelo

Crear una aplicación para visualizar la capacidad predictiva del modelo en R es muy sencillo gracias a Shiny. De hecho, no me voy a detener mucho en este punto, puesto que no se trata de un post sobre cómo crear un dashboard y ya he explicado cómo poner una app Shiny en producción en Cloud.

En cualquier caso, a la hora de crear dashboardss para visualizar la capacidad predictiva de un modelo de machine learning, es importante tener en cuenta lo siguiente:

  1. Visualización de la evolución de las métricas: cuando hablamos de error, no solo importa el valor absoluto, sino cómo ha sido este a lo largo del tiempo. Y es que, el error aumenta con el tiempo, por lo que este gráfico nos puede dar una idea de la velocidad de cambio de la realidad que modelamos.
  2. Comparación de la distribución de los datos de entrenamiento con los de producción. En ocasiones, el incremento del error viene porque una (o varias) variables del modelo empiezan a tener distribuciones significativamente diferentes a cuando se entrenó el modelo. Así pues, es relevante poder realizar este análisis.
  3. Tener en cuenta futuros desarrollos. Si nuestra organización únicamente se queda en el desarrollo de MLOPs en el nivel 2, es importante tener en cuenta que, en el futuro, pueda querer pasar al nivel 3. Así pues, es interesante pensar en otros requisitos futuros altamente probables, como un botón de reentrenamiento y despliegue.

Teniendo esto en cuenta, personalmente he desarrollado la siguiente aplicación sencilla:

libs = c('shiny', 'yaml', 'dplyr', 'ggplot2', 'lubridate', 'plotly', 'tidyr')
sapply(libs[!libs %in% installed.packages()], install.packages)
sapply(libs, require, character.only = T)

# Get parameters
config = read_yaml('config.yaml')
data_url = config$data_url
predictions_url = config$predictions_url

ui <- fluidPage(
  includeCSS("www/style.css"),
  h1('Spain Electricity Forecast Dashbaord Control'),

  fluidRow(class = 'metrics',
    column(3, class = "metric_card",
           p("Mean Square Error", class = "metric"),
           textOutput('mse')),
    column(3,  class = "metric_card",
           p("Root Mean Square Error", class = "metric"),
           textOutput('rmse')),
    column(3, class = "metric_card",
           p("Mean Absolute Error", class = "metric"),
           textOutput('mae')),
    column(3, class = "metric_card",
           p("Mean Percentage Error", class = "metric"),
           textOutput('mape') )
  ),

  fluidRow(
    class = 'graphs',
    plotlyOutput('evolution'),

  ), 

  div(class = "retrain",
    p("Retrain or Refresh Data"),
    actionButton('refresh', 'Refresh Data'),
    actionButton('retrain', 'Retrain Model'),
  )
)

# Define server logic required to draw a histogram
server <- function(input, output) {


  data_merged = eventReactive(input$refresh, {

    # Read Data
    download.file(data_url,"data.RData", mode="wb")
    download.file(predictions_url,"predictions.rds", mode="wb")

    data = readRDS("data.RData")
    predictions = readRDS("predictions.rds")

    # Parse Date
    predictions$.index = as.Date(predictions$.index) 
    data$datetime = as.Date(data$datetime)

    # Join the data
    data_merged = left_join(data, predictions, by = c("datetime" = ".index"))
    data_merged = data_merged %>% filter(!is.na(value) & !is.na(.value))

    data_merged

  },
  ignoreNULL = FALSE
  )

  output$mae = renderText({

    data_merged = data_merged()

    d = data_merged$value - data_merged$.value 
    mean(abs(d))

  })

  output$mse = renderText({

    data_merged = data_merged()
    d = data_merged$value - data_merged$.value 
    mean((d)^2)

  })

  output$rmse = renderText({

    data_merged = data_merged()
    d = data_merged$value - data_merged$.value 
    sqrt(mean((d)^2))

  })

  output$mape = renderText({

    data_merged = data_merged()
    d =  data_merged$.value - data_merged$value 
    mape = mean(abs(d)/data_merged$value)
    paste0(round(mape*100,2),"%")

  })


    output$evolution <- renderPlotly({

      data_merged = data_merged()

      gg = data_merged %>% 
        filter(!is.na(.value)) %>%
        rename(prediction = .value) %>%
        select(datetime, value, prediction, .conf_lo, .conf_hi, prediction_date) %>%
        group_by(datetime) %>%
        slice_max(order_by = prediction_date , n = 1) %>%
        ungroup() %>%
        select(-prediction_date) %>%
        pivot_longer(cols = -c("datetime"), names_to = "type", values_to = "value") %>%
        ggplot(aes(datetime, value, col = type)) + geom_line(size = 1) +
        theme_minimal() +
        theme(legend.position = "bottom")

      ggplotly(gg)

    })
}

# Run the application 
shinyApp(ui = ui, server = server)

Con esto ya tendríamos el segundo nivel de MLOPs en R cubierto. Ahora, veamos cómo crear el tercer nivel de MLOps con R. ¡Vamos con ello!

MLOps en R nivel 3: reentrenamiento automático del modelo

Para reentrenar nuestro modelo de forma automática, vamos a necesitar varios elementos:

  1. Un pipeline para el entrenamiento y comparación de los modelos.
  2. Un pipeline para comprobar si el modelo se debe reentrenar o no.

Así pues, veamos cómo programar cada uno de estos paso a paso.

Cómo crear un script de entrenamiento de modelos en R

Para crear un pipeline de entrenamiento de modelos en R debemos crear un script de R que automáticamente extraiga los datos de entrenamiento y entrene muchos modelos. Además, es vital que nuestro pipeline registre todos los modelos que se entrenan, incluyendo el tipo de modelo, parámetros, métricas obtenidos y el modelo en sí mismo.

En mi caso para ello he utilizado Neptune.ai, puesto que es de las pocas herramientas que son fáciles de implementar, con nivel gratuito y con librería en R. Para ello, simplemente hay que hacer el setup de Neptune, tal como se muestra en el siguiente script:

# Load libraries
libs = c('yaml', 'reticulate','neptune')
sapply(libs[!libs %in% installed.packages()], install.packages)
sapply(libs, require, character.only = T)

# Load variables
train_data_info =  read_yaml('config/parameters.yaml')[['train']]
neptune_envname = train_data_info$neptune$envname

# Install Miniconda
install_miniconda()
if(!neptune_envname %in% conda_list()$name){conda_create(envname = neptune_envname)}
conda_installations = conda_list()
conda_dir = conda_installations$python[conda_installations$name == neptune_envname] %>%
  gsub('\\', '/',., fixed =T)

use_condaenv(conda_dir)

# Install Neptune
neptune_install(
  method = 'conda',
  conda = conda_binary(),
  envname = neptune_envname
)

Asimismo, para entrenar los modelos de forecasting he utilizado las librerías timetk y modeltime, ambas basadas en la librería tidymodels. En el siguiente script puedes observar el script en el cual entreno los modelos, los registro en Neptune.ai y guardo el mejor modelo:

libs = c('forecast', 'yaml', 'dplyr', 'timetk', 'lubridate',
         'tidymodels', 'modeltime', 'reticulate','neptune')

sapply(libs[!libs %in% installed.packages()], install.packages)
sapply(libs, require, character.only = T)

# Load Variables
train_data_info =  read_yaml('config/parameters.yaml')[['train']]
resampling = train_data_info$resampling
input = train_data_info$input
output_path = train_data_info$output_path
output_filename = train_data_info$output_filename
allow_parallel = train_data_info$allow_parallel
parallel_cores = train_data_info$parallel_cores
neptune_envname = train_data_info$neptune$envname
neptune_project = train_data_info$neptune$project
model_output = train_data_info$model_output
best_model_output = train_data_info$best_model_output

# Read Data
data = readRDS(input)
neptune_api_key = Sys.getenv('api_key')

# Fix timestamp
data$datetime = ymd_hms(data$datetime)
data$date = as.Date(data$datetime)
data$percentage = NULL
data$datetime = NULL

# Make splits
splits = data %>%
  time_series_split(assess = resampling, cumulative = TRUE)

# Create recipe for preprocessing
recipe_spec_final <- recipe(value ~ ., data = training(splits)) %>%
  step_timeseries_signature(date) %>%
  step_fourier(date, period = 365, K = 5) %>%
  step_rm(date) %>%
  step_rm(all_nominal_predictors()) %>%
  step_rm(date_index.num) %>%
  step_rm(contains("iso"), contains("minute"), contains("hour"),
          contains("am.pm"), contains("xts")) %>%
  step_zv(all_predictors())


bake(prep(recipe_spec_final), new_data =  training(splits)) %>% glimpse(.)

# Create models --> Accepted boost_tree 
if("boost_tree" %in% names(train_data_info$models)){

  model_parameters = train_data_info$models$boost_tree$parameters %>% 
    unlist(.) %>% strsplit(., ',')

  model_parameters = data.frame(model_parameters) %>% tibble()


  model_grid = model_parameters %>%
    create_model_grid(
      f_model_spec = boost_tree,
      engine_name  = train_data_info$models$boost_tree$engine,
      mode         = train_data_info$prediction_mode
  )

}

# Create the workflow set
workflow <- workflow_set(
  preproc = list(recipe_spec_final),
  models = model_grid$.models, 
  cross = T
  )


control_fit_workflowset(
  verbose   = TRUE,
  allow_par = TRUE
)

if(allow_parallel){
  parallel_start(parallel_cores)
}

# Train in parallel
model_parallel_tbl <- workflow %>%
  modeltime_fit_workflowset(
    data    = training(splits),
    control = control_fit_workflowset(
      verbose   = TRUE,
      allow_par = allow_parallel
    )
  )

if(allow_parallel){
  parallel_stop()
}

calibrated_table = model_parallel_tbl %>%
  modeltime_calibrate(testing(splits)) 

accuracy_table = calibrated_table %>% modeltime_accuracy()

# Extract parameters for each model
for(i in 1:nrow(accuracy_table)){

  run <- neptune_init(
    project= neptune_project,
    api_token= neptune_api_key,
    python = 'conda',
    python_path = conda_dir
  )

  # Extract & log parameters
  model_specs = extract_spec_parsnip(workflow$info[[i]]$workflow[[1]])
  parameters = list()
  parameters[['engine']] = model_specs$engine
  parameters[['mode']] = model_specs$mode

  for(arg in names(model_specs$args)){
    parameters[[arg]] = as.character(model_specs$args[[arg]])[2]  
  }

  run["parameters"] = parameters

  # Add each metric in the accuracy table
  for(col in 4:ncol(accuracy_table) ){
    print(col)
    metric = colnames(accuracy_table)[col]
    run[paste0("evaluation/",metric)] = accuracy_table[[i, col]]
  }

  saveRDS(workflow$info[[i]]$workflow, model_output)
  neptune_upload(run["model"], model_output)

}

# Get best model
best_model = accuracy_table$.model_id[accuracy_table$mae == min(accuracy_table$mae)] 

# Save best model
saveRDS(calibrated_table[best_model,], best_model_output)

Perfecto, con esto ya tenemos nuestro pipeline para entrenar nuestro modelo dentro de nuestro proceso de MLOPs en R. Sin embargo, ¿cómo convertimos estos ficheros en un pipeline? ¡Veámoslo!

Cómo convertir el script de entrenamiento en un pipeline

Para convertir nuestro script en un pipeline, vamos a crear un Github Action que, cuando se lance, se ejecute el script. Además, este Github Action debe ser ejecutable desde varios lugares, porque deberá poder ejecutarlo cuando se compruebe tanto en la comprobación automática del modelo como de forma manual desde el dashboard.

Así pues, a la hora de convertir nuestro script de entrenamiento en un pipeline, usaremos como activador una petición HTTP. De esta forma, podremos lanzar este proceso de forma sencilla desde cualquier lugar .

Así pues, teniendo esto en cuenta, el fichero retrain_model.yaml que define nuestra automatización del reentrenamiento queda de la siguiente manera:

name: retrain_model

on:
  repository_dispatch:
    types: retrain_model

jobs:
  build:
    runs-on: ubuntu-18.04 
    container: 
     image: asachet/rocker-tidymodels

    steps:
      - name: Checkout
        uses: actions/checkout@v2

      - name: Install R
        uses: r-lib/actions/setup-r@v2 # Instalo R
        with: 
          r-version: '4.1.1'

      - name: Setup Neptune
        run: Rscript src/neptune_setup.R

      - name: Retrain Model
        run: Rscript src/model_train.R

      - name: Push data to Github
        run: bash src/github.sh

Con esto ya tendríamos un sistema MLOps que podemos reentrenar cuando nosotros queramos simplemente presionando un botón. Sin embargo, la idea es que el modelo se entrene de manera recurrente. Veamos cómo automatizar el reentreamiento de nuestro modelo en R.

Cómo comprobar el reentrenamiento del modelo

Para automatizar la comprobación del reentrenamiento del modelo, vamos a crear otra automatización con Github Actions que se ejecute de forma diaria y que, en caso de que el error sea superior al límite que hayamos fijado, lance el workflow que hemos definido anteriormente mediante una petición HTTP.

Si no sabemos cómo lanzar un workflow de Github Actions mediante una petición de HTTP, te recomiendo que leas este post donde explico cómo hacerlo.

Dicha ejecución lo conseguimos con el siguiente script:

# Install & load libraries  
libs = c( 'yaml', 'dplyr', 'lubridate', 'httr', 'glue', 'jsonlite')
sapply(libs[!libs %in% installed.packages()], install.packages)
sapply(libs, require, character.only = T)

# Load Variables
GITHUB_TOKEN = Sys.getenv("TOKEN")

retrain_info =  read_yaml('config/parameters.yaml')[['retrain']]
predictions_path = retrain_info$predictions_path
data_path = retrain_info$data_path
max_mae = retrain_info$max_mae
n_days = retrain_info$n_days
github_user = retrain_info$github_user
github_repo = retrain_info$github_repo
github_event_type = retrain_info$github_event_type

# Read Data
data = readRDS(data_path)
predictions = readRDS(predictions_path)


# Unify data
data$datetime = ymd_hms(data$datetime) %>% as.Date(.)
predictions$.index = as.Date(predictions$.index)

# For each date, get latest prediction % join the real data
comparison = predictions %>%
  group_by(.index, prediction_date) %>%
  arrange(desc(prediction_date)) %>%
  slice(1) %>%
  ungroup() %>%
  inner_join(data, by = c('.index' = 'datetime')) %>%
  slice_max(n = n_days, order_by = .index)

mae = mean(abs(comparison$value - comparison$.value))

if(mae > max_mae){

  url = glue('https://api.github.com/repos/{github_user}/{github_repo}/dispatches')

  body  = list("event_type" = github_event_type) 
  resp = POST(url, 
       add_headers('Authorization' = paste0('token ', GITHUB_TOKEN)),
       body = jsonlite::toJSON(body, pretty = T, auto_unbox = T)
       )
}

Sin embargo, tener este script no es suficiente: necesitamos poder automaitzarlo. Para ello, vamos a crear un Github Actions que lo ejecute de forma periodica.

Es importante que la comprobación del reentrenamiento se ejecute siempre después de la extracción de datos.

Así pues, vamos a crear otro Github Actions para que ejecute nuestro script de comprobación de reentrenamiento cada día. Esto lo conseguimos con el siguiente fichero yaml:

name: check_retrain

on:
  schedule:
    - cron: '0 9 * * *'  
  workflow_dispatch: 

jobs:
  build:
    runs-on: ubuntu-18.04
    steps:
      - name: Checkout
        uses: actions/checkout@v2

      - name: Update
        run: sudo apt-get update

      - name: Install necesary libraries
        run:  sudo apt-get install -y curl libssl-dev libcurl4-openssl-dev libxml2-dev

      - name: Install R
        uses: r-lib/actions/setup-r@v2 # Instalo R
        with: 
          r-version: '4.1.1'

      - name: Set environment variables
        run: |
          echo TOKEN="$secrets.TOKEN" >> ~/.Renviron

      - name: Run Retrain Check
        run: Rscript src/check_retrain.R

¡Perfecto! Ya tenemos nuestro sistema de MLOPs en R casi terminado. Y es que únicamente nos queda una cuestión: realizar peticiones de forma periódica a nuestra API e ir guardándolas, de tal forma que nuestro dashboard se pueda nutrir de ellas. ¡Veamos cómo hacerlo!

Cómo automatizar la obtención de predicciones

Habiendo visto todo lo anterior, supongo que te podrás imaginar cómo vamos a realizar la automatización de la obtención de predicciones: Github Actions. De hecho, en nuestro caso es bastante sencillo, ya que simplemente voy a realizar predicciones cada día e ir guardando todos los resultados.

Así pues, simplemente voy a tener que crear dos ficheros:

  1. Un script que realice peticiones a la API y haga un append en el fichero de predicciones actuales.
  2. Un fichero de configuración yaml para que ejecute dicho proceso cada día.

Respecto al primer paso, debemos tener en cuenta tanto la URL en la que se ha desplegado nuestro servicio como el endpoint en el que hemos ubicado nuestra función. Así pues, el script es el siguiente:

# Load libraries
libs = c('yaml', 'httr', 'jsonlite')
sapply(libs[!libs %in% installed.packages()], install.packages)
sapply(libs, require, character.only = T)

# Load variables
predict_info =  read_yaml('config/parameters.yaml')[['predict']]
horizon = predict_info[['horizon']]
service_url = predict_info[['service_url']]
service_endpoint = predict_info[['service_endpoint']]
predictions_path = predict_info[['predictions_path']]

# Construct the API call
url = paste0(service_url,'/',service_endpoint)

resp = POST(url)
predictions = fromJSON(content(resp, type = "text")) 

# Read the predictions
past_predictions = readRDS(predictions_path)

# Merge files
predictions$.index = as.Date(predictions$.index)
predictions$prediction_date = as.Date(predictions$prediction_date)
all_predictions = bind_rows(past_predictions, predictions)

# Save predictions
saveRDS(all_predictions, predictions_path)

Por último, para automatizar este script hemos utilizado el siguiente fichero yaml:

name: make_predictions

on:
  schedule:
    - cron: '0 10 * * *' 
  workflow_dispatch: 

jobs:
  build:
    runs-on: ubuntu-18.04
    steps:
      - name: Checkout
        uses: actions/checkout@v2

      - name: Update
        run: sudo apt-get update

      - name: Install necesary libraries
        run:  sudo apt-get install -y curl libssl-dev libcurl4-openssl-dev libxml2-dev

      - name: Install R
        uses: r-lib/actions/setup-r@v2
        with: 
          r-version: '4.1.1'

      - name: Run Retrain Check
        run: Rscript src/get_predictions.R

      - name: Push data to Github
        run: bash src/github.sh

Con esto ya tendríamos nuestro modelo de MLOPs en R terminado.

Conclusión MLOps en R

La primera y principal conclusión es que sí, se puede hacer MLOps en R. Sin embargo, aunque hacer MLOps en R es posible, hay que ser consciente de que sí o sí vamos a tener que usar contenedores Docker y usar herramientas como Github Actions o Circle CI.

Personalmente, te animo a que realices este mismo proceso con una API que ofrezca datos en tiempo real, ya que es un proceso complejo en el que, sin duda, se aprende mucho.

En cualquier caso, espero que este post sobre MLOps en R te haya gustado. ¡Nos vemos en el siguiente post!

Blog patrocinado por: