SQL for Kafka, 3 tiers!

Lenses supports SQL across the layers (topics, streams, connectors)
and provides a visual interface for interactive queries on Kafka topics.








[ VIEW ]
DATA BROWSING

Filter, query, project and analyze real time and historical data.

[ ANALYSE ]
SQL KAFKA STREAMS

Where streams are treated as unbounded query results for continuous analysis.

[ INGEST ]
KAFKA CONNECT

Also known as KCQL, an elegant and simple way for Kafka Connectors, from sources or topics to Kafka or the target systems, with auto-creates and system specific features.

Kafka SQL, Topics and Messages

Choose Decoders, AVRO supported

Kafka SQL knows how to handle the following payload structures (also known as decoder types): BINARY, JSON, AVRO, STRING, INT, LONG, PROTOBUF*(coming soon)
Especially for Avro messages Schema Registry is also fully supported.

SELECT field2 AS my_field
FROM magicTopic
WHERE _ktype='DOUBLE'
      AND _vtype='AVRO'
SELECT *
FROM magicTopic
WHERE _ktype='JSON'
      AND _vtype='AVRO'

Nested Fields

SELECT
    details.antenaclass AS antena,
    details.readingperiod AS `interval`
FROM sensorsTopic

Access Metadata

All Kafka messages carry some metadata: topic, timestamp, offset and partition. Our SQL allows the user to be able to use them in the queries.

SELECT
    field1,
    _offset,
    _ts,
    _partition
FROM magicTopic

Work with Keys

SELECT min_temp AS `min`
FROM sensorsTopic
WHERE _key > 999
      AND _ktype='INT'
      AND _vtype='JSON'
SELECT *
FROM sensorsTopic
WHERE _key LIKE '%N01'
      AND _ktype='STRING'
      AND _vtype='AVRO'
      AND _partition = 0

Project Fields & Functions

You can apply computations and even enhance your topic metadata.

SELECT field2 * field3 - abs(field4.field5) AS total,
       _offset AS `_key.offset`
FROM magicTopic
SELECT min_temp AS `min`,
       max_temp AS `max`,
       (min_temp + max_temp) / 2 AS average,
       details.antenaclass AS antena,
       details.readingperiod AS `interval`
FROM sensorsTopic
WHERE _key.id like '%N01'
      AND _ktype='JSON'
      AND _vtype='JSON'

Fully Enabled Filtering

Of course filters are important, so numeric, string and date functions may apply to data and metadata.

SELECT *
FROM topicA
WHERE  (a.d.e + b) / c = 100
    AND _offset > 2000
    AND partition in (2,6,9)
LIMIT 1000

Time Travelling

Access particular time lines of your event history via epoch or date-time

SELECT
    device_id,
    temperature
FROM sensorsTopic
WHERE _ts >= '2017-08-01 12:24:00' AND _ts <= '2017-08-01 14:30:00'

Kafka SQL, in Continuous Streams


We instrument SQL into KStream applications, that can continuously execute either in-memory or scale-out with HA with a controllable parallelization.

SET parallel = 3
INSERT INTO targetTopic
    SELECT device_id,
           longitude,
           latitude,
           details.altitude AS alt
FROM sourceTopic
WHERE alt > 100
      AND _partition < 7
      AND _vtype='AVRO'
      AND _ktype='AVRO'

Continuous Joins

You can define tables and streams with kafka topics and apply rules to join them

WITH
tableX AS (SELECT x, y, z
   FROM topicA
   WHERE _ktype='AVRO'
     AND _vtype='AVRO'
   GROUP BY SLIDING (1,S)),
streamY AS
  (SELECT STREAM m, n
   FROM topicB
   WHERE x <> 'something'
     AND _ktype='AVRO'
     AND _vtype='AVRO')
SELECT X.x,
       X.y,
       Y.m
FROM tableX X
INNER JOIN tableY Y ON X.z = Y.n
GROUP BY SLIDING(2,s)

Windowing, SLIDING, TUMBLING, SESSION, HOP windows supported

FROM topicA A
INNER JOIN topicB B ON A.a=B.b
WHERE A.i <> B.uu
  AND A.m = 10
  AND B.x LIKE '%a%'
  AND A._ktype='AVRO'
  AND A._vtype='AVRO'
  AND B._ktype='AVRO'
  AND B._vtype='AVRO'
GROUP BY SLIDING(1,S)

Kafka SQL, Connect


The KCQL (Kafka Connect Query Language) is a SQL like syntax allowing streamlined configuration of Kafka Sink Connectors, and is available across many connectors.

You get a logical and concise syntax to define lightweight transformations, with additional DSL to integrate with target systems.

Example
A Primary Key (PK) does not have a context in Kafka, but if we are using SQL to write data into a target RDBMS, using our KCQL extentions we can:

INSERT INTO targetDbTable SELECT field1, field2, field3 PK field5

Features

  • Avro and JSon support
  • Topic to target mapping
  • Field selection, extraction and filtering
  • Access to Key, Value and metadata
  • Auto creation & auto evolution
  • Error policies

Supported Commands

INSERT (or UPSERT) supported extentions

INSERT INTO   $TARGET
SELECT *|columns           (i.e col1,col2 | col1 AS column1,col2)
FROM   $TOPIC_NAME
  [ PK columns ]
  [ AUTOCREATE ]
  [ AUTOEVOLVE ]
  [ BATCH = N ]
  [ CAPITALIZE ]
  [ INITIALIZE ]
  [ PARTITIONBY cola[,colb] ]
  [ DISTRIBUTEBY cola[,colb] ]
  [ TIMESTAMP cola|sys_current ]
  [ STOREAS   $YOUR_TYPE([key=value, .....]) ]
  [ WITHFORMAT TEXT|AVRO|JSON|BINARY|OBJECT|MAP ]

The SELECT mode is useful for target systems that do not support the concept of namespaces ( Key-Value stores such as HazelCast or Redis are flat )

SELECT *|columns
FROM   $TOPIC_NAME
  [ IGNORE columns ]
  [ WITHCONVERTER ]
  [ WITHFORMAT  JSON|AVRO|BINARY ]
  [ WITHGROUP $YOUR_CONSUMER_GROUP ]
  [ WITHPARTITION (partition),[(partition, offset) ]
  [ STOREAS $YOUR_TYPE([key=value, .....]) ]
  [ SAMPLE $RECORDS_NUMBER EVERY $SLIDE_WINDOW ]

An example

Here is a Redis Connector example:

  "connect.redis.sink.kcql": "INSERT INTO sensorTimeseries SELECT sensorID, temperature, humidity FROM sensorsTopic STOREAS SortedSet (score=ts)",

The above KCQL instructs the Redis Kafka Sink Connector to select fields, and store the time-series data into a Sorted Set, using the field `ts` for scoring each message

Resources


Documentation on KCQL Specifications
Connectors supporting KCQL

At the forefront of IoT



We cover the major IoT Protocols with Source and Sink Connectors for both CoAP and MQTT.

Use cases

How British Gas is streaming 4 Billion messages with our connectors to enable Connected Homes with a simple query!

Our Hazelcast connector for low latency IoT, and all our kafka connectors support SQL.




Let's Get In Touch!

Ready to start your next project with us?
That's great! Send us an email and we will get back to you as soon as possible!



info@landoop.com


Follow us for news, updates and releases!


@LandoopLtd