KCQL | Kafka Connect Query Language

KCQL: Kafka Connect Query Language

The KCQL (Kafka Connect Query Language) is a SQL like syntax allowing streamlined configuration of Kafka Sink Connectors and then some more..

What benefits do I get?

You get a logical and concise syntax to define lightweight transformations, so you gain in expressibility and new capabilities

Why?

While working with Kafka based event systems, a pattern emerges, you need to producing complex configurations to support rich functionality

Imagine wanting to configure one Kafka JDBC sink to write into multiple tables of a SQL database while sourcing from multiple topics. On top of that you want some SQL tables to automatically evolve, when Avro message evolution occurs:

INSERT INTO transactions SELECT field1 AS column1, field2 AS column2, field3 FROM TransactionTopic;
INSERT INTO audits       SELECT *                                            FROM AuditsTopic;
INSERT INTO logs         SELECT *                                            FROM LogsTopic AUTOEVOLVE;
INSERT INTO invoices     SELECT *                                            FROM InvoiceTopic PK invoiceID;

The above is quite power-full as you can project fields, rename or ignore them and further customize any complex configurations in plain text.

So we invented the concept of a custom, yet simple Query Language for Kafka and extended it to support complex configurations per system including: Sorted Sets (Redis), Index definition (InfluxDB), Name-Spaces and Buckets. We worked with our collaborators and partners DataMountaineer and KCQL is utilized by multiple Kafka Sink connectors.

How does it look ?

Preview some of the supported Query commands for INSERT (or UPSERT) queries

INSERT INTO   $TARGET
SELECT *|columns           (i.e col1,col2 | col1 AS column1,col2)
FROM   $TOPIC_NAME
  [ IGNORE columns ]
  [ AUTOCREATE ]
  [ PK columns ]
  [ AUTOEVOLVE ]
  [ BATCH = N ]
  [ CAPITALIZE ]
  [ INITIALIZE ]
  [ PARTITIONBY cola[,colb] ]
  [ DISTRIBUTEBY cola[,colb] ]
  [ TIMESTAMP cola|sys_current ]
  [ STOREAS   $YOUR_TYPE([key=value, .....]) ]
  [ WITHFORMAT TEXT|AVRO|JSON|BINARY|OBJECT|MAP ]

The SELECT mode is useful for target systems that do not support the concept of namespaces ( Key-Value stores such as HazelCast or Redis are flat )

SELECT *|columns
FROM   $TOPIC_NAME
  [ IGNORE columns ]
  [ WITHFORMAT  JSON|AVRO|BINARY ]
  [ WITHGROUP $YOUR_CONSUMER_GROUP ]
  [ WITHPARTITION (partition),[(partition, offset) ]
  [ STOREAS $YOUR_TYPE([key=value, .....]) ]
  [ SAMPLE $RECORDS_NUMBER EVERY $SLIDE_WINDOW ]

An example

Here is a Redis Connector example:

{
  "connector.class": "com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkConnector",
  "connect.redis.connection.password": "pass",
  "tasks.max": "1",
  "connect.redis.sink.kcql": "SELECT temperature, humidity FROM sensorsTopic PK sensorID STOREAS SortedSet (score=ts)",
  "topics": "sensorsTopic",
  "connect.redis.connection.port": "40110",
  "name": "redis-connector",
  "connect.redis.connection.host": "redis03.landoop.com"
}

The above example invokes the execution of a Redis Kafka Sink Connector that would select the fields temperature and humidity store the time-series data into a Sorted Set named as the value of the sensorID field, using the field `ts` for scoring each message

Resources

Documentation on KCQL Specifications
Connectors supporting KCQL

The Future

The vision is to progress through the following list and implement all capabilities:

  • Topic to target mapping
  • Field selection
  • Auto creation
  • Auto evolution
  • Error policies
  • Multiple KCQLs / topic
  • Access to Key
  • Cherry picking nested fiels (field extraction) - in progress
  • Introduce Kafka KIP