Design an Edge System for the Cloud Native Edge Infrastructure

In the previous article, I discussed how Rancher’s K3s lightweight Kubernetes distribution, Calico networking software, and the Portworx open source cloud native storage platform become the foundation for modern artificial intelligence (AI) and IoT systems that run at the edge. Let’s design and deploy a solution that runs on this infrastructure.
Based on a hypothetical scenario of monitoring fans belonging to turbines, we will build a predictive maintenance solution that will detect anomalies in fans. This acts as a reference architecture for designing and architecting an IoT/edge solution that leverages various open source and the cloud native technologies.
Problem Statement
We are expected to design and deploy a solution that can ingest telemetry data from multiple fans and use the real-time stream to predict failures before they occur. The solution runs on the edge infrastructure running on low-end machines such as Intel NUCs. The infrastructure is based on K3s, Calico, and Portworx that provide the core building blocks of the Kubernetes cluster.
Solution Architecture
The sensors attached to the fans of the turbine provide the current rotational speed, vibration, temperature, and noise level. This telemetry data stream along with the deviceID from each fan acts as the input to the predictive maintenance solution.
Mosquitto, a popular open source MQTT broker acts as the gateway for the sensors and a centralized message broker for the platform. The sensors ingest the telemetry data into the fan/messages
topic of the Mosquitto broker.
Below is the payload published by each fan to the MQTT topic.
The predictor microservice is a subscriber to the same telemetry channel to which the fans are publishing. For each inbound data point, it invokes the anomaly detection service and publishes the result into a separate MQTT topic, fan/anomaly
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
import time import requests import random import datetime import json import os import paho.mqtt.client as mqtt broker_address = os.getenv('MQTT_HOST') dev_topic = os.getenv('MQTT_DEV_TOPIC') pred_topic = os.getenv('MQTT_PREDICT_TOPIC') scoring_url=os.getenv('SCORING_URL') d={} client = mqtt.Client("pdm") client.connect(broker_address) def on_message(mosq, obj, msg): rotation=json.loads(msg.payload)["rotation"] temperature=json.loads(msg.payload)["temperature"] vibration=json.loads(msg.payload)["vibration"] sound=json.loads(msg.payload)["sound"] telemetry=[rotation,temperature,vibration,sound] data={"params":telemetry} response = requests.post(scoring_url, json=data) fault=json.loads(response.text)["fault"] d["deviceID"]=json.loads(msg.payload)["deviceID"] d["fault"]=fault payload = json.dumps(d, ensure_ascii=False) print(payload) client.publish(pred_topic,payload) def on_subscribe(mosq, obj, mid, granted_qos): print("Subscribed: " + str(mid) + " " + str(granted_qos)) client.on_message = on_message client.on_subscribe = on_subscribe client.connect(broker_address) client.subscribe(dev_topic, 0) while True: client.loop() |
The SCORING_URL
is an endpoint of the anomaly detection inference service. A deep learning model trained in TensorFlow is exposed through a Flask web service.
Below is the payload published to the MQTT topic by the predictor service:
Training the Anomaly Detection Model
A historical dataset with over 20,000 data points is used to train the anomaly detection model.
From the dataset, it is observed that a fan’s rotation decreases hours before failing. Along with that, the vibration, sound, and temperature values increase indicating impending failure.
A scatter plot of the rotation data shows this visually. The RPM of a fan falls to 400 from the normal average of 600.
Based on this, we can easily train a simple TensorFlow logistic regression model to predict the faulty fan. We start by getting rid of the timestamp and the deviceID column.
1 2 3 |
dataframe = pandas.read_csv("../data/fan.csv", header=None,skiprows=1) del dataframe[0] del dataframe[1] |
The dataset is then split into train and test data after separating the features and label.
1 2 3 4 |
dataset = dataframe.values X = dataset[:,0:4].astype(float) y = dataset[:,4] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33) |
We then create a neural network with 4 layers that does logistic regression.
1 2 3 4 5 6 7 |
model = Sequential() model.add(Dense(60, input_dim=4, activation='relu')) model.add(Dense(30, activation='relu')) model.add(Dense(10, activation='relu')) model.add(Dense(1, activation='sigmoid')) model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy']) model.fit(X_train, y_train, epochs=250, batch_size=32, verbose=0) |
Finally, the model is saved and evaluated.
1 2 3 |
model.save("../model") loss, acc = model.evaluate(X_test, y_test, verbose=0) print('Test Accuracy: %.3f' % acc) |
The TensorFlow model saved to the disk is loaded by the inference service to perform predictions on the data sent by the predictor microservice.
Time-Series Data and Visualization
An instance of InfluxDB is connected to Mosquitto via Telegraf. This configuration gives us an elegant mechanism of ingesting time-series data into InfluxDB without writing a line of code.
Below is the Telegraf configuration that bridges Mosquitto with InfluxDB.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
[agent] interval = "10s" round_interval = true metric_batch_size = 1000 metric_buffer_limit = 10000 collection_jitter = "0s" flush_jitter = "0s" debug = false quiet = false hostname = "" omit_hostname = true [[outputs.influxdb]] urls = ["http://influxdb:8086"] database = "fan" retention_policy = "autogen" precision = "s" timeout = "5s" [[outputs.file]] files = ["stdout"] data_format = "influx" [[inputs.mqtt_consumer]] servers = ["tcp://mosquitto:1883"] qos = 0 topics = [ "fan/#" ] insecure_skip_verify = true client_id = "" data_format = "json" name_override = "fan" tag_keys = ["deviceID"] json_string_fields = ["rotation","temperature","vibration","sound","fault"] |
The time-series data can be now queried from InfluxDB.
Finally, we connect a Grafana dashboard to InfluxDB to build a beautiful visualization for our AIoT solution.
In the next part of this tutorial, I will discuss the deployment architecture along with the storage and network considerations based on K3s, Calico, and Portworx. Stay tuned.
Janakiram MSV’s Webinar series, “Machine Intelligence and Modern Infrastructure (MI2)” offers informative and insightful sessions covering cutting-edge technologies. Sign up for the upcoming MI2 webinar at http://mi2.live.