Tutorial: Use the Amazon SageMaker Python SDK to Train AutoML Models with Autopilot

In the last tutorial, we have seen how to use Amazon SageMaker Studio to create models through Autopilot.
In this installment, we will take a closer look at the Python SDK to script an end-to-end workflow to train and deploy a model. We will use batch inferencing and store the output in an Amazon S3 bucket.
The walkthrough is based on the same dataset and problem type discussed in the previous tutorial.
Follow the steps mentioned in the previous tutorial to configure and setup the environment for Autopilot. Launch a new Jupyter notebook to run the Python code that uses the SDK.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
import sagemaker import boto3 from sagemaker import get_execution_role region = boto3.Session().region_name session = sagemaker.Session() bucket = session.default_bucket() print(bucket) prefix = 'sagemaker/termdepo' role = get_execution_role() sm = boto3.Session().client(service_name='sagemaker',region_name=region) |
This step initializes the environment and returns the default S3 bucket associated with SageMaker.
1 2 |
!wget -N https://datahub.io/machine-learning/bank-marketing/r/bank-marketing.csv local_data_path = 'bank-marketing.csv' |
We downloaded the dataset from datahub.io.
1 2 3 4 5 6 |
import pandas as pd data = pd.read_csv(local_data_path) pd.set_option('display.max_columns', 500) pd.set_option('display.max_rows', 10) data |
This will verify the dataset and displays it in a grid.
1 2 3 |
train_data = data.sample(frac=0.8,random_state=200) test_data = data.drop(train_data.index) test_data = test_data.drop(columns=['Class']) |
1 2 3 4 5 6 7 8 9 |
train_file = 'train_data.csv'; train_data.to_csv(train_file, index=False, header=True) train_data_s3_path = session.upload_data(path=train_file, key_prefix=prefix + "/train") print('Train data uploaded to: ' + train_data_s3_path) test_file = 'test_data.csv'; test_data.to_csv(test_file, index=False, header=False) test_data_s3_path = session.upload_data(path=test_file, key_prefix=prefix + "/test") print('Test data uploaded to: ' + test_data_s3_path) |
We split the dataset and upload it to an S3 bucket.
Now that the dataset is ready, we will define the input, output, and job configuration of an Autopilot experiment.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
input_data_config = [{ 'DataSource': { 'S3DataSource': { 'S3DataType': 'S3Prefix', 'S3Uri': 's3://{}/{}/train'.format(bucket,prefix) } }, 'TargetAttributeName': 'Class' } ] job_config = { 'CompletionCriteria': { 'MaxRuntimePerTrainingJobInSeconds': 600, 'MaxAutoMLJobRuntimeInSeconds': 3600 }, } output_data_config = { 'S3OutputPath': 's3://{}/{}/output'.format(bucket,prefix) } problem_type = 'BinaryClassification' job_objective = { 'MetricName': 'F1' } |
This cell contains the most critical parameters for an Autopilot experiment. It tells where the dataset is located, the label, where the final artifacts will be uploaded, the criterion for the job to be completed along with the problem type and the metric to evaluate the performance of the model.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
from time import gmtime, strftime, sleep timestamp_suffix = strftime('%d-%H-%M-%S', gmtime()) auto_ml_job_name = 'termdepo' + timestamp_suffix print('AutoMLJobName: ' + auto_ml_job_name) sm.create_auto_ml_job(AutoMLJobName=auto_ml_job_name, InputDataConfig=input_data_config, OutputDataConfig=output_data_config, AutoMLJobConfig=job_config, AutoMLJobObjective=job_objective, ProblemType=problem_type, RoleArn=role) |
With the configuration in place, we will create an AutoML job.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
print ('JobStatus - Secondary Status') print('------------------------------') describe_response = sm.describe_auto_ml_job(AutoMLJobName=auto_ml_job_name) print (describe_response['AutoMLJobStatus'] + " - " + describe_response['AutoMLJobSecondaryStatus']) job_run_status = describe_response['AutoMLJobStatus'] while job_run_status not in ('Failed', 'Completed', 'Stopped'): describe_response = sm.describe_auto_ml_job(AutoMLJobName=auto_ml_job_name) job_run_status = describe_response['AutoMLJobStatus'] print (describe_response['AutoMLJobStatus'] + " - " + describe_response['AutoMLJobSecondaryStatus']) sleep(30) |
This cell will continue to print the status of the job every 30 seconds.
Once the job is complete, we can retrieve the data exploration notebook, candidate definition notebook, and the name of the candidate with the best model.
1 2 3 4 5 6 7 8 9 10 |
job = sm.describe_auto_ml_job(AutoMLJobName=auto_ml_job_name) job_candidate_notebook = job['AutoMLJobArtifacts']['CandidateDefinitionNotebookLocation'] job_data_notebook = job['AutoMLJobArtifacts']['DataExplorationNotebookLocation'] job_best_candidate = job['BestCandidate'] job_best_candidate_name = job_best_candidate['CandidateName'] job_candidate_notebook job_data_notebook job_best_candidate_name |
1 2 3 4 |
%%sh -s $job_candidate_notebook $job_data_notebook aws s3 cp $1 . aws s3 cp $2 . |
This will download the Jupyter notebooks from the S3 bucket to the local environment.
In the next few steps, we will create the model from the best candidate, deploy it and perform batch inferencing.
1 2 3 4 5 6 7 |
model_name = 'automl-termdepo-model-' + timestamp_suffix model = sm.create_model(Containers=job_best_candidate['InferenceContainers'], ModelName=model_name, ExecutionRoleArn=role) print('Model ARN corresponding to the best candidate is : {}'.format(model['ModelArn'])) |
To perform batch inferencing, we need to transform the test dataset stored in the S3 bucket and send it to the model.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
transform_job_name = 'automl-termdepo-transform-' + timestamp_suffix transform_input = { 'DataSource': { 'S3DataSource': { 'S3DataType': 'S3Prefix', 'S3Uri': test_data_s3_path } }, 'ContentType': 'text/csv', 'CompressionType': 'None', 'SplitType': 'Line' } transform_output = { 'S3OutputPath': 's3://{}/{}/inference-results'.format(bucket,prefix), } transform_resources = { 'InstanceType': 'ml.m4.xlarge', 'InstanceCount': 1 } sm.create_transform_job(TransformJobName = transform_job_name, ModelName = model_name, TransformInput = transform_input, TransformOutput = transform_output, TransformResources = transform_resources ) |
Wait till the job status shows it as completed.
1 2 3 4 5 6 7 8 9 10 11 12 |
print ('JobStatus') print('----------') describe_response = sm.describe_transform_job(TransformJobName = transform_job_name) job_run_status = describe_response['TransformJobStatus'] print (job_run_status) while job_run_status not in ('Failed', 'Completed', 'Stopped'): describe_response = sm.describe_transform_job(TransformJobName = transform_job_name) job_run_status = describe_response['TransformJobStatus'] print (job_run_status) sleep(30) |
We can now download and print the output from the inferencing job.
1 2 3 4 5 6 7 8 9 10 11 |
s3_output_key = '{}/inference-results/test_data.csv.out'.format(prefix); local_inference_results_path = 'inference_results.csv' s3 = boto3.resource('s3') inference_results_bucket = s3.Bucket(session.default_bucket()) inference_results_bucket.download_file(s3_output_key, local_inference_results_path); data = pd.read_csv(local_inference_results_path, sep=';') pd.set_option('display.max_rows', 10) data |
This step concludes the tutorial on using SageMaker Autopilot Python SDK to train models.
Janakiram MSV’s Webinar series, “Machine Intelligence and Modern Infrastructure (MI2)” offers informative and insightful sessions covering cutting-edge technologies. Sign up for the upcoming MI2 webinar at http://mi2.live.