4. Lenses SQL Engine

Lenses SQL is a Landoop initiative to make Kafka accessible to people familiar with SQL. Using Apache Calcite industry standard SQL parser, LSQL allows ad-hoc select queries to read data from a specific Kafka topic. But it does not stop there; using an intuitive syntax LSQL allows you to aggregate, join and transform Kafka streams. Our SQL engine supports Json and Avro records, nested fields and proper decimal handling for Avro records.

4.1. Why

We have seen interest and usage of Apache Kafka growing considerably over the last two years. Many use cases involve moving data and applying transformations, aggregations and lookups on route to a target system such as a data warehouse. Even many data science solutions we have been involved in can be achieved by SQL on streams, but adoption was hindered by users lack of experience with Java and deploying these components.

Many organizations also have traditional BI departments, who have been dealing with big, fast flowing data streams for many years and are well versed in running SQL on data pipelines. Often these teams are working on projects that involve migration of legacy systems to the cloud. They have the business knowledge and SQL technical skills. LSQL enables them to quickly start using Kafka and the wider streaming ecosystem around it.

Using LSQL, we want developers, business analysts and devops to focus on the problems they need to solve and spend less time on frameworks, tools and infrastructure.

4.2. Kafka Topics and Messages

There is so much information available for Apache Kafka on the internet that we find going into a lot of detail will take the focus away from the subject. But, we need to build a bit of context to fully understand why and how LSQL will increase productivity.

Kafka is a streaming platform using messaging semantics. A Kafka cluster manages a collection of streams named topics. A stream of data is divided horizontally into partitions. The partitions are replicated and distributed through the Kafka cluster to achieve high availability.

A Kafka record (message) is composed of :

  • key
  • value
  • metadata
  • time-stamp
  • topic
  • partition
  • offset

As a result we allow the user to select any of them within LSQL. If you want to select a field from the key part the syntax is _key.id or even _key.* to use the entire key value. A detail explanation is provided in the next chapter.

To extract maximum performance Apache Kafka is not aware of the actual content of a message. All it knows the message is made of two array of bytes. The standard approach is to use Apache Avro to handle the payload serialization step. By using Avro the payload schema is enforced and furthermore schema evolution is provided out of the box (there is a requirement for Schema management and Confluent Schema Registry tends to be the de-facto tool). However we have seen a lot of software systems using Apache Kafka that rely on JSON(JavaScript Object Notation) or Google Protobuf for the serialization.

Important

Kafka does not know about the content of a message!

LSQL knows how to handle the following payload structures (also known as decoder types):

  • BINARY
  • JSON
  • AVRO
  • STRING
  • INT
  • LONG
  • PROTOBUF* (coming soon)

When using LSQL the user has to provide the decoders type. We support specific keywords for this; _ktype and _vtype.

4.3. SQL Syntax

Lenses SQL uses the MySQL syntax for dual purposes:

  • read data from a topic [and partition/-s] while applying the required filters and functions on fields.
  • build and run a Kafka Streams application by using a simple SQL like syntax.
Name Description Optional Type
_key

Is used as prefix and instructs LSQL to pick up the field from the Kafka message key part.
YES  
_ktype

Specifies what decoder to use for the key component when reading from the given topic.
See above for the decode types list.
NO STRING
_vtype

Specifies what decoder to use for the value component when reading from the given topic.
See above for the decode types list.
NO STRING
_partition

Specifies which partitions to read from. It can be _partition=2 or _partition in (1,2,3).
If used in a SELECT field list it returns the Kafka message partition number.
YES INT
_offset

Specifies which offset to read from. It can be bounded by a specific range or a specific value.
If _offset=123 one message is read. The user can restrict further by specifying lower/upper
bound.``_offset >= 123 AND _offset < 999``. If used in a select gives back the Kafka offset.
YES INT
_ts

Restricts the messages read based on their timestamp. Similar to offset it can be bounded
by a specific range or a specific value _ts=1543366 or a range _ts > 123 AND _ts < 125.
Please consult Apache Kafka documentation for timestamp information. If used in a SELECT
statement it returns the Kafka message timestamp.
YES LONG

Keywords

The SQL grammar has predefined keywords:

  • timestamp
  • partition
  • topic

These keywords can still be used as fields, but they must be escaped. It is recommended as with most SQL dialects to escape field and topic names. Below is an example of such query:

SELECT
  `partition`
  , `timestamp`
FROM `target_topic`

Important

Always escape the topic name via `. If you don’t and the topic contains non alphanumeric characters the parsing will fail.

4.3.1. SELECT Syntax

Retrieves rows from a Kafka topic and enables the selection of one or many columns from each message on the target topic. The full syntax of the SELECT statement is described below:

SELECT select_expr [, select_expr ...]
FROM `Topic_Reference`
WHERE _ktype = 'KEY_TYPE'
AND _vtype = 'VALUE_TYPE'
[AND FILTERS]

Aliasing is not allowed in this scenario. Since there is only one source topic, all the select_expr are automatically qualified to that topic.

4.3.2. Value Fields

The previous section has given a quick overview of what a Kafka message is composed of. To select fields within the value component of a message the user has to just use the field name. For example, let’s assume the following message structure flows through your Kafka topic.

{
  "user": "a user id",
  "region" : "Europe",
  "url" : "someurl",
  "timestamp": 1503256766
}

To select the field named region from the message it is as simple as writing the query below:

SELECT region
FROM `web-traffic`
WHERE _ktype = 'BINARY'
AND _vtype ='AVRO'

Notice the query specifies the format type for both key and value components of a Kafka message. The result of such a query will be a new Avro record containing only one field: region. If the format of the source topic is JSON the result would be a Json containing one field.

Important

When a JSON decoder type is used, it is expected the payload is the bytes representation of a Json object.

Not all the payloads stored in a Kafka message have a linear structure. Nested structures are quite a common scenario and LSQL has full support for selecting nested fields.

Consider the message structure below for the message value component:

{
  "name": "Rick",
  "address": {
    "street": {
      "name": "Rock St"
    },
    "street2": {
      "name": "Sunset Boulevard"
    },
    "city": "MtV",
    "state": "CA",
    "zip": "94041",
    "country": "USA"
  }
}

We would want to flatten the structure. To do so, the following query can be used:

SELECT
    name
    , address.street.name AS address1
    , address.street2.name AS address
    , city
    , state
    , zip
    , country
FROM `target_topic`
WHERE _ktype = 'BINARY'
AND _vtype = 'AVRO'

The result of such query statement will be a new Avro record containing seven fields: name, address1, address2, city, state, zip and country. The same applies in case of a Json payload:

SELECT
    name
    , address.street.name AS address1
    , address.street2.name AS address2
    , city
    , state
    , zip
    , country
FROM `target_topic`
WHERE _ktype = 'BINARY'
AND _vtype = 'JSON'

4.3.3. Key Fields

Many times, especially in IoT systems, device information is in the message key and it is not replicated in the message value part. LSQL allows selecting the entire key (for primitive type: Int/Long/Double/String) or a specific field in case of Avro/Json structures. To do that all it is required is to use the _key prefix.

Imagine a scenario where various metrics are obtained from an electricity device and the message key part structure looks like this:

{
    "deviceId": 192211,
    "model":"AX14c1"
}

To select the device id, the following query can be written:

SELECT
    _key.deviceId
    , meterValue, ...
FROM `metrics`
WHERE _ktype = 'AVRO'
AND _vtype = 'BINARY'

Note

Prefix the field with _key if you want to address a field which is part of the key.

4.3.4. Functions

LSQL comes with support for string and arithmetic functions. The table below contains the list of supported functions.

Name Description Usage
CAST Enables conversion of values from one data type to another cast(dt as int)
COALESCE
Returns the first non-null expr in the expression list.
You must specify two expressions
coalesce(value, prevValue)
CONCATENATE

Returns the string representation of concatenating each expression
in the list. Null fields are left out.
concatenate(weather.main, 'fixed', dt,temp.eve)
LEN
Returns the length of a string. LEN calculates length
using characters as defined by UTF-16
len(name)
POSITION Return the position of the first occurrence of substring position('math' in fieldA)
REPLACE
Returns string with every occurrence of search_string
replaced with replacement_string
replace(fieldA, 'math', 'mathematics')
SUBSTRING Return a portion of the string, beginning at the given position substring(field, 2)
TRIM Removes leading and trailing spaces trim(field)
LOWER Returns the argument in lowercase lower(name)
UPPER Returns the argument in uppercase upper(name)
ABS
Returns the absolute value of an expression -
it has to evaluate to a number type.
abs(field1 + field2)
CEIL
Returns the absolute value of an expression
- it has to evaluate to a number type.
ceil(field1)
FLOOR Return the largest value not greater than the argument. floor(field1)
‘/’
Divides one number by another (an arithmetic division operator).
Each expression has to evaluate to a number.
a / b
‘-‘
Subtracts one number from another (an arithmetic subtract operator).
Each expression has to evaluate to a number.
a - b
‘*’
Multiplies one number from another (an arithmetic multiply operator).
Each expression has to evaluate to a number.
a * b
‘+’
Adds one number to another (an arithmetic addition operator).
Each expression has to evaluate to a number.
a + b
‘-‘ (negative)
Returns the negative of the value of a numeric
expression (a unary operator). The expression has to evaluate to a number.
a * b
POW
Returns expression1 raised to the expression2 power.
Each expression has to evaluate to a number.
pow(a, b)
SQRT Returns the square root of expression. The expression has to evaluate to a number. sqrt(a)
COUNT Returns the number of records read. ONLY FOR STREAMING when grouping is involved. SELECT count(*)
MAX Returns maximum value of expression. ONLY FOR STREAMING when grouping is involved. max(field1)
MIN Returns minimum value of expression. ONLY FOR STREAMING when grouping is involved. min(field1)
SUM
Returns the sum of expression for each record read.
ONLY FOR STREAMING when grouping is involved.
sum(field1)

4.3.5. Where Clause

Querying for data and filtering out records based on various criteria is a common scenario, hence LSQL has support for complex filter expression.

4.3.5.1. String Filter

If your field is of type string, any of the operators below can be used as part of the predicate describing which records to select and which ones to drop.

Operation Description
!=

Checks inequality. When you want to filter all records with field != 'ABC'
= Checks for equality. When you want to filter all records with field == 'ABC'
< Checks for less than. When you want to filter all records with field < 'ABC'
<= Checks for less than or equal. When you want to filter all records with field <= 'ABC'
> Checks for greater than. When you want to filter all records with field > 'ABC'
>= Checks for greater than or equal. When you want to filter all records with field >= 'ABC'
like

Checks for prefix/contains/postfix. When you want to filter all records with the field containing the text:
field like '%ABC%'. To check for prefix field like 'ABC%'. To check for postfix field like '%ABC'.
Comparison is case insensitive!.

Let’s assume you have an audit topic where you store changes made to data in your system and you want to see all the changes made by people whose name contains Alex. To achieve that the following query can be used;

SELECT *
FROM audit
WHERE username LIKE '%Alex%'
AND _ktype = 'BINARY'
AND _vtype = 'AVRO'

4.3.5.2. Null check

Some of the fields in the payload can be null (or missing in case of Json). Using the operations below a query can filter out any records where a field is null or not null:

Operation Description
is null Checks the value is null. When you want to filter all records with field is null
is not null Checks the value is not null. When you want to filter all records with field is not null
SELECT *
FROM `visitorTracking`
WHERE _ktype = 'BINARY'
AND _vtype = 'JSON'
AND location.country IS NOT NULL

4.3.5.3. Filter on numbers

LSQL allows you to apply filters for fields of number type: integer, short, byte, long, double, float, decimal. The equality comparison for float and double happens within a precision range. Java uses a subset of IEEE 754 standard to represent floating point numbers. Some floating point numbers, for example 1/3, cannot be represented exactly using float and double in Java. As a result equality needs to consider an epsilon. LSQL uses 0.000001 for the value of the epsilon. If the two operands differ by less than the epsilon they are considered to be equal.

Operation Description
!= Checks for not equal. When you want to filter all records with field != 2
= Checks for equality. When you want to filter all records with field == 'ABC'
< Checks for less than. When you want to filter all records with field < 123.4
<= Checks for less than or equal. When you want to filter all records with field <= 1000
> Checks for greater than. When you want to filter all record swith field > -10
>= Checks for greater than or equal. When you want to filter all records with field >= 122.122315
SELECT *
FROM topic
WHERE _ktype = 'BINARY'
AND _vtype = 'AVRO'
AND location.latitude < -10

4.3.5.4. Filter on partition

Sometimes a user will want to be able to look at one partition or a subset of partitions when applying a SELECT statement. Then the query needs to use the _partition keyword to instruct which ones. For example:

SELECT
    score
    , userId
    , sessionToken
FROM livescore
WHERE _ktype = 'BINARY'
AND _vtype = 'JSON'
AND _partition = 1

--or

SELECT
    score
    , userId
    , sessionToken
FROM livescore
WHERE _ktype = 'BINARY'
AND _vtype = 'JSON'
AND _partition in (2,3)

4.3.5.5. Filter on timestamp

LSQL allows for filtering by timestamp to enable selection of time windows you are interested in. For allowing such filtering LSQL uses the _ts filter.

Let’s say we want to see all the messages on a topic and partition where their timestamp is greater than 1501593142000L - epoch time.

SELECT
    tradeId
    , isin
    , user
FROM `systemxtrades`
WHERE _ktype = 'BINARY'
AND _vtype = 'AVRO'
AND _partition = 1
AND _ts > 1501593142000

This is the same as if you were writing:

SELECT
    tradeId
    , isin
    , user
FROM `systemxtrades`
WHERE _ktype = 'BINARY'
AND _vtype = 'AVRO'
AND _partition = 1
AND _ts > '2017-08-01 13:12:22'

Important

LSQL understands timestamp date format following this rule: yyyy-MM-dd HH:mm:ss[.SSS]

4.3.5.6. Boolean filter

There are scenarios where checking on a boolean field value is a requirement. The SQL allows you to do so by simply specifying: is true or is false. Let’s say we have a topic where credit cards metadata is stored and we want to only pick those ones which have been blocked. Running the query below will allow you to achieve such functionality:

SELECT *
FROM credit_cards
WHERE _ktype='AVRO' AND _vtype='AVRO' AND blocked is true

4.3.5.7. Complex Filter

Applying a filter on a field sometimes is not enough. A user might want to apply algebra on a set of fields and filter based on the result, or maybe a user wants to look at a subset of a string field. LSQL allows you do do that.

SELECT *
FROM topicA
WHERE _ktype = 'BINARY
AND _vtype = 'AVRO'
AND (a.d.e + b) /c > 100

LSQL allows the user to combine in the where clause fields from both key and value part. Here is an example of doing so:

SELECT *
FROM topicA
WHERE _ktype = 'AVRO'
AND _vtype = 'AVRO'
AND (_key.a.d.e + b) /_key.c < 100


SELECT *
FROM topicA
WHERE _ktype = 'BINARY'
AND _vtype = 'AVRO'
AND ((abs(fieldA) < 100 AND fieldB>=2) OR (fieldC like '%wow'))

4.4. Streaming Syntax

LSQL second usage is defining Kafka Streams flows through a SQL-like syntax. All the information learned so far is fully applicable to all the SELECT statements written for stream processing. The syntax, which might look complex initially, looks like this:

[ SET autocreate = true;]
[ SET partitions = 1;]
[ SET replication = 2;]
[ SET `decimal.scale`= 18;]
[ SET `decimal.precision`= 38;]
[ SET `ANY KAFKA STREAMS CONFIG. See Kafka documentation StreamsConfig, ConsumerConfig and ProducerConfig` = '';]
[ SET `topic.[ANY KAFKA Log CONFIG. See LogConfig]` = '';]

INSERT INTO _TARGET_TOPIC_
[WITH
   _ID_NAME_ AS (SELECT [STREAM] ...FROM _TOPIC_ ...),
   _ID_NAME_ AS (SELECT [STREAM] ...FROM _TOPIC_ ...)
]
SELECT select_expr [, select_expr ...]
FROM _ID_NAME_ INNER JOIN _OTHER_ID_NAME_ ON join_expr
[WHERE condition_expr]
[GROUP BY group_by_expr]
[HAVING having_expr]

If you are not familiar with Apache Kafka streaming please follow the documentation. Do not be tricked into thinking streaming SQL is similar to your typical RDBMS one.

Important

Streaming SQL is not your typical RDBMS SQL. Core concepts around stream processing with Apache Kafka, duality of Table/Stream, the implication of creating a Table versus a Stream instance, etc. need to be understood first.

Using LSQL for streaming allows you to do:

  • Transformations
  • Aggregation
  • Joining

We will go through each one in detail but before we do so we need to expand on they syntax you have seen earlier.

Important

When using Avro payloads the schema needs to be present. Lenses SQL does static validation against the existing schema. If schema is missing an error will be returned.

4.4.1. Windowing

Windowing allows you to control how to group records which share the same key for stateful operations such as aggregations or join windows. Windows are tracked per record key. LSQL has support for the full spectrum of windowing functionality available in the Kafka Streams API.

Note

A record is discarded and will not be processed by the window if it arrives after the retention period has passed.

You can use the following types of windows in LSQL:

  • Hopping time windows. These are windows based on time intervals. They model fixed-sized, (possibly) overlapping windows. A hopping window is defined by two properties: the window’s size and its advance interval (aka “hop”). The advance interval specifies by how much a window moves forward relative to the previous one. For example, you can configure a hopping window with a size 5 minutes and an advance interval of 1 minute. Since hopping windows can overlap a data record may belong to more than one such windows.
...
GROUP BY HOP(5,m,1,m)
...
  • Tumbling time windows. These are a special case of hopping time windows and, like the latter, are windows based on time intervals. They model fixed-size, non-overlapping, gap-less windows. A tumbling window is defined by a single property: the window’s size. A tumbling window is a hopping window whose window size is equal to its advance interval. Since tumbling windows never overlap, a data record will belong to one and only one window.
...
GROUP BY tumble(1,m)
...
  • Sliding windows. These express fixed-size window that slides continuously over the time axis. Here, two data records are said to be included in the same window if the difference of their timestamps is within the window size. Thus, sliding windows are not aligned to the epoch, but on the data record timestamps.
...
GROUP BY SLIDING(1,m)
...
  • Session windows. These are used to aggregate key-based events into sessions. Sessions represent a period of activity separated by a defined gap of inactivity. Any events processed that fall within the inactivity gap of any existing sessions are merged into the existing sessions. If the event falls outside of the session gap, then a new session will be created. Session windows are tracked independently across keys (e.g. windows of different keys typically have different start and end times) and their sizes vary (even windows for the same key typically have different sizes). As such session windows can’t be pre-computed and are instead derived from analyzing the timestamps of the data records.
...
GROUP BY SESSION(10,m, 5, m)
...

All the window functions allow the user to specify the time unit. Supported time windows are

Keyword Unit
MS milliseconds
S seconds
M minutes
H hours

4.4.2. Table or Stream

When using Apache Kafka Streams API you can build a KStream or KTable from a topic. To distinguish between them LSQL uses the keyword STREAM. If the keyword is missing an instance of a KTable as opposed to KStream is created.

Important

When creating a table the Kafka Messages must have a non null key. Otherwise the record is ignored.

4.4.3. KStream flow properties

LSQL allows you to define a Kafka Stream flow with SQL-like syntax. From the target topic settings to the producer/consumer settings LSQL allows the user to override the defaults. LSQL supports setting these configurations via a standard SQL pattern of setting variables via SET. For example:

SET `auto.offset.reset` = 'smallest';
SET `processing.guarantee`= 'exactly_once';  //this is for Kafka 0.11+ enabling exactly once semantics
SET `commit.interval.ms` = 1000;     //The frequency with which to save the position of the processor.

Any of the target topic specific configurations can also be specified here. Follow the Apache Kafka documentation here for a full list of topic specific configuration. To set the configuration for the flow result topic you need to prefix the key with topic.. For example to set the cleanup policy to compact ` and to `flush.messages every 5 messages you need to configure LSQL as follows:

SET `topic.cleanup.policy`='compact';
SET `topic.flush.messages`= 5;
...

Apart from the topic, producer/consumer or Kafka stream configs, LSQL allows you to set the following:

Setting Description Type Example
autocreate
If the target topic does not exist it will create it. **If the
Kafka setup does not allow for auto topic creation the flow will fail!**
BOOLEAN SET autocreate=true
partitions
The number of partitions to create for the target topic.
Applies only when autocreate is set to true. By default is false.
INTEGER SET partitions=2
replication
How many replicas to create for the target topic. Applies
only when autocreate is set to true. By default is false.
INTEGER SET replication=3
decimal.scale
When working with Avro records where decimal type is
involved it specifies the decimal scale.
INTEGER SET `decimal.scale`=18
decimal.precision
When working with Avro records where decimal type is
involved it specifies the decimal precision.
INTEGER SET `decimal.precision`=38

Important

Each SET .. instruction needs to be followed by a semicolon:;.

Here is an example of setting the commit interval to 5 seconds and enabling exactly-once semantics (Apache Kafka 0.11+):

SET `commit.interval.ms` = 5000;
SET `processing.guarantee`= 'exactly_once';

INSERT INTO `hot_sensors`
SELECT
    ip
    , lat
    , `long`
    , (temp * 1.8 + 32) as metric
FROM  `sensors`
WHERE _ktype = 'LONG'
AND _vtype = 'AVRO'
AND temp > 30

Note

Configuring a stream flow via code requires the following configuration keys to be set: default.key.serde and default.value.serde. LSQL takes care of this based on the values specified in the SQL so you don’t have to set them.

4.4.4. Transformations

This is the basic and most common usecase. Transforming an incoming topic to morph the messages using any of the capabilities provided by the SELECT statement. That includes:

  • Selecting specific fields
  • Applying supported functions to achieve a new field
  • Filtering the records based on your criteria.

Let’s imagine we have a topic containing sensor specific data:

{
  "device_id": 1,
  "ip": "191.35.83.75",
  "timestamp": 1447886791,
  "lat": 22,
  "long": 82,
  "scale": "Celsius",
  "temp": 22.0,
  "device_name": "sensor-AbC-193X",
  "humidity": 15,
  "zipcode": 95498
}

And we want to select only that data where temperature is over 30 degrees Celsius. Furthermore we want the temperature value to be expressed in Fahrenheit and we only need ip, lat, long from the initial data. To do so we can write this LSQL statement:

INSERT INTO `hot_sensors`
SELECT
    ip
    , lat
    , long
    , (temp * 1.8 + 32) AS metric
FROM  `sensors`
WHERE _ktype = 'LONG'
AND _vtype = 'AVRO'
AND temp > 30

This is the simplest flow you could write and the query will end up producing records looking like this:

{
  "ip": "191.35.83.75",
  "lat": 22,
  "long": 82,
  "metric": 71.6
}

The SQL syntax allows you to access nested fields or a complex field. We can change slightly the structure above to do this. The new data looks like this:

{
  "description":"Sensor embedded in exhaust pipes in the ceilings",
  "ip":"204.116.105.67",
  "id":5,
  "temp":40,
  "c02_level":1574,
  "geo":{
    "lat":35.93,
    "long":-85.46
    }
}

First we write the SQL to address the nested fields:

INSERT INTO `new_sensors`
SELECT
    ip
    , geo.lat
    , geo.long
    , temp
FROM  `sensors`
WHERE _ktype = 'LONG'
AND _vtype = 'AVRO'

The result of applying this query will be Avro records with the following format:

{
   "ip":"204.116.105.67",
   "lat":35.93,
   "long":-85.46,
   "temp":40
}

If the user selects a complex field, the entire substructure is copied over. For example:

INSERT INTO `new_sensors`
SELECT
    ip
    , geo
    , temp
FROM  `sensors`
WHERE _ktype = 'LONG'
AND _vtype = 'AVRO'

The new records will have this format:

{
  "ip":"204.116.105.67",
  "geo":{
    "lat":35.93,
    "long":-85.46
    },
  "temp":40
}

These examples are for records of type Avro, but the similar support is provided for Json payloads.

4.4.5. Aggregation

Typical streaming aggregation involves scenarios similar to these:

  • Counting the number of visitors on your website per region
  • Totalling amount of Foreign Exchange transactions for GBP-USD on a 15 minutes interval
  • Totalling sales made on each of the company stores every day
  • Retaining the minimum and maximum stock value on a 30 minutes interval

These are just a few examples - the list goes on. LSQL gives you a way of quickly express such aggregation over Kafka streams with either Json or Avro payloads.

Imagine a trading system needs to display the number of transactions made for each currency pair (GBPUSD is a currency exchange ticker). Such functionality can be easily achieved with a query like this:

INSERT INTO `total_transactions`
SELECT count(*) AS transaction_count
FROM `fx`
WHERE _ktype = 'BINARY'
AND _vtype = 'AVRO'
GROUP BY ticker

Remember this is a stateful stream so potentially, you could see the the values for a ticker more than once. It depends on how many transactions flow through the Kafka topic. The result of this query could be the following:

Key Value
GBPUSD 1
CHFYEN 1
USDEUR 1
GBPUSD 3
USDEUR 5

Suppose the user needs to look only at specific tickers. There are two approaches here where the filter is applied in the WHERE clause (best for performance) or relying on HAVING clause. Both examples are covered by the queries below:

INSERT INTO `total_transactions`
SELECT count(*) AS transaction_count
FROM `fx`
WHERE _ktype = 'BINARY'
AND _vtype = 'AVRO'
AND ticker LIKE '%GBP%'
GROUP BY ticker

--OR

INSERT INTO `total_transactions`
SELECT count(*) as transaction_count
FROM `fx`
WHERE _ktype = 'BINARY'
AND _vtype = 'AVRO'
GROUP BY ticker
HAVING ticker in ('GBPUSD', 'EURDKK', 'SEKCHF')

Having clause allows the usage of any of the LSQL supported functions to achieve your filter requirements. To illustrate that we will filter all the tickers for USD and the list: GBPUSD, EURDKK, SEKCHF

INSERT INTO `total_transactions`
SELECT count(*) as transaction_count
FROM `fx`
WHERE _ktype = 'BINARY'
AND _vtype = 'AVRO'
GROUP BY ticker
HAVING ticker IN ('GBPUSD', 'EURDKK', 'SEKCHF') OR ticker LIKE '%USD%'

There are scenarios where grouping by the record key part. Assume the fx topic contains the ticker in the key part. In that case the queries become:

INSERT INTO `total_transactions`
SELECT count(*) AS transaction_count
FROM `fx`
WHERE _ktype = 'STRING'
AND _vtype = 'AVRO'
GROUP BY _key

-- OR adding filter

INSERT INTO `total_transactions`
SELECT count(*) AS transaction_count
FROM `fx`
WHERE _ktype = 'STRING'
AND _vtype = 'AVRO'
AND _key.* LIKE '%GBP%'
GROUP BY _key

Important

Every time GROUP BY a field is involved, the resulting key on the target topic is STRING! This is to allow joins on multiple fields.

In the version 1, LSQL does not support arithmentics on aggregation functions. By that we mean you can not do `SUM(fieldA)/count(*)`. We are looking at solutions to address this in the upcoming version(-s).

Important

Doing aggregate functions (SUM/COUNT/MIN/MAX) arithmentics is not supported!

4.4.6. Using window

We have shown so far simple aggregation without involving windows. That might solve some of the requirements a user has. Aggregating over a window is a very common scenario for streaming. Window support was introduced earlier; please revisit the windowing section.

Keeping the trend of IoT scenarios, imagine there is a stream of metrics information from devices across the globe. The data structure looks like this:

{
  "device_id": 2,
  "device_type": "sensor-gauge",
  "ip": "193.156.90.200",
  "cca3": "NOR",
  "cn": "Norway",
  "temp": 18,
  "signal": 26,
  "battery_level": 8,
  "timestamp": 1475600522
}

The following query allows you to count all the records received from each country on a tumbling window of 30 seconds. Such functionality can be described like this:

INSERT INTO norway_sensors_count
SELECT count(*) AS total
FROM sensors
WHERE _ktype = 'BINARY'
AND _vtype = 'AVRO'
GROUP BY tumble(30,s), cca3

The result would be records emitted on a 30 seconds interval and they would look similar to this:

Key Value
NOR 10
ROM 2
FRA 126
UK 312
US 289
NOR 2
FRA 16
UK 352
US 219

Note

Remember the key value will be of type String.

So far we have done counting only but LSQL provides support for SUM, MIN or MAX as well. Maybe your system processes customers orders and you want to keep computing every hour the total amount of orders over the last 24 hours.

SELECT
    product
    , SUM(amount) AS amount
FROM Orders
WHERE _ktype = 'BINARY'
AND _vtype = 'AVRO'
GROUP BY HOP(1, H, 1,D), product

4.4.7. Joins

A join operation merges two streams based on the keys of their data records. The result is a new stream.

Note

LSQL supports joins on key, but also allows the user to join based on value/key part fields. This will end up in both sides having the records key remapped. The key is a result of string concatenation of the fields involved

Kafka Streams supports the following join operations:

  • KStream-to-KStream Joins are always windowed joins, since otherwise the memory and state required to compute the join would grow infinitely in size. Here, a newly received record from one of the streams is joined with the other stream’s records within the specified window interval to produce one result for each matching pair. A new KStream instance representing the resulting stream of the join is returned from this operator.
  • KTable-to-KTable Joins are join operations designed to be consistent with the ones in relational databases. Here, both changelog streams are materialized into local state stores first. When a new record is received from one of the streams, it is joined with the other stream’s materialized state stores to produce one result for each matching pair. A new KTable instance is produced representing the result stream of the join, which is also a changelog stream of the represented table.
  • KStream-to-KTable Joins allow you to perform table lookups against a changelog stream (KTable) upon receiving a new record from another record stream (KStream). An example use case would be to enrich a stream of orders (KStream) with the order details(KTable). Only records received from the record stream will trigger the join and produce results, not vice versa. This results in a brand new KStream instance representing the result stream of the join.

Here is a table of joins supported by Apache Kafka Streams:

Left Operand Right Operand Inner Join Left Join Outer Join
KStream KStream Yes Yes Yes
KTable KTable Yes Yes Yes
KStream KTable Yes Yes No

LSQL supports these joins operators:

  • INNER
  • LEFT
  • OUTER
  • RIGHT

Note

RIGHT JOIN will be expressed in terms of LEFT JOIN (The two operands are swapped)

Given the table above, the following here are a list of joins NOT possible by default in Kafka Streams API:

  • KTable RIGHT JOIN KStream
  • KTable OUTER JOIN KStream
  • KStream RIGHT JOIN KTable

LSQL ALLOWS the user to perform these operations, however there are some costs associated in doing so. But before more details are provided, we need to give an overview of the context at hand. We said already a RIGHT JOIN is expressed as a LEFT JOIN and as a result the above list becomes the following:

  • KStream LEFT JOIN KTable
  • KTable OUTER JOIN KStream
  • KTable LEFT JOIN KStream

The challenge here is that a KTable can only be joined with another one. Furthermore, at momennt there is not a straight forward way to go from a KStream instance to a KTable one. The only solution is to use an intermediary topic and then build the KTable required off that topic. Of course this will hurt performance since the data has to be written to a topic and read again to allow for the join to happen. The topology description for the flow will reflect such scenario. Given this information the above joins become:

  • KTABLE LEFT JOIN KTable
  • KTable OUTER JOIN KTABLE
  • KTABLE LEFT JOIN KTABLE

A KStream OUTER JOIN KTable, despite not having support in the Kafka Streams API, is translated to a KStream OUTER JOIN KTable.

Important

LSQL transforms the flow as required to allow for the join type to happen. Fully understand the implications of making joins which require going through an intermediary topic.

4.4.8. Repartition

Apache Kafka Streams API does not allow joining two streams with a different partition count. This can easily be the case in real systems. For example, with an order and order-detail topic, the partition count on latter will be smaller since traffic is lower. To allow such a join LSQL makes sure it brings the two in line. As a result it will have to create an order-repartition (the name is just an illustration) matching the right operand version.

Of course such operation will have a direct impact on performance since the entire topic is copied over just to have the join. The topology viewer allows the user to see when such flow change appears.

4.4.9. Using WITH

As you have seen earlier, the fully syntax for LSQL contains the following:

[WITH
   _ID_NAME_ AS (SELECT [STREAM] ...FROM _TOPIC_ ...),
   _ID_NAME_ AS (SELECT [STREAM] ...FROM _TOPIC_ ...)
]

This allows you to break down your query complexity. For example, let’s consider the scenario where you have a topic for static product details which you will want to join against the topic containing the orders. From the product details you only need to store the product name. This is the SQL to use to achieve such behavior:

...
INSERT INTO ...
WITH
productTable AS
 (
   SELECT productName
   FROM `product`
   WHERE _ktype = 'STRING'
   AND _vtype = 'AVRO'
 )

SELECT ..
FROM  ...  JOIN productTable ON ...
...

Any names registered via WITH, in the example above product, can be referenced after its definition. If your requirements are as such that you need to define multiple entries you can do so by separating all the WITH via comma. For example:

WITH
productTable as
 (
   SELECT productName
   FROM `product`
   WHERE _ktype = 'STRING'
   AND _vtype = 'AVRO'
 ),
userTable as
 (
    SELECT firstName, secondName
    FROM `user`
    WHERE _ktype = 'LONG'
    AND _vtype = 'AVRO'
 )

The examples define tables (which translate into instances of a KTable) but you can specify a stream (which translates to an instance of KStream) by simply adding STREAM after the SELECT:

WITH
productStream AS
 (
   SELECT STREAM productName
   FROM `product`
   WHERE _ktype = 'STRING'
   AND _vtype = 'AVRO'
 )

Important

If the right operand is not found in the list of entities defined by WITH a Stream instance will be created. SELECT STREAM or SELECT within a join targets the left operand only.

4.4.10. Join on key

When joins were introduced at the beginning of the chapter, it was stated that two records are matched when their keys are equal. Here is how you would join orders and order details for Avro records:

INSERT INTO orders_enhanced
SELECT STREAM
    o.orderNumber
    , o.status
    , SUM(od.quantityOrdered * od.priceEach) total
FROM  `order_details` AS od
    INNER JOIN `orders` AS o
        ON o._key = od._key
WHERE o._ktype = 'LONG'
AND o._vtype = 'AVRO'
AND od._ktype = 'LONG'
AND od._vtype = 'AVRO'
GROUP BY TUMBLE(2,s),o.orderNumber

Important

You can not join two topics when left operand value decoder differs from right operand value decoder. Joining values from different decoder types is not supported.

When joining streams the join needs to happen over a JoinWindow. The GROUP BY tumble(2,s) will be used as part of aggregation but also from it LSQL will build an instance of JoinWindow instance to use when joining the streams but before apply the grouping. The translation between Window and JoinWindow happens as described in the table below:

Window Type Join Window
tumble(duration) JoinWindows.of(duration)
hop(duration,advance) JoinWindows.of(duration).until(advance)
session(inactivity,duration) JoinWindows.of(inactivity).until(advance)
slide(duration) JoinWindows.of(duration)

If your topics have Json payloads the above query should be:

INSERT INTO orders_enhanced
SELECT STREAM
    o.orderNumber
    , o.status
    , SUM(od.quantityOrdered * od.priceEach) total
FROM  `order_details` AS od
    INNER JOIN `orders` AS o
        ON o._key = od._key
WHERE o._ktype = 'LONG'
AND o._vtype = 'JSON'
AND od._ktype = 'LONG'
AND od._vtype = 'JSON'
GROUP BY TUMBLE(2,s),o.orderNumber

You can still join two streams without aggregating by simply doing the following:

INSERT INTO `orders_enhanced`
SELECT STREAM
    od.orderNumber
    , od.productCode
    , od.quantityOrdered
    , od.priceEach
    , od.orderLineNumber
    , p.productName
FROM  `product` as p
    INNER JOIN `order_details` AS od
        ON p._key = od._key
WHERE p._ktype = 'BINARY'
AND p._vtype = 'AVRO'
AND od._ktype = 'BINARY'
AND od._vtype = 'AVRO'
GROUP BY TUMBLE(2,s)

Although GROUP BY is still used it is not actually applying grouping since no grouping fields were defined. If your product topic key is not Avro , you can specify like in the example above, the _ktype='BINARY'. This gives some performance benefit for not having to deserialize the LONG in the earlier example and serializing it back to the output topic.

Important

Do not use BINARY when the payload is Avro. The Avro content is retained but the Schema Registry entry on the target will not be created.

All the functions supported by LSQL can be used in the select list. However unless grouping is involved the analytic ones: SUM, MIN, MAX, COUNT are not allowed.

INSERT INTO `orders_enhanced`
SELECT STREAM
    od.orderNumber
    , od.productCode
    , od.quantityOrdered
    , od.priceEach
    , od.orderLineNumber
    , concat(od.productCode,'-',p.productName) AS productName
FROM  `order_details` AS od
    LEFT JOIN `product` as p
        ON p.productCode = od.productCode
WHERE p._ktype = 'BINARY'
AND p._vtype = 'AVRO'
AND od._ktype = 'BINARY'
AND od._vtype = 'AVRO'
GROUP BY TUMBLE(4,s)

4.4.11. Join on fields

It is not always the case that the topic key is actually the join value. Maybe it was an oversight in initial development. LSQL has it covered as well since it allow you pick part of the value to fulfill the join.

INSERT INTO `order_details`
SELECT STREAM
    o.orderNumber
    , o.status
    , o.flags
    , od.productCode
FROM  `order_details` AS od
    INNER JOIN `orders` AS o
        ON o.orderNumber = od.orderNumber
WHERE o._ktype = 'BINARY'
AND o._vtype = 'AVRO'
AND od._ktype = 'BINARY'
AND od._vtype = 'AVRO'
GROUP BY TUMBLE(2,s)

There is a trade off here. Joining on a field like above means the stream needs to be remapped to allow for the new key. All groupings will result in a String key. The reason is because LSQL allows you to join on more than one field! The key is a string concatenation of all the values invovled.

Important

Joining on a value field(-s) will re-map the stream/table and the new key type will be String. Re-mapping a table has it’s cost since it will have to move the data from the KTable to a new topic and build a new instance of the table.

The standard way to handle joins with a table is to define the table via WITH. An optimal solution for joining orders with product to get the product name attached to the order looks like this:

INSERT INTO `orders_enhanced`
WITH
productTable AS
 (
   SELECT productName
   FROM `product`
   WHERE _ktype = 'STRING'
   AND _vtype = 'AVRO'
 )
SELECT STREAM
    od.orderNumber
    , od.productCode
    , od.quantityOrdered
    , od.priceEach
    , od.orderLineNumber
    , p.productName
FROM  `order_details` AS od
    LEFT JOIN  productTable AS p
        ON p._key = od.productCode
WHERE od._ktype = 'BINARY'
AND od._vtype = 'AVRO'

First a productTable is defined and it becomes the right operand for a LEFT JOIN. It is required for the od.productCode to be of type String since the key on the table is String. Also notice _ktype is still required for order_details (od._ktype=’BINARY’). The resulting schema will have the productName field as an optional string since the right side might not be present.

4.4.12. Join samples

INSERT INTO `toTopic`
SELECT STREAM
    od.orderNumber
    , od.productCode
    , od.quantityOrdered
    , od.priceEach
    , od.orderLineNumber
    , CONCAT(od.productCode,'-',p.productName) AS productName
FROM  `OrdersDetailsTopic` AS od
    LEFT JOIN `ProductTopic` AS p
        ON p.productCode = od.productCode
WHERE p._ktype ='BINARY'
AND p._vtype = 'AVRO'
AND od._ktype = 'BINARY'
AND od._vtype = 'AVRO'
GROUP BY TUMBLE(4,s)
INSERT INTO `toTopic`
WITH
product AS
 (
   SELECT productName
   FROM `ProductTopic`
   WHERE _ktype = 'STRING'
   AND _vtype = 'AVRO'
 )
SELECT STREAM
    od.orderNumber
    , od.productCode
    , od.quantityOrdered
    , od.priceEach
    , od.orderLineNumber
    , p.productName
FROM  product AS p
    RIGHT JOIN `OrdersDetailsTopic` AS od
        ON p._key = od.productCode
WHERE od._ktype = 'BINARY'
AND od._vtype = 'AVRO'
GROUP BY TUMBLE(1,s)
SET `auto.offset.reset`='latest';
INSERT INTO `toTopic`
WITH
tableTelecom AS (
  SELECT *
  FROM  `telecom_data`
  WHERE _ktype = 'BINARY'
  AND _vtype = 'AVRO'
)

SELECT STREAM
    data.squareId
    , grid.polygon
FROM `activity` AS data
    LEFT JOIN tableTelecom AS grid
    ON data._key = grid._key
WHERE data._ktype = 'BINARY'
AND data._vtype = 'AVRO'
INSERT INTO `toTopic`
SELECT STREAM
    od.orderNumber
    , od.productCode
    , od.quantityOrdered
    , od.priceEach
    , od.orderLineNumber
    , concat(od.productCode,'-',p.productName) as productName
FROM  `OrdersDetailsTopic` AS od
    LEFT JOIN `ProductTopic` AS p
        ON p.productCode = od.productCode
WHERE p._ktype = 'BINARY'
AND p._vtype = 'JSON'
AND od._ktype = 'BINARY'
AND od._vtype = 'JSON'
GROUP BY TUMBLE(4,s)

4.5. More Examples

SELECT *
FROM `topicA`
WHERE f1 IS NULL
AND f2 = 3
AND f3 < 100
AND f4>=2
AND f5 <= 1
AND f6> 12.4
AND f7 LIKE '%wow'
AND _ktype = 'AVRO'
AND _vtype = 'AVRO'
SELECT *
FROM `topicA`
WHERE p1.p2.f1 IS NULL OR (p1.p2.f2 = 3 AND p1.p2.f3 < 100 AND (p1.p2.f4 LIKE 'wow%' OR p1.p2.f5 = 'wow' ))
AND _vtype = 'AVRO'
SELECT *
FROM topicA
WHERE p1.p2.f1 is NOT NULL OR (p1.p2.f2 <> 3 AND p1.p2.f3 < 100 AND (p1.p2.f4 like 'wow%' OR p1.p2.f5 = 'wow' ))
AND _vtype = 'JSON'
SELECT field1.fieldA
    , field2
    , field3,
    , _key.fieldM.fieldN AS N
FROM topicA
WHERE _vtype = 'JSON'
AND _ts >= 1201
SELECT field1.fieldA
    , field2
    , field3
    _key.fieldM.fieldN AS N
FROM topicA
WHERE _vtype = 'JSON'
AND _ts <= 1201
SELECT field1.fieldA
    , field2
    , field3
    , _key.fieldM.fieldN AS N
FROM topicA
WHERE _vtype = 'JSON'
AND _ts >= 1201
AND _ts < 2001