How to implement MLOps in R step by step

There’s been a lot of talk lately about MLOps. However, few examples exist, and the majority of the examples that do exist are in Python. So, in this post you are going to learn what MLOps is and how to implement MLOps with R. Sounds interesting? Let’s go with it!

What is MLOps

MLOps is a methodology to make the production of Machine Learning models more efficient.

I know, said like that it sounds very broad and it really is. Furthermore, there is no single standard for MLOps, but there are several levels of implementation of MLOps, depending on the use and type of ML models that your organization has.

In my opinion, the four levels of adoption of MLOps are as follows:

MLOps Level 1: Continuous Deployment of ML Models

This level is ideal when you start to develop ML models. This stage is usually very iterative: you need to put the model into production several times until you get a model that works well.

Thus, this level of MLOps consists of integrating our code repository (such as GitHub), with the deployment environment. The idea is simple: when we upload a new version of our model to our code repository, the model will be automatically deployed to production.

To do this, we must use the following types of tools: A code repository such as GitHub, GitLab, Bitbucket or one in the cloud. A CI/CD tool like GitHub Actions or Circle CI. A place of deployment. This can be very varied, from a virtual machine, a Kubernetes cluster or cloud services such as AWS Lambda or Cloud Run.

If you want to learn more about Github Actions, I recommend you read this post.

However, once you have the model in production, this level falls short. And it is that, in real life, it is not enough to put a model into production, you have to monitor it to know when to retrain it.

So when we’ve managed to get a well-functioning model into production, we need to move on to the next level of MLOps.

MLOps level 2: Visualizing the predictive capacity of the model

Once we have a model in production, the next step is to have a dashboard where we can see how the model is behaving: comparison of predictions vs. reality, distribution of training variables vs. those received by the model, etc.

The dashboard should include the information we need to decide when to retrain the model. In this way, we will simply have to consult the dashboard, without having to do anything else.

For this, many tools can be used that are easily integrated with R: from a traditional database, a data warehouse such as Big Query or Snowflake or, even if the project is small, the information can be saved in .RData files.

Also, since we are doing MLOPs with R, we can develop the application with Shiny or Dash, for example.

If you want to learn how to put Shiny apps into production on Google Cloud, I recommend you read this post.

The limitation of the second level of MLOps comes when you have several models in production (or few but with a very changing reality). After all, you can’t have a person continually checking whether or not the model should be retrained, and retraining would also be expensive (think retraining is manual so far).

Thus, when you already have several models in production, the ideal is to move to the next level of MLOPs.

MLOps level 3: automatic retraining through pipelines

At this level, the change is clear: the model is no longer manually retrained, but many models are automatically retrained and the best one is chosen. All of this retraining happens in the cloud.

Furthermore, thanks to automating retraining, we can create a system in such a way that, if the predictive capacity falls below a certain threshold, the model is retrained (and deployed) automatically.

To do this, we must create pipelines. The way to do it is very diverse: it can be used from Apache Airflow, Kubeflow or even scripts. In our case, since we are dealing with MLOPs in R, we will use this last option: we will create a script for each step of data processing.

As we’re working with little data, this will run on whatever CircleCI machine it runs on. However, if the dataset is very large (big data) you can always use Apache Spark (sparklyr) or other tools. In short, although we use scripts, it is a scalable process.

In our case of MLOps in R, we will create an example at this level: a time series model with automatic retraining and deployment of the model and a dashboard to visualize the evolution of the predictive capacity.

However, although this may seem like the last level, there is actually one more level, designed to meet the needs of large companies, such as Google, Facebook, etc.

MLOPs level 4: CI/CD experimentation

If you think about it, MLOps level 3 has a limitation: experimenting becomes complicated. And it is that, if we want to test different types of preprocessing, for example, we have to change and deploy the entire pipeline, which is not very optimal.

To solve this problem, each step of the pipeline is packaged, in such a way that experimenting is much easier. At the end of the day, it is as if we had broken the data pipeline into small pieces that we can remove and put as we please.

Okay, now that it’s clear what MLOPs is and how it works, let’s see what our MLOPS in R case is all about.

MLOps in R: predicting electricity demand in Spain

Project Structure

As the title indicates, this case consists of predicting the electricity demand of Spain. To do this, I use the data from Red Eléctrica de España, which exposes a lot of data from the Spanish electricity market through a free API (enlace ).

The approach of the case is as follows:

  • Data extraction script: an executable script that extracts the REE data and saves it, together with the data history, in a .RData file. This script will be automated using Github Actions and a cron trigger, so we get the data every day.
  • Model training script: a script that reads the data history and trains various xgboost-based time series models by doing a small grid search. In addition, I save each model with its parameters and metrics in Neptune.ai, so that I can keep track of the models.
  • Prediction script: given a previously trained model and the history of data, makes predictions for the next few days. In addition, it saves all the predictions in a file, in order to be able to evaluate the performance of the model. This script will be run recursively each day, after the data extraction script has been run.
  • Retrain Check Script – A script that loads the prediction data and the reality data and check the error of the predictions. If the error exceeds a defined threshold, the model training script will be launched.

Now that we know the project approach, let’s go step by step.

Data Extraction

As I have previously commented, the data extraction script is a self-executing script that makes requests to a REE API endpoint, extracts the information and saves it, either by saving the file or appending to it. current data.

Likewise, in order to facilitate the reproducibility of this case or apply it to another REE API, all variables have been defined in the parameters.yaml file, which contains the following information regarding the data extraction:

data_extraction:
  time_trunc : day
  category : demanda
  subcategory : evolucion
  first_date : 2014-01-01
  max_time_call : 365
  output_path : data
  output_filename_data : raw.RData
  output_filename_date : last_date.RData

So, the following script uses said information to perform the data extraction. Broadly speaking, the process followed by the script is as follows:

  1. Installation and loading of libraries .
  2. Loading parameters from the file parameters.yaml .
  3. Obtaining the periods on which the must perform the extractions, taking into account the last extraction date, the current date, and the number of days that the API allows extraction. If the last extraction date is not available, the API limit will be used, also defined in parameters.yaml.
  4. Performing the different API requests and their conversion to DataFrame.
  5. Append of past data and newly extracted data, unless it is the first extraction.
  6. Saving of historical data and extraction date.
libs = c('httr', 'jsonlite', 'lubridate', 'dplyr', 'yaml', 'glue')
sapply(libs[!libs %in% installed.packages()], install.packages)
sapply(libs, require, character.only = T)

# Read parameters file
parameters = read_yaml('config/parameters.yaml')[['data_extraction']]

time_trunc = parameters$time_trunc
category = parameters$category
subcategory = parameters$subcategory
first_date = parameters$first_date
output_path = parameters$output_path
output_filename_data = parameters$output_filename_data
output_filename_date = parameters$output_filename_date
max_time_call = parameters$max_time_call


# Create full paths
output_data = glue("{output_path}/{output_filename_data}")
output_date = glue("{output_path}/{output_filename_date}")

# Read last date
if(output_filename_date %in% list.files(output_path)){
  first_date =readRDS(output_date)
}


get_real_demand = function(url){
  resp = GET(url)
  tmp = content(resp, type = 'text', encoding = 'UTF-8') %>% fromJSON(.)
  return(tmp$included$attributes$values[[1]])
}

# I calculate the extraction dates as dates as a substract from today
if(time_trunc == 'hour'){
  now = floor_date(Sys.time(), unit = time_trunc) 
  hours_extract = difftime(now, ymd_hms(first_date), units = 'days') %>% as.numeric(.)
  hours_extract = hours_extract * 24  
} else{
  now = floor_date(Sys.Date()-1, unit = time_trunc)
  hours_extract = difftime(now, ymd(first_date), units = 'days') %>% as.numeric(.)
}


if(hours_extract > max_time_call){
  n_extractions = ceiling(hours_extract/max_time_call)
  diff_times = as.difftime( max_time_call * 1:n_extractions, units = paste0(time_trunc,'s'))
  date_extractions = now - diff_times

} else{
  date_extractions = c(first_date)

}

# I add today as last day
date_extractions = c(now, date_extractions)


# Change Format
date_extractions = format(date_extractions, '%Y-%m-%dT%H:%M')

if(time_trunc == 'hour'){
  date_extractions[length(date_extractions)] = ymd_hms(first_date) %>% format(., '%Y-%m-%dT%H:%M')  
} else{
  date_extractions[length(date_extractions)] = ymd(first_date) %>% format(., '%Y-%m-%dT%H:%M')
}


# I create the URLs
urls = c()

for(i in 1:(length(date_extractions)-1)){
  url = paste0(
    'https://apidatos.ree.es/en/datos/',
    category,'/', subcategory,
    '?start_date=', date_extractions[i+1],
    '&end_date=', date_extractions[i],
    '&time_trunc=', time_trunc
  )
  urls = c(urls, url)
}

# Get URLs Asynchronously
# resps = getURLAsynchronous(urls)
resps = lapply(urls, get_real_demand)

resps =list()
for(i in 1:length(urls)){
  resps[[i]] =  get_real_demand(urls[i])
  print(i)
  Sys.sleep(1)
}


# From list to df
data = bind_rows(resps)

# Check if data exists
if(output_filename_data %in% list.files(output_path)){

  # Read previous data & merge
  prev_data = readRDS(output_data)
  data = bind_rows(data, prev_data)

}

# Save the file
saveRDS(data, output_data)
saveRDS(now, output_date)

With this we already have a simple data extraction pipeline. Now we need to automate this pipeline so that it runs on a regular basis, say every day. Let’s see how to do it.

Automate the data extraction pipeline

In this blog I have shown several ways to automate R scripts, from how to automate an R script locally to how to automate an R script in the cloud. While either method would work, a third way to do this is to use a CI/CD tool, such as Github Actions or CircleCI.

Apache Airflow is also often used a lot in these cases. However, Airflow is a tool that works only with Python, so in our case it would not work for us.

So, in order to automate the data pipeline of this MLOps process with R I will use Github Actions, since it is free and also, if you want to learn how to use it, I have a post about it (link).

Fortunately, Github Actions has several steps for R users (link), which will make your job much easier, not having to use Docker in every case. Thus, the yaml file to automate our data extraction pipeline is as follows:

name: update_data

on:
  schedule:
    - cron: '0 7 * * *' # Execute every day at 7
  workflow_dispatch: 

jobs:
  build:
    runs-on: ubuntu-latest
    steps:
      - name: Checkout
        uses: actions/checkout@v2

      - name: Install necesary libraries
        run: sudo apt-get install -y curl libssl-dev libcurl4-openssl-dev libxml2-dev

      - name: Install R
        uses: r-lib/actions/setup-r@v2 # Instalo R
        with: 
          r-version: '4.1.1'

      - name: Run Data Extraction
        run: Rscript src/data_extraction.R

      - name: Push data to Github
        run: bash src/github.sh

Perfect, with this we already have our data extraction process working. Now that we have the data, let’s see how to tackle the first level of MLOps with R: automatic model deployment. Let’s go with it!

MLOps in R level 1: how to automatically deploy an R model to production

When building machine learning models in R, the most common way to put models into production is by using Docker. In fact, it is the method that I use in the post where I explain how to put Machine Learning models in R into production.

If you are new to Docker, I recommend you read this post where I explain step by step what Docker is and how it works.

That said, how to deploy a model to the cloud automatically will depend on two things:

  1. The cloud we are using.
  2. Our Circle CI tool.

In my opinion, the easiest way to do this deployment is using Google Cloud and Github or Bitbucket. In this case, Google has developed an extension, so that we can execute a task every time we push to a branch of our choice.

Anyway, here’s how to automatically deploy to both AWS and Azure:

  • Azure:
    • Deploy to Kubernetes with Github (link).
    • Deployment to Kubernetes with CircleCI (link).
  • AWS:
    • Deploy to EKS with Github (link).
    • Deploying in EKS with CircleCI (link).

That said, to deploy our model to production there are two different approaches:

  1. Wrap the model in an API. The first case consists of creating an API that, every time it is called, returns a prediction. This is typically the case for models with real-time or near-real-time response.
  2. Create an automation that collects the necessary data, loads the model, and makes the prediction in batch. Within this type of models we find the prediction of probability of company leakage or customer scoring, for example.

Although our example is more suited to the second case, we will take the approach of the first case. So, to put our R model into production with MLOps, the first thing to do is create an API that wraps our model. Let’s see how to do it.

1. Create an API that returns predictions

To create an API in R we are going to use the Plumber library. This library allows you to convert a function into an API in a very simple way. We simply have to indicate the parameters of our function (which will, in turn, be the parameters of the API) and the type of request to make. The value returned by our function will be precisely the value returned by our API.

So, as it is a forecasting model, the only input to the model is the number of days to make predictions on, so our API has the following form:

# Install & load libraries  
libs = c('plumber', 'forecast', 'yaml', 'dplyr', 'timetk', 'lubridate',
         'tidymodels', 'modeltime', 'reticulate','neptune')

sapply(libs[!libs %in% installed.packages()], install.packages)
sapply(libs, require, character.only = T)

# Load Variables
predict_data_info =  read_yaml('config/parameters.yaml')[['predict']]
model_path = predict_data_info$model_path
data_path = predict_data_info$data_path
predictions_path = predict_data_info$predictions_path

# Load model
model = readRDS(model_path)
data = readRDS(data_path)

# Clean data
data$datetime = ymd_hms(data$datetime)
data$date = as.Date(data$datetime)
data$percentage = NULL
data$datetime = NULL


#* @apiTitle Predictions API
#* @apiDescription API to get predictions of Spanish Electricity Demand

#* Get the predictions
#* @param horizon The message to echo
#* @post /get_predictions
function(horizon = "2 months") {

  predictions = model %>%
    modeltime_forecast(
      h = horizon,
      actual_data = data
      ) %>%
    filter(.model_desc != 'ACTUAL') %>%
    select(.index, .value, .conf_lo, .conf_hi)

  predictions$prediction_date = Sys.Date()

  return(predictions)
}

Now that we have the API, we move on to the next step: creating a Docker image from our API.

2. Creating a Docker image of our API

To create a Docker image we will need the following:

  1. API file itself.
  2. The saved model.
  3. The data that has been used to train the model.

In this sense, it is important to note that my working directory is the following:

├───.github
│   └───workflows
│       └───workflows
├───config
│   └───parameters.yaml
├───data
│   ├───last_date.RData
│   └───raw.RData
│
├───get_predictions.R
│
├───outputs
│   ├───best_model.RData
│   ├───model.RData
│   └───predictions.RData
└───src
    ├───check_retrain.R
    ├───data_extraction.R
    ├───github.sh
    ├───model_train.R
    ├───neptune_setup.R
    └───predict.RData

So, for this we will create a Dockerfile that includes all that content of our project within our Docker image to, later, expose our API.

FROM asachet/rocker-tidymodels

# Install required libraries
RUN R -e 'install.packages(c("plumber", "forecast", "yaml", "dplyr", "timetk", "lubridate", "tidymodels", "modeltime"))'

# Copy model and script
RUN mkdir /data
RUN mkdir /config
RUN mkdir /outputs
COPY config/parameters.yaml /config
COPY data/raw.RData /data
COPY get_predictions.R /
COPY outputs/best_model.RData /outputs

# Plumb & run server
EXPOSE 8080
ENTRYPOINT ["R", "-e", \
    "pr <- plumber::plumb('get_predictions.R'); pr$run(host='0.0.0.0', port=8080)"]

So, with this, we are going to build our image:

docker build -t spain_electricity .

Once we have mounted the image, we can verify that it works correctly. To do this, we simply have to expose Docker port 8080 to a port on our computer. We can do this with the following command:

docker run -p 8080:8080 spain_electricity

As we can see in the following image our image works perfectly. So now let’s see how we can make the image automatically deploy to Google Cloud.

3. How to automatically deploy our ML model to Google Cloud

3.1 Enabling the necessary APIs

First of all, we must enable the APIs that we are going to need on our computer, which are: the Cloud Build API (link), the Cloud Run API (link) and the Container Registry API (link). To do this, you simply have to go to each of the links that I indicate and click on the “Enable” or “Enable” button, as shown in the following image:

Now that we have the APIs enabled, we move on to the next step: how to connect Github with Google Cloud.

3.2 How to connect Google Cloud with Github to do MLOps with R

To connect our Github with Google Cloud we have to go to the Cloud Build Triggers section. To give you an idea, Cloud Build is the Google Cloud service that allows you to build Docker images. Well then, we are going to connect this tool with Github so that the Github images are directly uploaded to our Cloud Run.

To do this, the first thing is to connect Cloud Build with our repository, which we can do by clicking the “Connect Repository” button at the bottom of the page, as shown in the following image:

We will simply have to choose our code control tool (Github), log in and then choose the repository to which we want the connection to be made, which in my case is this repository.

Once this is done, the option to create trigger will appear. For now we can ignore it, since it is something that I will explain in the next point.

3.3 How to auto-deploy an image in Cloud Run on push

To execute the automatic deploy, first of all we are going to create a trigger. To do this, from the initial page of Cloud Build we are going to click on “Create Triger”. In this way, a new menu will open, in which we can choose the following questions:

  1. Event that triggers the deploy. We can choose between a push, new tag or a pull request. In our case, we’ll choose push.
  2. Repository that Cloud Build should look at. It must be the same repository that we have connected in the previous step. Likewise, we must choose the branch in which we want it to be fixed, as well as if a specific model must have changed (or not), to do the deploy. In our case, we will deploy only when the best_model.RData file changes.
  3. Cloud Build Configuration. These are the instructions that Cloud Build must follow. In this sense we have three options:
    1. cloudbuild.yaml configuration file: it is a YAML file where we indicate step by step what Cloud should execute.
    2. Dockerfile: it will simply build the Docker image found in the file.
    3. Buildpacks: it is a form of code containerization, used in Python, Go, Java and .Net.

So, in our case, in our case we will use the first option, since it is the one that will allow us, not only to build the image, but also to deploy it.

So, inside the config folder we have included the cloudbuild.yaml file, which contains the following content:

steps:
- name: 'gcr.io/cloud-builders/docker'
  args: ['build', '-t', 'gcr.io/spain-electricity-forecast/github.com/anderfernandez/MLOPs-en-R-con-Google-Cloud:$SHORT_SHA', '.']
- name: 'gcr.io/cloud-builders/docker'
  args: ['push', 'gcr.io/spain-electricity-forecast/github.com/anderfernandez/MLOPs-en-R-con-Google-Cloud:$SHORT_SHA']
- name: 'gcr.io/cloud-builders/gcloud'
  args: ['beta', 'run', 'deploy', 'spain-electricity-forecast', '--image=gcr.io/spain-electricity-forecast/github.com/anderfernandez/MLOPs-en-R-con-Google-Cloud:$SHORT_SHA', '--region=europe-west1', '--platform=managed']

Finally, so that the service account can access Cloud Run to deploy, we have to give it permissions in Cloud Run. To do this, we simply have to go to the Cloud Build settings section (link) and allow the Cloud and Service Accounts API, as shown below:

Perfect, with this we have already completed level 1 of MLOps in R: the automatic deployment of our R model to Google Cloud Run every time there is a change in the trained model.

Now, let’s see how to get the next level of MLOps in R.

MLOps in R level 2: Dashboard visualize the predictive capacity of the model

Creating an application to visualize the predictive capacity of the model in R is very simple thanks to Shiny. In fact, I am not going to spend much time on this point, since it is not a post about how to create a dashboard and I have already explained how to put a Shiny app into production in the Cloud.

In any case, when creating dashboards to visualize the predictive capacity of a machine learning model, it is important to take into account the following:

  1. Visualization of the evolution of the metrics: when we talk about error, not only the absolute value matters, but also how it has been over time. And it is that, the error increases with time, so this graph can give us an idea of ​​the speed of change of the reality that we model.
  2. Comparison of the distribution of the data of training with production. Sometimes, the increase in error comes because one (or several) variables in the model start to have distributions that are significantly different from when the model was trained. Therefore, it is relevant to be able to carry out this analysis.
  3. Take into account future developments. If our organization only stays with MLOP development at level 2, it is important to note that in the future it may want to move to level 3. So it is interesting to think about other highly likely future requirements, such as a button retraining and deployment.

With this in mind, I have personally developed the following simple application:

libs = c('shiny', 'yaml', 'dplyr', 'ggplot2', 'lubridate', 'plotly', 'tidyr')
sapply(libs[!libs %in% installed.packages()], install.packages)
sapply(libs, require, character.only = T)

# Get parameters
config = read_yaml('config.yaml')
data_url = config$data_url
predictions_url = config$predictions_url

ui <- fluidPage(
  includeCSS("www/style.css"),
  h1('Spain Electricity Forecast Dashbaord Control'),

  fluidRow(class = 'metrics',
    column(3, class = "metric_card",
           p("Mean Square Error", class = "metric"),
           textOutput('mse')),
    column(3,  class = "metric_card",
           p("Root Mean Square Error", class = "metric"),
           textOutput('rmse')),
    column(3, class = "metric_card",
           p("Mean Absolute Error", class = "metric"),
           textOutput('mae')),
    column(3, class = "metric_card",
           p("Mean Percentage Error", class = "metric"),
           textOutput('mape') )
  ),

  fluidRow(
    class = 'graphs',
    plotlyOutput('evolution'),

  ), 

  div(class = "retrain",
    p("Retrain or Refresh Data"),
    actionButton('refresh', 'Refresh Data'),
    actionButton('retrain', 'Retrain Model'),
  )
)

# Define server logic required to draw a histogram
server <- function(input, output) {


  data_merged = eventReactive(input$refresh, {

    # Read Data
    download.file(data_url,"data.RData", mode="wb")
    download.file(predictions_url,"predictions.rds", mode="wb")

    data = readRDS("data.RData")
    predictions = readRDS("predictions.rds")

    # Parse Date
    predictions$.index = as.Date(predictions$.index) 
    data$datetime = as.Date(data$datetime)

    # Join the data
    data_merged = left_join(data, predictions, by = c("datetime" = ".index"))
    data_merged = data_merged %>% filter(!is.na(value) & !is.na(.value))

    data_merged

  },
  ignoreNULL = FALSE
  )

  output$mae = renderText({

    data_merged = data_merged()

    d = data_merged$value - data_merged$.value 
    mean(abs(d))

  })

  output$mse = renderText({

    data_merged = data_merged()
    d = data_merged$value - data_merged$.value 
    mean((d)^2)

  })

  output$rmse = renderText({

    data_merged = data_merged()
    d = data_merged$value - data_merged$.value 
    sqrt(mean((d)^2))

  })

  output$mape = renderText({

    data_merged = data_merged()
    d =  data_merged$.value - data_merged$value 
    mape = mean(abs(d)/data_merged$value)
    paste0(round(mape*100,2),"%")

  })


    output$evolution <- renderPlotly({

      data_merged = data_merged()

      gg = data_merged %>% 
        filter(!is.na(.value)) %>%
        rename(prediction = .value) %>%
        select(datetime, value, prediction, .conf_lo, .conf_hi, prediction_date) %>%
        group_by(datetime) %>%
        slice_max(order_by = prediction_date , n = 1) %>%
        ungroup() %>%
        select(-prediction_date) %>%
        pivot_longer(cols = -c("datetime"), names_to = "type", values_to = "value") %>%
        ggplot(aes(datetime, value, col = type)) + geom_line(size = 1) +
        theme_minimal() +
        theme(legend.position = "bottom")

      ggplotly(gg)

    })
}

# Run the application 
shinyApp(ui = ui, server = server)

With this we would have the second level of MLOPs in R covered. Now, let’s see how to create the third level of MLOps with R. Let’s get to it!

MLOps in R level 3: automatic model retraining

In order to retrain our model automatically, we are going to need several elements:

  1. A pipeline for model training and comparison.
  2. A pipeline to check if the model should be retrained or not.

So, let’s see how to program each of these step by step.

How to create a model training script in R

To create a model training pipeline in R we must create an R script that automatically extracts the training data and trains many models. Furthermore, it is vital that our pipeline records all the models that are trained, including the type of model, parameters, metrics obtained, and the model itself.

In my case, I have used Neptune.ai, since it is one of the few tools that are easy to implement, with a free level and with library in R. To do this, you simply have to do the Neptune setup, as shown in the following script:

# Load libraries
libs = c('yaml', 'reticulate','neptune')
sapply(libs[!libs %in% installed.packages()], install.packages)
sapply(libs, require, character.only = T)

# Load variables
train_data_info =  read_yaml('config/parameters.yaml')[['train']]
neptune_envname = train_data_info$neptune$envname

# Install Miniconda
install_miniconda()
if(!neptune_envname %in% conda_list()$name){conda_create(envname = neptune_envname)}
conda_installations = conda_list()
conda_dir = conda_installations$python[conda_installations$name == neptune_envname] %>%
  gsub('\\', '/',., fixed =T)

use_condaenv(conda_dir)

# Install Neptune
neptune_install(
  method = 'conda',
  conda = conda_binary(),
  envname = neptune_envname
)

Likewise, to train the forecasting models I have used the timetk and modeltime libraries, both based on the tidymodels library. In the following script you can see the script in which I train the models, register them in Neptune.ai and save the best model:

libs = c('forecast', 'yaml', 'dplyr', 'timetk', 'lubridate',
         'tidymodels', 'modeltime', 'reticulate','neptune')

sapply(libs[!libs %in% installed.packages()], install.packages)
sapply(libs, require, character.only = T)

# Load Variables
train_data_info =  read_yaml('config/parameters.yaml')[['train']]
resampling = train_data_info$resampling
input = train_data_info$input
output_path = train_data_info$output_path
output_filename = train_data_info$output_filename
allow_parallel = train_data_info$allow_parallel
parallel_cores = train_data_info$parallel_cores
neptune_envname = train_data_info$neptune$envname
neptune_project = train_data_info$neptune$project
model_output = train_data_info$model_output
best_model_output = train_data_info$best_model_output

# Read Data
data = readRDS(input)
neptune_api_key = Sys.getenv('api_key')

# Fix timestamp
data$datetime = ymd_hms(data$datetime)
data$date = as.Date(data$datetime)
data$percentage = NULL
data$datetime = NULL

# Make splits
splits = data %>%
  time_series_split(assess = resampling, cumulative = TRUE)

# Create recipe for preprocessing
recipe_spec_final <- recipe(value ~ ., data = training(splits)) %>%
  step_timeseries_signature(date) %>%
  step_fourier(date, period = 365, K = 5) %>%
  step_rm(date) %>%
  step_rm(all_nominal_predictors()) %>%
  step_rm(date_index.num) %>%
  step_rm(contains("iso"), contains("minute"), contains("hour"),
          contains("am.pm"), contains("xts")) %>%
  step_zv(all_predictors())


bake(prep(recipe_spec_final), new_data =  training(splits)) %>% glimpse(.)

# Create models --> Accepted boost_tree 
if("boost_tree" %in% names(train_data_info$models)){

  model_parameters = train_data_info$models$boost_tree$parameters %>% 
    unlist(.) %>% strsplit(., ',')

  model_parameters = data.frame(model_parameters) %>% tibble()


  model_grid = model_parameters %>%
    create_model_grid(
      f_model_spec = boost_tree,
      engine_name  = train_data_info$models$boost_tree$engine,
      mode         = train_data_info$prediction_mode
  )

}

# Create the workflow set
workflow <- workflow_set(
  preproc = list(recipe_spec_final),
  models = model_grid$.models, 
  cross = T
  )


control_fit_workflowset(
  verbose   = TRUE,
  allow_par = TRUE
)

if(allow_parallel){
  parallel_start(parallel_cores)
}

# Train in parallel
model_parallel_tbl <- workflow %>%
  modeltime_fit_workflowset(
    data    = training(splits),
    control = control_fit_workflowset(
      verbose   = TRUE,
      allow_par = allow_parallel
    )
  )

if(allow_parallel){
  parallel_stop()
}

calibrated_table = model_parallel_tbl %>%
  modeltime_calibrate(testing(splits)) 

accuracy_table = calibrated_table %>% modeltime_accuracy()

# Extract parameters for each model
for(i in 1:nrow(accuracy_table)){

  run <- neptune_init(
    project= neptune_project,
    api_token= neptune_api_key,
    python = 'conda',
    python_path = conda_dir
  )

  # Extract & log parameters
  model_specs = extract_spec_parsnip(workflow$info[[i]]$workflow[[1]])
  parameters = list()
  parameters[['engine']] = model_specs$engine
  parameters[['mode']] = model_specs$mode

  for(arg in names(model_specs$args)){
    parameters[[arg]] = as.character(model_specs$args[[arg]])[2]  
  }

  run["parameters"] = parameters

  # Add each metric in the accuracy table
  for(col in 4:ncol(accuracy_table) ){
    print(col)
    metric = colnames(accuracy_table)[col]
    run[paste0("evaluation/",metric)] = accuracy_table[[i, col]]
  }

  saveRDS(workflow$info[[i]]$workflow, model_output)
  neptune_upload(run["model"], model_output)

}

# Get best model
best_model = accuracy_table$.model_id[accuracy_table$mae == min(accuracy_table$mae)] 

# Save best model
saveRDS(calibrated_table[best_model,], best_model_output)

Perfect, with this we already have our pipeline to train our model within our MLOPs process in R. However, how do we convert these files into a pipeline? Let’s see!

How to turn the training script into a pipeline

To turn our script into a pipeline, we’re going to create a Github Action that, when launched, runs the script. Also, this Github Action needs to be executable from multiple places, because you’ll need to be able to run it when checking in both in the automatic model checkout and manually from the dashboard.

So, when turning our training script into a pipeline, we’ll use an HTTP request as a trigger. In this way, we can easily launch this process from anywhere.

So, with this in mind, the retrain_model.yaml file that defines our retrain automation looks like this:

name: retrain_model

on:
  repository_dispatch:
    types: retrain_model

jobs:
  build:
    runs-on: ubuntu-18.04 
    container: 
     image: asachet/rocker-tidymodels

    steps:
      - name: Checkout
        uses: actions/checkout@v2

      - name: Install R
        uses: r-lib/actions/setup-r@v2 # Instalo R
        with: 
          r-version: '4.1.1'

      - name: Setup Neptune
        run: Rscript src/neptune_setup.R

      - name: Retrain Model
        run: Rscript src/model_train.R

      - name: Push data to Github
        run: bash src/github.sh

With this we would already have an MLOps system that we can retrain whenever we want simply by pressing a button. However, the idea is that the model is trained recursively. Let’s see how to automate the retraining of our model in R.

How to check model retraining

To automate the model retraining check, we are going to create another automation with Github Actions that is executed daily and that, in case the error is higher than the limit that we have set, launches the workflow that we have defined previously via an HTTP request.

If you don’t know how to launch a Github Actions workflow using an HTTP request, I recommend you read this post where I explain how to do it.

This execution is achieved with the following script:

# Install & load libraries  
libs = c( 'yaml', 'dplyr', 'lubridate', 'httr', 'glue', 'jsonlite')
sapply(libs[!libs %in% installed.packages()], install.packages)
sapply(libs, require, character.only = T)

# Load Variables
GITHUB_TOKEN = Sys.getenv("TOKEN")

retrain_info =  read_yaml('config/parameters.yaml')[['retrain']]
predictions_path = retrain_info$predictions_path
data_path = retrain_info$data_path
max_mae = retrain_info$max_mae
n_days = retrain_info$n_days
github_user = retrain_info$github_user
github_repo = retrain_info$github_repo
github_event_type = retrain_info$github_event_type

# Read Data
data = readRDS(data_path)
predictions = readRDS(predictions_path)


# Unify data
data$datetime = ymd_hms(data$datetime) %>% as.Date(.)
predictions$.index = as.Date(predictions$.index)

# For each date, get latest prediction % join the real data
comparison = predictions %>%
  group_by(.index, prediction_date) %>%
  arrange(desc(prediction_date)) %>%
  slice(1) %>%
  ungroup() %>%
  inner_join(data, by = c('.index' = 'datetime')) %>%
  slice_max(n = n_days, order_by = .index)

mae = mean(abs(comparison$value - comparison$.value))

if(mae > max_mae){

  url = glue('https://api.github.com/repos/{github_user}/{github_repo}/dispatches')

  body  = list("event_type" = github_event_type) 
  resp = POST(url, 
       add_headers('Authorization' = paste0('token ', GITHUB_TOKEN)),
       body = jsonlite::toJSON(body, pretty = T, auto_unbox = T)
       )
}

However, having this script is not enough: we need to be able to automate it. To do this, we are going to create a Github Actions that executes it periodically.

It is important that the retrain check is always run after data extraction.

So, let’s create another Github Actions to run our retrain check script every day. We achieve this with the following yaml file:

name: check_retrain

on:
  schedule:
    - cron: '0 9 * * *'  
  workflow_dispatch: 

jobs:
  build:
    runs-on: ubuntu-18.04
    steps:
      - name: Checkout
        uses: actions/checkout@v2

      - name: Update
        run: sudo apt-get update

      - name: Install necesary libraries
        run:  sudo apt-get install -y curl libssl-dev libcurl4-openssl-dev libxml2-dev

      - name: Install R
        uses: r-lib/actions/setup-r@v2 # Instalo R
        with: 
          r-version: '4.1.1'

      - name: Set environment variables
        run: |
          echo TOKEN="$secrets.TOKEN" >> ~/.Renviron

      - name: Run Retrain Check
        run: Rscript src/check_retrain.R

Perfect! We already have our MLOPs system in R almost finished. And it is that we only have one question left: periodically make requests to our API and save them, in such a way that our dashboard can feed on them. Let’s see how to do it!

How to automate getting predictions

Having seen all of the above, I guess you can imagine how we’re going to automate getting predictions: Github Actions. In fact, in our case it is quite simple, since I am simply going to make predictions every day and save all the results.

So I’m just going to have to create two files:

  1. A script that makes API requests and does an append to the current predictions file.
  2. A yaml to run that process every day.

Regarding the first step, we must take into account both the URL in which our service has been deployed and the endpoint in which we have located our function. So, the script is as follows:

# Load libraries
libs = c('yaml', 'httr', 'jsonlite')
sapply(libs[!libs %in% installed.packages()], install.packages)
sapply(libs, require, character.only = T)

# Load variables
predict_info =  read_yaml('config/parameters.yaml')[['predict']]
horizon = predict_info[['horizon']]
service_url = predict_info[['service_url']]
service_endpoint = predict_info[['service_endpoint']]
predictions_path = predict_info[['predictions_path']]

# Construct the API call
url = paste0(service_url,'/',service_endpoint)

resp = POST(url)
predictions = fromJSON(content(resp, type = "text")) 

# Read the predictions
past_predictions = readRDS(predictions_path)

# Merge files
predictions$.index = as.Date(predictions$.index)
predictions$prediction_date = as.Date(predictions$prediction_date)
all_predictions = bind_rows(past_predictions, predictions)

# Save predictions
saveRDS(all_predictions, predictions_path)

Finally, to automate this script we have used the following yaml file:

name: make_predictions

on:
  schedule:
    - cron: '0 10 * * *' 
  workflow_dispatch: 

jobs:
  build:
    runs-on: ubuntu-18.04
    steps:
      - name: Checkout
        uses: actions/checkout@v2

      - name: Update
        run: sudo apt-get update

      - name: Install necesary libraries
        run:  sudo apt-get install -y curl libssl-dev libcurl4-openssl-dev libxml2-dev

      - name: Install R
        uses: r-lib/actions/setup-r@v2
        with: 
          r-version: '4.1.1'

      - name: Run Retrain Check
        run: Rscript src/get_predictions.R

      - name: Push data to Github
        run: bash src/github.sh

With this we would have our MLOPs model in R finished.

MLOps conclusion in R

The first and main conclusion is that yes, it is possible to do MLOps in R. However, although doing MLOps in R is possible, we must be aware that yes or yes we are going to have to use Docker containers and use tools such as Github Actions or Circle CI.

Personally, I encourage you to carry out this same process with an API that offers data in real time, since it is a complex process in which, without a doubt, a lot is learned.

In any case, I hope you liked this post about MLOps in R. See you in the next post!

Blog sponsored by: