March 23, 2023

Machine Learning How To: Continuous Integration (Part I)

Machine Learning How To: Continuous Integration (Part I)

I've been writing on different stages of the machine learning (ML) life-cycle in a series of blog posts. The first article was about building traceable and reproducible pipelines. This allows for data to be version-controlled and experiments to be run in a way that makes comparisons easy and reproducible. The second article covered exposing ml inference as a service. I used Fastapi to write APIs and docker to containerise the code and gave code examples of how to run a simple application that predicts the sentiment of text on a local machine.
In this article, I will cover unit tests and integration tests. In the second part, I will go over linting and setting up a CI pipeline. These are best practices for any software project and are not unique to machine learning. Tests ensure that verifies that the code behaves as expected and that refactoring does not break the existing functionalities. Linting makes sure that your code conforms to the standard formatting rules and that there are no syntax errors or unused variables. A CI pipeline automatically runs all the existing tests whenever you push your code to a remote repository or create a pull request and check for any linting issues. In the case of errors, the build fails and you will be notified. This keeps the quality of code in check and saves you a lot of time by fixing bugs early before it affects users or other team members.
I will be using the mlseries repo for code examples and Pytest for tests.

Unit tests

The simple application in this repo has two main folders, ml/ which contains all the ML code, including building the dvc pipelines for data preprocessing, training and testing the model and the inference service.
The other folder is the server/ folder which includes the code for the backend service receiving requests from the user and sending them to ML's inference service.
Unit testing for ML systems can be different from traditional software due to the indeterministic nature of parts of the system. Here's a great article from Jeremy Jordan on testing ML systems. In this post, the focus is on the deterministic aspects of the ML code for testing. I use Pytest for testing.

Mock dependencies and isolate tests with SetUp

Below is a code snippet for preparing the data for train and testing:

# /ml/prepare_data/main.py

import pandas as pd
from model_params import TrainTestSplit
from sklearn.model_selection import train_test_split

from .config import Config


def split_train_test_data(config=Config()):
    data = pd.read_csv(f"{config.data_path}/{config.data_file}", sep="\t")
    data = data[(data["Sentiment"] == 0) | (data["Sentiment"] == 4)]
    data["labels"] = data.apply(lambda x: 0 if x["Sentiment"] == 0 else 1, axis=1)

    data_train, data_test = train_test_split(data, test_size=TrainTestSplit.TEST_SIZE)

    data_train.to_parquet(f"{config.data_path}/{config.train_data_file}")
    data_test.to_parquet(f"{config.data_path}/{config.test_data_file}")

In this simple function, the data is loaded, only rows with labels 0 and 4 are selected, and a new binary column, labels is added based on the Sentiment value modified to 0 and 1, then the selected data is split into train and test sets and finally saved as parquet files.
In general, a function may have dependencies on other functions which affect its return value. For example, the split_train_test_data function above depends on pandas reading a certain file. In addition, train_test_split from sklearn is used to split the dataframe and finally, the result is saved to disk using pandas.
When writing a unit test, the goal is to test the behaviour of the function in question and assume all its dependencies return the expected results. We, therefore, do not need to worry about reading and writing the data (which might be large and slow down tests) or whether train_test_split works or not. We can mock these dependencies and inject them into the function. Python has a built-in unittest library that can be used for this purpose.
I create a test folder inside the ml/ directory named tests/unittests/prepare_data/ and add a file test_prepare_data.py there. The folder structure is a personal preference. I find it easier to follow if the tests mimic the same structure of the code. Below is the content of this file:

# /ml/tests/unittests/prepare_data/test_prepare_data.py

from unittest import mock, TestCase
from unittest.mock import Mock, patch

import pandas as pd
from prepare_data.__main__ import split_train_test_data


class TestDataPreparation(TestCase):
    def setUp(self):
        # Create a mock self.config object and set data paths
        self.config = Mock()
        self.config.data_path = "/data"
        self.config.data_file = "data_file.csv"
        self.config.train_data_file = "train.csv"
        self.config.test_data_file = "test.csv"

    @patch("prepare_data.__main__.Config", autospec=True)
    @patch("prepare_data.__main__.pd.read_csv")
    @patch("prepare_data.__main__.train_test_split")
    @patch("prepare_data.__main__.pd.DataFrame.to_parquet")
    def test_split_train_test_data(self, mock_to_parquet, mock_train_test_split, mock_read_csv, mock_config):

        mock_config.return_value = self.config
        
        # Mock what pd.read_csv will return
        mock_read_csv.return_value = pd.DataFrame(
            {
                "Sentiment": [0, 1, 4, 2, 4, 3, 0, 4, 2, 0, 1],
                "Phrase": [
                    "foo",
                    "char",
                    "kar",
                    "bar",
                    "lar",
                    "baz",
                    "qux",
                    "quux",
                    "corge",
                    "vid",
                    "chill",
                ],
            }
        )

        # The expected intermediary dataframe before calling train_test_split()
        expected_data_before_splitting = pd.DataFrame(
            {
                "Sentiment": [0, 4, 4, 0, 4, 0],
                "Phrase": ["foo", "kar", "lar", "qux", "quux", "vid"],
                "labels": [0, 1, 1, 0, 1, 0],
            }
        )

        # Define a return value for train_test_split()
        mock_train_test_split.return_value = (
            pd.DataFrame(
                {
                    "Sentiment": [0, 0, 4, 4, 0],
                    "Phrase": ["foo", "kar", "lar", "qux", "vid"],
                    "labels": [0, 0, 1, 1, 0],
                }
            ),
            pd.DataFrame({"Sentiment": [4], "Phrase": ["quux"], "labels": [1]}),
        )

        # Call the function
        split_train_test_data()

        # Verify that pd.read_csv() was called once with the mocked path
        mock_read_csv.assert_called_once_with(
            f"{self.config.data_path}/{self.config.data_file}", sep="\t"
        )

        # Verify that train_test_split() was called with the expected df
        pd.testing.assert_frame_equal(
            mock_train_test_split.call_args[0][0].reset_index(drop=True),
            expected_data_before_splitting.reset_index(drop=True),
        )

        # Verify that to_parquet was called twice
        assert mock_to_parquet.call_count == 2
        
        # Check that the train/test data called to_parquet with the correct         paths
        mock_to_parquet.assert_any_call(
            f"{self.config.data_path}/{self.config.train_data_file}"
        )
        mock_to_parquet.assert_any_call(
            f"{self.config.data_path}/{self.config.test_data_file}"
        )

As seen, anything that is not part of the core functionality of split_train_test_data() is mocked. Here's a breakdown of how to write the above test code:

  1. Create a test class that inherits from unittest.TestCase. TestCase has the interface for running and testing code.
  2. Create a setUp method which is inherited from the base class and runs before any defined test. This is especially useful if you have multiple tests using the same set-up. In that case, the tearDown method should be added so that after every test any changes made to the setup are removed and the next test uses the original set-up. Since my test class only has one method, I didn't use the tearDown method.
  3. What should be set up before testing split_train_test_data? I need to define the Config class and set its attributes since I don't want the function to actually read/write data from/to the real data_path. So, I define a mock config object using Mock() and set the attributes to dummy strings.
  4. Define a method that will test split_train_test_data. The name of the test function should start with "test" for Pytest to find it, e.g. test_<name_of_function_to_be_tested>.
  5. Decorate the test method with the mocked objects. I use unittest.mock.patch for this. There are 4 mocked objects here: the config instance, read_csv, train_test_split and to_parquet. These mocked objects are passed to the test function.
    Note that for Config, the autospec is set to true. This ensures that the mocked config object has all the attributes of the Config class.
  6. Inside the test, set the return values of the mocked objects to static predefined values. For example, create a dataframe with dummy rows and set that to be the return value of pd.read_csv. Remember that the functionality that I want to test here is after the data is loaded (or mocked) does split_train_test_data: 1) extract only the phrases with sentiment values 0 and 4 and 2) add a labels column with values 0 and 1.
  7. Based on the defined dummy data, create the expected dataframe that split_train_test_data should pass to train_test_split. I call this expected_data_before_splitting
  8. Call split_train_test_data.
  9. Verify if the dependencies were called the expected number of times with the expected values. To be more specific:
    1. Verify that read_csv() was called once with the mocked configured path.
    2. Make sure that train_test_split was called with expected_data_before_splitting.
    3. split_train_test_data makes two calls to to_parquet. I verify that this is the case in the test with assert mock_to_parquet.call_count == 2.
    4. I also verify that to_parquet was called with the correct dummy data paths and file names defined in the setUp method.

Manage dependencies using pytest.fixtures

Similarly, unit tests for other parts of the code, e.g. training, and inference can be written. I will give one more example, but this time instead of using unittest.TestCase.setUp I use pytest.fixture. A fixture is a function that returns an object or value that is required by one or more tests. pytest.fixture is a decorator that allows you to define and use fixtures for your tests. The fixtures need to be passed as input to the tests that require their return values. This way, similar to unittest.TestCase.setUp, when the tests are run, the fixture functions are run first and their return value is passed to the tests that take them as input.
You can define different scopes for your fixtures, e.g. function, module, and class. The scope determines how often the fixture function should be called and when should it get destroyed. For example, in the default scope, function, the fixture functions are called and destroyed after each test function using them. Here's a great article from Xiaoxu Gao about fixtures and their scopes.
Let's see fixtures in action. Below is the code snippet for creating a Pipeline from sklearn and fitting it to the training data:

# ml/train/__main__.py

import joblib
import pandas as pd
from model_params import LogisticRegressionConfig
from sklearn.feature_extraction.text import CountVectorizer, TfidfTransformer
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline

from .config import Config


def train():
    """
    Loads the training data, creates, and fits a pipeline on
    the data and saves the model
    """
    config = Config()

    lr_params = {
        "n_jobs": LogisticRegressionConfig.n_jobs,
        "C": LogisticRegressionConfig.C,
        "max_iter": LogisticRegressionConfig.max_iter,
    }
    train_dataframe = pd.read_parquet(f"{config.data_path}/{config.train_data_file}")

    X = train_dataframe["Phrase"]
    y = train_dataframe["labels"]

    clf = Pipeline(
        [
            ("vect", CountVectorizer()),
            ("tfidf", TfidfTransformer()),
            ("clf", LogisticRegression(**lr_params)),
        ]
    )
    clf.fit(X, y)
    joblib.dump(clf, f"{config.model_path}/{config.model_file}")

As seen in the code above, after the configurable variables and the training data are loaded, an sklearn.Pipeline.pipline is defined and fit to the data. Finally, the fit model is saved to the configured path. A unit test can verify whether:

  1. The model was loaded from the expected path.
  2. A pipeline object was fit to the data and dumped to the disk

Since I don't want to read the real training data, I will define a mocked Config class instance using fixtures. Below is the unit test code:

# ml/tests/unittests/train/

import pandas as pd
import pytest

from sklearn.pipeline import Pipeline
from unittest.mock import Mock, patch
from train.__main__ import train


@pytest.fixture
def config():
    # Create a mock config object and set data paths
    config = Mock()
    config.data_path = "/path/to/data"
    config.train_data_file = "train_data.parquet"
    config.model_path = "/path/to/model"
    config.model_file = "model.joblib"
    return config


@patch("train.__main__.Config", autospec=True)
@patch("train.__main__.pd.read_parquet")
@patch("train.__main__.joblib.dump")
def test_train(mock_dump, mock_read_parquet, mock_config, config):
    mock_config.return_value = config

    # Create mock data
    data = pd.DataFrame(
        {
            "Phrase": ["this is a test", "this is another test"],
            "labels": [0, 1],
        }
    )
    mock_read_parquet.return_value = data

    # Call the train function
    train()

    # Check that pd.read_parquet was called with the correct file path
    pd.read_parquet.assert_called_once_with(
        f"{config.data_path}/{config.train_data_file}"
    )

    # Check that joblib.dump was called with a Pipeline object
    _, args, _ = mock_dump.mock_calls[0]
    assert isinstance(args[0], Pipeline)

The main difference with test_split_train_test_data is the use of pytest.fixture. Notice that there are two arguments related to Config passed to the test function above, one is, mock_config, which replaces the call to config = Config() in ml/train/__main__.py. The other is the fixture function config which contains the mocked config object with the set dummy values. Now I need to actually set this mocked config object to the return value of the call to Config() inside ml/train/__main__.py. This is what the first line in test_train() does.
I then define a dataframe and set it to be the return value of read_parquet and verify that this dataframe was attempted to be read using the correct mocked data path.
Lastly, I check that joblib.dump saved a pipeline object.

Integration Tests

In unit tests, the goal is to verify that a small isolated piece of code works as expected. This is useful but doesn't give us the confidence that the different components of the application work together. This is the purpose of integration tests. With integration tests we don't write a test for every function involved in a certain service and rather treat the service as a black box. For example, in this codebase, I want to test the server service and make sure that making requests to the review_sentiment endpoint with different reviews results in the expected status codes and response messages. In this case, I don't need to know how the server service runs, whether it calls some other internal services or not. All I care about is, if I hit the endpoint with a valid input, do I get the expected result. Because the test treats the implementation of the service as a black box, if any changes are made to any components related to this service during refactoring, I can be confident in the changes not affecting the expected results by seeing the integration test pass.
These tests are more involved and might require some dependencies to be set up before tests, e.g. a local test database. However, the tests should be contained to the internal service you are testing. For example, if your endpoint, makes a POST request to S3, or the ML service, you should assume that those services work and return the expected results.
Let's see an example. Below is the code in server/server_main.py which I want to test.

# server/server_main.py

import re

import requests
from config import Config
from entities.review import Review
from fastapi import FastAPI, HTTPException, status

app = FastAPI()
config = Config()


@app.post("/review_sentiment/", status_code=status.HTTP_200_OK)
def add_review(review: Review):
    """
    This endpoint receives reviews, and creates an appropriate
    payload and makes a post request to ml's endpoint for a
    prediction on the input

    Args:
        review(Review): the input should conform to the Review model

    Returns:
        response(dict): response from ml's endpoint which includes
                        prediction details

    Raises:
        HTTPException: if the input is a sequence of numbers
    """
    endpoint = f"{config.ml_base_uri}/prediction_job/"
    if re.compile(r"^\-?[1-9][0-9]*$").search(review.text):
        raise HTTPException(status_code=422, detail="Review cannot be a number")

    resp = requests.post(endpoint, json=review.dict())
    return resp.json()

In this simple service, a Review object is received, if the text in the review is a number, an exception is raised with the message "Review cannot be a number". Otherwise, the review is posted to endpoint which is the ML service and its response is returned.
To test this service, I need to simulate the server app and make a POST request to it. FastAPI's TestClient makes this possible. As mentioned earlier, the server's external dependencies should be mocked. To mock the POST request to the ml service, I use requests_mock.
To test the server's review_sentiment endpoint, I create a tests/integration_tests directory and add a test_server_main.py. file with the contents below:

# server/tests/integration_tests/test_server_main.py

import re

import pytest
from config import Config
from entities.review import Review
from fastapi.testclient import TestClient
from main import add_review, app
from pydantic import ValidationError
from requests_mock import ANY


@pytest.fixture
def config():
    return Config()


@pytest.fixture(scope="module")
def module_client():
    with TestClient(app) as c:
        yield c


@pytest.fixture
def client(module_client, requests_mock):
    # ref: https://github.com/encode/starlette/issues/818
    test_app_base_url_prefix_regex = re.compile(
        rf"{re.escape(module_client.base_url)}(/.*)?"
    )
    requests_mock.register_uri(ANY, test_app_base_url_prefix_regex, real_http=True)
    return module_client


def test_add_review(config, requests_mock, client):
    review1 = "hi"
    with pytest.raises(ValidationError) as exc_info:
        add_review(Review(text=review1))
    assert exc_info.value.errors() == [
        {
            "loc": ("text",),
            "msg": "min_length:4",
            "type": "value_error.any_str.min_length",
            "ctx": {"limit_value": 4},
        }
    ]

    review2 = 123458303
    response = client.post("/review_sentiment/", json=Review(text=review2).dict())
    assert response.status_code == 422
    assert response.json()["detail"] == "Review cannot be a number"

    # Mock the request to ML, and its response
    adapter = requests_mock.post(
        f"{config.ml_base_uri}/prediction_job/",
        json={"prediction": "positive", "prediction_score": 80},
    )
    review3 = "very pleasant. highly recommend"
    client.post("/review_sentiment/", json=Review(text=review3).dict())
    
    # Verify that a call to ML was made with the correct input
    assert adapter.last_request.json() == {"text": review3}
    assert adapter.call_count == 1

Here's a break down of the code:

  1. Create an instance of Config using pytest.fixture.
  2. Define a TestClient instance by passing it the server app, I call this module_client. I set the scope of this fixture to module for it to be used for all tests in the script.
  3. Since I want to only test the server endpoint and mock ML's, I need to make sure that in the test function, calls to any of the server endpoints are not mocked. For this, I need to override requests_mock in a fixture by adding the server's base URL to its registered URIs and setting real_http=True. This is what the client fixture does. To be more specific:
    1. I define a regex pattern for the server's base URL which can have any set of characters after module_client.base_url/
    2. Register this URL pattern with ANY http methods, to be a real http request
  4. Define a test function, test_add_review, and pass all the fixtures to it.
  5. Define faulty inputs and verify that the expected results are returned, this is what review1 and review2 test.
  6. Define a valid input, review3, mock the post request to ML and set its response, e.g. {"prediction": "positive", "prediction_score": 80}.
  7. Verify that the server test app did call ML service with review3.

Similarly, the ML service can be tested. I will not include the code snippet here since it's very similar to the server's integration tests, but you can find it in the repo.

Running the Tests

For the tests to run successfully on your local machine, you need to set up some paths and environment variables. These configurations can go in a conftest.py which is a special file used by Pytest for any plugins, fixtures, paths, etc. that need to be used for the tests to run successfully.
I create a separate conftest.py for the ml/ and server/ directories. Because I want to be able to run the code outside of docker, I need to provide Pytest with the path to shared_utils/logger which is mounted to the containers. I do this by adding the lines below to ml/conftest.py:

# ml/conftest.py

import os
import sys

"""
Loading shared scripts for unittest
This prevents throwing errors when unit test is running without logging code mounted in utils/
"""
logger_path = os.path.abspath("./shared_utils/unittest")
sys.path.insert(0, logger_path)


# Change directory to /ml because the ml code runs from /ml as root dir and this allows the calls to loading the model, etc. to be successful
os.chdir(os.getcwd() + "/ml")

My goal is to have the tests run from the root of the repo. I think it is easier this way for all tests to run from one place and avoid the need to switch between directories.
In order for the paths to the data and model to be successful for the ml scripts, I change the directory to ml after inserting the logger path.
For server/conftest.py, because server's config uses some environment variables, I use load_dotenv.

# server/conftest.py

from dotenv import load_dotenv

load_dotenv("../deploy/.env")

I can now run the tests from the terminal in the root of the repo by simply running pytest:

Running all tests from the root with pytest

That's it for part one of this blog subseries. I covered writing unit tests and integration tests, mocking dependencies, isolating using setUp and fixtures and configuring dependencies for running tests with Pytest.
In the second part, I will go over linting, how to automate tasks using invoke and finally how to set up a CI pipeline with Github Actions.

I hope you found this post useful. If you have any questions or feedback you can reach me on Twitter.