KafkaItemReader and KafkaItemWriter
Apache Kafka is a distributed publish-subscribe messaging system. jberet-support module includes kafkaItemReader and kafkaItemWriter to read from or write to Kafka topics. kafkaItemReader reads and binds data to instances of custom POJO bean provided by the batch application.
kafkaItemReader keeps track of the current read position, including current topic name, topic partition number, and topic partition offset. Therefore, it is recommended to disable Kafka auto commit in Kafka consumer properties, e.g., enable.auto.commit=false. Kafka consumer properties are specified in batch property configFile.
This reader class supports retry and restart, using the tracked read position as checkpoint info. It is also recommended to turn off Kafka consumer automatic group management; instead manually assign topics and partitions for the consumer. See batch property topicPartitions.
The following dependencies are required for kafkaItemReader and kafkaItemWriter:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0</version>
</dependency>
Batch Configuration Properties in Job XML
kafkaItemReader and kafkaItemWriter are configured through <reader> or <writer> batch properties in job xml. All properties are of type String, unless noted otherwise. The following is an example job xml that references kafkaItemReader and kafkaItemWriter:
<chunk item-count="5">
<reader ref="kafkaItemReader">
<properties>
<property name="configFile" value="kafka-consumer.properties"/>
<property name="topicPartitions" value="#{jobParameters['topicPartitions']}"/>
<property name="pollTimeout" value="#{jobParameters['pollTimeout']}"/>
</properties>
</reader>
...
</chunk>
<chunk item-count="5">
...
<writer ref="kafkaItemWriter">
<properties>
<property name="configFile" value="kafka-producer.properties"/>
<property name="topicPartition" value="#{jobParameters['topicPartition']}"/>
<property name="recordKey" value="#{jobParameters['recordKey']}"/>
</properties>
</writer>
</chunk>
Batch Properties for Both kafkaItemReader and kafkaItemWriter
configFile
The file path or URL to the Kafka configuration properties file. See Kafka docs for valid property keys and values.
Batch Properties for kafkaItemReader
topicPartitions
java.util.List<String>
A list of topic-and-partition in the form of "topicName1:partitionNumber1, topicName2:partitionNumber2, ". For example, "orders:0, orders:1, returns:0, returns:1".
pollTimeout
long
The time, in milliseconds, spent waiting in poll if data is not available. If 0, returns immediately with any records that are available now. Must not be negative.
Batch Properties for kafkaItemWriter
topicPartition
A topic partition in the form of <topicName>:<partitionNumber>. For example, "orders:0". Unlike KafkaItemReader, which accepts multiple TopicPartition as source, this writer class only accepts 1 TopicPartition as destination.
recordKey
The key used when sending ProducerRecord.