Machine Learning How To: Continuous Integration (Part I)
I've been writing on different stages of the machine learning (ML) life-cycle in a series of blog posts. The first article was about building traceable and reproducible pipelines. This allows for data to be version-controlled and experiments to be run in a way that makes comparisons easy and reproducible. The second article covered exposing ml inference as a service. I used Fastapi to write APIs and docker to containerise the code and gave code examples of how to run a simple application that predicts the sentiment of text on a local machine.
In this article, I will cover unit tests and integration tests. In the second part, I will go over linting and setting up a CI pipeline. These are best practices for any software project and are not unique to machine learning. Tests ensure that verifies that the code behaves as expected and that refactoring does not break the existing functionalities. Linting makes sure that your code conforms to the standard formatting rules and that there are no syntax errors or unused variables. A CI pipeline automatically runs all the existing tests whenever you push your code to a remote repository or create a pull request and check for any linting issues. In the case of errors, the build fails and you will be notified. This keeps the quality of code in check and saves you a lot of time by fixing bugs early before it affects users or other team members.
I will be using the mlseries repo for code examples and Pytest for tests.
Unit tests
The simple application in this repo has two main folders, ml/
which contains all the ML code, including building the dvc pipelines for data preprocessing, training and testing the model and the inference service.
The other folder is the server/
folder which includes the code for the backend service receiving requests from the user and sending them to ML's inference service.
Unit testing for ML systems can be different from traditional software due to the indeterministic nature of parts of the system. Here's a great article from Jeremy Jordan on testing ML systems. In this post, the focus is on the deterministic aspects of the ML code for testing. I use Pytest for testing.
Mock dependencies and isolate tests with SetUp
Below is a code snippet for preparing the data for train and testing:
# /ml/prepare_data/main.py
import pandas as pd
from model_params import TrainTestSplit
from sklearn.model_selection import train_test_split
from .config import Config
def split_train_test_data(config=Config()):
data = pd.read_csv(f"{config.data_path}/{config.data_file}", sep="\t")
data = data[(data["Sentiment"] == 0) | (data["Sentiment"] == 4)]
data["labels"] = data.apply(lambda x: 0 if x["Sentiment"] == 0 else 1, axis=1)
data_train, data_test = train_test_split(data, test_size=TrainTestSplit.TEST_SIZE)
data_train.to_parquet(f"{config.data_path}/{config.train_data_file}")
data_test.to_parquet(f"{config.data_path}/{config.test_data_file}")
In this simple function, the data is loaded, only rows with labels 0
and 4
are selected, and a new binary column, labels
is added based on the Sentiment
value modified to 0
and 1
, then the selected data is split into train and test sets and finally saved as parquet files.
In general, a function may have dependencies on other functions which affect its return value. For example, the split_train_test_data
function above depends on pandas
reading a certain file. In addition, train_test_split
from sklearn
is used to split the dataframe and finally, the result is saved to disk using pandas
.
When writing a unit test, the goal is to test the behaviour of the function in question and assume all its dependencies return the expected results. We, therefore, do not need to worry about reading and writing the data (which might be large and slow down tests) or whether train_test_split
works or not. We can mock these dependencies and inject them into the function. Python has a built-in unittest
library that can be used for this purpose.
I create a test folder inside the ml/
directory named tests/unittests/prepare_data/
and add a file test_prepare_data.py
there. The folder structure is a personal preference. I find it easier to follow if the tests mimic the same structure of the code. Below is the content of this file:
# /ml/tests/unittests/prepare_data/test_prepare_data.py
from unittest import mock, TestCase
from unittest.mock import Mock, patch
import pandas as pd
from prepare_data.__main__ import split_train_test_data
class TestDataPreparation(TestCase):
def setUp(self):
# Create a mock self.config object and set data paths
self.config = Mock()
self.config.data_path = "/data"
self.config.data_file = "data_file.csv"
self.config.train_data_file = "train.csv"
self.config.test_data_file = "test.csv"
@patch("prepare_data.__main__.Config", autospec=True)
@patch("prepare_data.__main__.pd.read_csv")
@patch("prepare_data.__main__.train_test_split")
@patch("prepare_data.__main__.pd.DataFrame.to_parquet")
def test_split_train_test_data(self, mock_to_parquet, mock_train_test_split, mock_read_csv, mock_config):
mock_config.return_value = self.config
# Mock what pd.read_csv will return
mock_read_csv.return_value = pd.DataFrame(
{
"Sentiment": [0, 1, 4, 2, 4, 3, 0, 4, 2, 0, 1],
"Phrase": [
"foo",
"char",
"kar",
"bar",
"lar",
"baz",
"qux",
"quux",
"corge",
"vid",
"chill",
],
}
)
# The expected intermediary dataframe before calling train_test_split()
expected_data_before_splitting = pd.DataFrame(
{
"Sentiment": [0, 4, 4, 0, 4, 0],
"Phrase": ["foo", "kar", "lar", "qux", "quux", "vid"],
"labels": [0, 1, 1, 0, 1, 0],
}
)
# Define a return value for train_test_split()
mock_train_test_split.return_value = (
pd.DataFrame(
{
"Sentiment": [0, 0, 4, 4, 0],
"Phrase": ["foo", "kar", "lar", "qux", "vid"],
"labels": [0, 0, 1, 1, 0],
}
),
pd.DataFrame({"Sentiment": [4], "Phrase": ["quux"], "labels": [1]}),
)
# Call the function
split_train_test_data()
# Verify that pd.read_csv() was called once with the mocked path
mock_read_csv.assert_called_once_with(
f"{self.config.data_path}/{self.config.data_file}", sep="\t"
)
# Verify that train_test_split() was called with the expected df
pd.testing.assert_frame_equal(
mock_train_test_split.call_args[0][0].reset_index(drop=True),
expected_data_before_splitting.reset_index(drop=True),
)
# Verify that to_parquet was called twice
assert mock_to_parquet.call_count == 2
# Check that the train/test data called to_parquet with the correct paths
mock_to_parquet.assert_any_call(
f"{self.config.data_path}/{self.config.train_data_file}"
)
mock_to_parquet.assert_any_call(
f"{self.config.data_path}/{self.config.test_data_file}"
)
As seen, anything that is not part of the core functionality of split_train_test_data()
is mocked. Here's a breakdown of how to write the above test code:
- Create a test class that inherits from
unittest.TestCase
.TestCase
has the interface for running and testing code. - Create a
setUp
method which is inherited from the base class and runs before any defined test. This is especially useful if you have multiple tests using the same set-up. In that case, the tearDown method should be added so that after every test any changes made to the setup are removed and the next test uses the original set-up. Since my test class only has one method, I didn't use thetearDown
method. - What should be set up before testing
split_train_test_data
? I need to define theConfig
class and set its attributes since I don't want the function to actually read/write data from/to the realdata_path
. So, I define a mock config object usingMock()
and set the attributes to dummy strings. - Define a method that will test
split_train_test_data
. The name of the test function should start with "test" for Pytest to find it, e.g.test_<name_of_function_to_be_tested>
. - Decorate the test method with the mocked objects. I use
unittest.mock.patch
for this. There are 4 mocked objects here: theconfig
instance,read_csv
,train_test_split
andto_parquet
. These mocked objects are passed to the test function.
Note that forConfig
, theautospec
is set to true. This ensures that the mocked config object has all the attributes of theConfig
class. - Inside the test, set the return values of the mocked objects to static predefined values. For example, create a dataframe with dummy rows and set that to be the return value of
pd.read_csv
. Remember that the functionality that I want to test here is after the data is loaded (or mocked) doessplit_train_test_data
: 1) extract only the phrases with sentiment values0
and4
and 2) add alabels
column with values0
and1
. - Based on the defined dummy data, create the expected dataframe that
split_train_test_data
should pass totrain_test_split
. I call thisexpected_data_before_splitting
- Call
split_train_test_data
. - Verify if the dependencies were called the expected number of times with the expected values. To be more specific:
- Verify that
read_csv()
was called once with the mocked configured path. - Make sure that
train_test_split
was called withexpected_data_before_splitting
. split_train_test_data
makes two calls toto_parquet
. I verify that this is the case in the test withassert mock_to_parquet.call_count == 2
.- I also verify that
to_parquet
was called with the correct dummy data paths and file names defined in thesetUp
method.
- Verify that
Manage dependencies using pytest.fixtures
Similarly, unit tests for other parts of the code, e.g. training, and inference can be written. I will give one more example, but this time instead of using unittest.TestCase.setUp
I use pytest.fixture
. A fixture is a function that returns an object or value that is required by one or more tests. pytest.fixture
is a decorator that allows you to define and use fixtures for your tests. The fixtures need to be passed as input to the tests that require their return values. This way, similar to unittest.TestCase.setUp
, when the tests are run, the fixture functions are run first and their return value is passed to the tests that take them as input.
You can define different scopes for your fixtures, e.g. function, module, and class. The scope determines how often the fixture function should be called and when should it get destroyed. For example, in the default scope, function, the fixture functions are called and destroyed after each test function using them. Here's a great article from Xiaoxu Gao about fixtures and their scopes.
Let's see fixtures in action. Below is the code snippet for creating a Pipeline
from sklearn
and fitting it to the training data:
# ml/train/__main__.py
import joblib
import pandas as pd
from model_params import LogisticRegressionConfig
from sklearn.feature_extraction.text import CountVectorizer, TfidfTransformer
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from .config import Config
def train():
"""
Loads the training data, creates, and fits a pipeline on
the data and saves the model
"""
config = Config()
lr_params = {
"n_jobs": LogisticRegressionConfig.n_jobs,
"C": LogisticRegressionConfig.C,
"max_iter": LogisticRegressionConfig.max_iter,
}
train_dataframe = pd.read_parquet(f"{config.data_path}/{config.train_data_file}")
X = train_dataframe["Phrase"]
y = train_dataframe["labels"]
clf = Pipeline(
[
("vect", CountVectorizer()),
("tfidf", TfidfTransformer()),
("clf", LogisticRegression(**lr_params)),
]
)
clf.fit(X, y)
joblib.dump(clf, f"{config.model_path}/{config.model_file}")
As seen in the code above, after the configurable variables and the training data are loaded, an sklearn.Pipeline.pipline
is defined and fit to the data. Finally, the fit model is saved to the configured path. A unit test can verify whether:
- The model was loaded from the expected path.
- A
pipeline
object was fit to the data and dumped to the disk
Since I don't want to read the real training data, I will define a mocked Config
class instance using fixtures. Below is the unit test code:
# ml/tests/unittests/train/
import pandas as pd
import pytest
from sklearn.pipeline import Pipeline
from unittest.mock import Mock, patch
from train.__main__ import train
@pytest.fixture
def config():
# Create a mock config object and set data paths
config = Mock()
config.data_path = "/path/to/data"
config.train_data_file = "train_data.parquet"
config.model_path = "/path/to/model"
config.model_file = "model.joblib"
return config
@patch("train.__main__.Config", autospec=True)
@patch("train.__main__.pd.read_parquet")
@patch("train.__main__.joblib.dump")
def test_train(mock_dump, mock_read_parquet, mock_config, config):
mock_config.return_value = config
# Create mock data
data = pd.DataFrame(
{
"Phrase": ["this is a test", "this is another test"],
"labels": [0, 1],
}
)
mock_read_parquet.return_value = data
# Call the train function
train()
# Check that pd.read_parquet was called with the correct file path
pd.read_parquet.assert_called_once_with(
f"{config.data_path}/{config.train_data_file}"
)
# Check that joblib.dump was called with a Pipeline object
_, args, _ = mock_dump.mock_calls[0]
assert isinstance(args[0], Pipeline)
The main difference with test_split_train_test_data
is the use of pytest.fixture
. Notice that there are two arguments related to Config
passed to the test function above, one is, mock_config
, which replaces the call to config = Config()
in ml/train/__main__.py
. The other is the fixture function config
which contains the mocked config object with the set dummy values. Now I need to actually set this mocked config object to the return value of the call to Config()
inside ml/train/__main__.py
. This is what the first line in test_train()
does.
I then define a dataframe and set it to be the return value of read_parquet
and verify that this dataframe was attempted to be read using the correct mocked data path.
Lastly, I check that joblib.dump saved a pipeline
object.
Integration Tests
In unit tests, the goal is to verify that a small isolated piece of code works as expected. This is useful but doesn't give us the confidence that the different components of the application work together. This is the purpose of integration tests. With integration tests we don't write a test for every function involved in a certain service and rather treat the service as a black box. For example, in this codebase, I want to test the server service and make sure that making requests to the review_sentiment
endpoint with different reviews results in the expected status codes and response messages. In this case, I don't need to know how the server service runs, whether it calls some other internal services or not. All I care about is, if I hit the endpoint with a valid input, do I get the expected result. Because the test treats the implementation of the service as a black box, if any changes are made to any components related to this service during refactoring, I can be confident in the changes not affecting the expected results by seeing the integration test pass.
These tests are more involved and might require some dependencies to be set up before tests, e.g. a local test database. However, the tests should be contained to the internal service you are testing. For example, if your endpoint, makes a POST request to S3, or the ML service, you should assume that those services work and return the expected results.
Let's see an example. Below is the code in server/server_main.py
which I want to test.
# server/server_main.py
import re
import requests
from config import Config
from entities.review import Review
from fastapi import FastAPI, HTTPException, status
app = FastAPI()
config = Config()
@app.post("/review_sentiment/", status_code=status.HTTP_200_OK)
def add_review(review: Review):
"""
This endpoint receives reviews, and creates an appropriate
payload and makes a post request to ml's endpoint for a
prediction on the input
Args:
review(Review): the input should conform to the Review model
Returns:
response(dict): response from ml's endpoint which includes
prediction details
Raises:
HTTPException: if the input is a sequence of numbers
"""
endpoint = f"{config.ml_base_uri}/prediction_job/"
if re.compile(r"^\-?[1-9][0-9]*$").search(review.text):
raise HTTPException(status_code=422, detail="Review cannot be a number")
resp = requests.post(endpoint, json=review.dict())
return resp.json()
In this simple service, a Review
object is received, if the text in the review is a number, an exception is raised with the message "Review cannot be a number". Otherwise, the review is posted to endpoint
which is the ML service and its response is returned.
To test this service, I need to simulate the server app
and make a POST
request to it. FastAPI's TestClient
makes this possible. As mentioned earlier, the server's external dependencies should be mocked. To mock the POST
request to the ml service, I use requests_mock
.
To test the server's review_sentiment
endpoint, I create a tests/integration_tests
directory and add a test_server_main.py
. file with the contents below:
# server/tests/integration_tests/test_server_main.py
import re
import pytest
from config import Config
from entities.review import Review
from fastapi.testclient import TestClient
from main import add_review, app
from pydantic import ValidationError
from requests_mock import ANY
@pytest.fixture
def config():
return Config()
@pytest.fixture(scope="module")
def module_client():
with TestClient(app) as c:
yield c
@pytest.fixture
def client(module_client, requests_mock):
# ref: https://github.com/encode/starlette/issues/818
test_app_base_url_prefix_regex = re.compile(
rf"{re.escape(module_client.base_url)}(/.*)?"
)
requests_mock.register_uri(ANY, test_app_base_url_prefix_regex, real_http=True)
return module_client
def test_add_review(config, requests_mock, client):
review1 = "hi"
with pytest.raises(ValidationError) as exc_info:
add_review(Review(text=review1))
assert exc_info.value.errors() == [
{
"loc": ("text",),
"msg": "min_length:4",
"type": "value_error.any_str.min_length",
"ctx": {"limit_value": 4},
}
]
review2 = 123458303
response = client.post("/review_sentiment/", json=Review(text=review2).dict())
assert response.status_code == 422
assert response.json()["detail"] == "Review cannot be a number"
# Mock the request to ML, and its response
adapter = requests_mock.post(
f"{config.ml_base_uri}/prediction_job/",
json={"prediction": "positive", "prediction_score": 80},
)
review3 = "very pleasant. highly recommend"
client.post("/review_sentiment/", json=Review(text=review3).dict())
# Verify that a call to ML was made with the correct input
assert adapter.last_request.json() == {"text": review3}
assert adapter.call_count == 1
Here's a break down of the code:
- Create an instance of
Config
usingpytest.fixture
. - Define a
TestClient
instance by passing it the server app, I call thismodule_client
. I set the scope of this fixture tomodule
for it to be used for all tests in the script. - Since I want to only test the server endpoint and mock ML's, I need to make sure that in the test function, calls to any of the server endpoints are not mocked. For this, I need to override
requests_mock
in a fixture by adding the server's base URL to its registered URIs and settingreal_http=True
. This is what theclient
fixture does. To be more specific:- I define a regex pattern for the server's base URL which can have any set of characters after
module_client.base_url/
- Register this URL pattern with
ANY
http methods, to be a real http request
- I define a regex pattern for the server's base URL which can have any set of characters after
- Define a test function,
test_add_review
, and pass all the fixtures to it. - Define faulty inputs and verify that the expected results are returned, this is what
review1
andreview2
test. - Define a valid input,
review3
, mock the post request to ML and set its response, e.g.{"prediction": "positive", "prediction_score": 80}
. - Verify that the server test app did call ML service with
review3
.
Similarly, the ML service can be tested. I will not include the code snippet here since it's very similar to the server's integration tests, but you can find it in the repo.
Running the Tests
For the tests to run successfully on your local machine, you need to set up some paths and environment variables. These configurations can go in a conftest.py
which is a special file used by Pytest for any plugins, fixtures, paths, etc. that need to be used for the tests to run successfully.
I create a separate conftest.py
for the ml/
and server/
directories. Because I want to be able to run the code outside of docker, I need to provide Pytest with the path to shared_utils/logger
which is mounted to the containers. I do this by adding the lines below to ml/conftest.py
:
# ml/conftest.py
import os
import sys
"""
Loading shared scripts for unittest
This prevents throwing errors when unit test is running without logging code mounted in utils/
"""
logger_path = os.path.abspath("./shared_utils/unittest")
sys.path.insert(0, logger_path)
# Change directory to /ml because the ml code runs from /ml as root dir and this allows the calls to loading the model, etc. to be successful
os.chdir(os.getcwd() + "/ml")
My goal is to have the tests run from the root of the repo. I think it is easier this way for all tests to run from one place and avoid the need to switch between directories.
In order for the paths to the data and model to be successful for the ml
scripts, I change the directory to ml
after inserting the logger path.
For server/conftest.py
, because server's config uses some environment variables, I use load_dotenv
.
# server/conftest.py
from dotenv import load_dotenv
load_dotenv("../deploy/.env")
I can now run the tests from the terminal in the root of the repo by simply running pytest
:
That's it for part one of this blog subseries. I covered writing unit tests and integration tests, mocking dependencies, isolating using setUp and fixtures and configuring dependencies for running tests with Pytest.
In the second part, I will go over linting, how to automate tasks using invoke
and finally how to set up a CI pipeline with Github Actions.
I hope you found this post useful. If you have any questions or feedback you can reach me on Twitter.