等待vertx中多个observable的响应

时间:2022-04-01 18:02:23

I am using vertx-rx-java

我正在使用vertx-rx-java

I have a handler where I need to make 2 different requests via EventBus and create response using responses from these 2 requests.

我有一个处理程序,我需要通过EventBus发出2个不同的请求,并使用来自这2个请求的响应创建响应。

public handle(RoutingContext context) {
....some code...    

    Single<Message<Object>> firstRequest = eb.rxSend("address1", "message1");
    Single<Message<Object>> secondRequest = eb.rxSend("address2", "message2");
    ... TODO ...
}

Basically I need to combine two request results and put them into RoutingContext response. The problem is that I don't completely understand how to do that in rxjava style. The only way I was able to think of is smth like that:

基本上我需要结合两个请求结果并将它们放入RoutingContext响应中。问题是我不完全理解如何在rxjava风格中这样做。我能想到的唯一方法就是这样:

firstRequest.doOnSuccess(resp1 -> {
  secondRequest.doOnSuccess(resp2 -> {

  });
});

But I think it's a bad way because what if there are 10 requests instead of 2? this code will have 10 nested calls.

但我认为这是一个糟糕的方式,因为如果有10个请求而不是2个请求呢?此代码将有10个嵌套调用。

Is there any better ways to combine multiple requests results?

有没有更好的方法来组合多个请求结果?

1 个解决方案

#1


2  

the zip operator can be used to associated emissions from multiple sources, with the distinction that it only emits when each of its underlying sources emits. so...

zip操作符可用于关联来自多个源的排放,区别在于它仅在每个基础源发出时发出。所以...

  • in the case that there are two underlying sources, zip will emit in pairs.
  • 在有两个潜在来源的情况下,zip将成对发射。

  • in the case that there are three underlying sources, zip will emit in triplets.
  • 在有三个潜在来源的情况下,zip将以三元组的形式发出。

  • ...etc

to get a hands-on sense of what i mean, you can refer to the RxMarbles page, and play with the emissions in the top two streams while observing the bottom stream.

为了获得我的意思,您可以参考RxMarbles页面,并在观察底部流的同时使用前两个流中的排放。

with that understanding, you can use the zip operator to combine the results of the Message replies, like this:

根据这种理解,您可以使用zip运算符来组合Message回复的结果,如下所示:

Single.zip(firstRequest, secondRequest, (firstReply, secondReply) -> {
    // ...do stuff with the replies and compose some result
    //    to be handled in onSuccess()
    return firstReply.body().toString() + secondReply.body().toString();
})
.subscribe(
    result -> {
        System.out.println("## onSuccess(" + result + ")");
    },
    error -> {
        System.err.println("## onError(" + error.getMessage() + ")");
    }
);

if either delivery fails then the onError handler will be triggered. onSuccess will be triggered otherwise.

如果任何一个传递失败,那么将触发onError处理程序。否则将触发onSuccess。

if, as you mentioned, you have a large number of requests that you'd like to handle at once, there is an overloaded variant of zip that accepts an Iterable of sources. in your case, that might look something like this:

如果你正如你所提到的那样,你有大量的请求需要一次处理,那么有一个重载的zip变种可以接受Iterable的来源。在你的情况下,这可能看起来像这样:

final List<Single<Message<Object>>> requests = asList(firstRequest, secondRequest, ...);

Single.zip(requests, replies -> {
    // ...do stuff with the array of replies
    return null;
})
.subscribe(...);

hope that helps!

希望有所帮助!

#1


2  

the zip operator can be used to associated emissions from multiple sources, with the distinction that it only emits when each of its underlying sources emits. so...

zip操作符可用于关联来自多个源的排放,区别在于它仅在每个基础源发出时发出。所以...

  • in the case that there are two underlying sources, zip will emit in pairs.
  • 在有两个潜在来源的情况下,zip将成对发射。

  • in the case that there are three underlying sources, zip will emit in triplets.
  • 在有三个潜在来源的情况下,zip将以三元组的形式发出。

  • ...etc

to get a hands-on sense of what i mean, you can refer to the RxMarbles page, and play with the emissions in the top two streams while observing the bottom stream.

为了获得我的意思,您可以参考RxMarbles页面,并在观察底部流的同时使用前两个流中的排放。

with that understanding, you can use the zip operator to combine the results of the Message replies, like this:

根据这种理解,您可以使用zip运算符来组合Message回复的结果,如下所示:

Single.zip(firstRequest, secondRequest, (firstReply, secondReply) -> {
    // ...do stuff with the replies and compose some result
    //    to be handled in onSuccess()
    return firstReply.body().toString() + secondReply.body().toString();
})
.subscribe(
    result -> {
        System.out.println("## onSuccess(" + result + ")");
    },
    error -> {
        System.err.println("## onError(" + error.getMessage() + ")");
    }
);

if either delivery fails then the onError handler will be triggered. onSuccess will be triggered otherwise.

如果任何一个传递失败,那么将触发onError处理程序。否则将触发onSuccess。

if, as you mentioned, you have a large number of requests that you'd like to handle at once, there is an overloaded variant of zip that accepts an Iterable of sources. in your case, that might look something like this:

如果你正如你所提到的那样,你有大量的请求需要一次处理,那么有一个重载的zip变种可以接受Iterable的来源。在你的情况下,这可能看起来像这样:

final List<Single<Message<Object>>> requests = asList(firstRequest, secondRequest, ...);

Single.zip(requests, replies -> {
    // ...do stuff with the array of replies
    return null;
})
.subscribe(...);

hope that helps!

希望有所帮助!