Spark 2.3 Brings ‘Continuous Processing’ to the Big Data World
With an open source project, it’s difficult to keep a secret. The recent release of Apache Spark, numbered 2.3, will begin the data processing engine’s experimental built-in support for deploying services on the Kubernetes open source container orchestration engine, officially hitching itself to the data center’s biggest bandwagon.
But the next stage in an ongoing rollout of what Spark calls its Structured Streaming API will include what’s being described as a low latency continuous processing mode. Coupled with increased support for SQL and DataFrames over streams, the new mode has the potential for enabling long pre-existing batch processes to run as stream processes without modification, according to Spark co-creator Matei Zaharia, speaking with The New Stack.
“Traditionally, Spark has just focused on high throughput,” said Zaharia, “with a latency of a half-second to a second. But now you can also get super-low latency, millisecond execution for the same code if you’re willing to lose a little bit in throughput. So you can deploy the same application with these settings.”
How Small Does a Batch Have to Be?
Back in 2016, Spark had a fairly fast batch processing engine, at least compared to the Hadoop engines it was already replacing, such as MapReduce. In fact, it had already begun implementing what Zaharia dubbed structured streaming. At first, it was explained as a way to remove the idea of “microbatching” — trying to make batch processes really fast by cutting them down really small — from the Spark API, without actually changing the API calls themselves.
With “Spark” and “streaming” being automatically paired together in developers’ discussions, Spark needed a way to actually pull off streaming for real. Its competition, even in just the Apache domain, had started to beat up on Spark. Proponents of Apex (donated to the Apache Foundation by DataTorrent) and Flink were implying that Spark’s structure streaming was actually a kind of mask for microbatching — a way to make small clumps of data seem like streams if they were going by fast enough.
Last May, Databricks software engineer Michael Armbrust, inspired by a conversation with Zaharia and two other engineers, launched an effort to develop an alternative processing engine for Spark. One use case that came to mind was a fully pipelined read/write stream for Kafka messages. Spark and Kafka, you’ll recall, form the bookends of the emerging “SMACK Stack.” If Kafka is to be relied upon to send messages pursuant to stream processing, a 100 ms latency may as well be 100 minutes.
If you’ve ever siphoned off water from a garden hose going uphill, you’re familiar with what a public works engineer would consider “pipelining,” as opposed to a Jenkins engineer. The flow of fluid through a pipe or a hose creates suction that draws more fluid to fill the vacuum. This is the type of pipelining Spark’s engineers were striving for: an automatic feed. Otherwise, the way things stood, each microbatch had to be launched individually. So the time consumed in processing a “stream” of these microbatches could never get faster than the launch time.
“We know that streaming systems exist in the real world,” Armbrust told attendees last June at Spark Summit 2017, “which is messy. Data arrives late, out-of-order. So we wanted to make working with event time a native part of the API and of the engine.”
True streaming, as opposed to the previous methodology that Databricks calls “Dstreaming,” is critical to machine learning applications, where the act of querying data must run concurrently with the ability to discern false data within the return stream. If you can imagine both sets of processes bound by latencies, you can picture how each process waiting for the other to finish can accumulate latencies greater than either one could generate on its own.
“What this means is, we’ve eliminated microbatches from Spark streaming,” explained Armbrust, who is also Databricks’ engineer for Spark SQL, Spark’s query language for structured data. “It supports asynchronous checkpointing, and for some applications, latency is as low as sub-millisecond.”
Beginning with version 1.6, the integration of Spark SQL necessitated the introduction of a collection type it called DataFrame. It was a way for SQL to assemble query results like a regular relational table, for all intents and purposes, but for Spark to maintain the associations it needs to go beyond mere relational associations. A Hive table from Hadoop could be reconstructed as a Spark DataFrame.
For Databricks to achieve its goals for structured streaming, it needed to assemble the results being carried forth in streams, in such a way that they could still be addressed as DataFrame objects.
“You can use the DataFrame API as just one of the programmatic ways to write [a query], that’s very familiar to a lot of users, especially of Python,” explained Zaharia in his discussion with us. “Or you can use SQL to write a query. And what were doing in this new streaming project is saying, any application you wrote as a batch query, we can incrementalize it and run it as a streaming query.”
This way, an old query could suddenly have a completely new purpose, without any transformation to the code. A report on events pursuant to a retailer’s customer activity, as one example Zaharia gave us, that was run once per night and delivered to marketers in the morning, can instantly become a dashboard query that delivers customer events in near-real-time.
“If you know how to do your thing as a batch application,” said Zaharia, “you just write it the same way, and it will learn it for you continuously. You get the same result that you would have gotten with the batch query, but you get it updated continuously as new data arrives.”
Although it was Hadoop that originally liberated organizations from the 20th-century traps of their old data warehouses, what many later discovered was that they had traded one trap for another. Although it could run processes in parallel as operating systems could never do before, and it accessed volumes at scales that were impossible in the client/server era, Hadoop was only a moderately efficient batch processing system. Compared with the tempo set by modern server processors, Hadoop’s original gear is actually surprisingly slow.
So organizations have already begun investing in integration tools that would bridge Hadoop’s batch processing (already being called “legacy data”) with streaming data engines. For example, a company called Syncsort produces an integration package called DMX-h, originally as a bridge between mainframes and Hadoop clusters. The idea is to utilize a GUI with which a data engineer can effectively draw the schema of the mainframe data. An interpreter then re-envisions that drawing as a set of API calls to Syncsort’s integrator, which then feeds a subsequent query to Hadoop. Later, DMX-h was expanded to include Spark.
Before long, DMX-h was being explained as an integrator between one big data engine and the other, with both Hadoop and Spark described as “frameworks,” perhaps borrowing a bit of lexicon from Apache Mesos.
A Syncsort white paper gives you an idea of what its engineers were aiming for: “Intelligent execution enables organizations to execute the identical data pipeline created with the DMX-h GUI on different underlying compute environments, such as Hadoop v1, Hadoop v2 or Apache Spark, without making any changes,” it reads. “These jobs can then be deployed on-premise(s) or in the cloud. Additional frameworks will be supported over time as they are adopted in the Hadoop community.”
But if your imagination still has that picture of two latency-bound operations depending upon one another, increment that number and consider what probably happens next.
In 2015, a team led by University of Zurich researcher Daniele Dell’Aglio, foresaw this very problem in a paper that was ostensibly about the use of a generalized vocabulary for producing integration systems. That paper was published in a volume entitled Data Management in Pervasive Systems.
Within organizations, Dell’Aglio and his colleagues wrote, integration systems tend to be written (or perhaps drawn) by the individuals likely to use them. The product of those integrators usually ends up being siloed data, customized to suit the exclusive needs of the integrators themselves. “The different business and legal requirements will influence their respective data representations,” the team wrote. “For example, a social network is unlikely to keep a record of what pieces in the site its users asked information about, while the QR code-based system is unlikely to keep track of what impressions its users shared with their friends.”
Through no fault of their own, integrators created these deposits of data by-products that served little or no purpose beyond satisfying the needs of their respective queries. The solution the team suggested was a kind of common conceptual model, enabling a broad vocabulary to address data from streams without the need for integrators.
Databricks’ solution to the problem is far less philosophical and is more in tune with the fact that data scientists and database developers have already standardized on SQL. Why develop a whole new language when the existing one will do? Since SQL essentially frames the results of operations, why should it matter to the query language how the engine fetches those results? Swapping a microbatch engine with a streaming engine should not impact SQL’s phraseology, or perhaps, that of any API built on top of SQL.
“In most streaming systems, you need to learn a whole different programming interface to understand how to do stuff in a streaming way,” Matei Zaharia told us. “How do I keep track of state? How do I make things update incrementally if just a few events arrive? Here, if you know how to write the query in the first place, the engine does all that for you automatically.”
Feature image of leaf-cutting ants of the species Atta colombica, by Bandwagonman, released under Creative Commons 3.0.