KafkaItemReader and KafkaItemWriter

Apache Kafka is a distributed publish-subscribe messaging system. jberet-support module includes kafkaItemReader and kafkaItemWriter to read from or write to Kafka topics. kafkaItemReader reads and binds data to instances of custom POJO bean provided by the batch application.

kafkaItemReader keeps track of the current read position, including current topic name, topic partition number, and topic partition offset. Therefore, it is recommended to disable Kafka auto commit in Kafka consumer properties, e.g., enable.auto.commit=false. Kafka consumer properties are specified in batch property configFile.

This reader class supports retry and restart, using the tracked read position as checkpoint info. It is also recommended to turn off Kafka consumer automatic group management; instead manually assign topics and partitions for the consumer. See batch property topicPartitions.

The following dependencies are required for kafkaItemReader and kafkaItemWriter:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.9.0.0</version>
</dependency>

Batch Configuration Properties in Job XML

kafkaItemReader and kafkaItemWriter are configured through <reader> or <writer> batch properties in job xml. All properties are of type String, unless noted otherwise. The following is an example job xml that references kafkaItemReader and kafkaItemWriter:

<chunk item-count="5">
  <reader ref="kafkaItemReader">
    <properties>
      <property name="configFile" value="kafka-consumer.properties"/>
      <property name="topicPartitions" value="#{jobParameters['topicPartitions']}"/>
      <property name="pollTimeout" value="#{jobParameters['pollTimeout']}"/>
    </properties>
  </reader>
  ...
</chunk>
<chunk item-count="5">
  ...
  <writer ref="kafkaItemWriter">
    <properties>
      <property name="configFile" value="kafka-producer.properties"/>
      <property name="topicPartition" value="#{jobParameters['topicPartition']}"/>
      <property name="recordKey" value="#{jobParameters['recordKey']}"/>
    </properties>
  </writer>
</chunk>

Batch Properties for Both kafkaItemReader and kafkaItemWriter

configFile

The file path or URL to the Kafka configuration properties file. See Kafka docs for valid property keys and values.

Batch Properties for kafkaItemReader

topicPartitions

java.util.List<String>

A list of topic-and-partition in the form of "topicName1:partitionNumber1, topicName2:partitionNumber2, ". For example, "orders:0, orders:1, returns:0, returns:1".

pollTimeout

long

The time, in milliseconds, spent waiting in poll if data is not available. If 0, returns immediately with any records that are available now. Must not be negative.

Batch Properties for kafkaItemWriter

topicPartition

A topic partition in the form of <topicName>:<partitionNumber>. For example, "orders:0". Unlike KafkaItemReader, which accepts multiple TopicPartition as source, this writer class only accepts 1 TopicPartition as destination.

recordKey

The key used when sending ProducerRecord.