Tutorial: Anomaly Detection in Connected Devices with PubNub and Azure Machine Learning

In the previous part of this series, I introduced PubNub as the real-time data streaming network platform with native support for the MQTT connectivity protocol. We built a turbine simulator in Python based on the popular Paho MQTT client. In this tutorial, we will implement anomaly detection based on Azure ML Studio.
Apart from using the standard MQTT libraries, developers can also use native PubNub software development kits (SDKs) in the clients. PubNub clients can talk to each other even if they are not using the same protocol or library.
To demonstrate this capability, we implemented the alarm device based on PubNub native SDK for Python. The below code snippet looks familiar to PubNub developers:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
from pubnub.callbacks import SubscribeCallback from pubnub.enums import PNStatusCategory from pubnub.pnconfiguration import PNConfiguration from pubnub.pubnub import PubNub from termcolor import colored import sys import signal import os import json from datetime import datetime alarm_count = 0 run_level = 1 class AlarmCallback(SubscribeCallback): def status(self, pubnub, status): pass def presence(self, pubnub, presence): pass # handle incoming presence data def message(self, pubnub, message): global alarm_count global run_level state = message.message['alarm'].upper() if (state == "OFF"): print colored(str(datetime.now().time().strftime("%H:%M:%S")) +' - No anomaly detected in Fan', 'yellow') print colored(str(datetime.now().time().strftime("%H:%M:%S")) +' - Alarm state is OFF' ,'green') run_level = 1 alarm_count = 0 else: alarm_count = alarm_count + 1 print colored(str(datetime.now().time().strftime("%H:%M:%S")) +'- Anomaly Detected!', 'red') print colored(str(datetime.now().time().strftime("%H:%M:%S")) +'- Alarm count - ' + str(alarm_count),'yellow') if (alarm_count >= 5): if (run_level == 1): print colored('Alarm count exceeded the maximum threshold. Setting the fan run level to 0', 'red') publish() run_level = 0 alarm_count = 0 pnconfig = PNConfiguration() pnconfig.subscribe_key = "<Sub_Key>" pnconfig.publish_key = "<Pub_Key>" pnconfig.ssl = False pubnub = PubNub(pnconfig) pubnub.add_listener(AlarmCallback()) pubnub.subscribe().channels('turbine.alarm').execute() print('connected') def signal_handler(signal, frame): print("Caught Signal CTRL+C..exiting gracefully") sys.exit(0) def publish(): pubnub.publish().channel('turbine.command').message({'run':0}).sync() |
When PubNub Function sends a message to the alarm, it waits until the count reaches five before shutting down the turbine. It performs this operation by publishing a message to the turbine.command PubNub topic which is visible to the turbine as turbine/command MQTT topic. The turbine will keep sending the data while the alarm will listen for a message.
You can run these simulators for the turbine and alarm after replacing the Pub_Key and Sub_Key with the actual keys obtained from PubNub admin dashboard.
The code is available in the Github repo for you to clone or fork.
Data Ingestion and Business Logic
In the last section, we noticed that devices that are connected to PubNub can publish and subscribe through the MQTT protocol or native PubNub channels. The message delivery is handled by the PubNub Network without the need to provision or configure servers.
But, there may be a need to intercept messages and even modify them before delivering them to the subscribers. This is where PubNub Functions, the serverless computing environment come into the picture.
In our scenario, each data point published by the turbine reaches the PubNub Function, which then forwards it to a machine learning web service to detect faults. After noticing 10 consecutive faults, the function will publish a message to the topic on which the alarm is subscribed.
This use case highlights how PubNub Functions can orchestrate the message flow based on the custom logic. Developers can write just a few lines of JavaScript code to implement the logic. Since there are no servers involved, PubNub Functions becomes an efficient and an economical platform to run business logic in a serverless environment. The functionality is similar to AWS Lambda, Azure Functions, and other serverless platforms.
We start by creating a new Function called CheckAnomaly that is fired just after a message is published to turbine.message channel. So, we choose “After Publish or Fire” as the triggering mechanism.
The function starts by setting up references to the required modules.
1 2 3 |
const xhr = require("xhr"); const pubnub = require('pubnub'); const db = require("kvstore"); |
XHR is used for making outbound HTTP calls. We use this for invoking the Azure Machine Learning (Azure ML) REST endpoint. It is one of the core modules supported by PubNub Functions.
The pubnub module provides the PubNub client API to publish messages to the Data Stream Network. Finally, the kvstore module gives us access to a persistent key/value database. Since PubNub Functions are stateless, this module is useful to maintain state across multiple invocations of the function.
The next step is to define and initialize a set of variables that we use later.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
const api_key = 'AZURE_ML_KEY'; const body = { 'Inputs': { 'input1': [{ 'Rotation': request.message.rotation, "Temperature": request.message.temperature, "Vibration": request.message.vibration, "Sound": request.message.sound }] } }; const url = "AZURE_ML_ENDPOINT"; var anomaly_count; |
The api_key variable holds the API key for the Azure ML web service. The parameters sent to the web service are embedded in the body object. The URL variable points us to the web service endpoint while the anomaly_count variable tracks the count of reported anomalies by the ML algorithm.
We finally, construct the HTTP payload from the above variables.
1 2 3 4 5 6 7 8 |
const http_options = { "method": "POST", "headers": { "Content-Type": "application/json", 'Authorization': ('Bearer ' + api_key) }, "body": body }; |
The next block maintains the anomaly counter by effectively using the KV store.
1 2 3 4 5 6 7 8 9 |
db.get("anomaly_count").then((value) => { anomaly_count = value.count; if (anomaly_count == "undefined" || anomaly_count == "null") { anomaly_count = 0; db.set("anomaly_count", { 'count': anomaly_count }); } }); |
The final section of the Function contains the essential logic. It calls the ML web service, check the count of reported anomalies, and invokes the alarm by sending a message to it. This mechanism avoids false positives that are occasionally reported by ML models.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
return xhr.fetch(url, http_options).then((x) => { const body = JSON.parse(x.body); const anomaly_score = body.Results.output1[0]["Scored Probabilities"]; var msg; if (anomaly_score > 0.7) { console.log("{'anomaly': true, 'score':" + anomaly_score + ", anomaly_count:" + anomaly_count + "}"); if (anomaly_count >= 5) { msg = { 'alarm': 'on' }; anomaly_count = 0; } anomaly_count = anomaly_count + 1; db.set("anomaly_count", { 'count': anomaly_count }); } else { msg = { 'alarm': 'off' }; } pubnub.publish({ "channel": "turbine.alarm", "message": msg }).then((publishResponse) => {}); return request.ok(); }); |
Azure ML web service checks each message for anomaly and returns the probability as a score which is in between 0 and 1. We start by checking if the score is above 0.7 to avoid false positives. If the anomaly score is above 0.7 and the count exceeds 5, we adjust the variable stored in the KV store, and initialize the msg variable to an on or off to invoke the alarm.
Finally, pubnub.publish method pushes the msg variable to the turbine.alarm channel on which the alarm simulator is listening.
Implementing a Machine Learning Model
To implement the machine learning model, we take advantage of Azure ML Studio, which provides a serverless platform to train and publish ML models.
We upload a dataset (fan.csv) that contains the sample turbine telemetry data to train the ML model. This dataset contains both normal and anomalous data which helps us in training the model.
In the next step, we select appropriate columns such as vibration, noise, temperature, and rotation speed from the dataset.
The dataset is then split into two parts — one for training and the other for testing. 75 percent of the data is used for training the model while the remaining 25 percent is reserved for testing the accuracy of the model.
The training dataset is used to train the model with a binary classification algorithm. The Two-Class Logistic Regression module classifies the given the datapoint as normal (0) or anomalous (1).
We finally score the model to generate predictions using the trained classification model. The Score Model module outputs a predicted value for the class, as well as the probability of the predicted value. We will use the probability of the predicted value in the PubNub Function to decide whether to invoke the alarm or not.
The last step, Evaluate Model measures the accuracy of a trained model against a set of industry-standard evaluation metrics. This module validates the accuracy and precision of the model.
Once the model is trained, we can publish it as a web service that is ready to deal with new data points. Azure ML Studio makes it easy to publish the model as a RESTful service.
The web service is published and hosted by Azure ML Studio. The configuration page shows the HTTP endpoint and the API key required to invoke it.
Testing the Solution
After configuring all the components of the solution, you can verify it by running the simulators in both the states — normal and faulty.
Launch two terminal windows and run turbine.py and alarm.py in each.
Since there are no anomalies found, the alarm state is always off.
Let’s introduce anomalies in our data. Open turbine.py and change the variable Fault to True. This forces our emulator to publish telemetry data that are outside of the expected range. This change in the pattern will trigger the alarm due to anomaly detection reported by Azure ML.
Launch the turbine.py again, and monitor the terminal running the alarm simulator.
When the alarm count crosses five, it goes ahead and turns off the turbine fan.
The screenshot below shows that the runlevel of turbine is set to zero indicating that it is now shut down.
Congratulations! You have successfully implemented a serverless solution for predicting anomalies in industrial IoT environments.
Conclusion
This tutorial walked us through the steps involved in configuring PubNub DSN and Functions to integrate with third-party services. It highlights the interoperability between native MQTT protocol and the PubNub network.
PubNub Functions offer a powerful mechanism to implement business logic that can modify and manipulate the inbound messages. We have seen how to use XHR, KV store, and PubNub modules with Functions.
By taking advantage of Azure ML Studio and its serverless architecture, we could add sophisticated machine learning capability to PubNub Functions. This integration demonstrates how PubNub can move the heavy lifting to public cloud platforms such as Microsoft Azure to perform compute-intensive tasks.
Microsoft is a sponsor of The New Stack.
Feature image via Pixabay.