Describe and execute Kafka stream topologies with Lenses SQL

Effective browsing, querying and joining Kafka data
photo of Stefan Bocutiu
Stefan Bocutiu

Streaming Topologies out of the box!

Lenses SQL in action

Lenses SQL for Analyze, Process, Connect

Lenses SQL supports the 3 major layers of your data streaming pipeline:

  • Analyze: Run ad-hoc queries over Kafka topic in real time or history. Browsing your Kafka topics has never been easier and more powerful.
  • Process: on top of Kafka Streams API in order to run comprehensive and production-quality streaming analytics.
  • Connect: We build all our connectors by bringing SQL capability at the ingestion process.

For this article, we will do a deep dive on the processing layer.

Everything starts with once upon a time

Back in 2016, we introduced our first SQL layer for Kafka. We did so with KCQL (Kafka Connect Query Language) for all the Kafka Connectors (20+). You can find them all under Apache License 2 on github. KCQL allows the users to express the action the Kafka Connect source or sink should be executing. This streamlines the Kafka connectors configuration and furthermore, it clarifies the execution logic.

Register SQL processors, easily, visually and at Scale

As we interacted more and more with our clients, we realized the majority of flows in production can be expressed through SQL. We understood the need to enrich the Apache Kafka ecosystem with a SQL layer allowing end-to-end flows to be expressed easily and quickly through the simplicity of a structured query language. This allows the client to fully reuse skills already present in the company; the employees with their business knowledge can focus on the business requirements rather than spending time gaining a good understanding of the Kafka Streams API.

We built Lenses SQL for Apache Kafka and made it available via the Lenses platform. Lenses will take care of executing the stream flow while providing fault tolerance, monitoring, and scalability; out of the box. We are fully supporting the major deserializers like Json and Avro including decimal types (as everyone knows using doubles in a financial institution, is a bad practise)! Lenses also integrates with the Schema Registry and offers support for managing the entries via a modern web interface.

Can I get my stream processor in production in the next 2 hours?

Describing and executing the entire streaming flow in SQL, enables the users to create end-to-end flows in production by only writing a SQL query.

Let’s see a real example, where we want to create a fraud detection report for a payments provider, that demonstrates the fraudulent transactions per minute. A transaction is considered fraudulent if the actual credit card has been blocked and yet a payment has been attempted.

As seen in the picture, the above scenario can be easily achieved with a few lines of Lenses SQL to leverage the power of stream processing in Apache Kafka.

Below you can find the Lenses SQL statement through which we build and execute the Kafka stream flow calculating every minute, for each currency involved, the amount of money on fraudulent transactions.

INSERT INTO `cc_payments_fraud`

WITH tableCards AS
  (SELECT *
  FROM `cc_data`
  WHERE _ktype = 'STRING'
    AND _vtype = 'AVRO' )

SELECT STREAM
  p.currency
  , SUM(p.amount) AS total
  , COUNT(*) usage
FROM `cc_payments` AS p
  LEFT JOIN tableCards AS c
    ON p._key = c._key
WHERE p._ktype = 'STRING'
  AND p._vtype = 'AVRO'
  AND c.blocked IS true
GROUP BY TUMBLE(1,m), p.currency

The Non-Streaming Approach

Before the revolution brought by streaming data and in-flight computations, a typical approach could have been storing the transactions into an RDBMS database and periodically querying with a filter on the time between now and now - 1 minute.

Our domain space is comprised of two entities: credit-card and payments. Using Microsoft SQL as the target RDBMS instance we first need to create the tables:

CREATE TABLE `credit_cards`( `number` VARCHAR(17) PRIMARY KEY CLUSTERED
                          , `firstName` VARCHAR(50) NOT NULL
                          , `lastName` VARCHAR(50) NOT NULL
                          , `country` VARCHAR(3) NOT NULL
                          , `currency` VARCHAR(3) NOT NULL
                          , `blocked` BIT  DEFAULT 0)


CREATE TABLE `payments`( `id` VARCHAR(36) PRIMARY KEY CLUSTERED
                      , `time` BIGINT NOT NULL
                      , `amount` decimal NOT NULL
                      , `currency` VARCHAR(3) NOT NULL
                      , `creditCardId` VARCHAR(17) NOT NULL
                      , `merchantID` BIGINT NOT NULL)

The time column found in the payments table will hold the epoch time when the transaction is requested. Assuming we have a continuous flow of transactions being recorded we can get the required report with this query:

SELECT
  p.currency
  , SUM(p.amount) AS total
  , COUNT(*) AS usage
FROM  payments p 
  INNER JOIN credit_cards c 
    ON p.creditCardId = c.number
WHERE c.blocked = 1 
  AND p.time BETWEEN 1507718700 AND 1507718760
GROUP BY p.currency

The two epoch timestamp values represent the time of 11 October 2017 10:45:00 and 11 October 2017 10:46:00, respectively. This query would be used to repeatedly poll the RDBMS and produce data as below:

Currency Total Usage
USD 9999.99 12
EUR 1111.11 21
GBP 8888.88 4

The Streaming Approach

The Lenses SQL syntax template for describing Kafka streams flow is as follows:

[ SET autocreate = true;]
[ SET partitions = 1;]
[ SET replication = 2;]
[ SET `decimal.scale`= 18;]
[ SET `decimal.precision`= 38;]
[ SET `ANY KAFKA STREAMS CONFIG. See StreamsConfig` = '';]

INSERT INTO _TARGET_TOPIC_
[
  WITH
   _ID_NAME_ AS (SELECT [STREAM] ...FROM _TOPIC_ ...),
   _ID_NAME_ AS (SELECT [STREAM] ...FROM _TOPIC_ ...)
]
SELECT ...
FROM _ID_NAME_ [INNER JOIN _OTHER_ID_NAME_ ...]
WHERE ...
[
  GROUP BY ...
  [HAVING ...]
]

Via the SET statements you can:

  • Enable topic creation for the _TARGET_TOPIC_. An error will be raised if this flag is not set and the topic does not exist.
  • Set the target topic partition and replication values.
  • Provide the decimal scale and precision when working with Avro records and utilizing big decimal types. This is a requirement when precision is critical, i.e. in the financial sector.
  • Configure any of parameters accepted by the Kafka Streams API when building a stream flow.

The WITH statements allow you to define what is the equivalent of traditional RDBMS temporary tables. The SELECT statement should be self-explanatory.

In this example, the Kafka messages key is a String whereas the value is an Avro record. For those new to Kafka, a message is a key-value tuple. When it comes to Avro payloads, Lenses SQL does static validation and therefore requires the Avro schemas to be available.

Let’s walk through the logic to illustrate how everything really works and gets executed:

SET `autocreate`=true;
SET `auto.offset.reset`='earliest';

INSERT INTO `cc_payments_fraud`
WITH tableCards AS
 (SELECT *
  FROM `cc_data`
  WHERE _ktype = 'STRING'
    AND _vtype = 'AVRO' )
  
SELECT STREAM
  p.currency
  , SUM(p.amount) AS total
  , COUNT(*) usage
FROM `cc_payments` AS p
  LEFT JOIN tableCards AS c ON p._key = c._key
WHERE p._ktype = 'STRING'
  AND p._vtype = 'AVRO'
  AND c.blocked IS true
GROUP BY TUMBLE(1,m), p.currency

The first statement in the query instructs LSQL to auto create the target topic, while the second one is a Kafka consumer setting. This setting specifies what to do when there is no initial offset in Kafka for the underlying consumer or if the current offset does not exist.

Covering the details of Kafka Streams goes beyond the scope of this blog entry, so if you need to familiarize yourself please consult the Apache Kafka documentation.

The next instruction is basically a SQL INSERT statement. It starts withINSERT INTO `cc_payments_fraud` which instructs LSQL to create a Kafka stream flow which will store the results into a topic named cc_payments_fraud. Before talking about the main logic, encapsulated by the two SELECT statements, let’s clarify the setup we have.

Kafka is message agnostic platform and in order to create a working flow, LSQL needs to know how the topic data can be read, hence the presence of _ktype and _vtype keywords. The former provides the format for the key part, whereas the latter provides the format for the value part. Using this information, LSQL can create the corresponding Serdes (check documentation if not familiar) for the Kafka stream flow.

For each of the domain entities we have a corresponding topic:

  • cc_data with 1 partition (since it’s static data) and a replication factor of 3. This holds the static information on the registered credit cards.
  • cc_payments with 3 partitions and a replication factor of 3. This holds all the requested payment transactions.

Each record sent to the cc_data topic has the Kafka message key part set to the credit card number (which is a string) and the value part is an Avro Record with the following schema:

{
  "type": "record",
  "name": "CreditCard",
  "namespace": "com.landoop.data.payments",
  "fields": [
      { "name": "number",    "type": "string" },
      { "name": "firstName", "type": "string" },
      { "name": "lastName",  "type": "string" },
      { "name": "country",   "type": "string" },
      { "name": "currency",  "type": "string" },
      { "name": "blocked",   "type": "boolean"}
  ]
}

Now we can talk about the first SELECT instruction. Using the cc_data topic we create a KTable instance. This is expressed by the following code:

WITH tableCards AS
 (SELECT *
  FROM `cc_data`
  WHERE _ktype = 'STRING'
    AND _vtype = 'AVRO')

When defining a table, it is important for the Kafka message key component to be non-null. We will not go into the details of what a KTable is but it is effectively a change log stream, giving you the latest view built from update events. Every time a credit card record is added or updated a message will end up on this topic. The tableCards KTable is stateful and will have always the latest view of this static data.

Each record sent to the cc_payments topic has the Kafka message key set to the credit card number (which is a string) and the value part is an Avro Record with the following schema:

{
  "type": "record",
  "name": "Payment",
  "namespace": "com.landoop.data.payments",
  "fields": [
    { "name": "id",           "type": "string" },
    { "name": "time",         "type": "string" },
    { "name": "currency",     "type": "string" },
    { "name": "creditCardId", "type": "string" },
    { "name": "merchantId",   "type": "long"   },
    {
      "name": "amount",
      "type": {
        "type": "bytes",
        "logicalType": "decimal",
        "precision": 38,
        "scale": 18
      }
    }
  ]
}

Setting the Kafka message key to be the credit card number is a matter of optimization. LSQL allows you to join on fields as well! However in case of describing a table (which materializes as a KTable instance) remapping the key comes with a performance impact. Because we need to re-map the message key, an intermediary topic is required. This is advanced usage and good material for another blog entry. However, Lenses SQL takes care of this automatically for you.

It could have been the case all the messages sent to cc_payments topic to have null values for the key. In that case, the query would have been the following:

SET `autocreate`=true;
SET `auto.offset.reset`='earliest';

INSERT INTO `cc_payments_fraud`
WITH tableCards AS
 (SELECT *
  FROM `cc_data`
  WHERE _ktype = 'STRING'
    AND _vtype = 'AVRO' )
  
SELECT STREAM
  p.currency
  , SUM(p.amount) AS total
  , COUNT(*) usage
FROM `cc_payments` AS p
  LEFT JOIN tableCards AS c ON p.creditCardId = c._key // JOIN VALUE FIELD TO KEY
WHERE p._ktype = 'STRING'
  AND p._vtype = 'AVRO'
  AND c.blocked IS true
GROUP BY TUMBLE(1,m), p.currency

Please note the change in the join condition: p.creditCardId = c._key.

Notice the second SELECT statement is followed by the STREAM keyword. This is a small but rather important difference when describing the flow. This time, LSQL is instructed to create a KStream instance. For the requirement we have, we don’t want to build a KTable out of the cc_payments. But instead we require a KStream instance and for each payment record Kafka Streams API will perform a lookup on the KTable defined earlier - the lookup key is the credit card number.

The remainder of the statement follows conventional SQL, the ON p.creditCardId = c._key defines the join condition. Here only one join condition is set but more than one join condition is possible. The WHERE clause is trivial, filtering for blocked cards, differing only from a standard SQL statement by setting the key and value decoders with the _kttype and _vtype keywords.

Last code line, GROUP BY TUMBLE(1,m), p.currency instructs LSQL to group the result of the join by the payment currency and calculate the aggregation for a time window of 1 minute at a time. You can use the following types of windows in LSQL:

  • Hopping time windows. These are windows based on time intervals. They model fixed-sized, (possibly) overlapping windows. A hopping window is defined by two properties: the window’s size and its advance interval (aka “hop”). The advance interval specifies by how much a window moves forward relative to the previous one. For example, you can configure a hopping window with a size 5 minutes and an advance interval of 1 minute. Since hopping windows can overlap a data record may belong to more than one such windows.
GROUP BY HOP(5,m,1,m)
  • Tumbling time windows. These are a special case of hopping time windows and, like the latter, are windows based on time intervals. They model fixed-size, non-overlapping, gap-less windows. A tumbling window is defined by a single property: the window’s size. A tumbling window is a hopping window whose window size is equal to its advance interval. Since tumbling windows never overlap, a data record will belong to one and only one window.
GROUP BY TUMBLE(1,m)
  • Sliding windows. These express fixed-size window that slides continuously over the time axis. Here, two data records are said to be included in the same window if the difference of their timestamps is within the window size. Thus, sliding windows are not aligned to the epoch, but on the data record timestamps.
GROUP BY SLIDING(1,m)
  • Session windows. These are used to aggregate key-based events into sessions. Sessions represent a period of activity separated by a defined gap of inactivity. Any events processed that fall within the inactivity gap of any existing sessions are merged into the existing sessions. If the event falls outside of the session gap, then a new session will be created. Session windows are tracked independently across keys (e.g. windows of different keys typically have different start and end times)and their sizes vary (even windows for the same key typically have different sizes). As such session windows can’t be pre-computed and are instead derived from analyzing the timestamps of the data records.
GROUP BY SESSION(10,m, 5, m)

Executing the flow would yield results similar to the ones below 1 minute apart:

    Key {"key":CAD,"timestamp":"2017:10:11 11:22:00.000"}
    Value { currency: CAD, total: 51130.08, usage: 20 }
    
    Key {"key":EUR,"timestamp":"2017:10:11 11:22:00.000"}
    Value { currency: EUR, total: 236937.22, usage: 84 }
    
    Key {"key":USD,"timestamp":"2017:10:11 02:22:00.000"}
    Value { currency: USD, total: 86313.89, usage: 37 }
    
    Key {"key":GBP,"timestamp":"2017:10:11 02:22:00.000"}
    Value { currency: GBP, total: 43038.13, usage: 14 }

You might have noticed the key in the output topic is not just the currency. This is the result of aggregating on a timed window. To read the cc_payments_fraud you have to use a Windowed<String> deserializer for the key and an Avro deserializer for the value part.

Scale-out my stream flows

Say you are a popular payments provider and deal with 10k+ transactions a second at times. In this case running a single instance of our flow might not be ideal. Given the data volume, the processing could easily not be fast enough. Furthermore, I imagine some of you might ask where is the fault tolerance.

Our engineering team at Landoop makes sure the challenge is addressed and Lenses provides, at the time of writing this, two execution modes! We are committed to extending the options to suit various client needs and thus a 3rd execution mode is already being trialed.

One of the existing modes is available only as part of the Enterprise package. It allows to scale your flows and its implementation is entirely based on Kafka group semantics and benefits from all the important guarantees you would look for in such a situation: fault tolerance, monitoring, auto restarts, flow instance distribution, etc.

Topology view is there, and yet not a word on it

You would have not expected us to leave this part out, would you?

Seeing the actual Kafka stream execution plan diagram, leaving aside the visual embellishment, allows for a better understanding of what is going on under the hood. We have more functionality coming up in the near future; but rather than discussing what’s on the road map, lets see why the topology is useful from a user perspective.

You might have spotted a box in the topology graph labeled repartition. This example has been chosen specifically. Joining topics which have different partition counts can not be materialized - it’s a Kafka Streams API requirement. Therefore, cc_payments data has to be moved to an intermediary topic, one that has the same number of partitions as the cc_data topic does. Why is this important for you the user? Well, this of course comes with some performance costs since data has to be read, written and read again. The Kafka brokers and network infrastructure will take a small hit. You might want to avoid this penalty and set up your topics such that this repartitioning is not required.

Did I deploy my streaming flow in production within 2 hours

With the help of Lenses SQL and Lenses itself, we think such time constraints can actually be met. How fast can you write SQL?

Conclusion

By using SQL as the abstraction, Lenses SQL simplifies the tasks of streaming analytics on top of Apache Kafka. It enables users to bring streaming analytics applications to production quicker and allows organizations to use existing skill sets to unlock the potential of streaming platforms.

New features extending the functionality that Lenses SQL streaming already supports are in progress, and we have barely scratched the surface on LSQL. There are so many other things to talk about, but we’ll leave that for another blog entry.

Lenses SQL engine uses Apache Calcite parser with MySQL syntax and is but one of the many offerings of Lenses.

If you want to learn more about LSQL and/or Lenses, we would love to schedule a demo for you and your team.

So don’t be shy and get in touch to find out more.

Sign up for a Lenses Preview.

You can also contact us @LandoopLtd or email us info@landoop.com to find out more and schedule a demo!


Share this article

Did you like this article?

Subscribe to get new blogs, updates & news

Discover Lenses

Application Accelerator
for Apache Kafka®

Dev version Free

Download Now
Share this article

Let's Get In Touch!

Ready to start your next project with us?
That's great! Send us an email and we will get back to you as soon as possible!



info@landoop.com


Follow us for news, updates and releases!


@LandoopLtd