Time Series Analysis on Real-Time Data with Kafka and Kinetica

The growth in popularity of time series data is driven by the increasing availability of sensors and other devices that generate data in real time, as well as the growing recognition of the value in predicting future trends.
Time series analysis is a more rigorous and structured approach to understanding patterns than simply charting a measurement over time. It involves analysis such as linear or exponential growth or decay or cyclical fluctuations that often aren’t apparent just by looking at a chart of the data.

Figure 1: The growth in connected devices
Time series analysis of data from real-time data feeds can be used to make informed decisions about business processes and operations. For example, businesses can use it to track energy usage and optimize machine performance, monitor customer behavior and preferences, provide predictive maintenance for equipment, detect anomalies in operations, and reduce costs.
However, a significant part of the value of time series data depreciates quickly. For instance, consider a market data feed on stock trades and quotes. Buy or sell decisions that are based on the signals from this data need to be triggered immediately before the opportunity is lost.
But, performing this type of time series analysis in real time is a significant challenge. In this article, I will illustrate the fundamental analytical capabilities required for a sophisticated time series database by setting up an end-to-end pipeline that ingests market data on stocks from two different Kafka topics, and runs time series analysis on them to trigger buy decisions on stocks. All in real time.
I will use Kinetica, a free time series database, to demonstrate the different pieces. The last section of this article includes instructions on how to install Kinetica locally or in the cloud (in just a few minutes).
Ingesting from High-Speed Data Feeds
The first requirement of a system for real-time series analysis is the ability to ingest a lot of data at speed. This requires a few things:
- First-class connectors to streaming data sources like Apache Kafka
- Perform queries and load data simultaneously i.e. the data load cannot be blocked by query execution
- Distributed ingest across all the nodes in a cluster so that a single node does not become a bottleneck
For the first query below, I create a data source using the connection credentials, the location of the Kafka cluster and the name of the quotes topic. The second query is a LOAD INTO statement that configures a streaming ingest from this data source. The data ingest process is always on and will automatically add new data from the Kafka topics into a table in the database.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
CREATE OR REPLACE DATA SOURCE quote_stream LOCATION = 'kafka://pkc-ep9mm.us-east-2.aws.confluent.cloud:9092' WITH OPTIONS ( kafka_topic_name = 'quotes', credential = 'trade_quote_creds' --set using a diff. query ); LOAD DATA INTO quotes FORMAT JSON WITH OPTIONS ( DATA SOURCE = 'quote_stream', -- name of the data source SUBSCRIBE = TRUE, TYPE_INFERENCE_MODE = 'speed', ERROR_HANDLING = 'permissive', kafka_offset_reset_policy = 'latest', -- load the latest ); |
A Note about the Data
Kafka uses the concept of topics that receive “messages” that encapsulate incoming data. You can think of each message as a row in a table. I have set up two synthetic Kafka topics for this demonstration.
- Trades topic: The trades topic records intraday trades data on a per-minute basis. This data includes the open, close, low and high price, and the traded volumes every minute for three stocks — Apple, Amazon and Google.
- Quotes topic: The quotes data is produced at the rate of five to six messages per second. This records the bid price, bid size, ask price and ask size for the three stocks.
I have set up the topics so that they mimic market feeds but the information in the topics doesn’t reflect actual prices. Another point to note is that actual market feeds are operational only during market hours (9 a.m. to 5 p.m. or extended hours), however, the topics that I have set up produce data continuously so that the code works regardless of the time zone you are in.

Figure 2: Highest daily traded price.
Date and Time Functions
Manipulation of date and time values is a fundamental part of time series analysis. Any analytical database that you pick for time series analysis needs to offer date and time functions that make it easy to wrangle time series data. I have listed a few of the common analytical tasks below
Calculating Durations
The query below calculates the total duration of the trades data in days. The keyword “DAY” in the query can be changed to other values such as “MONTH”, “YEAR”, “SECONDS” etc. to change the unit used to express the duration.
1 2 |
SELECT TIMESTAMPDIFF(DAY, MIN(time), MAX(time)) AS days_duration FROM trades |
Time Buckets
Another common task, particularly when dealing with large amounts of data is to create buckets to aggregate a particular value so as to identify patterns.
A time bucket function allows us to create an arbitrary interval to categorize the time (or date) values in a table. Let’s use it to specify 30-minute buckets and calculate the total traded stocks for those 30-minute intervals over the previous day. The result of a time bucket function is the start of the interval in date time format. I use the SPLIT() function to discard the date part so that labels look better on the chart.
1 2 3 4 5 6 7 8 9 10 |
SELECT SPLIT( TIME_BUCKET(INTERVAL 30 MINUTES, time, 0, 0), --buckets ' ', 2) AS time_bucket, SUM(trading_volume) AS total_volume FROM trades WHERE date(time) = DATE(DATEADD(DAY, -1, NOW())) GROUP BY time_bucket ORDER BY time_bucket |

Figure 3: Total traded volume every 30 minutes yesterday.
Add or Subtract Dates and Time
Sometimes we may need to add or subtract date and time values. This can be useful when trying to filter a specific portion of the data, correct for a consistent lag in date time recordings or jump forward or backward along a timeline. One of the easiest ways to do that is using an INTERVAL function. The simple query below adds exactly a day to the current time.
1 |
SELECT NOW() + INTERVAL 1 DAY AS same_time_tomorrow |
Window Functions
Window functions are applied to a window of records that are ordered on some column in the table (usually time when applied on time series data). Each window is constructed in reference to the current row. And the result set of this operation consists of one value per row in the table.
1 |
<window function> OVER <window specification> |
Window functions are essential for time series analysis. Some of its many applications include the following.
- Detecting Trends and Seasonality: Window functions can be used to identify trends and seasonality in time series data by computing the rolling mean or rolling standard deviation over a window of data points.
- Smoothing Data: Window functions are also useful for smoothing noisy time series data by applying a rolling average over the window of data points. This can be useful for identifying underlying patterns in the data that would otherwise be difficult to detect.
- Outlier Detection: By calculating the rolling standard deviation over a window of data points, outliers can be identified and removed from the analysis. This helps to ensure that results are not skewed by extreme values in the dataset.
- Feature Extraction: Window functions can also be used to extract features from time series datasets by calculating rolling means, rolling sums, and other statistical measures over a window of data points. This is particularly useful for extracting features such as trend, seasonality, and cycle components from a time series dataset which can then be used for more complex analyses such as forecasting and predictive modeling.
In the query below, I use a window function to find the moving average over the five previous observations for each observation over the previous day:


Figure 4: Average hourly price gap for the last 24 hours.
Keep Things Real Time with Continuously Updated Materialized Views
In the previous query, I used a materialized view to store the results of the query. Note the second line that specifies the refresh interval of five seconds. This implies that the query is updated every five seconds with the most current version of the data. So as new data arrives into the trades topic those are automatically included in the moving average calculation. Additionally, note the WHERE clause. It references the current time using NOW(). With each refresh, the query will also advance the update to remove any observations corresponding to the first five seconds in the previous result set.
Timestamps Require Inexact or Interval-Based Joins
Sophisticated analysis and decision-making require data to be combined from different tables. This is easy when tables share low cardinality key columns that have the same values i.e. an equality-based join. However, this is most often not the case when dealing with high-velocity streaming data. Timestamps from different streaming sources rarely align exactly.
For instance, a simple equality-based inner join between the trades and quotes data shown below yields only nine records when trying to find quote values that have the exact same time stamps as the trade values.
1 2 3 4 5 |
SELECT * FROM trades t INNER JOIN quotes q ON t.symbol = q.symbol AND t.time = q.timestamp |
A simple change that replaces the equality operator with an ASOF function solves the problem. The ASOF function below establishes a one-second interval from each trade timestamp value within which to look for matching quotes. If there are multiple quotes within the one-second interval, we pick the value that is closest to the trades timestamp (set by the MIN parameter).
1 2 3 4 5 |
SELECT * FROM trades t INNER JOIN quotes q ON t.symbol = q.symbol AND ASOF(t.time, q.timestamp, INTERVAL '0' SECOND, INTERVAL '1' SECOND, MIN) |
This simple change yields a result set with 305 records instead of the nine we got from a strict equality-based match.
Event Streaming and Decisioning in Real Time
The final piece of the puzzle is executing decisions based on time series queries. Let’s build on the previous query to set up a materialized view that records a buy decision if the bid price from the quotes data is greater than the highest price in the previous minute in the trades data. This materialized view is set to refresh on change. As a result, buy decisions are triggered in real time as the materialized view detects and responds to new data in the trades or the quotes table.
1 2 3 4 5 6 7 8 9 10 11 12 |
CREATE OR REPLACE MATERIALIZED VIEW trade_quotes REFRESH ON CHANGE SELECT DATETIME(time) AS time, t.symbol AS symbol, price_high, bid_price FROM trades t INNER JOIN quotes q ON t.symbol = q.symbol AND ASOF(t.time, q.timestamp, INTERVAL '0' SECOND, INTERVAL '1' SECOND, MIN) WHERE bid_price > price_high |
Finally, to execute the trade we will need to stream any new records that come into this view to an external system. We can do that by setting up a stream. The query below sets up a stream that listens for changes to the materialized view and sends buy decisions to a webhook.
1 2 3 4 5 6 |
CREATE STREAM buy_events ON trade_quotes REFRESH ON CHANGE WITH OPTIONS ( DESTINATION = 'https://ertzvoll4sdf60x.m.pipedream.net' ) |
Try This on Your Own
I have configured the destination webhook in the STREAM query above to write the data out to a google spreadsheet here. You can actually trigger these buy events on your own and see them written out to the spreadsheet in real time using the Time Series Analysis workbook on Kinetica Cloud or by downloading the workbook from our Github repo here and importing it into Kinetica’s Developer Edition. I have pre-configured all the data streams so you don’t have to do any additional setup to run this workbook.
Both Kinetica Cloud and Developer Edition are free to use. The cloud version takes only a few minutes to set up and it is a great option for quickly touring the capabilities of Kinetica. However, it is a shared multitenant instance of Kinetica, so you might experience slightly slower performance. The Developer Edition is also easy to set up and takes about 10 to 15 minutes and an installation of Docker. The Developer Edition is a free forever personal copy that runs on your computer and can handle real-time computations on high-volume data feeds.