Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
586 views
in Technique[技术] by (71.8m points)

spring webflux - Netty with ReactorClientHttpConnector: How to set readTimeout, writeTimeout and connectTimeout without using deprecated tcpConfiguration

Netty has deprecated the usage of HttpClient#tcpConfiguration. We are looking for a simple way how to configure:

  • connectTimeout: How long to wait for a connection
  • writeTimout: How long to wait for writing to the stream (if data cannot be delivered within this time frame, exception will be thrown)
  • readTimeout: How long to wait to read from the stream (if data is not delivered within this time frame, exception will be thrown)

The current code looks like this:

HttpClient httpClient = HttpClient.create();

Integer connectTimeOutInMs = clientProperties.getConnectTimeOutInMs();
Integer writeTimeOutInMs = clientProperties.getWriteTimeOutInMs();
Integer readTimeout = clientProperties.getReadTimeOutInMs();

httpClient = httpClient.tcpConfiguration(tcpClientParam -> {
    TcpClient tcpClient = tcpClientParam;
    // Connect timeout configuration
    if (connectTimeOutInMs != null) {
        tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeOutInMs);
    }
    return tcpClient.doOnConnected(conn -> {
        if (readTimeout != null) {
            conn.addHandlerLast(new ReadTimeoutHandler(readTimeout, TimeUnit.MILLISECONDS));
        }
        if (writeTimeOutInMs != null) {
            conn.addHandlerLast(new WriteTimeoutHandler(writeTimeOutInMs, TimeUnit.MILLISECONDS));
        }
    });
});

How should this be configured without use of tcpConfiguration? The following approach did not work as expected and ReadTimeout has not been thrown as expected.

Integer readTimeout = clientProperties.getReadTimeOutInMs();
if (readTimeout != null) {
    httpClient.doOnConnected(c -> c.addHandlerLast(new ReadTimeoutHandler(readTimeout, TimeUnit.MILLISECONDS)));
}
Integer writeTimeOutInMs = clientProperties.getWriteTimeOutInMs();
if (writeTimeOutInMs != null) {
    httpClient.doOnConnected(
            c -> c.addHandlerLast(new WriteTimeoutHandler(writeTimeOutInMs, TimeUnit.MILLISECONDS)));
}
Integer connectTimeout = clientProperties.getConnectTimeOutInMs();
if (connectTimeout != null) {
    httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
}

What would be the correct implementation? I saw that Netty offers HttpClient#responseTimeout(), which at the end sets HttpClientOperations#addHandler(NettyPipeline.ResponseTimeoutHandler, new ReadTimeoutHandler(responseTimeout.toMillis(), TimeUnit.MILLISECONDS));. But there are no methods for connect nor writeTimeouts.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

This can be done with a combination of a Reactor ConnectionProvider and a pair of Netty handlers. I am using a basic Spring configuration bean injected into a WebClient.Builder as follows:

import java.time.Duration;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.context.annotation.Bean;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;

import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;

public class WebClientFactory implements ConnectionObserver {
    private final Logger LOG = LogManager.getLogger(getClass());
    private final int connectTimeout;
    private final long readTimeout;
    private final long writeTimeout;
    private final int maxConnections;
    private final Duration maxAcquireTime;
    private final Duration maxIdleTime;
    private final Duration maxLifeTime;
    
    
    private final WebClient.Builder webClientBuilder;

    /**
     * Creates a new WebClientFactory
     * @param config The web client configuration
     */
    public WebClientFactory(WebClientConfiguration config) {
        connectTimeout = config.getConnectTimeout();
        readTimeout = config.getReadTimeout();
        writeTimeout = config.getWriteTimeout();
        maxConnections = config.getMaxConnections();
        maxAcquireTime = Duration.ofMillis(config.getMaxAcquireTime());
        maxIdleTime = Duration.ofMillis(config.getMaxIdleTime());
        maxLifeTime = Duration.ofMillis(config.getMaxLifeTime());
        ConnectionProvider connectionProvider =
                ConnectionProvider.builder("aod-http-client")
                        .maxConnections(maxConnections)
                        .pendingAcquireTimeout(maxAcquireTime)
                        .maxIdleTime(maxIdleTime)
                        .maxLifeTime(maxLifeTime)
                        .build();
        HttpClient httpClient = HttpClient.create(connectionProvider)
                .doOnConnected(conn -> conn
                        .addHandlerLast(new ReadTimeoutHandler(readTimeout, TimeUnit.MILLISECONDS))
                        .addHandlerLast(new WriteTimeoutHandler(writeTimeout, TimeUnit.MILLISECONDS))
                 ).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)             
                .observe(this);
        webClientBuilder = WebClient.builder()
                .clientConnector(
                        new ReactorClientHttpConnector(httpClient)
                            
                );
                
        LOG.info("WebClientConfig: connectTimeout={}, readTimeout={}, writeTimeout={}", connectTimeout, readTimeout, writeTimeout);
    }
    
    @Bean
    protected WebClient.Builder webClientBuilder() {
        return webClientBuilder;
    }

    /**
     * @see reactor.netty.ConnectionObserver#onStateChange(reactor.netty.Connection, reactor.netty.ConnectionObserver.State)
     */
    @Override
    public void onStateChange(Connection connection, State newState) {
        LOG.info("WebClient State Change: connection={}, newState={}", connection, newState);       
    }
    
    /**
     * @see reactor.netty.ConnectionObserver#onUncaughtException(reactor.netty.Connection, java.lang.Throwable)
     */
    @Override
    public void onUncaughtException(Connection connection, Throwable error) {
        LOG.error("WebClient Uncaught Exception: connection={}", connection, error);        
    }

}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...