Processing Large Data Sets in Fine-Grained Parallel Streams with SQL
SQL is broadly used as a data access language, and Trino provides a powerful engine for SQL access to multiple data sources. However, as more and more real-time data becomes available, developers will need to process large data sets at an unlimited scale with predictable performance.
The Aerospike Trino Connector leverages specific Aerospike mechanisms for accessing large data sets over parallel streams. It works with Trino’s distributed computation framework and its cost-based optimizer (CBO) to define data splits, push down query operations and leverage appropriate indexes. The result is faster time to insights that speed decisioning and business outcomes.
This article explains mechanisms for accessing large data sets over parallel streams and describes some schemes for defining data splits for parallel processing, as well as a framework for testing them.
Please follow along in the adjunct interactive tutorial.
Parallel Processing of Large Data Sets
To process large data sets, a common scheme is to split the data into partitions and assign a worker task to process each partition. The partitioning scheme must have the following properties:
- The partitions are collectively exhaustive, meaning they cover the entire data set, and mutually exclusive, meaning they do not overlap.
- They are deterministically and efficiently computed.
- They are accessible in an efficient and flexible manner as required by worker tasks, for example, in smaller chunks at a time.
Data Partitions in Aerospike
The three types of Aerospike indexes — primary, set and secondary — are partition-oriented. That means they are split by partitions at each node, and queries are processed at each node over individual partitions. A client can request a query to be processed over specific partitions so that multiple client workers can work in parallel. It is easy to see how parallel streams up to the total number of partitions (4,096) can be set up for parallel processing data streams.
Pagination is supported with Aerospike queries where the client can process a chunk of records at a time by repeatedly asking for a certain number of records until all records are retrieved.
Splitting Data Sets Beyond 4,096
Many data-processing platforms allow more worker tasks than 4,096. For example, Spark allows up to 32K worker tasks to run in parallel. Trino allows theoretical concurrency greater than 4,096.
Aerospike allows for data splits larger than 4,096 by allowing a partition to be divided into subpartitions efficiently. The scheme is based on the “digest-modulo” function that can divide a partition into an arbitrary number of non-overlapping and collectively complete subpartitions. It involves adding the filter expression “digest % N == i for 0 <= i < N”, where the “digest” is the hashed key of the record.
The advantage of the “digest-modulo” function is that it can be evaluated without reading individual records from the storage device, such as SSDs. Digests of all records are held in the primary index, which resides in memory. Therefore, determining the membership of a digest, and equivalently of the corresponding record, in a subpartition is fast. Each subpartition stream needs to read only its records from the potentially slower storage device, although it needs to perform the in-memory digest-modulo evaluation, which is much faster, for all records.
This scheme works for primary-index and set-index queries because they hold digests of records. The secondary index holds the primary index location of the record, and a lookup provides the digest information.
Defining and Assigning Splits
How are the splits over a data set defined and assigned to N worker tasks, where N can vary from one to any arbitrarily large number? In reality, there would be an upper bound on N on a given platform because of either a platform-defined absolute limit, or the overhead of processing a large number of parallel streams and coordinating across them can negate the benefits.
It is important to understand what partitions or subpartitions can be requested in a single Aerospike API call:
- Full partitions and subpartitions cannot be mixed in a call.
- Full partitions must be consecutive in order, or “(pstart-id, pcount)”.
- Subpartitions must be consecutive, belong to consecutive partitions and use the same modulo factor, or “(pstart-id, pcount, sstart-id, scount, m)”.
The goal is to achieve best efficiency with the operations available in the APIs.
Split Assignment Schemes
We will examine three variations of split assignment.
If N is the requested number of splits:
- At most N splits (can be fewer), same sized, one API call per split.
- At least N splits (can be more), same sized, one API call per split.
- Exactly N splits, same sized, up to three API calls per split.
The first two allow specific, discrete values of splits to allocate the same amount of data (as partitions or a subpartition) and choose the closest allowed number of splits that is a factor or multiple of 4,096. Each split is processed with one API call.
The third one allows any number of splits with the same sized data assignment of partitions and/or subpartitions. Each split, however, may require up to three API calls.
Parallel Query Framework
The parallel stream processing from the above split assignments can be tested with the following simple framework. It can be tweaked to suit the needs of the intended workload and environment.
The test data consists of 100,000 records (can be changed) of ~1KB size, with a secondary index defined on an integer bin.
The processing takes place as follows (tunable parameters are italicized):
- Splits assignments are made for the requested number of splits and the desired split type.
- The desired number of workers (threads) are created. All workers start at the same time to process the splits. Each worker thread does the following in a loop until there are no unprocessed splits available:
- Obtain the next scheduled split.
- Create one or more query requests over the split’s partitions and subpartitions and process them sequentially.
- Assign the secondary-index query predicate depending on the requested query type.
- Create the requested filter expression. Append it (with AND) to the subpartition filter expression if one is being used; otherwise, use it separately.
- Process the query with the filter in the requested mode (sync or async).
- Get chunk-size records at a time until all records are retrieved.
- Process the records using the stream-processing implementation. The CountAndSum implementation:
- Aggregates the number of records in a count by the worker.
- Aggregates an integer bin value in a sum by the worker.
- Aggregates count and sum across all workers at the end.
- Waits for all workers to finish and outputs the aggregated results from stream processing.
In the CountAndSum example, the total number of processed records and the sum of the integer bin across all records must be the same for a given query predicate and filter irrespective of the number of splits, split type, number of workers and processing mode.
Parameters and Variations
This requires a wide range of action, including number of splits, number of workers, query index types and more. Please follow along in the adjunct interactive tutorial for a deeper understanding.
Use Cases for Fine-Grained Parallelism
Processing speed can benefit from a very high degree of parallelism for a very large data set processed with transforms, aggregations and updates.
Multiple data sets that need to be joined and require shuffling subsets across a large number of worker nodes may not benefit from a very high parallelism in retrieval. The cost of transfer of data in subsequent steps across a large number of worker nodes can limit the benefit of fine-grained retrieval. A cost-based optimizer on the processing platform should be able to determine the best level of parallelism for data access from Aerospike for a given query.