Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
202 views
in Technique[技术] by (71.8m points)

java - Spring Cloud Stream handling messages after sent

In new versions of Spring Cloud Stream @EnableBinding and declarative programming style will be deprecated. With functional programming style, how can I handle processed with pipe messages?

First case

I need to process message, that send to out channel, like logging success processed, or store them to database.

@EnableBinding(Processor.class)
public class MessageProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageProcessor.class);
    private final MessageChannel output;

    public MessageProcessor(MessageChannel output) {
        this.output = output;
    }

    @StreamListener(Processor.INPUT)
    public void process(Message<String> message) {
        LOGGER.info("Receive message: {}", message);
        output.send(message);
        /*Do some work with message here*/
        LOGGER.info("Finish processing for message: {}", message);
    }
}

Second case

I receive a message, that store collection of DTO, and I need to process each DTO object separately.

@EnableBinding(Processor.class)
public class BatchMessageProcessor {
    private final MessageChannel output;

    public BatchMessageProcessor(MessageChannel output) {
        this.output = output;
    }

    @StreamListener(Processor.INPUT)
    public void process(Message<PackageDto> pgk) {
        Stream.ofNullable(pgk)
                .map(Message::getPayload)
                .map(PackageDto::getMessages)
                .flatMap(Collection::stream)
                .filter(Objects::nonNull)
                /*Sent messages separately*/
                .forEach(m -> output.send(MessageBuilder.withPayload(m).build()));
    }
}

How I can do this cases with funciton programming style in Spring Cloud Stream?

question from:https://stackoverflow.com/questions/65859338/spring-cloud-stream-handling-messages-after-sent

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Use a StreamBridge or reactor EmitterProcessor.

See Sending arbitrary data to an output.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...