I'm looking for the best method to wait for async tasks to finish in rx-java.
我正在寻找等待异步任务在rx-java中完成的最佳方法。
As a common example say there is a function which gets a list of id's from a local store and then queries a remote system for those Id's, the remote system results are then consolidated into a single report and returned to the caller of the function. As the call to the remote system is slow we want them to be done in asynchronously and I only want to return once all of the calls have returned and their results have been processed.
作为一个常见示例,有一个函数从本地存储中获取id列表,然后在远程系统中查询这些Id,然后将远程系统结果合并到单个报告中并返回给函数的调用者。由于对远程系统的调用很慢,我们希望它们以异步方式完成,并且我只想在所有调用都返回并且其结果已经处理后返回。
The only reliable way I have found to do this is to poll the subscription to check it is unsubscribed yet. But I'm thinking doesn't seem to be the 'RX' way to do things!
我发现这样做的唯一可靠方法是轮询订阅以检查它是否已取消订阅。但我认为似乎并不是'RX'做事的方式!
As an example I've taken the example from http://howrobotswork.wordpress.com/2013/10/28/using-rxjava-in-android/ and amended it slightly to make it non-android and to show what I mean. I have to have the following code at the of the main() method to stop it exiting immediately.
作为一个例子,我从http://howrobotswork.wordpress.com/2013/10/28/using-rxjava-in-android/中取了一个例子并稍微修改它以使其成为非android并显示我的意思。我必须在main()方法中使用以下代码来阻止它立即退出。
while (!subscription.isUnsubscribed()) {
Thread.sleep(100);
}
The full code for the example is listed below (it is dependent on http://square.github.io/retrofit/ if your trying to compile it)
下面列出了该示例的完整代码(如果您尝试编译它,则依赖于http://square.github.io/retrofit/)
package org.examples;
import retrofit.RestAdapter;
import retrofit.http.GET;
import retrofit.http.Query;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
public class AsyncExample {
public static void main(String[] args) throws InterruptedException {
final Subscription subscription = Observable.from("London", "Paris", "Berlin")
.flatMap(new Func1<String, Observable<WeatherData>>() {
@Override
public Observable<WeatherData> call(String s) {
return ApiManager.getWeatherData(s);
}
})
.subscribe(
new Action1<WeatherData>() {
@Override
public void call(WeatherData weatherData) {
System.out.println(Thread.currentThread().getName() + " - " + weatherData.name + ", " + weatherData.base);
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(Thread.currentThread().getName() + " - ERROR: " + throwable.getMessage());
}
},
new Action0() {
@Override
public void call() {
System.out.println(Thread.currentThread().getName() + " - COMPLETED");
}
}
);
// Have to poll subscription to check if its finished - is this the only way to do it?
while (!subscription.isUnsubscribed()) {
Thread.sleep(100);
}
}
}
class ApiManager {
private interface ApiManagerService {
@GET("/weather")
WeatherData getWeather(@Query("q") String place, @Query("units") String units);
}
private static final RestAdapter restAdapter = new RestAdapter.Builder()
.setEndpoint("http://api.openweathermap.org/data/2.5")
.build();
private static final ApiManagerService apiManager = restAdapter.create(ApiManagerService.class);
public static Observable<WeatherData> getWeatherData(final String city) {
return Observable.create(new Observable.OnSubscribe<WeatherData>() {
@Override
public void call(Subscriber<? super WeatherData> subscriber) {
try {
System.out.println(Thread.currentThread() + " - Getting " + city);
subscriber.onNext(apiManager.getWeather(city, "metric"));
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.io());
}
}
3 个解决方案
#1
5
If you are asking how to turn an asynchronous Observable into a synchronous one you can use .toBlocking() and then use .last() to wait for the Observable to complete.. For example, the following will not continue until the timer completes even though the timer executes on the computation thread.
如果您正在询问如何将异步Observable转换为同步Observable,您可以使用.toBlocking()然后使用.last()等待Observable完成。例如,在定时器完成之前,以下内容将不会继续虽然计时器在计算线程上执行。
try {
Observable
.timer(10, TimeUnit.SECONDS)
.toBlocking()
.last(); // Wait for observable to complete. Last item discarded.
} catch (IllegalArgumentException ex) {
// No items or error.
}
You probably shouldn't use this method unless absolutely necessary. Normally, application termination would be controlled by some other event (key press, close menu item click, etc.) Your network request Observables would complete asynchronously and you would take action in the onNext(), onCompleted() and onError() calls to update the UI or display an error.
除非绝对必要,否则您可能不应该使用此方法。通常,应用程序终止将由其他一些事件(按键,关闭菜单项单击等)控制。您的网络请求Observables将异步完成,您将在onNext(),onCompleted()和onError()调用中执行操作更新UI或显示错误。
Also, the beauty of Retrofit is that it has built in support for RxJava and will execute network requests in the background. To use that support declare you interface as returning an Observable.
此外,Retrofit的优点在于它内置了对RxJava的支持,并将在后台执行网络请求。要使用该支持,请将接口声明为返回Observable。
interface ApiManagerService {
@GET("/weather")
Observable<WeatherData> getWeather(@Query("q") String place, @Query("units") String units);
}
Which allows you to simplify your getWeatherData() method to this:
这允许您将getWeatherData()方法简化为:
public static Observable<WeatherData> getWeatherData(final String city) {
return apiManager.getWeather(city, "metric");
}
#2
3
For the purpose of synchronous waiting for asynchronous observаble I wrote a simple function:
为了同步等待异步observ的目的,我写了一个简单的函数:
public static <T> void runSynchronously(Observable<T> observable) {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> exception = new AtomicReference<>();
observable.subscribe(new Subscriber<T>() {
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onError(Throwable e) {
exception.set(e);
latch.countDown();
}
@Override
public void onNext(T o) {
}
});
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (exception.get() != null) {
throw Exceptions.propagate(exception.get());
}
}
You can use it this way:
你可以这样使用它:
Observer<String> sideEffects = new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("onError: " + e.getMessage());
}
@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}
};
runSynchronously(
Observable.just("London", "Paris", "Berlin")
.doOnEach(sideEffects)
);
#3
1
In most cases it's better not to block a method. Try to be as non-blocking as possible. If you need to do something after an Observable has completed simply invoke your logic from the onCompleted
callback, or use the doOnCompleted
operator.
在大多数情况下,最好不要阻止方法。尝试尽可能无阻塞。如果在Observable完成后需要执行某些操作,只需从onCompleted回调调用逻辑,或使用doOnCompleted运算符。
If you really, really need to block then you can use the Blocking Observable Operators
. Most of these operators block until the Observable completes.
如果你确实需要阻止那么你可以使用Blocking Observable Operators。大多数运算符都会阻塞,直到Observable完成。
However, stopping your main method from exiting prematurely can be achieved by a simple System.in.read()
at the end.
但是,可以通过最后一个简单的System.in.read()来实现停止主方法过早退出。
#1
5
If you are asking how to turn an asynchronous Observable into a synchronous one you can use .toBlocking() and then use .last() to wait for the Observable to complete.. For example, the following will not continue until the timer completes even though the timer executes on the computation thread.
如果您正在询问如何将异步Observable转换为同步Observable,您可以使用.toBlocking()然后使用.last()等待Observable完成。例如,在定时器完成之前,以下内容将不会继续虽然计时器在计算线程上执行。
try {
Observable
.timer(10, TimeUnit.SECONDS)
.toBlocking()
.last(); // Wait for observable to complete. Last item discarded.
} catch (IllegalArgumentException ex) {
// No items or error.
}
You probably shouldn't use this method unless absolutely necessary. Normally, application termination would be controlled by some other event (key press, close menu item click, etc.) Your network request Observables would complete asynchronously and you would take action in the onNext(), onCompleted() and onError() calls to update the UI or display an error.
除非绝对必要,否则您可能不应该使用此方法。通常,应用程序终止将由其他一些事件(按键,关闭菜单项单击等)控制。您的网络请求Observables将异步完成,您将在onNext(),onCompleted()和onError()调用中执行操作更新UI或显示错误。
Also, the beauty of Retrofit is that it has built in support for RxJava and will execute network requests in the background. To use that support declare you interface as returning an Observable.
此外,Retrofit的优点在于它内置了对RxJava的支持,并将在后台执行网络请求。要使用该支持,请将接口声明为返回Observable。
interface ApiManagerService {
@GET("/weather")
Observable<WeatherData> getWeather(@Query("q") String place, @Query("units") String units);
}
Which allows you to simplify your getWeatherData() method to this:
这允许您将getWeatherData()方法简化为:
public static Observable<WeatherData> getWeatherData(final String city) {
return apiManager.getWeather(city, "metric");
}
#2
3
For the purpose of synchronous waiting for asynchronous observаble I wrote a simple function:
为了同步等待异步observ的目的,我写了一个简单的函数:
public static <T> void runSynchronously(Observable<T> observable) {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> exception = new AtomicReference<>();
observable.subscribe(new Subscriber<T>() {
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onError(Throwable e) {
exception.set(e);
latch.countDown();
}
@Override
public void onNext(T o) {
}
});
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (exception.get() != null) {
throw Exceptions.propagate(exception.get());
}
}
You can use it this way:
你可以这样使用它:
Observer<String> sideEffects = new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("onError: " + e.getMessage());
}
@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}
};
runSynchronously(
Observable.just("London", "Paris", "Berlin")
.doOnEach(sideEffects)
);
#3
1
In most cases it's better not to block a method. Try to be as non-blocking as possible. If you need to do something after an Observable has completed simply invoke your logic from the onCompleted
callback, or use the doOnCompleted
operator.
在大多数情况下,最好不要阻止方法。尝试尽可能无阻塞。如果在Observable完成后需要执行某些操作,只需从onCompleted回调调用逻辑,或使用doOnCompleted运算符。
If you really, really need to block then you can use the Blocking Observable Operators
. Most of these operators block until the Observable completes.
如果你确实需要阻止那么你可以使用Blocking Observable Operators。大多数运算符都会阻塞,直到Observable完成。
However, stopping your main method from exiting prematurely can be achieved by a simple System.in.read()
at the end.
但是,可以通过最后一个简单的System.in.read()来实现停止主方法过早退出。