Time-Series with Kafka, Kafka Connect & InfluxDB

Learn how to integrate Apache Kafka with InfluxDB using Kafka Connect and implement a Scala Avro message producer to test the setup.

Christina Daskalaki
Dec 01, 2016
Time-Series with Kafka, Kafka Connect & InfluxDB

Watch the Video!

MQTT to Kafka to InfluxDB to view data in Kafka.

Time-series data stores are of particular interest these days and influxDB is a popular open source distributed time-series database. In this tutorial we will integrate Kafka with InfluxDB using Kafka Connect and implement a Scala Avro message producer to test the setup.

The steps we are going to follow are:

  • Setup a docker development environment

  • Run an InfluxDB Sink Kafka Connector

  • Create a Kafka Avro producer in Scala (use the schema registry)

  • Generate some messages in Kafka

Finally, we will verify the data in influxDB and visualize them in Chronograph.

About InfluxDB

Data in influxDb is organized in time series where each time series has points, one for each discrete sample of the metric. Points consists of:

  • time: the timestamp

  • measurement: which conceptually matches the idea of a SQL table

  • tags: key-value pairs in order to store index values, usually metadata.

  • fields: key-value pairs, containing the value itself, non indexed.

null values aren’t stored.

The structure is of the data is:

measurement,tagKey1=tagVal1,tagKey2=tagVal2 fieldK1=fieldV1,fieldK1=fieldV1

Example:

Setup the environment

For both the InfluxDB and Kafka ecosystem setup we are going to use Docker. If you don’t have docker installed on your machine, follow the instructions on their website.

InfluxDB setup with Docker

Run the docker image:

$ docker run -p 8086:8086 -v influxdb:/var/lib/influxdb influxdb

Enter the InfluxDB Shell and create a database and a user:

You may find more info on this docker image here.

Kafka setup with Docker

In order to set up Kafka we will need a set of systems to be installed. We are going to use fast-data-dev docker image, which includes everything we need for this tutorial:

  • Kafka Broker

  • ZooKeeper

  • Kafka Connect

  • Kafka REST Proxy

  • Schema Registry

  • Landoop’s UI tools

  • DataMountaineer Connectors (in our case InfluxDB will be used)

  • Embedded integration tests with examples.

Run the docker image:

You may find more on fast-data-dev documentation here.

While this is running for the first time it will run some integration tests and create some topics and connectors for you. You are able to access the landing page for the container at http://localhost:3030

Once it’s up and running you may access:

  • the Kafka Broker logs: docker exec <ID> tail -f /var/log/broker.log

  • the Kafka Connect logs: docker exec <ID> tail -f /var/log/connect-distributed.log

  • the docker bash: docker run --rm -it --net=host landoop/fast-data-dev bash

IoT example

Data usually originate from somewhere. For example, an embedded sensor can produce data at frequent intervals. For the shake of the example we are going to create a simple Kafka Producer in Scala to transmit some metrics

kafka-avro-console-producer can also be used instead of the Scala app

The data structure looks like this:

We are going to send these messages in a Kafka topic named as device-measurements-topic

InfluxDB sink connector in Kafka Connect

Our fast-data-dev docker image provides Kafka Connect and has already added the influxDB connector in the classpath of the available plugins.

Let’s find the IP address of the docker used by the influxDB container (usually 172.17.0.x) docker inspect <ID or NAME> | grep IPA

We will now use the kafka-connect-ui to configure the connector:

http://localhost:3030/kafka-connect-ui

kafka-connect 4

or from command line:

KCQL query explained

InfluxDB Sink connector supports KCQL (Kafka Connect Query Language) and this allows us to filter fields from a Kafka topic without needing to do any extra processing. Let’s say in our case we are only interested in storing the temperature for a particular device in a time series within InfluxDB.

The query looks like this:

deviceMeasurement is the time series namespace (measurement) for influxDB

WITHTIMESTAMP will add the time in the point. If we omit it from our query then influx will add a system timestamp.

Avro Producer using Avro4s

Now lets create a simple producer that sends these types of event messages into Kafka. We are going to use Avro format so that we can maintain the schema of the data in the Schema Registry. In this example both the key and the value of the Kafka message will be in Avro format. Of course you may use any other Serializer.

Generate messages to Kafka

We create the case classes that describe the schemas for the key and the value and then use call the producer with some random values.

Check the schema

In your schema registry you will find 2 automatically registered schemas: one for the key called device-measurement-topic-key and one for the value device-measurement-topic-value.

We can see the schemas created in http://localhost:3030/schema-registry-ui

schema-registry

Validate the messages

We can see the Kafka messages generated in http://localhost:3030/kafka-topics-ui

kafka-topics

and also InfluxDB should also be getting data:

influx-db

Visualise results

InfluxDB provides a visualization tool called Chronograf.

Here is how our time series chart may look like:

chronograf-lenses

More

Ready to get started with Lenses?

Try now for free