I coded an ObservableList
based on a PublishSubject
. Everytime a client add an element to this list then the Observers get notified via the onNext
method on the Subject.
我基于PublishSubject编写了一个ObservableList。每次客户端向此列表添加元素时,Observers都会通过Subject上的onNext方法获得通知。
I have coded two sample Observers and subscribed them to the ObservableList
. I have noticed that the notification is sequential and blocking. So if I have these two observers subscribed then the following happens:
我编写了两个示例观察者并将它们订阅到ObservableList。我注意到通知是顺序和阻止的。因此,如果我订阅了这两个观察者,则会发生以下情况:
- The observers get notified in the same order as subscribed.
- If an Observer is blocking then the next Observer is only notified when the previous one has ended its execution.
观察者按照订阅的顺序得到通知。
如果Observer阻塞,则只有当前一个Observer结束执行时才会通知下一个Observer。
I really don't care id the invocation is sequential but want to understand why the notification is blocking as well and how can I make it not to be blocking.
我真的不关心id调用是顺序的,但是想要理解为什么通知也是阻塞的,我怎么能让它不被阻塞。
Here is the code for the ObservableList
这是ObservableList的代码
package co.com.subjects.example;
import java.util.ArrayList;
import java.util.List;
import rx.Observable;
import rx.functions.Action1;
import rx.subjects.PublishSubject;
public class ObservableList<T>{
public String nombre;
protected final List<T> list;
protected final PublishSubject<T> onAdd;
public ObservableList(String nombre) {
this.list = new ArrayList<T>();
this.onAdd = PublishSubject.create();
this.nombre = nombre;
}
public void add(T value) {
list.add(value);
onAdd.onNext(value);
}
public Observable<T> getObservable() {
return onAdd;
}
}
2 个解决方案
#1
want to understand why the notification is blocking
想了解通知阻止的原因
The implementation of RxJava assumes that executing the onNext
method of an Observer is always fast and cheap, so PublishSubject
just calls all onNext
methods of its Observers one after the other, without introducing any concurrency.
RxJava的实现假定执行Observer的onNext方法总是快速且便宜,因此PublishSubject只是一个接一个地调用其Observers的所有onNext方法,而不引入任何并发。
as well and how can I make it not to be blocking
以及如何让它不被阻止
Before subscribing your observers, you could insert .observeOn(Schedulers.computation())
(or a different scheduler, depending on your needs), so that the onNext
calls are executed on a thread pool.
在订阅观察者之前,您可以插入.observeOn(Schedulers.computation())(或根据您的需要使用不同的调度程序),以便在线程池上执行onNext调用。
#2
You can make it non blocking by wrapping the observers in an observer which notifies it in an ExecutorService.
您可以通过将观察者包装在一个在ExecutorService中通知它的观察者来使其无阻塞。
Obviously adding a task to another pool is fairly expensive so I would only do this when you know it will take a while. Also note that unless you are careful, events published this way can get out of order. A simple solution for this is to have an single threaded executor for each listener.
显然,将任务添加到另一个池是相当昂贵的,所以我只会在你知道它需要一段时间后才这样做。另请注意,除非您小心,否则以这种方式发布的事件可能会出现故障。一个简单的解决方案是为每个监听器提供一个单线程执行程序。
#1
want to understand why the notification is blocking
想了解通知阻止的原因
The implementation of RxJava assumes that executing the onNext
method of an Observer is always fast and cheap, so PublishSubject
just calls all onNext
methods of its Observers one after the other, without introducing any concurrency.
RxJava的实现假定执行Observer的onNext方法总是快速且便宜,因此PublishSubject只是一个接一个地调用其Observers的所有onNext方法,而不引入任何并发。
as well and how can I make it not to be blocking
以及如何让它不被阻止
Before subscribing your observers, you could insert .observeOn(Schedulers.computation())
(or a different scheduler, depending on your needs), so that the onNext
calls are executed on a thread pool.
在订阅观察者之前,您可以插入.observeOn(Schedulers.computation())(或根据您的需要使用不同的调度程序),以便在线程池上执行onNext调用。
#2
You can make it non blocking by wrapping the observers in an observer which notifies it in an ExecutorService.
您可以通过将观察者包装在一个在ExecutorService中通知它的观察者来使其无阻塞。
Obviously adding a task to another pool is fairly expensive so I would only do this when you know it will take a while. Also note that unless you are careful, events published this way can get out of order. A simple solution for this is to have an single threaded executor for each listener.
显然,将任务添加到另一个池是相当昂贵的,所以我只会在你知道它需要一段时间后才这样做。另请注意,除非您小心,否则以这种方式发布的事件可能会出现故障。一个简单的解决方案是为每个监听器提供一个单线程执行程序。