In new versions of Spring Cloud Stream @EnableBinding and declarative programming style will be deprecated. With functional programming style, how can I handle processed with pipe messages?
First case
I need to process message, that send to out channel, like logging success processed, or store them to database.
@EnableBinding(Processor.class)
public class MessageProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageProcessor.class);
private final MessageChannel output;
public MessageProcessor(MessageChannel output) {
this.output = output;
}
@StreamListener(Processor.INPUT)
public void process(Message<String> message) {
LOGGER.info("Receive message: {}", message);
output.send(message);
/*Do some work with message here*/
LOGGER.info("Finish processing for message: {}", message);
}
}
Second case
I receive a message, that store collection of DTO, and I need to process each DTO object separately.
@EnableBinding(Processor.class)
public class BatchMessageProcessor {
private final MessageChannel output;
public BatchMessageProcessor(MessageChannel output) {
this.output = output;
}
@StreamListener(Processor.INPUT)
public void process(Message<PackageDto> pgk) {
Stream.ofNullable(pgk)
.map(Message::getPayload)
.map(PackageDto::getMessages)
.flatMap(Collection::stream)
.filter(Objects::nonNull)
/*Sent messages separately*/
.forEach(m -> output.send(MessageBuilder.withPayload(m).build()));
}
}
How I can do this cases with funciton programming style in Spring Cloud Stream?
question from:
https://stackoverflow.com/questions/65859338/spring-cloud-stream-handling-messages-after-sent 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…