Streamline Event Management with Kafka and tmctl

DevOps teams and platform engineers in support of event-driven application developers face the challenge of capturing events from sources such as public cloud providers, messaging systems or in-house applications, and reliably delivering filtered and transformed events to consumers depending on their needs.
In this post, we’ll run through a solution that uses Kafka and TriggerMesh’s new command-line interface called tmctl
to centralize and standardize events so we can apply transformations and filters in a uniform way before pushing events to downstream consumers for easy consumption.
The source code for all the examples is available on GitHub.
The Problem Illustrated
An e-commerce company needs to process orders from different order management systems. The company’s in-house order management system writes orders to a Kafka topic, but others that were added over time work differently: One pushes orders over HTTP, another exposes a REST API to invoke, another writes orders to an AWS SQS queue. Order structure varies by producer and needs to be massaged into shape. For all producers, orders are labeled with a region (EU, US, etc.) and a category (electronics, fashion, etc.) and come in every possible combination.
A downstream team of app developers is asking to consume global book orders to create a new loyalty card. A separate analytics team wants to consume all European orders to explore expansion opportunities in the region. Each of these consumers wants specific events from the overall stream, sometimes with specific formats, and they both want to consume them from dedicated Kafka topics.
You’re tasked with capturing orders from the four order management systems in real-time, standardizing them, filtering and delivering them to Kafka topics dedicated to each consumer.
TriggerMesh as a Unified Eventing Layer
We’ll show how to use TriggerMesh to ingest orders, transform and route them for consumption on Kafka topics. There are other tools that could address this problem, each with its quirks and perks. TriggerMesh has found appeal with engineers with DevOps roles due to its declarative interface and Kubernetes native deployment.
A typical TriggerMesh configuration is made up of the following components:
Sources
Sources are the origin of data and events. These may be on-premises or cloud-based. Examples include message queues, databases, logs and events from applications or services.
All sources are listed and documented in the source’s documentation.
Brokers, Triggers and Filters
TriggerMesh provides a broker that acts as an intermediary between event producers and consumers, decoupling them and providing delivery guarantees to ensure that no events are lost along the way. Brokers behave like an event bus, meaning all events are buffered together as a group.
Triggers are used to determine which events go to which targets. A trigger is attached to a broker and contains a filter that defines which events should cause the trigger to fire. Filter expressions are based on event metadata or payload contents. If a trigger fires, it sends the event to the target defined in the trigger. You can think of triggers like push-based subscriptions.
Transformations
Transformations are a set of modifications to events. Examples include annotating incoming events with timestamps, dropping fields or rearranging data to fit an expected format. TriggerMesh provides a few ways to transform events.
Targets
Targets are the destination for the processed events or data. Examples include databases, message queues, monitoring systems and cloud services. All targets are listed and documented in the targets documentation.
Setting Up the Environment
To provision the Kafka topics for the example, I’m going to use RedPanda, a Kafka-compatible streaming data platform that comes with a handy console. I’ll run both on my laptop with its provided docker compose file, which I’ve tweaked a bit for my setup. You can use any Kafka distribution you like.
docker-compose up -d
and away we go, the console becomes available at http://localhost:8080/ by default.
We’re going to use tmctl
, TriggerMesh’s new command-line interface that lets you easily build event flows on a laptop that has Docker. To install it, homebrew does the job for me:
brew install triggermesh/cli/tmctl
There are other installation options available.
Ingest Orders from Kafka
We’ll start by creating a broker, the central component of the event flow we’re going to build. It’s a lightweight event bus that provides at-least-once delivery guarantees and pub/sub style subscriptions called triggers (and their filters).
tmctl create broker triggermesh
And now we’ll use a Kafka source component to ingest the stream of orders into our broker:
tmctl create source kafka --topic orders --bootstrapServers <url> --groupID mygroup
.
In a separate terminal, I’ll start watching for events on the TriggerMesh broker with the command tmctl watch
.
We can now send an event to the orders
topic using the RedPanda Console:
If we look at the terminal running the watch
command, we see the event show up there, which means the event has been ingested by the broker. Notice how the event has been wrapped in a standard envelope based on the CloudEvents specification. We’ll see how we can leverage this envelope later on.
Transform and Route Events to the Right Topics
We’re going to want to route global book orders to the orders-global-books
topic, and all EU orders across all categories to the orders-eu-all
topic. Before we can do that, we need to extract the region and category from the event payload into event headers (CloudEventsattributes to be specific), so that we can later filter against these headers with trigger filters.
For this, we’ll use a TriggerMesh JSON transformation, which provides a low-code approach to modifying a JSON event’s payload and metadata. Here we’re storing the values of region
and category
from the payload as variables (second half of the code) and using them to modify the event type attribute to be of the form $region-$category-v1
.
1 2 3 4 5 6 7 8 9 10 11 12 |
context: - operation: add paths: - key: type value: $region-$category-v1 data: - operation: store paths: - key: $region value: region - key: $category value: category |
I’m giving the event type a version, so we can more easily make additional transformations down the road and evolve the version at each stage. This will provide the possibility for consumers to migrate from one version of events to another at their own pace and provide more flexibility to modify the flow of events with minimal impact to other components.
We’ll put this transformation code in a file and create a new transformation that references it, along with a trigger that routes events from the original order
topic (which are of type io.triggermesh.kafka.event
as you can see in the first output from tmctl
watch above) to this transformation:
tmctl create transformation --name transform-extract-region-category -f transformations/orders-add-region-category.yaml
.
tmctl create trigger --eventTypes io.triggermesh.kafka.event --target transform-extract-region-category
Now if we send the same event into the orders
topic, we’ll see two events show up in tmctl watch
: the original, followed by the transformed event, the latter should look like this:
Notice that the event type is now eu-fashion-v1
. This is perfect for the routing we want to do in the next step.
Now let’s create some Kafka targets that will write events to the two dedicated Kafka topics for our app developers and analytics consumers.
tmctl create target kafka --name orders-global-books-target --topic orders-global-books --bootstrapServers <url>
tmctl create target kafka --name orders-eu-all-target --topic orders-eu-all --bootstrapServers <url>
These targets will create the necessary Kafka topics for you as you’ll see in the RedPanda console. However, these Kafka targets aren’t doing anything yet, because I haven’t routed any events to them.
Let’s create the triggers that will send events to their respective Kafka targets and thus to their respective Kafka topics:
tmctl create trigger --name global-books --eventTypes eu-books-v1,us-books-v1 --target orders-global-books-target
tmctl create trigger --name eu-all --eventTypes eu-fashion-v1,eu-books-v1,eu-electronics-v1,eu-groceries-v1,eu-pharma-v1 --target orders-eu-all-target
.
Each trigger defines the types of events that should fire the trigger, as well as a target component (Kafka targets here) to which events should be delivered.
If we send the original event again now, because its event type has become eu-fashion-v1
, it’ll get routed to the orders-eu-all
Kafka topic. We can see it there in the RedPanda console:
At any given moment, we can use the command tmctl describe
to see the TriggerMesh components we’ve created, their status and parameters:
Europeans Need a Special Format
A general manager from the EU region says, the formatting of item IDs in Europe should be of the form _item_uuid_184
, as opposed to the United States where item IDs are simple numbers like 184
. Ah, those pesky Europeans (I would know).
We’ll add a new JSON transformation that only transforms the itemid
value for the eu
region, and it’ll also bump the version of these events to v2
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
context: - operation: add paths: - key: type value: $region-$category-v2 data: - operation: store paths: - key: $itemid value: item.itemid - key: $region value: region - key: $category value: category - operation: add paths: - key: item.itemid value: _item_uuid_$itemid |
Because copies of events in both versions v1 and v2 will be flowing through the broker, consumers of v1 can continue uninterrupted, so long as trigger filters are still listening for v1 when it initially makes it into the broker.
We can now create the transformation and the trigger that will route all the v1 EU orders to the transformation, so that they can be transformed into v2 EU orders.
tmctl create transformation --name transform-eu-format -f transformations/orders-eu-format.yaml
tmctl create trigger --eventTypes eu-fashion-v1,eu-electronics-v1,eu-books-v1,eu-groceries-v1,eu-pharma-v1 --target transform-eu-format
We’re also going to update the trigger for the EU target to filter for v2 events instead of v1 events. Note that recreating a trigger (or any other component) with the same name or parameters results in an update. Again, this is where you could decide to only start routing v2 to a single consumer for now, before rolling it out to the others that aren’t yet ready for v2 or simply in order to reduce the blast radius if something were to go wrong.
tmctl create trigger --name eu-all --eventTypes eu-fashion-v2,eu-books-v2,eu-electronics-v2,eu-groceries-v2,eu-pharma-v2 --target orders-eu-all-target
If we send the same original event into the orders
topic, we now see the transformed v2 event appear in the orders-eu-all
topic.
We now have events flowing across Kafka topics and some transformations in place, as illustrated below.
Next, let’s add some new sources of orders.
Integrate Orders Pushed over HTTP
The next order management system we need to integrate is pushing orders out over HTTP. So to be able to ingest these into TriggerMesh, we’ll create a webhook source that exposes an HTTP endpoint:
tmctl create source webhook --name orders-webhook --eventType orders-legacy
I’m giving the orders-legacy
type to the orders coming from this webhook because they aren’t formatted according to the latest standards. The orders are arriving as follows:
1 2 3 4 5 6 7 8 9 10 |
{ "orderid": 11, "ordertime": 1497014121580, "region": "us", "category": "books", "itemid": "331", "brand": "Penguin", "itemcategory": "Edutainment", "name": "Bonnie Garmus - Lessons in Chemistry" } |
We need to transform these events as they arrive in TriggerMesh, following which they’ll be processed by the rest of the pipeline we’ve already created:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
context: - operation: add paths: - key: type value: io.triggermesh.kafka.event data: - operation: store paths: - key: $itemid value: itemid - key: $brand value: brand - key: $itemcategory value: itemcategory - key: $name value: name - operation: add paths: - key: item.itemid value: $itemid - key: item.brand value: $brand - key: item.category value: $itemcategory - key: item.name value: $name - operation: delete paths: - key: itemid - key: brand - key: itemcategory - key: name |
We’re doing a little trick here by setting their event type to io.triggermesh.kafka.event
so that they get picked up by the first transformation we created.
We’ll create the transformation component and route the legacy orders to it as follows:
tmctl create transformation --name transform-orders-webhook-legacy -f transformations/orders-webhook-legacy.yaml
tmctl create trigger --eventTypes orders-legacy --target transform-orders-webhook-legacy
The order management system will push events to the webhook over HTTP, and we can simulate this using curl as follows:
curl -X POST -H "Content-Type: application/json" -d @mock-events/webhook_raw.json <webhook URL>
To get the webhook’s URL, you can use tmctl describe
and find the URL next to the webhook component called orders-webhook
.
Integrate Orders Provided by an HTTP Service
The next order management system we need to integrate provides an HTTP API that we need to regularly poll for new events. This service also produces the legacy order format that we’ll need to transform.
First, we’ll start a mock HTTP service locally to simulate this service, in a new terminal (requires Python 3):
python3 -m http.server 8000
In the directory I’m working in, there is an example legacy json event in the file mock-events/legacy_event.json
that this HTTP server can serve up.
Now, we create the HTTP Poller:
tmctl create source httppoller --name orders-httppoller --method GET --endpoint http://host.docker.internal:8000/mock-events/http_poller_event.json --interval 10s --eventType orders-legacy
.
You can adjust the endpoint depending on your environment. I’m using host.docker.internal
because I’m running on Docker Desktop.
The beauty here is that we’re also setting the type of these events to order-legacy
. This means that without any additional work, we know that these orders will get processed by the pipeline we just created for the webhook orders, meaning they’ll be reformatted to the new standard, transformed to extract the necessary metadata, etc.
You should now see these events appearing in TriggerMesh every 10 seconds and being routed to the orders-global-books
Kafka topic.
Integrate Orders from an SQS Queue
The final order management system that needs to be integrated provides orders through an AWS SQS queue. To read from the queue, we can create an SQS source with TriggerMesh:
tmctl create source awssqs --arn <queue-arn> --auth.credentials.accessKeyID <id> --auth.credentials.secretAccessKey <secret>
Now I’ll send an event into SQS that matches our initial order format:
Surprise! When I look at the event coming into TriggerMesh with tmctl watch
, I notice that the order data is enveloped in a lot of AWS metadata, some of which is shown below:
Because we don’t need this metadata, we’ll extract the body of the SQS message so that the incoming event matches the schema we want. It is a simple case of extracting the Body
attribute and setting it as the root of the payload. We’ll also set the event’s type to the same one produced by the Kafka orders source, so that it gets processed by the same set of transformations further down the pipe.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
context: - operation: add paths: - key: type value: io.triggermesh.kafka.event data: - operation: store paths: - key: $payload value: Body - key: $category value: category - operation: delete paths: - key: - operation: add paths: - key: value: $payload |
Again, we’ll create the transformation and a trigger to send events to it:
tmctl create transformation --name transform-sqs-orders -f transformations/orders-transform-sqs.yaml
tmctl create trigger --eventTypes com.amazon.sqs.message --target transform-sqs-orders
Benefits of This Approach
We’ve just created a unified eventing layer that can ingest, transform and filter events from heterogeneous sources and reliably filter and deliver them to different consumers.
- Decoupling: The decoupling of event producers and consumers, which is inherent to this style of event-driven architecture, makes it easy to evolve the topology as requirements change. It’s easy to add new consumers without affecting other producers or consumers, we just need to create a new trigger and target. Likewise, we can easily add new order management systems and transform them to fit the right schema. This provides agility and maintainability.
- Versioning: By versioning our events with the type metadata, we’re able to progressively roll out changes and consumers can adopt new versions at independent paces.
- Integration: The solution lets you integrate heterogeneous event producers and consumers without imposing constraints like schemas or SDKs they must embed.
- Push or pull: The combination of Kafka and TriggerMesh gives us an interesting combination of message exchange patterns and guarantees. In our example, the event consumers will pull events from Kafka. But if we wanted to, we could push events directly to consumers using other TriggerMesh target components like the CloudEvents target.
- Short inner-loop:
tmctl
makes the process of creating an event flow interactive and iterative. You can see results immediately and adjust. - Transition to declarative on K8s: When you’re ready to move to a more declarative workflow, you can
tmctl dump
your configuration as a Kubernetes manifest that you can then apply onto any cluster that has TriggerMesh installed. - Multicloud and open source: The components used in this tutorial are Apache 2.0 (TriggerMesh) and source-available (RedPanda). They are cloud-agnostic and will run wherever you need them.
If you’d like to take it for a spin yourself, you can either head to the GitHub repo for this example or try to create something yourself by starting with the quickstart guide.