Tutorial: Build an End-to-End Azure ML Pipeline with the Python SDK

In the third part of the series on Azure ML Pipelines, we will use Jupyter Notebook and Azure ML Python SDK to build a pipeline for training and inference. For background on the concepts, refer to the previous article and tutorial (part 1, part 2).
We will use the same Pima Indian Diabetes dataset to train and deploy the model. To demonstrate how to use the same data transformation technique used in training for inference, I will serialize the MinMaxScaler of Scikit-learn from the data preparation stage and use it for scoring.
Setting up the Environment
Start by creating a new ML workspace in one of the supporting Azure regions. Make sure you choose the enterprise edition of the workspace as the designer is not available in the basic edition.
Configure a virtual environment with the Azure ML SDK. Run the below commands to install the Python SDK, and launching a Jupyter Notebook. Start a new Python 3 kernel from Jupyter.
1 2 3 4 5 6 7 |
conda create -n aml -y Python=3.6 conda activate aml conda install nb_conda pip install azureml-sdk[notebooks] |
Next, on your development workstation, create the below directory structure:
pipeline
– data
– prep
– train
– model
Each of these directories will contain the Python scripts and artifacts used by each stage of the pipeline.
Copy the diabetes dataset in CSV format to the data directory. The final directory structure would look like the below screenshot.
Create the below script for preparing the data (prep.py) under the prep directory.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
import argparse import os from azureml.core import Run from pandas import read_csv import numpy as np from sklearn.preprocessing import MinMaxScaler from sklearn.model_selection import train_test_split import joblib run = Run.get_context() parser = argparse.ArgumentParser("prep") parser.add_argument("--train", type=str, help="train") parser.add_argument("--test", type=str, help="test") parser.add_argument("--scaler", type=str, help="test") args = parser.parse_args() dataframe=run.input_datasets["raw_data"].to_pandas_dataframe() array = dataframe.values X = array[:,0:8] Y = array[:,8] scaler = MinMaxScaler(feature_range=(0, 1)) rescaledX = scaler.fit_transform(X) test_size = 0.33 seed = 7 X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=test_size, random_state=seed) train=np.column_stack((X_train,Y_train)) test=np.column_stack((X_test,Y_test)) os.makedirs(args.train, exist_ok=True) os.makedirs(args.test, exist_ok=True) np.savetxt(args.train+"/train.txt",train,fmt="%f") np.savetxt(args.test+"/test.txt",test,fmt="%f") if not os.path.isdir(args.scaler): os.mkdir(args.scaler) joblib.dump(scaler,args.scaler+"/scaler.joblib") |
This script is responsible for the below tasks:
- Receive the CSV file stored in the default workspace storage as an input.
- Split the CSV file into training (77%) and training (33%) datasets.
- Save the datasets to the default workspace storage.
- Apply the MinMaxScaler to the training dataset.
- Serialize and save the scaler object to the default workspace storage.
- Log the start and end time of the task to Azure ML workspace.
Create the below training script (train.py) under the train directory.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
import numpy as np from sklearn.linear_model import LogisticRegression import joblib import os from azureml.core import Run parser = argparse.ArgumentParser("train") parser.add_argument("--train", type=str, help="train") parser.add_argument("--test", type=str, help="test") parser.add_argument("--model", type=str, help="model") args = parser.parse_args() run = Run.get_context() run.log("Training start time", str(datetime.datetime.now())) train=np.loadtxt(args.train+"/train.txt",dtype=float) test=np.loadtxt(args.test+"/test.txt",dtype=float) X_train=train[:,0:8] Y_train=train[:,8] X_test=test[:,0:8] Y_test=test[:,8] model = LogisticRegression(max_iter=100000) model.fit(X_train, Y_train) if not os.path.isdir(args.model): os.mkdir(args.model) joblib.dump(model,args.model+"/model.joblib") result = model.score(X_test, Y_test) run.log('Score :', result) run.log("Training end time", str(datetime.datetime.now())) run.complete() |
This script is responsible for the below tasks:
- Receive the train and test datasets stored in the default workspace storage as an input.
- Separate the features and label from the train and test datasets.
- Train and score the model.
- Serialize the model file and save it to the default workspace.
- Log the score, start, and end time of the task to Azure ML workspace.
These files will be used to build the two-step pipeline that will be executed by the Azure ML Pipelines environment.
The pipeline we are building will look like the below illustration:
Building the Pipeline
Start by creating a new Jupyter Notebook and follow the below steps. Run each of these blocks in a separate Notebook cell.
1 |
jupyter notebook |
Let’s import all the modules needed by the pipeline.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
import os import azureml.core from azureml.core import Workspace, Experiment, Datastore from azureml.widgets import RunDetails from azureml.core import Dataset from azureml.pipeline.core import Pipeline, PipelineData from azureml.pipeline.core import PipelineRun, StepRun, PortDataReference from azureml.pipeline.steps import PythonScriptStep from azureml.core.compute import ComputeTarget, AmlCompute from azureml.core.compute_target import ComputeTargetException from azureml.core.runconfig import RunConfiguration from azureml.core.conda_dependencies import CondaDependencies from azureml.core.model import Model # Check core SDK version number print("SDK version:", azureml.core.VERSION) |
Let’s configure the workspace and the default storage.
1 |
ws = Workspace.from_config() |
1 |
def_blob_store = ws.get_default_datastore() |
We are now ready to upload the dataset from the local directory to the workspace default storage.
1 |
def_blob_store.upload_files(["./data/pima-indians-diabetes.csv"], target_path="data", overwrite=True) |
We are now ready to associate the pipeline with a compute environment. The below snippet will either launch a new compute cluster or attaches itself to an existing one.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
aml_compute_target = "demo-cluster" try: aml_compute = AmlCompute(ws, aml_compute_target) print("found existing compute target.") except ComputeTargetException: print("creating new compute target") provisioning_config = AmlCompute.provisioning_configuration(vm_size = "STANDARD_D2_V2", min_nodes = 1, max_nodes = 4) aml_compute = ComputeTarget.create(ws, aml_compute_target, provisioning_config) aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20) print("Azure Machine Learning Compute attached") |
The next step is to define the software environment and dependencies for the pipeline. For this, we will build a custom Docker image with appropriate pip and Conda modules. The steps of the pipeline will leverage this image during the runtime.
1 2 3 4 5 6 7 8 9 10 11 12 |
aml_run_config = RunConfiguration() aml_run_config.target = aml_compute aml_run_config.environment.docker.enabled = True aml_run_config.environment.docker.base_image = "mcr.microsoft.com/azureml/base:latest" aml_run_config.environment.python.user_managed_dependencies = False aml_run_config.environment.python.conda_dependencies = CondaDependencies.create( conda_packages=['pandas','scikit-learn','numpy'], pip_packages=['joblib','azureml-sdk','fusepy'], pin_sdk_version=False) |
We now have the compute infrastructure in place. The next step is to define the datasets that act as input and output to the stages of the pipeline.
Let’s first define the CSV file as the input dataset.
1 2 |
diabetes_data = Dataset.Tabular.from_delimited_files(def_blob_store.path('./data/pima-indians-diabetes.csv')) diabetes_data = diabetes_data.register(ws, 'diabetes_data') |
Let’s also define the intermediary datasets and the output from each step.
1 2 3 4 5 |
raw_data = diabetes_data.as_named_input('raw_data') train_data = PipelineData("train_data", datastore=def_blob_store).as_dataset() test_data = PipelineData("test_data", datastore=def_blob_store).as_dataset() scaler_file = PipelineData("scaler_file", datastore=def_blob_store) model_file = PipelineData("model_file", datastore=def_blob_store) |
With everything in place, we are ready to define the data prep and training steps.
1 2 3 4 5 6 7 8 9 10 |
source_directory="./prep" step1 = PythonScriptStep(name="prep_step", script_name="./prep.py", arguments=["--train", train_data,"--test", test_data,"--scaler",scaler_file], inputs=[raw_data], outputs=[train_data,test_data,scaler_file], compute_target=aml_compute, runconfig=aml_run_config, source_directory=source_directory, allow_reuse=True) |
Notice how the input and output values are sent to the script.
1 2 3 4 5 6 7 8 9 10 |
source_directory="./train" step2 = PythonScriptStep(name="train_step", script_name="./train.py", arguments=["--train", train_data,"--test", test_data,"--model",model_file], inputs=[train_data,test_data], outputs=[model_file], compute_target=aml_compute, runconfig=aml_run_config, source_directory=source_directory, allow_reuse=True) |
We are all set to create the pipeline composed of the above two steps, validate it, and finally submit to Azure ML.
1 |
steps = [step1,step2] |
1 |
pipeline1 = Pipeline(workspace=ws, steps=steps) |
1 |
pipeline_run1 = Experiment(ws, 'diabetes').submit(pipeline1, regenerate_outputs=False) |
After you run all the cells in the notebook, you can switch the Azure Portal and see the metrics from both the steps.
Clicking on the Run ID link takes you the page where you can access the metrics and output from each step.
You can download both the scaler object and the model from Azure storage to your workstation.
1 2 3 4 5 6 |
prep_step = pipeline_run1.find_step_run('prep_step')[0] step_run_output = prep_step.get_output("scaler_file") port_data_reference = step_run_output.get_port_data_reference() port_data_reference.download(local_path=".") scaler_file=port_data_reference.path_on_datastore |
1 2 3 4 5 6 |
train_step = pipeline_run1.find_step_run('train_step')[0] step_run_output = train_step.get_output("model_file") port_data_reference = step_run_output.get_port_data_reference() port_data_reference.download(local_path=".") model_file=port_data_reference.path_on_datastore |
1 2 3 4 5 |
model=model_file+"/model.joblib" scaler=scaler_file+"/scaler.joblib" os.makedirs("model", exist_ok=True) os.popen("cp "+ model +" model") os.popen("cp "+ scaler +" model") |
With the downloaded files, you can either host the model locally or register them as models in Azure for inference.
In the next part of this series, we will explore Azure AutoML to train the model without writing code. Stay tuned!
Janakiram MSV’s Webinar series, “Machine Intelligence and Modern Infrastructure (MI2)” offers informative and insightful sessions covering cutting-edge technologies. Sign up for the upcoming MI2 webinar at http://mi2.live.