Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
227 views
in Technique[技术] by (71.8m points)

spring boot - How to Understand if a Batch ended in a Batch To Record Adapter

I am developing a springboot application that reads messages from a topic. Messages are managed in transaction and read as string in batch mode and then deserialized to an object. This operation may fail but I don't want to discard all the batch but rather I would move failed messages to DLQ.

As I am using spring-kafka 2.6.5 I found out that I can use BatchToRecordAdapter in order to achieve this purpose. However I did not find out how to know when I am reading the last message of any batch. I would like to read one message at a time, serialize it and then store in an ArrayList; when listener reads the last message I want to make some processing and finally commit the transaction.

Thanks, Giuseppe.

UPDATE

In order to achieve this purpose I override BatchToRecordAdapter and added headers that allow me to know the position in a batch of every element.

package com.doxee.commons.lifecycle.kafka;

import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.ConsumerRecordRecoverer;
import org.springframework.kafka.listener.adapter.BatchToRecordAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;

/*
 * Insert a description here.
 *
 * Bugs: none known
 *
 * @author  gmiano [email protected]
 * @createDate  25/01/21
 *
 * Copyright (C) 2021 Doxee S.p.A. C.F. - P.IVA: IT02714390362. All Rights Reserved
 */

@Slf4j
public class BatchToEnrichedRecordAdapter<K, V> implements BatchToRecordAdapter<K, V> {

  private final ConsumerRecordRecoverer recoverer;

  public BatchToEnrichedRecordAdapter(ConsumerRecordRecoverer recoverer) {
    Assert.notNull(recoverer, "'recoverer' cannot be null");
    this.recoverer = recoverer;
  }

  @Override
  public void adapt(List<Message<?>> messages, List<ConsumerRecord<K, V>> records,
      Acknowledgment ack, Consumer<?, ?> consumer, Callback<K, V> callback) {
    for (int i = 0; i < messages.size(); ++i) {
      Message enrichedMessage = MessageBuilder.fromMessage(messages.get(i))
          .setHeader(MyHeaders.BATCH_SIZE, messages.size())
          .setHeader(MyHeaders.MESSAGE_BATCH_POSITION, i + 1)
          .build();

      try {
        callback.invoke(records.get(i), ack, consumer, enrichedMessage);
      } catch (Exception var9) {
        this.recoverer.accept(records.get(i), var9);
      }
    }
  }
}

with this bean as recoverer

 @Bean
  ConsumerRecordRecoverer recoverer(KafkaOperations<?, ?> template) {
    return new DeadLetterPublishingRecoverer(template, (record, ex) -> {
      String srcTopic = record.topic();
      String srcKey = record.key().toString();
      log.error("Failed consume of message {} from topic {}", srcKey, srcTopic, ex);
      String dstTopic;
      if (ex.getCause() instanceof ClientResumableException) {
        dstTopic = srcTopic.concat(".RECOVERABLE");
      } else {
        dstTopic = srcTopic.concat(".DLT");
      }
      log.error("Cannot retry. Try to write message to topic: {}", dstTopic);
      return new TopicPartition(dstTopic, 0);
    });
  }

Is this the proper solution?

question from:https://stackoverflow.com/questions/65881850/how-to-understand-if-a-batch-ended-in-a-batch-to-record-adapter

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...