如何在Apache异步客户端中配置允许的挂起请求数

时间:2021-08-11 20:29:43

I'm developing an HTTP client application, using Apache httpasyncclient version 4.0.2.

我正在使用Apache httpasyncclient版本4.0.2开发HTTP客户端应用程序。

I would like to configure the maximum number of pending requests. Initially I assumed this number is the same as the maximum number of connections. I set this to 20 in the following way:

我想配置最大待处理请求数。最初我假设这个数字与最大连接数相同。我通过以下方式将其设置为20:

    final CloseableHttpAsyncClient httpclient;

In the constructor:

在构造函数中:

    final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory = new ManagedNHttpClientConnectionFactory(new DefaultHttpRequestWriterFactory(), new DefaultHttpResponseParserFactory(), HeapByteBufferAllocator.INSTANCE);
    final IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
            .setIoThreadCount(4)
            .setConnectTimeout(30000)
            .setSoTimeout(30000)
            .build();
    final PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(new DefaultConnectingIOReactor(ioReactorConfig), connFactory);
    final int maxConns = 20;
    connManager.setDefaultMaxPerRoute(maxConns);
    connManager.setMaxTotal(maxConns);
    httpclient = HttpAsyncClientBuilder.create().setConnectionManager(connManager).build();
    httpclient.start();

and later:

    final BasicAsyncRequestProducer requestProducer = new BasicAsyncRequestProducer(URIUtils.extractHost(URI.create(serverAddress)), request) {
        @Override
        public void requestCompleted(HttpContext context) {
            pendings.add(callback);
            logMessage(Direction.REQUEST, req);
            handler.onContentWriteCompleted();
        }
    };
    httpclient.execute(requestProducer, HttpAsyncMethods.createConsumer(), new HttpClientContext(), callback);

where callback is where I handle the response.

回调是我处理响应的地方。

As a proof of concept it fails. Indeed get do four threads running, but when I try to send 20 messages simultaneously, only 8 are sent immediately, the rest must wait until the server responds to them.

作为概念证明,它失败了。确实要运行四个线程,但是当我尝试同时发送20个消息时,只会立即发送8个消息,其余的必须等到服务器响应它们。

Apache debug messages are indicating that 20 connections have indeed been created. It seems that more configuration must be done.

Apache调试消息表明确实已创建了20个连接。似乎必须进行更多配置。

?

1 个解决方案

#1


Apache HttpAsyncClient maintains an unbounded request execution queue and does not attempt to limit the number of pending requests. Various applications may or may not want to throttle request rate and there is no easy way to satisfy them all.

Apache HttpAsyncClient维护一个*请求执行队列,不会尝试限制挂起请求的数量。各种应用程序可能或可能不想要限制请求率,并且没有简单的方法来满足它们。

One can however fairly easily throttle the number of concurrent requests using a simple semaphore.

但是,可以使用简单的信号量相当容易地限制并发请求的数量。

final Semaphore semaphore = new Semaphore(maxConcurrencyLevel);
for (int i = 0; i < n; i++) {

    semaphore.acquire();
    this.httpclient.execute(
            new BasicAsyncRequestProducer(target, request),
            new MyResponseConsumer(),
            new FutureCallback<HttpResponse>() {

                @Override
                public void completed(final HttpResponse result) {
                    semaphore.release();
                }

                @Override
                public void failed(final Exception ex) {
                    semaphore.release();
                }

                @Override
                public void cancelled() {
                    semaphore.release();
                }

            });
}

#1


Apache HttpAsyncClient maintains an unbounded request execution queue and does not attempt to limit the number of pending requests. Various applications may or may not want to throttle request rate and there is no easy way to satisfy them all.

Apache HttpAsyncClient维护一个*请求执行队列,不会尝试限制挂起请求的数量。各种应用程序可能或可能不想要限制请求率,并且没有简单的方法来满足它们。

One can however fairly easily throttle the number of concurrent requests using a simple semaphore.

但是,可以使用简单的信号量相当容易地限制并发请求的数量。

final Semaphore semaphore = new Semaphore(maxConcurrencyLevel);
for (int i = 0; i < n; i++) {

    semaphore.acquire();
    this.httpclient.execute(
            new BasicAsyncRequestProducer(target, request),
            new MyResponseConsumer(),
            new FutureCallback<HttpResponse>() {

                @Override
                public void completed(final HttpResponse result) {
                    semaphore.release();
                }

                @Override
                public void failed(final Exception ex) {
                    semaphore.release();
                }

                @Override
                public void cancelled() {
                    semaphore.release();
                }

            });
}