
时间:2022-05-26 21:05:52

I coded an ObservableList based on a PublishSubject. Everytime a client add an element to this list then the Observers get notified via the onNext method on the Subject.


I have coded two sample Observers and subscribed them to the ObservableList. I have noticed that the notification is sequential and blocking. So if I have these two observers subscribed then the following happens:


  • The observers get notified in the same order as subscribed.
  • 观察者按照订阅的顺序得到通知。

  • If an Observer is blocking then the next Observer is only notified when the previous one has ended its execution.
  • 如果Observer阻塞,则只有当前一个Observer结束执行时才会通知下一个Observer。

I really don't care id the invocation is sequential but want to understand why the notification is blocking as well and how can I make it not to be blocking.


Here is the code for the ObservableList


package co.com.subjects.example;

import java.util.ArrayList;
import java.util.List;

import rx.Observable;
import rx.functions.Action1;
import rx.subjects.PublishSubject;

public class ObservableList<T>{

    public String nombre;
    protected final List<T> list;
    protected final PublishSubject<T> onAdd;

    public ObservableList(String nombre) {
        this.list = new ArrayList<T>();
        this.onAdd = PublishSubject.create();
        this.nombre = nombre;

    public void add(T value) {

    public Observable<T> getObservable() {
        return onAdd;

2 个解决方案


want to understand why the notification is blocking


The implementation of RxJava assumes that executing the onNext method of an Observer is always fast and cheap, so PublishSubject just calls all onNext methods of its Observers one after the other, without introducing any concurrency.


as well and how can I make it not to be blocking


Before subscribing your observers, you could insert .observeOn(Schedulers.computation()) (or a different scheduler, depending on your needs), so that the onNext calls are executed on a thread pool.



You can make it non blocking by wrapping the observers in an observer which notifies it in an ExecutorService.


Obviously adding a task to another pool is fairly expensive so I would only do this when you know it will take a while. Also note that unless you are careful, events published this way can get out of order. A simple solution for this is to have an single threaded executor for each listener.



want to understand why the notification is blocking


The implementation of RxJava assumes that executing the onNext method of an Observer is always fast and cheap, so PublishSubject just calls all onNext methods of its Observers one after the other, without introducing any concurrency.


as well and how can I make it not to be blocking


Before subscribing your observers, you could insert .observeOn(Schedulers.computation()) (or a different scheduler, depending on your needs), so that the onNext calls are executed on a thread pool.



You can make it non blocking by wrapping the observers in an observer which notifies it in an ExecutorService.


Obviously adding a task to another pool is fairly expensive so I would only do this when you know it will take a while. Also note that unless you are careful, events published this way can get out of order. A simple solution for this is to have an single threaded executor for each listener.
