Confluent's Kafka Sink Connector to JDBC

Kafka connect to MySQL, PostgreSQL, Oracle, MS SQL Server
photo of Antonios Chalkiopoulos
Antonios Chalkiopoulos
antwnis

In this article we introduce how Apache Kafka and the Confluent Platform can be effectively utilized in order to stream data from Kafka into MySQL, PostgreSQL, Oracle or MS SQL Server. The new Kafka JDBC Sink Connector build and certified in collaboration with Datamountaineer, provides powerful schematics and capabilities.

kafka-connect-jdbc a new Kafka Connect component engineered to

  • Introduce Kafka Connect Query Language and provide
  • Failover and Rebalancing support
  • Idempotency of message processing
  • Exactly once schematics
  • High throughput and low resource utilization

At Landoop, our preferred way of building data streaming applications is:

Kafka Connect Query Language

When having data on Kafka and thinking about integration with a relational database we need to consider a number of options and high flexibility in expressing data mappings.

  • The connector can operate in INSERT or UPSERT mode.
  • Tables can be auto-created, as Confluent’s Schema Registry knows the schemas.
  • Tables can evolve automatically as schemas and messages evolve.
  • Users can be explicit about primary keys - or let the connector decide
  • Batching and explicit selecting or ignoring or renaming fields.

The above requirements resulted into the KCQL library that looks like that:

INSERT into PERSON SELECT fullname AS name FROM person-topic BATCH 5000

The above KCQL selects from person-topic a particular field, renames it and inserts it into table PERSON using batches of 5000 records. Note that the table PERSON needs to be an existing table in the RDBMS system.

INSERT into CUSTOMER SELECT * FROM customer-topic IGNORE col1 AUTOCREATE PK ID AUTOEVOLVE

This example, results into the table being created automatically for us, by inferring names and types from the avro schema registered in the schema registry for that topic. As that schema evolves, the above query will result into ALTER statements executed in order to keep the SQL table matching the message schema.

More examples can be found at the KCQL github repo

Supported databases

At the moment the Kafka-Connect-JDBC-Sink supports multiple dialects and has been functionally tested on Landoop against:

  • MySQL Server 5.5.47
  • PostgreSQL 9.5.3
  • Oracle XE 11g
  • SQL Server 2014

Running a Kafka Connector

Running a kafka connector in standalone mode, requires a file connect-jdkc-sink.properties

zookeeper=localhost:2181
bootstrap.servers=localhost:9092
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081

key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter

# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=5000
offset.storage.file.filename=sink.offset

The second configuration files that drives the connector is component specific, and in the case of the kafka-connect-jdbc-sink kafka-mysql-person.properties would look like:

name=jdbc-sink-UNIQUE-NAME
connector.class=com.datamountaineer.streamreactor.connect.jdbc.sink.JdbcSinkConnector
tasks.max=3
topics=person-topic
connect.jdbc.sink.error.policy=NOOP
connect.jdbc.sink.export.mappings=INSERT INTO person SELECT * from person-topic AUTOCREATE AUTOEVOLVE PK name

connect.jdbc.connection.uri=jdbc:mysql://mariadb.landoop.com:3306/jdbc_sink_01?useServerPrepStmts=false&rewriteBatchedStatements=true
connect.jdbc.connection.user=username
connect.jdbc.connection.password=password
connect.jdbc.sink.database=jdbc_sink_01

Executing the above would result into 3 workers getting messages from a Kafka topic, and sinking them in parallel in the table in the MySQL server. To achieve higher throughput through parallelization the topic would need to have at least 3 partitions.

A recommended configuration is a topic with 10 partitions, and for efficient throughput and resource utilization 2-3 max workers

Running the above connector is a matter of:

export KAFKA_HEAP_OPTS="-Xmx16G"
connect-standalone connect-jdkc-sink.properties kafka-mysql-person.properties

To enable DEBUG mode:

cat << EOF > log4j.properties
 log4j.rootLogger=DEBUG, console
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.Target=System.out
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
EOF
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:log4j.properties"

If you want to generate some data into this topic, refer to the Landoop-Avro-Generator github repo. In the logs, you will immediately notice ten’s of thousands of inserts / sec.

2016-06-11 14:00:38 INFO  DatabaseMetadata:255 - Table 'simple100' is present in catalog=jdbc_sink_01 - [MySQL]
2016-06-11 14:00:38 INFO  JdbcDbWriter:228 - Created the error policy handler as com.datamountaineer.streamreactor.connect.jdbc.sink.writer.NoopErrorHandlingPolicy
2016-06-11 14:00:38 INFO  WorkerSinkTask:155 - Sink task org.apache.kafka.connect.runtime.WorkerSinkTask@62808f2e finished initialization and start
2016-06-11 14:00:39 INFO  JdbcSinkTask:104 - Received 14768 records. First entry topic:demo-simple  partition:0 offset:30000. Writing them to the database...
2016-06-11 14:00:40 INFO  JdbcDbWriter:161 - Wrote 14768 to the database.
2016-06-11 14:00:40 INFO  JdbcSinkTask:107 - Finished writing 14768 records to the database.
2016-06-11 14:00:40 INFO  JdbcSinkTask:104 - Received 14768 records. First entry topic:demo-simple  partition:0 offset:44768. Writing them to the database...
2016-06-11 14:00:41 INFO  JdbcDbWriter:161 - Wrote 14768 to the database.
2016-06-11 14:00:41 INFO  JdbcSinkTask:107 - Finished writing 14768 records to the database.

Performance tests

When aiming for high throughput we need to parametrize the JDBC parameters and specifically for MySQL:

?useServerPrepStmts=false&rewriteBatchedStatements=true

To establish a baseline, we compared mysql restore vs an application built in Go vs Kafka Connect JDBC Sink against the same infrastructure:

We then went ahead and tested the performance on stand-alone and distributed kafka connectors on Landoop. Adding more nodes to an initial 3 node cluster did not improve performance, as we are limited mostly by the network I/O or the capacity of the MySQL server.

We tested with small messages (100 bytes) - medium messages (1000 bytes) and large messages (10,000 bytes) against combinations of 1,2,3 .. 10, 20 partitions.

We measured CPU, Memory and network utilization and throughput and aimed at building a connector with steady characteristics:

If you are interested for a more detailed report on Kafka Connect JDBC Sink performance download the performance report

Conclusions

The above established that the Kafka Connect JDBC Sink provides significant and continuous throughput, and that the framework delivers to it promise.

Optimizing for highest throughput requires tuning in terms of batching, maximum number of workers, networking, running SQL servers on SSDs and/or running a SQL cluster. What we aimed on Landoop was ensuring that the connector is developed on high standards, not leaking memory and providing a robust performance.

We want to thank Confluent, and Datamountaineer for delivering a high quality and performant connector. As a long time has passed and development never stalls, please refer to confluentinc/kafka-connect-jdbc to view current configuration options and capabilities.

comments powered by Disqus