Amazon SageMaker Autopilot makes it possible for organizations to quickly build and deploy an end-to-end machine learning (ML) model and inference pipeline with just a few lines of code or even without any code at all with Amazon SageMaker Studio. Autopilot offloads the heavy lifting of configuring infrastructure and the time it takes to build an entire pipeline, including feature engineering, model selection, and hyperparameter tuning.

In this post, we show how to go from raw data to a robust and fully deployed inference pipeline with Autopilot.

Solution overview

We use Lyft’s public dataset on bikesharing for this simulation to predict whether or not a user participates in the Bike Share for All program. This is a simple binary classification problem.

We want to showcase how easy it is to build an automated and real-time inference pipeline to classify users based on their participation in the Bike Share for All program. To this end, we simulate an end-to-end data ingestion and inference pipeline for an imaginary bikeshare company operating in the San Francisco Bay Area.

The architecture is broken down into two parts: the ingestion pipeline and the inference pipeline.

We primarily focus on the ML pipeline in the first section of this post, and review the data ingestion pipeline in the second part.

Prerequisites

To follow along with this example, complete the following prerequisites:

  1. Create a new SageMaker notebook instance.
  2. Create an Amazon Kinesis Data Firehose delivery stream with an AWS Lambda transform function. For instructions, see Amazon Kinesis Firehose Data Transformation with AWS Lambda. This step is optional and only needed to simulate data streaming.

Data exploration

Let’s download and visualize the dataset, which is located in a public Amazon Simple Storage Service (Amazon S3) bucket and static website:

# The dataset is located in a public bucket and static s3 website.
# https://www.lyft.com/bikes/bay-wheels/system-data import pandas as pd
import numpy as np
import os
from time import sleep !wget -q -O '201907-baywheels-tripdata.zip' https://s3.amazonaws.com/baywheels-data/201907-baywheels-tripdata.csv.zip
!unzip -q -o 201907-baywheels-tripdata.zip
csv_file = os.listdir('.')
data = pd.read_csv('201907-baywheels-tripdata.csv', low_memory=False)
data.head()

The following screenshot shows a subset of the data before transformation.

The last column of the data contains the target we want to predict, which is a binary variable taking either a Yes or No value, indicating whether the user participates in the Bike Share for All program.

Let’s take a look at the distribution of our target variable for any data imbalance.

# For plotting
%matplotlib inline
import matplotlib.pyplot as plt
#!pip install seaborn # If you need this library
import seaborn as sns
display(sns.countplot(x='bike_share_for_all_trip', data=data))

As shown in the graph above, the data is imbalanced, with fewer people participating in the program.

We need to balance the data to prevent an over-representation bias. This step is optional because Autopilot also offers an internal approach to handle class imbalance automatically, which defaults to a F1 score validation metric. Additionally, if you choose to balance the data yourself, you can use more advanced techniques for handling class imbalance, such as SMOTE or GAN.

For this post, we downsample the majority class (No) as a data balancing technique:

The following code enriches the data and under-samples the overrepresented class:

df = data.copy()
df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time'])
df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown
df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month)
# Breaking the day in 4 parts: ['morning', 'afternoon', 'evening']
conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21),
]
choices = ['morning', 'afternoon', 'evening']
df['part_of_day'] = np.select(conditions, choices, default='night')
df.dropna(inplace=True) # Downsampling the majority to rebalance the data
# We are getting about an even distribution
df.sort_values(by='bike_share_for_all_trip', inplace=True)
slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1)
df = df[-slice_pointe:]
# The data is balanced now. Let's reshuffle the data
df = df.sample(frac=1).reset_index(drop=True)

We deliberately left our categorical features not encoded, including our binary target value. This is because Autopilot takes care of encoding and decoding the data for us as part of the automatic feature engineering and pipeline deployment, as we see in the next section.

The following screenshot shows a sample of our data.

The data in the following graphs looks otherwise normal, with a bimodal distribution representing the two peaks for the morning hours and the afternoon rush hours, as you would expect. We also observe low activities on weekends and at night.

In the next section, we feed the data to Autopilot so that it can run an experiment for us.

Build a binary classification model

Autopilot requires that we specify the input and output destination buckets. It uses the input bucket to load the data and the output bucket to save the artifacts, such as feature engineering and the generated Jupyter notebooks. We retain 5% of the dataset to evaluate and validate the model’s performance after the training is complete and upload 95% of the dataset to the S3 input bucket. See the following code:

import sagemaker
import boto3 # Let's define our storage.
# We will use the default sagemaker bucket and will enforce encryption bucket = sagemaker.Session().default_bucket() # SageMaker default bucket. #Encrypting the bucket
s3 = boto3.client('s3')
SSEConfig={ 'Rules': [ { 'ApplyServerSideEncryptionByDefault': { 'SSEAlgorithm': 'AES256', } }, ] }
s3.put_bucket_encryption(Bucket=bucket, ServerSideEncryptionConfiguration=SSEConfig) prefix = 'sagemaker-automl01' # prefix for ther bucket
role = sagemaker.get_execution_role() # IAM role object to use by SageMaker
sagemaker_session = sagemaker.Session() # Sagemaker API
region = sagemaker_session.boto_region_name # AWS Region # Where we will load our data input_path = "s3://{}/{}/automl_bike_train_share-1".format(bucket, prefix) output_path = "s3://{}/{}/automl_bike_output_share-1".format(bucket, prefix) # Spliting data in train/test set.
# We will use 95% of the data for training and the remainder for testing.
slice_point = int(df.shape[0] * 0.95) training_set = df[:slice_point] # 95%
testing_set = df[slice_point:] # 5% # Just making sure we have split it correctly
assert training_set.shape[0] + testing_set.shape[0] == df.shape[0] # Let's save the data locally and upload it to our s3 data location
training_set.to_csv('bike_train.csv')
testing_set.to_csv('bike_test.csv', header=False) # Uploading file the trasining set to the input bucket
sagemaker.s3.S3Uploader.upload(local_path='bike_train.csv', desired_s3_uri=input_path)

After we upload the data to the input destination, it’s time to start Autopilot:

from sagemaker.automl.automl import AutoML
# You give your job a name and provide the s3 path where you uploaded the data
bike_automl_binary = AutoML(role=role, target_attribute_name='bike_share_for_all_trip', output_path=output_path, max_candidates=30)
# Starting the training bike_automl_binary.fit(inputs=input_path, wait=False, logs=False)

All we need to start experimenting is to call the fit() method. Autopilot needs the input and output S3 location and the target attribute column as the required parameters. After feature processing, Autopilot calls SageMaker automatic model tuning to find the best version of a model by running many training jobs on your dataset. We added the optional max_candidates parameter to limit the number of candidates to 30, which is the number of training jobs that Autopilot launches with different combinations of algorithms and hyperparameters in order to find the best model. If you don’t specify this parameter, it defaults to 250.

We can observe the progress of Autopilot with the following code:

# Let's monitor the progress this will take a while. Go grup some coffe.
from time import sleep def check_job_status(): return bike_automl_binary.describe_auto_ml_job()['AutoMLJobStatus'] def discribe(): return bike_automl_binary.describe_auto_ml_job() while True: print (check_job_status(), discribe()['AutoMLJobSecondaryStatus'], end='** ') if check_job_status() in ["Completed", "Failed"]: if "Failed" in check_job_status(): print(discribe()['FailureReason']) break sleep(20)

The training takes some time to complete. While it’s running, let’s look at the Autopilot workflow.

To find the best candidate, use the following code:

# Let's take a look at the best candidate selected by AutoPilot
from IPython.display import JSON
def jsonView(obj, rootName=None): return JSON(obj, root=rootName, expanded=True) bestCandidate = bike_automl_binary.describe_auto_ml_job()['BestCandidate']
display(jsonView(bestCandidate['FinalAutoMLJobObjectiveMetric'], 'FinalAutoMLJobObjectiveMetric'))

The following screenshot shows our output.

Our model achieved a validation accuracy of 96%, so we’re going to deploy it. We could add a condition such that we only use the model if the accuracy is above a certain level.

Inference pipeline

Before we deploy our model, let’s examine our best candidate and what’s happening in our pipeline. See the following code:

display(jsonView(bestCandidate['InferenceContainers'], 'InferenceContainers'))

The following diagram shows our output.

Autopilot has built the model and has packaged it in three different containers, each sequentially running a specific task: transform, predict, and reverse-transform. This multi-step inference is possible with a SageMaker inference pipeline.

A multi-step inference can also chain multiple inference models. For instance, one container can perform principal component analysis before passing the data to the XGBoost container.

Deploy the inference pipeline to an endpoint

The deployment process involves just a few lines of code:

# We chose to difine an endpoint name.
from datetime import datetime as dt
today = str(dt.today())[:10]
endpoint_name='binary-bike-share-' + today
endpoint = bike_automl_binary.deploy(initial_instance_count=1, instance_type='ml.m5.xlarge', endpoint_name=endpoint_name, candidate=bestCandidate, wait=True)

Let’s configure our endpoint for prediction with a predictor:

from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import CSVDeserializer
csv_serializer = CSVSerializer()
csv_deserializer = CSVDeserializer()
# Initialize the predictor
predictor = sagemaker.predictor.Predictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker.Session(), serializer=csv_serializer, deserializer=csv_deserializer )

Now that we have our endpoint and predictor ready, it’s time to use the testing data we set aside and test the accuracy of our model. We start by defining a utility function that sends the data one line at a time to our inference endpoint and gets a prediction in return. Because we have an XGBoost model, we drop the target variable before sending the CSV line to the endpoint. Additionally, we removed the header from the testing CSV before looping through the file, which is also another requirement for XGBoost on SageMaker. See the following code:

# The fuction takes 3 arguments: the file containing the test set,
# The predictor and finaly the number of lines to send for prediction.
# The function returns two Series: inferred and Actual.
def get_inference(file, predictor, n=1): infered = [] actual = [] with open(file, 'r') as csv: for i in range(n): line = csv.readline().split(',') #print(line) try: # Here we remove the target variable from the csv line before predicting observed = line.pop(14).strip('\n') actual.append(observed) except: pass obj = ','.join(line) predicted = predictor.predict(obj)[0][0] infered.append(predicted) pd.Series(infered) data = {'Infered': pd.Series(infered), 'Observed': pd.Series(actual)} return pd.DataFrame(data=data) n = testing_set.shape[0] # The size of the testing data
inference_df = get_inference('bike_test.csv', predictor, n) inference_df['Binary_Result'] = (inference_df['Observed'] == inference_df['Infered'])
display(inference_df.head())

The following screenshot shows our output.

Now let’s calculate the accuracy of our model.

See the following code:

count_binary = inference_df['Binary_Result'].value_counts()
accuracy = count_binary[True]/n
print('Accuracy:', accuracy)

We get an accuracy of 92%. This is slightly lower than the 96% obtained during the validation step, but it’s still high enough. We don’t expect the accuracy to be exactly the same because the test is performed with a new dataset.

Data ingestion

We downloaded the data directly and configured it for training. In real life, you may have to send the data directly from the edge device into the data lake and have SageMaker load it directly from the data lake into the notebook.

Kinesis Data Firehose is a good option and the most straightforward way to reliably load streaming data into data lakes, data stores, and analytics tools. It can capture, transform, and load streaming data into Amazon S3 and other AWS data stores.

For our use case, we create a Kinesis Data Firehose delivery stream with a Lambda transformation function to do some lightweight data cleaning as it traverses the stream. See the following code:

# Data processing libraries
import pandas as pd # Data processing
import numpy as np
import base64
from io import StringIO def lambda_handler(event, context): output = [] print('Received', len(event['records']), 'Records') for record in event['records']: payload = base64.b64decode(record['data']).decode('utf-8') df = pd.read_csv(StringIO(payload), index_col=0) df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time']) df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month) # Breaking the day in 4 parts: ['morning', 'afternoon', 'evening'] conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21), ] choices = ['morning', 'afternoon', 'evening'] df['part_of_day'] = np.select(conditions, choices, default='night') df.dropna(inplace=True) # Downsampling the majority to rebalance the data # We are getting about an even distribution df.sort_values(by='bike_share_for_all_trip', inplace=True) slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1) df = df[-slice_pointe:] # The data is balanced now. Let's reshuffle the data df = df.sample(frac=1).reset_index(drop=True) data = base64.b64encode(bytes(df.to_csv(), 'utf-8')).decode("utf-8") output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': data } output.append(output_record) print('Returned', len(output), 'Records') print('Event', event) return {'records': output}

This Lambda function performs light transformation of the data streamed from the devices onto the data lake. It expects a CSV formatted data file.

For the ingestion step, we download the data and simulate a data stream to Kinesis Data Firehose with a Lambda transform function and into our S3 data lake.

Let’s simulate streaming a few lines:

# Saving the data in one file.
file = '201907-baywheels-tripdata.csv' data.to_csv(file) # Stream the data 'n' lines at a time.
# Only run this for a minute and stop the cell
def streamer(file, n): with open(file, 'r') as csvfile: header = next(csvfile) data = header counter = 0 loop = True while loop == True: for i in range(n): line = csvfile.readline() data+=line # We reached the end of the csv file. if line == '': loop = False counter+=n # Use your kinesis streaming name stream = client.put_record(DeliveryStreamName='firehose12-DeliveryStream-OJYW71BPYHF2', Record={"Data": bytes(data, 'utf-8')}) data = header print( file, 'HTTPStatusCode: '+ str(stream['ResponseMetadata']['HTTPStatusCode']), 'csv_lines_sent: ' + str(counter), end=' -*- ') sleep(random.randrange(1, 3)) return
# Streaming for 500 lines at a time. You can change this number up and down.
streamer(file, 500) # We can now load our data as a DataFrame because it’s streamed into the S3 data lake:
# Getting data from s3 location where it was streamed.
STREAMED_DATA = 's3://firehose12-deliverybucket-11z0ya3patrod/firehose/2020'
csv_uri = sagemaker.s3.S3Downloader.list(STREAMED_DATA)
in_memory_string = [sagemaker.s3.S3Downloader.read_file(file) for file in csv_uri]
in_memory_csv = [pd.read_csv(StringIO(file), index_col=0) for file in in_memory_string]
display(df.tail())

Clean up

It’s important to delete all the resources used in this exercise to minimize cost. The following code deletes the SageMaker inference endpoint we created as well the training and testing data we uploaded:

#Delete the s3 data
predictor.delete_endpoint() # Delete s3 data
s3 = boto3.resource('s3')
ml_bucket = sagemaker.Session().default_bucket()
delete_data = s3.Bucket(ml_bucket).objects.filter(Prefix=prefix).delete()

Conclusion

ML engineers, data scientists, and software developers can use Autopilot to build and deploy an inference pipeline with little to no ML programming experience. Autopilot saves time and resources, using data science and ML best practices. Large organizations can now shift engineering resources away from infrastructure configuration towards improving models and solving business use cases. Startups and smaller organizations can get started on machine learning with little to no ML expertise.

We recommend learning more about other important features SageMaker has to offer, such as the Amazon SageMaker Feature Store, which integrates with Amazon SageMaker Pipelines to create, add feature search and discovery, and reuse automated ML workflows. You can run multiple Autopilot simulations with different feature or target variants in your dataset. You could also approach this as a dynamic vehicle allocation problem in which your model tries to predict vehicle demand based on time (such as time of day or day of the week) or location, or a combination of both.


About the Authors

Doug Mbaya is a Senior Solution architect with a focus in data and analytics. Doug works closely with AWS partners, helping them integrate data and analytics solution in the cloud. Doug’s prior experience includes  supporting AWS customers in the ride sharing and food delivery segment.

Valerio Perrone is an Applied Science Manager working on Amazon SageMaker Automatic Model Tuning and Autopilot.