如果异步模板HTTP请求占用太多时间,如何取消?

时间:2021-05-13 20:47:38

Since starting, I was always confuse of how to deal with InterruptedException and how to properly cancel the http request if they are taking too much time. I have a library in which I have provided two methods, sync and async for our customer. They can call whichever method they feel is right for their purpose.

从一开始,我总是搞不清楚如何处理InterruptedException,以及如果http请求占用太多时间,如何正确地取消它们。我有一个库,其中为我们的客户提供了同步和异步两种方法。他们可以调用任何他们认为适合他们目的的方法。

  • executeSync() - waits until I have a result, returns the result.
  • executeSync()—等待直到得到结果,返回结果。
  • executeAsync() - returns a Future immediately which can be processed after other things are done, if needed.
  • executeAsync()——立即返回一个Future,如果需要,可以在完成其他事情之后进行处理。

They will pass DataKey object which has the user id and timeout value in it. We will figure out which machine to call basis on the user id and then create an URL with that machine and we will make http call to the URL using AsyncRestTemplate and then send the response back to them basis on whether it is successful or not.

它们将传递DataKey对象,其中包含用户id和超时值。我们将找出在用户id上调用基础的机器,然后使用该机器创建一个URL,我们将使用AsyncRestTemplate对URL进行http调用,然后将响应发送回它们,以判断它是否成功。

I am using exchange method of AsyncRestTemplate which returns back a ListenableFuture and I wanted to have async non blocking architecture with NIO based client connections so that request uses non blocking IO so that's why I went with AsyncRestTemplate. Does this approach sounds right for my problem definition? This library will be used in production under very heavy load.

我正在使用AsyncRestTemplate的交换方法它返回一个ListenableFuture并且我想要有一个基于NIO的客户端连接的异步非阻塞体系结构,这样请求就会使用非阻塞IO,所以我选择了AsyncRestTemplate。这种方法听起来适合我的问题定义吗?这个库将在非常重的负载下用于生产。

Below is my interface:

下面是我的界面:

public interface Client {
    // for synchronous
    public DataResponse executeSync(DataKey key);

    // for asynchronous
    public ListenableFuture<DataResponse> executeAsync(DataKey key);
}

And below is my implementation of the interface:

下面是我对界面的实现:

public class DataClient implements Client {

    // using spring 4 AsyncRestTemplate
    private final AsyncRestTemplate restTemplate = new AsyncRestTemplate();

    // for synchronous
    @Override
    public DataResponse executeSync(DataKey keys) {
        Future<DataResponse> responseFuture = executeAsync(keys);
        DataResponse response = null;

        try {
            response = responseFuture.get(keys.getTimeout(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException ex) {
            // do we need to catch InterruptedException here and interrupt the thread?
            Thread.currentThread().interrupt();
            // also do I need throw this RuntimeException at all?
            throw new RuntimeException("Interrupted", ex);
        } catch (TimeoutException ex) {
            DataLogging.logEvents(ex, DataErrorEnum.CLIENT_TIMEOUT, keys);
            response = new DataResponse(null, DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
            responseFuture.cancel(true); // terminating the tasks that got timed out so that they don't take up the resources?
        } catch (Exception ex) {
            DataLogging.logEvents(ex, DataErrorEnum.ERROR_CLIENT, keys);
            response = new DataResponse(null, DataErrorEnum.ERROR_CLIENT, DataStatusEnum.ERROR);
        }

        return response;
    }

    // for asynchronous     
    @Override
    public ListenableFuture<DataResponse> executeAsync(final DataKey keys) {

        final SettableFuture<DataResponse> responseFuture = SettableFuture.create();
        final org.springframework.util.concurrent.ListenableFuture orig = 
            restTemplate.exchange(createURL(keys), HttpMethod.GET, keys.getEntity(), String.class);

        orig.addCallback(
                new ListenableFutureCallback<ResponseEntity<String>>() {
                    @Override
                    public void onSuccess(ResponseEntity<String> result) {
                        responseFuture.set(new DataResponse(result.getBody(), DataErrorEnum.OK,
                                DataStatusEnum.SUCCESS));
                    }

                    @Override
                    public void onFailure(Throwable ex) {
                        DataLogging.logErrors(ex, DataErrorEnum.ERROR_SERVER, keys);
                        responseFuture.set(new DataResponse(null, DataErrorEnum.ERROR_SERVER,
                                DataStatusEnum.ERROR));
                    }
                });

        // propagate cancellation back to the original request
        responseFuture.addListener(new Runnable() {
          @Override public void run() {
             if (responseFuture.isCancelled()) {
               orig.cancel(false); // I am keeping this false for now
             }
          }
        }, MoreExecutors.directExecutor());
        return responseFuture;
    }
}

And customer will be calling like this from their code -

客户将从他们的代码-

// if they are calling executeSync() method
DataResponse response = DataClientFactory.getInstance().executeSync(dataKey);

// and if they want to call executeAsync() method
Future<DataResponse> response = DataClientFactory.getInstance().executeAsync(dataKey);

Now the question is -

现在的问题是-

  1. Can we interrupt AsyncRestTemplate call if http request is taking too long? I am actually calling cancel on my future in my above code in executeSync method but I am not sure how do I verify it to make sure it is doing what it should? I want to propagate cancellation back to the original future, so that I can cancel the corresponding http request (which I probably want to do to save resources) so that's why I have added a listener in my executeAsync method. I believe, we cannot interrupt RestTemplate calls but not sure on AsyncRestTemplate whether we can do that or not. If let's say we can interrupt AsyncRestTemplate calls, then am I doing everything right to interrupt the http calls? Or is there any better/cleaner way to do this? Or Do I even need to worry about cancelling the Http request with AsyncRestTemplate with my current design?

    如果http请求占用太长时间,我们可以中断异步模板调用吗?实际上,我正在调用executeSync方法中的上述代码中的cancel,但我不确定如何验证它以确保它正在执行它应该执行的操作?我想将取消消息传播回原来的将来,这样我就可以取消相应的http请求(我可能想这样做以保存资源),所以我在executeAsync方法中添加了一个侦听器。我认为,我们不能中断RestTemplate调用,但不能确定是否可以这样做。如果我们可以中断AsyncRestTemplate调用,那么我是否做了所有正确的事情来中断http调用?或者有更好更干净的方法吗?或者我是否需要担心用当前设计的异步模板取消Http请求?

        // propagate cancellation back to the original request
        responseFuture.addListener(new Runnable() {
          @Override public void run() {
             if (responseFuture.isCancelled()) {
               orig.cancel(false); // I am keeping this false for now
             }
          }
        }, MoreExecutors.directExecutor()); 
    

    With the current setup, I can see it is throwing CancellationException some of the times (not everytime)- Does that mean my HTTP request was cancelled then?

    在当前的设置中,我可以看到它有时会抛出取消异常(不是每次)——这是否意味着当时我的HTTP请求被取消了?

  2. Also am I doing the right thing in catch block of InterruptedException in executeSync method? If not, then what's the right way to deal with that. And do I need to deal with InterruptedException at all in my case?
  3. 在executeSync方法中捕获InterruptedException,我是否做了正确的事情?如果不是,那么正确的处理方法是什么?我需要处理InterruptedException吗?
  4. Is it true that by default AsyncRestTamplete uses blocking calls and request per thread? If yes, then is there any way to have NIO based client connections in my current setup?
  5. 默认情况下,AsyncRestTamplete对每个线程都使用阻塞调用和请求,这是真的吗?如果是,那么在我当前的设置中是否有基于NIO的客户端连接?

Any explanations/code suggestions will be of great help.

任何解释/代码建议都会有很大的帮助。

1 个解决方案

#1


8  

First of all, Why are you using SettableFuture? Why can't just return the ListenableFuture returned by AsyncRestTemplate?

首先,你为什么要使用SettableFuture?为什么不能返回AsyncRestTemplate返回的ListenableFuture呢?

1. Can we interrupt AsyncRestTemplate call if http request is taking too long?

Of course you do! You only need to call Future.cancel method. This method will interrupt the execution of the internal RestTemplate that AsyncRestTemplate is actually using.

当然,你做的!你只需要打电话给未来。取消方法。该方法将中断AsyncRestTemplate实际使用的内部RestTemplate的执行。

2. Also am I doing the right thing in catch block of InterruptedException in executeSync method?

As Phil and Danilo have said, you don't need to interrupt the current thread within the InterruptedException catch block. Just do whatever you need to do when the execution of the request must be canceled.

正如Phil和Danilo所说,您不需要中断InterruptedException块中的当前线程。当请求的执行必须被取消时,只需做任何需要做的事情。

In fact, I recommend you create a method that handles this behaviour, something like handleInterruption, and use this method for both TimeoutException and InterruptedException.

事实上,我建议您创建一个处理这种行为的方法,比如handleinterrupt,并将此方法用于TimeoutException和InterruptedException。

3. Is it true that by default AsyncRestTamplete uses blocking calls and request per thread?

Yes. The default constructor of AsyncRestTamplete is internally using SimpleClientHttpRequestFactory and SimpleAsyncTaskExecutor.

是的。AsyncRestTamplete的默认构造函数在内部使用SimpleClientHttpRequestFactory和SimpleAsyncTaskExecutor。

This TaskExecutor always starts a threat for every task, and never reuse Threads, so it's very inefficient:

这个任务执行程序总是对每个任务都产生威胁,并且从不重用线程,所以它非常低效:

 * TaskExecutor implementation that fires up a new Thread for each task,
 * executing it asynchronously.
 *
 * Supports limiting concurrent threads through the "concurrencyLimit"
 * bean property. By default, the number of concurrent threads is unlimited.
 *
 * NOTE: This implementation does not reuse threads! Consider a
 * thread-pooling TaskExecutor implementation instead, in particular for
 * executing a large number of short-lived tasks.
 *

I recommend you use another configuration of AsyncRestTemplate.

我建议您使用异步模板的另一种配置。

You should use the constructor of AsyncRestTemplate that uses another TaskExecutor:

您应该使用使用另一个TaskExecutor的AsyncRestTemplate的构造函数:

public AsyncRestTemplate(AsyncListenableTaskExecutor taskExecutor)

For instance:

例如:

AsyncRestTemplate template = new AsyncRestTemplate(new ConcurrentTaskExecutor(Executors.newCachedThreadPool()));

This ExecutorService (Executors.newCachedThreadPool()) creates new threads as needed, but will reuse previously constructed threads when they are available.

这个ExecutorService (Executors.newCachedThreadPool()))根据需要创建新线程,但是当以前构造的线程可用时,会重用它们。

Or even better, you can use another RequestFactory. For instance, you can use HttpComponentsAsyncClientHttpRequestFactory, that internally uses NIO, just calling the proper constructor of AsyncRestTemplate:

或者更好,您可以使用另一个RequestFactory。例如,您可以使用HttpComponentsAsyncClientHttpRequestFactory,它在内部使用NIO,只需调用AsyncRestTemplate的适当构造函数:

new AsyncRestTemplate(new HttpComponentsAsyncClientHttpRequestFactory())

Don't forget the internal behaviour of AsyncRestTemplate will depend on how you create the object.

不要忘记AsyncRestTemplate的内部行为取决于您如何创建对象。

#1


8  

First of all, Why are you using SettableFuture? Why can't just return the ListenableFuture returned by AsyncRestTemplate?

首先,你为什么要使用SettableFuture?为什么不能返回AsyncRestTemplate返回的ListenableFuture呢?

1. Can we interrupt AsyncRestTemplate call if http request is taking too long?

Of course you do! You only need to call Future.cancel method. This method will interrupt the execution of the internal RestTemplate that AsyncRestTemplate is actually using.

当然,你做的!你只需要打电话给未来。取消方法。该方法将中断AsyncRestTemplate实际使用的内部RestTemplate的执行。

2. Also am I doing the right thing in catch block of InterruptedException in executeSync method?

As Phil and Danilo have said, you don't need to interrupt the current thread within the InterruptedException catch block. Just do whatever you need to do when the execution of the request must be canceled.

正如Phil和Danilo所说,您不需要中断InterruptedException块中的当前线程。当请求的执行必须被取消时,只需做任何需要做的事情。

In fact, I recommend you create a method that handles this behaviour, something like handleInterruption, and use this method for both TimeoutException and InterruptedException.

事实上,我建议您创建一个处理这种行为的方法,比如handleinterrupt,并将此方法用于TimeoutException和InterruptedException。

3. Is it true that by default AsyncRestTamplete uses blocking calls and request per thread?

Yes. The default constructor of AsyncRestTamplete is internally using SimpleClientHttpRequestFactory and SimpleAsyncTaskExecutor.

是的。AsyncRestTamplete的默认构造函数在内部使用SimpleClientHttpRequestFactory和SimpleAsyncTaskExecutor。

This TaskExecutor always starts a threat for every task, and never reuse Threads, so it's very inefficient:

这个任务执行程序总是对每个任务都产生威胁,并且从不重用线程,所以它非常低效:

 * TaskExecutor implementation that fires up a new Thread for each task,
 * executing it asynchronously.
 *
 * Supports limiting concurrent threads through the "concurrencyLimit"
 * bean property. By default, the number of concurrent threads is unlimited.
 *
 * NOTE: This implementation does not reuse threads! Consider a
 * thread-pooling TaskExecutor implementation instead, in particular for
 * executing a large number of short-lived tasks.
 *

I recommend you use another configuration of AsyncRestTemplate.

我建议您使用异步模板的另一种配置。

You should use the constructor of AsyncRestTemplate that uses another TaskExecutor:

您应该使用使用另一个TaskExecutor的AsyncRestTemplate的构造函数:

public AsyncRestTemplate(AsyncListenableTaskExecutor taskExecutor)

For instance:

例如:

AsyncRestTemplate template = new AsyncRestTemplate(new ConcurrentTaskExecutor(Executors.newCachedThreadPool()));

This ExecutorService (Executors.newCachedThreadPool()) creates new threads as needed, but will reuse previously constructed threads when they are available.

这个ExecutorService (Executors.newCachedThreadPool()))根据需要创建新线程,但是当以前构造的线程可用时,会重用它们。

Or even better, you can use another RequestFactory. For instance, you can use HttpComponentsAsyncClientHttpRequestFactory, that internally uses NIO, just calling the proper constructor of AsyncRestTemplate:

或者更好,您可以使用另一个RequestFactory。例如,您可以使用HttpComponentsAsyncClientHttpRequestFactory,它在内部使用NIO,只需调用AsyncRestTemplate的适当构造函数:

new AsyncRestTemplate(new HttpComponentsAsyncClientHttpRequestFactory())

Don't forget the internal behaviour of AsyncRestTemplate will depend on how you create the object.

不要忘记AsyncRestTemplate的内部行为取决于您如何创建对象。