I am developing a springboot application that reads messages from a topic. Messages are managed in transaction and read as string in batch mode and then deserialized to an object. This operation may fail but I don't want to discard all the batch but rather I would move failed messages to DLQ.
As I am using spring-kafka 2.6.5 I found out that I can use BatchToRecordAdapter in order to achieve this purpose. However I did not find out how to know when I am reading the last message of any batch.
I would like to read one message at a time, serialize it and then store in an ArrayList; when listener reads the last message I want to make some processing and finally commit the transaction.
Thanks,
Giuseppe.
UPDATE
In order to achieve this purpose I override BatchToRecordAdapter and added headers that allow me to know the position in a batch of every element.
package com.doxee.commons.lifecycle.kafka;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.ConsumerRecordRecoverer;
import org.springframework.kafka.listener.adapter.BatchToRecordAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
/*
* Insert a description here.
*
* Bugs: none known
*
* @author gmiano [email protected]
* @createDate 25/01/21
*
* Copyright (C) 2021 Doxee S.p.A. C.F. - P.IVA: IT02714390362. All Rights Reserved
*/
@Slf4j
public class BatchToEnrichedRecordAdapter<K, V> implements BatchToRecordAdapter<K, V> {
private final ConsumerRecordRecoverer recoverer;
public BatchToEnrichedRecordAdapter(ConsumerRecordRecoverer recoverer) {
Assert.notNull(recoverer, "'recoverer' cannot be null");
this.recoverer = recoverer;
}
@Override
public void adapt(List<Message<?>> messages, List<ConsumerRecord<K, V>> records,
Acknowledgment ack, Consumer<?, ?> consumer, Callback<K, V> callback) {
for (int i = 0; i < messages.size(); ++i) {
Message enrichedMessage = MessageBuilder.fromMessage(messages.get(i))
.setHeader(MyHeaders.BATCH_SIZE, messages.size())
.setHeader(MyHeaders.MESSAGE_BATCH_POSITION, i + 1)
.build();
try {
callback.invoke(records.get(i), ack, consumer, enrichedMessage);
} catch (Exception var9) {
this.recoverer.accept(records.get(i), var9);
}
}
}
}
with this bean as recoverer
@Bean
ConsumerRecordRecoverer recoverer(KafkaOperations<?, ?> template) {
return new DeadLetterPublishingRecoverer(template, (record, ex) -> {
String srcTopic = record.topic();
String srcKey = record.key().toString();
log.error("Failed consume of message {} from topic {}", srcKey, srcTopic, ex);
String dstTopic;
if (ex.getCause() instanceof ClientResumableException) {
dstTopic = srcTopic.concat(".RECOVERABLE");
} else {
dstTopic = srcTopic.concat(".DLT");
}
log.error("Cannot retry. Try to write message to topic: {}", dstTopic);
return new TopicPartition(dstTopic, 0);
});
}
Is this the proper solution?
question from:
https://stackoverflow.com/questions/65881850/how-to-understand-if-a-batch-ended-in-a-batch-to-record-adapter 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…