
agera is a set of classes and interfaces to help wirte functional,asynchronous and reactive applications for Android.Requires Android SDK version 9 or higher.
Observable & Updatable
public interface Observable {
/** * Adds {@code updatable} to the {@code Observable}. * * @throws IllegalStateException if the {@link Updatable} was already added or if it was called * from a non-Looper thread */ void addUpdatable(@NonNull Updatable updatable);
/** * Removes {@code updatable} from the {@code Observable}. * * @throws IllegalStateException if the {@link Updatable} was not added */ void removeUpdatable(@NonNull Updatable updatable); }
/** * Called when when an event has occurred. Can be added to {@link Observable}s to be notified * of {@link Observable} events. */ public interface Updatable {
/** * Called when an event has occurred. */ void update(); }
<?xml version="1.0" encoding="utf-8"?> <LinearLayout xmlns:android="" xmlns:app="" xmlns:tools="" android:orientation="vertical" android:layout_width="match_parent" android:layout_height="match_parent" android:fitsSystemWindows="true" tools:context="">
<Button android:text="trigger" android:layout_width="match_parent" android:layout_height="wrap_content" android:onClick="trigger"/>
<TextView android:id="@+id/show" android:layout_width="match_parent" android:layout_height="match_parent" android:text="wait for trigger..." android:textSize="20sp" android:gravity="center"/>
public class MainActivity extends AppCompatActivity implements Updatable{
private TextView show;
private Observable observable = new Observable() { @Override public void addUpdatable(@NonNull Updatable updatable) { updatable.update(); }
@Override public void removeUpdatable(@NonNull Updatable updatable) {
} };
@Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main);
show = (TextView)findViewById(; }
public void trigger(View view){ observable.addUpdatable(this); }
@Override public void update() { show.setText("update!!"); } }
看到这你可能会说,”这不坑爹吗!连数据都传递不了,还谈什么观察者模式,谈什么响应式编程!“不要着急,这是Google故意而为之的,他们的想法就是要让数据从Observable和Updatable中剥离,从而达到他们所期望的“Push event,pull data model”。这个我在后面和RxJava的比较中会讲,RxJava是”Push data model”。
public interface Repository<T> extends Observable, Supplier<T> {}
public interface Supplier<T> {
/** * Returns an instance of the appropriate type. The returned object may or may not be a new * instance, depending on the implementation. */ @NonNull T get(); }
private Supplier<String> supplier = new Supplier() { @NonNull @Override public Object get() { return "update!!"; } };
private Repository<String> repository = Repositories.repositoryWithInitialValue("a") .observe() .onUpdatesPerLoop() .thenGetFrom(supplier) .compile();
public void trigger(View view){ repository.addUpdatable(this); }
@Override public void update() { show.setText(repository.get()); }
上面的那两个初始化代码大家可以先不用懂,具体看下面的,点击Button(进入trigger(View view)方法)之后,我们和刚才一样,使用了addUpdatable将我们继承自Updatable的activity注册到repository中,然后repository发现有东西注册到了自己这儿,经过一系列的方法执行,就会调用Updatable的update()方法,然后我们通过repository.get()去拿到对应的数据就OK了。
(1) Observable:agera中的被观察者,用于在合适的时机去通知观察者进行更新。
(2) Updatable:agera中的观察者,用于观察Observable。
(3) Supplier:agera中提供数据的接口,通过范型指定数据类型,通过get()方法获取数据。
(4) Repository:agera中集成了Observable和Supplier功能的一个[提供数据的被观察者]。
说到这里,大家可能会有一个问题,前面说了agera是”Push event,pull data model”,也就是数据和事件分离的,那这个Repository的出现不是自己打自己的脸吗?
This does not change the push event, pull data model: the repository notifies the registered updatables to update themselves when the data changes; and the updatables pull data from the repository when they individually react to this event.
通过代码来解释就是,Repository经过一系列的方法执行之后,调用了Updatable的update()方法,这个是事件传递,也就是push event,而Updatable在接收到唤醒事件之后,通过调用Repository的get()方法,自己去获取数据而不是从updata()方法中拿到传递过来的数据,类似update(T value),这是pull data。这样的好处是可以lazy load,这个我们在后文中会讲。
private Supplier<String> strSupplier = new Supplier<String>() { @NonNull @Override public String get() { return "value"; } };
private Function<String,String> transform = new Function<String, String>() { @NonNull @Override public String apply(@NonNull String input) { return "new " + input; } };
private Supplier<Integer> integerSupplier = new Supplier<Integer>() { @NonNull @Override public Integer get() { return 100; } };
private Merger<String,Integer,String> merger = new Merger<String, Integer, String>() { @NonNull @Override public String merge(@NonNull String s, @NonNull Integer integer) { return s + "plus " + String.valueOf(integer); } };
private Updatable updatable = new Updatable() { @Override public void update() { Log.d("TAG", repository.get()); } };
repository = Repositories.repositoryWithInitialValue("default") .observe() .onUpdatesPerLoop() .getFrom(strSupplier) .transform(transform) .thenMergeIn(integerSupplier,merger) .compile();

private Supplier<String> strSupplier = new Supplier<String>() { @NonNull @Override public String get() { return "value"; } };
private Function<String,String> transform = new Function<String, String>() { @NonNull @Override public String apply(@NonNull String input) { return "new " + input; } };
private Supplier<Integer> integerSupplier = new Supplier<Integer>() { @NonNull @Override public Integer get() { return 100; } };
private Merger<String,Integer,String> merger = new Merger<String, Integer, String>() { @NonNull @Override public String merge(@NonNull String s, @NonNull Integer integer) { return s + " plus " + String.valueOf(integer); } };
private Updatable updatable = new Updatable() { @Override public void update() { Log.d("TAG", repository.get()); } };
repository = Repositories.repositoryWithInitialValue("default") .observe() .onUpdatesPerLoop() .getFrom(strSupplier) .transform(transform) .thenMergeIn(integerSupplier,merger) .compile();
public interface RepositoryCompilerStates {
interface REventSource<TVal, TStart> {
@NonNull RFrequency<TVal, TStart> observe(@NonNull Observable... observables); }
interface RFrequency<TVal, TStart> extends REventSource<TVal, TStart> {
@NonNull RFlow<TVal, TStart, ?> onUpdatesPer(int millis);
@NonNull RFlow<TVal, TStart, ?> onUpdatesPerLoop(); }
interface RFlow<TVal, TPre, TSelf extends RFlow<TVal, TPre, TSelf>> extends RSyncFlow<TVal, TPre, TSelf> {
@NonNull @Override <TCur> RFlow<TVal, TCur, ?> getFrom(@NonNull Supplier<TCur> supplier);
@NonNull @Override <TCur> RTermination<TVal, Throwable, RFlow<TVal, TCur, ?>> attemptGetFrom( @NonNull Supplier<Result<TCur>> attemptSupplier);
@NonNull @Override <TAdd, TCur> RFlow<TVal, TCur, ?> mergeIn(@NonNull Supplier<TAdd> supplier, @NonNull Merger<? super TPre, ? super TAdd, TCur> merger);
@NonNull @Override <TAdd, TCur> RTermination<TVal, Throwable, RFlow<TVal, TCur, ?>> attemptMergeIn( @NonNull Supplier<TAdd> supplier, @NonNull Merger<? super TPre, ? super TAdd, Result<TCur>> attemptMerger);
@NonNull @Override <TCur> RFlow<TVal, TCur, ?> transform(@NonNull Function<? super TPre, TCur> function);
@NonNull @Override <TCur> RTermination<TVal, Throwable, RFlow<TVal, TCur, ?>> attemptTransform( @NonNull Function<? super TPre, Result<TCur>> attemptFunction);
@NonNull TSelf goTo(@NonNull Executor executor);
@NonNull RSyncFlow<TVal, TPre, ?> goLazy(); }
interface RSyncFlow<TVal, TPre, TSelf extends RSyncFlow<TVal, TPre, TSelf>> {
@NonNull <TCur> RSyncFlow<TVal, TCur, ?> getFrom(@NonNull Supplier<TCur> supplier);
@NonNull <TCur> RTermination<TVal, Throwable, ? extends RSyncFlow<TVal, TCur, ?>> attemptGetFrom( @NonNull Supplier<Result<TCur>> attemptSupplier);
@NonNull <TAdd, TCur> RSyncFlow<TVal, TCur, ?> mergeIn(@NonNull Supplier<TAdd> supplier, @NonNull Merger<? super TPre, ? super TAdd, TCur> merger);
@NonNull <TAdd, TCur> RTermination<TVal, Throwable, ? extends RSyncFlow<TVal, TCur, ?>> attemptMergeIn( @NonNull Supplier<TAdd> supplier, @NonNull Merger<? super TPre, ? super TAdd, Result<TCur>> attemptMerger);
@NonNull <TCur> RSyncFlow<TVal, TCur, ?> transform(@NonNull Function<? super TPre, TCur> function);
@NonNull <TCur> RTermination<TVal, Throwable, ? extends RSyncFlow<TVal, TCur, ?>> attemptTransform( @NonNull Function<? super TPre, Result<TCur>> attemptFunction);
@NonNull RTermination<TVal, TPre, TSelf> check(@NonNull Predicate<? super TPre> predicate); @NonNull <TCase> RTermination<TVal, TCase, TSelf> check( @NonNull Function<? super TPre, TCase> caseFunction, @NonNull Predicate<? super TCase> casePredicate); @NonNull TSelf sendTo(@NonNull Receiver<? super TPre> receiver); @NonNull <TAdd> TSelf bindWith(@NonNull Supplier<TAdd> secondValueSupplier, @NonNull Binder<? super TPre, ? super TAdd> binder); @NonNull RConfig<TVal> thenSkip(); @NonNull RConfig<TVal> thenGetFrom(@NonNull Supplier<? extends TVal> supplier); @NonNull RTermination<TVal, Throwable, RConfig<TVal>> thenAttemptGetFrom( @NonNull Supplier<? extends Result<? extends TVal>> attemptSupplier); @NonNull <TAdd> RConfig<TVal> thenMergeIn(@NonNull Supplier<TAdd> supplier, @NonNull Merger<? super TPre, ? super TAdd, ? extends TVal> merger); @NonNull <TAdd> RTermination<TVal, Throwable, RConfig<TVal>> thenAttemptMergeIn( @NonNull Supplier<TAdd> supplier, @NonNull Merger<? super TPre, ? super TAdd, ? extends Result<? extends TVal>> attemptMerger); @NonNull RConfig<TVal> thenTransform( @NonNull Function<? super TPre, ? extends TVal> function); @NonNull RTermination<TVal, Throwable, RConfig<TVal>> thenAttemptTransform( @NonNull Function<? super TPre, ? extends Result<? extends TVal>> attemptFunction); } interface RTermination<TVal, TTerm, TRet> { @NonNull TRet orSkip(); @NonNull TRet orEnd(@NonNull Function<? super TTerm, ? extends TVal> valueFunction); } interface RConfig<TVal> { @NonNull RConfig<TVal> notifyIf(@NonNull Merger<? super TVal, ? super TVal, Boolean> checker); @NonNull RConfig<TVal> onDeactivation(@RepositoryConfig int deactivationConfig); @NonNull RConfig<TVal> onConcurrentUpdate(@RepositoryConfig int concurrentUpdateConfig); @NonNull Repository<TVal> compile(); @NonNull <TVal2> RFrequency<TVal2, TVal> compileIntoRepositoryWithInitialValue(@NonNull TVal2 value); }
@NonNull public static <T> REventSource<T, T> repositoryWithInitialValue(@NonNull final T initialValue) { return RepositoryCompiler.repositoryWithInitialValue(initialValue); }
final class RepositoryCompiler implements RepositoryCompilerStates.RFrequency, RepositoryCompilerStates.RFlow, RepositoryCompilerStates.RTermination, RepositoryCompilerStates.RConfig {
....... }
@NonNull @Override public Repository compile() { Repository repository = compileRepositoryAndReset(); recycle(this); return repository; }
@NonNull private Repository compileRepositoryAndReset() { checkExpect(CONFIG); Repository repository = CompiledRepository.compiledRepository(initialValue, eventSources, frequency, directives, notifyChecker, concurrentUpdateConfig, deactivationConfig); expect = NOTHING; initialValue = null; eventSources.clear(); frequency = 0; directives.clear(); goLazyUsed = false; notifyChecker = objectsUnequal(); deactivationConfig = RepositoryConfig.CONTINUE_FLOW; concurrentUpdateConfig = RepositoryConfig.CONTINUE_FLOW; return repository; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
@NonNull static Repository compiledRepository( @NonNull final Object initialValue, @NonNull final List<Observable> eventSources, final int frequency, @NonNull final List<Object> directives, @NonNull final Merger<Object, Object, Boolean> notifyChecker, @RepositoryConfig final int concurrentUpdateConfig, @RepositoryConfig final int deactivationConfig) { Observable eventSource = perMillisecondObservable(frequency, compositeObservable(eventSources.toArray(new Observable[eventSources.size()]))); Object[] directiveArray = directives.toArray(); return new CompiledRepository(initialValue, eventSource, directiveArray, notifyChecker, deactivationConfig, concurrentUpdateConfig); }
好了,到这里关于Repository的东西就讲完了,大家可以尝试着自己去写一下,这些个数据处理的方法能让我们像RxJava一样轻松的处理数据。当然,agera也提供了异步操作的封装,like this:
private Executor executor = Executors.newSingleThreadExecutor();
repository = Repositories.repositoryWithInitialValue("default") .observe() .onUpdatesPerLoop() .goTo(executor) .thenGetFrom(new Supplier<Object>() { @NonNull @Override public Object get() { //some biz work,may be block the main thread. return null; } }) .compile();
Attempt & Result
private Supplier<Integer> strSupplier = new Supplier<Integer>() { @NonNull @Override public Integer get() { return 1/0; } };
当然这种代码在实际情况下是不会产生的,但是总会有错误发生啊,对于RxJava,它有很好的error handling机制,那agera有吗?答案是有的。就是通过操作符attemptXXX()和Result类来解决。
repository = Repositories.repositoryWithInitialValue(0) .observe() .onUpdatesPerLoop() .thenGetFrom(strSupplier) .compile();
如果使用我们刚才的方式去做,strSupplier的get()方法中return 1/0,这样就爆炸了。。程序直接退出,你一天美好的心情就此终结。但是如果这样
private Supplier<Result<Integer>> safeStrSupplier = new Supplier<Result<Integer>>() { @NonNull @Override public Result<Integer> get() { try{ return Result.success(1/ 0); }catch (ArithmeticException e){ return Result.failure(e); } } };
safeRepository = Repositories.repositoryWithInitialValue(Result.<Integer>absent()) .observe() .onUpdatesPerLoop() .attemptGetFrom(safeStrSupplier).orEnd(new Function<Throwable, Result<Integer>>() { @NonNull @Override public Result<Integer> apply(@NonNull Throwable input) { return Result.success(2222); } }) .thenTransform(new Function<Integer, Result<Integer>>() { @NonNull @Override public Result<Integer> apply(@NonNull Integer input) { return Result.absentIfNull(input); } }) .compile();
所以这里,如果你写了1/0这样的代码并且引发了异常,我们可以安全的捕获它并且做你想要做的操作。另外大家可以看thenTransform()中,我们return Result.absentIfNull(input);表示如果数据是空的,我们就返回缺省值。
private Receiver<Throwable> errorReceiver = new Receiver<Throwable>() { @Override public void accept(@NonNull Throwable value) { trigger.setText(value.toString()); } };
private Receiver<Integer> successReceiver = new Receiver<Integer>() { @Override public void accept(@NonNull Integer value) { trigger.setText(String.valueOf(value)); } };
@Override public void update() { safeRepository.get() .ifFailedSendTo(errorReceiver) .ifSucceededSendTo(successReceiver); }
/** * A receiver of objects. */ public interface Receiver<T> {
/** * Accepts the given {@code value}. */ void accept(@NonNull T value); }
public interface Reservoir<T> extends Receiver<T>, Repository<Result<T>> {}
private Reservoir<String> provider = Reservoirs.reservoir();
@NonNull public static <T> Reservoir<T> reservoir(@NonNull final Queue<T> queue) { return new SynchronizedReservoir<>(checkNotNull(queue)); }
private static final class SynchronizedReservoir<T> extends BaseObservable implements Reservoir<T> { @NonNull private final Queue<T> queue;
private SynchronizedReservoir(@NonNull final Queue<T> queue) { this.queue = checkNotNull(queue); }
@Override public void accept(@NonNull T value) { boolean shouldDispatchUpdate; synchronized (queue) { boolean wasEmpty = queue.isEmpty(); boolean added = queue.offer(value); shouldDispatchUpdate = wasEmpty && added; } if (shouldDispatchUpdate) { dispatchUpdate(); } }
@NonNull @Override public Result<T> get() { T nullableValue; boolean shouldDispatchUpdate; synchronized (queue) { nullableValue = queue.poll(); shouldDispatchUpdate = !queue.isEmpty(); } if (shouldDispatchUpdate) { dispatchUpdate(); } return absentIfNull(nullableValue); }
@Override protected void observableActivated() { synchronized (queue) { if (queue.isEmpty()) { return; } } dispatchUpdate(); } }
private Supplier<String> supplier = new Supplier<String>() { @NonNull @Override public String get() { return "url"; } };
private Function<String,List<Integer>> strToList = new Function<String, List<Integer>>() { @NonNull @Override public List<Integer> apply(@NonNull String input) { List<Integer> data = new ArrayList<>(); for(int i = 0;i < 10;i++){ data.add(i); } return data; } };
private Predicate<Integer> filter = new Predicate<Integer>() { @Override public boolean apply(@NonNull Integer value) { return value > 5; } };
private Function<Integer,String> intToStr = new Function<Integer, String>() { @NonNull @Override public String apply(@NonNull Integer input) { return String.valueOf(input); } };
private Function<List<String>, Integer> getSize = new Function<List<String>, Integer>() { @NonNull @Override public Integer apply(@NonNull List<String> input) { return input.size(); } };
Function<String,Integer> finalFunc = Functions.functionFrom(String.class) .unpack(strToList) .filter(filter) .map(intToStr) .thenApply(getSize);
private Repository<String> repository;
repository = Repositories.repositoryWithInitialValue("default") .observe() .onUpdatesPerLoop() .getFrom(supplier) .transform(finalFunc) .thenTransform(new Function<Integer, String>() { @NonNull @Override public String apply(@NonNull Integer input) { return String.valueOf(input); } }) .compile();
Function<String,Integer> finalFunc = Functions.functionFrom(String.class) .unpack(strToList) .filter(filter) .map(intToStr) .thenApply(getSize);
我们这里只分析和Repository相关的源码,一来Repository在agera现有代码中占的比重最大,而来其他的代码还是比较简单的,大家可以自行read the fucking source code。
@NonNull static <TVal> RepositoryCompilerStates.REventSource<TVal, TVal> repositoryWithInitialValue( @NonNull final TVal initialValue) { checkNotNull(Looper.myLooper()); RepositoryCompiler compiler = compilers.get(); if (compiler == null) { compiler = new RepositoryCompiler(); } else { // Remove compiler from the ThreadLocal to prevent reuse in the middle of a compilation. // recycle(), called by compile(), will return the compiler here. ThreadLocal.set(null) keeps // the entry (with a null value) whereas remove() removes the entry; because we expect the // return of the compiler, don't use the heavier remove(). compilers.set(null); } return compiler.start(initialValue); }
@NonNull private RepositoryCompiler start(@NonNull final Object initialValue) { checkExpect(NOTHING); expect = FIRST_EVENT_SOURCE; this.initialValue = initialValue; return this; }
@NonNull @Override public RepositoryCompiler observe(@NonNull final Observable... observables) { checkExpect(FIRST_EVENT_SOURCE, FREQUENCY_OR_MORE_EVENT_SOURCE); for (Observable observable : observables) { eventSources.add(checkNotNull(observable)); } expect = FREQUENCY_OR_MORE_EVENT_SOURCE; return this; }
@NonNull @Override public RepositoryCompiler onUpdatesPer(int millis) { checkExpect(FREQUENCY_OR_MORE_EVENT_SOURCE); frequency = Math.max(0, millis); expect = FLOW; return this; }
@NonNull @Override public RepositoryCompiler onUpdatesPerLoop() { return onUpdatesPer(0); }
@NonNull @Override public RepositoryCompiler getFrom(@NonNull final Supplier supplier) { checkExpect(FLOW); addGetFrom(supplier, directives); return this; }
static void addGetFrom(@NonNull final Supplier supplier, @NonNull final List<Object> directives) { directives.add(GET_FROM); directives.add(supplier); }
private static final int END = 0; private static final int GET_FROM = 1; private static final int MERGE_IN = 2; private static final int TRANSFORM = 3; private static final int CHECK = 4; private static final int GO_TO = 5; private static final int GO_LAZY = 6; private static final int SEND_TO = 7; private static final int BIND = 8; private static final int FILTER_SUCCESS = 9;
@NonNull @Override public RepositoryCompiler thenTransform(@NonNull final Function function) { transform(function); endFlow(false); return this; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
@NonNull @Override public RepositoryCompiler transform(@NonNull final Function function) { checkExpect(FLOW); addTransform(function, directives); return this; }
private void endFlow(boolean skip) { addEnd(skip, directives); expect = CONFIG; }
static void addEnd(boolean skip, @NonNull final List<Object> directives) { directives.add(END); directives.add(skip); }
final class CompiledRepository extends BaseObservable implements Repository, Updatable, Runnable {
..... }
/** * A partial implementation of {@link Observable} that adheres to the threading contract between * {@link Observable}s and {@link Updatable}s. Subclasses can use {@link #observableActivated()} and * {@link #observableDeactivated()} to control the activation and deactivation of this observable, * and to send out notifications to client updatables with {@link #dispatchUpdate()}. * * <p>For cases where subclassing {@link BaseObservable} is impossible, for example when the * potential class already has a base class, consider using {@link Observables#updateDispatcher()} * to help implement the {@link Observable} interface. */ public abstract class BaseObservable implements Observable { @NonNull private final Worker worker;
protected BaseObservable() { checkState(Looper.myLooper() != null, "Can only be created on a Looper thread"); worker = new Worker(this); }
@Override public final void addUpdatable(@NonNull final Updatable updatable) { checkState(Looper.myLooper() != null, "Can only be added on a Looper thread"); worker.addUpdatable(updatable); }
@Override public final void removeUpdatable(@NonNull final Updatable updatable) { checkState(Looper.myLooper() != null, "Can only be removed on a Looper thread"); worker.removeUpdatable(updatable); }
/** * Notifies all registered {@link Updatable}s. */ protected final void dispatchUpdate() { worker.dispatchUpdate(); }
/** * Called from the worker looper thread when this {@link Observable} is activated by transitioning * from having no client {@link Updatable}s to having at least one client {@link Updatable}. */ protected void observableActivated() {}
/** * Called from the worker looper thread when this {@link Observable} is deactivated by * transitioning from having at least one client {@link Updatable} to having no client * {@link Updatable}s. */ protected void observableDeactivated() {}
public Updatable getUpdatable(){ return worker.getUpdatable(); } /** * Worker and synchronization lock behind a {@link BaseObservable}. */ static final class Worker { @NonNull private static final Object[] NO_UPDATABLES_OR_HANDLERS = new Object[0];
@NonNull private final BaseObservable baseObservable; @NonNull private final WorkerHandler handler;
@NonNull private Object[] updatablesAndHandlers; private int size;
Worker(@NonNull final BaseObservable baseObservable) { this.baseObservable = baseObservable; this.handler = workerHandler(); this.updatablesAndHandlers = NO_UPDATABLES_OR_HANDLERS; this.size = 0; }
public Updatable getUpdatable(){ return (Updatable)updatablesAndHandlers[0]; } synchronized void addUpdatable(@NonNull final Updatable updatable) { add(updatable, workerHandler()); if (size == 1) { handler.obtainMessage(WorkerHandler.MSG_FIRST_ADDED, this).sendToTarget(); } } synchronized void removeUpdatable(@NonNull final Updatable updatable) { remove(updatable); if (size == 0) { handler.obtainMessage(MSG_LAST_REMOVED, this).sendToTarget(); } } void dispatchUpdate() { handler.obtainMessage(MSG_UPDATE, this).sendToTarget(); } private void add(@NonNull final Updatable updatable, @NonNull final Handler handler) { boolean added = false; for (int index = 0; index < updatablesAndHandlers.length; index += 2) { if (updatablesAndHandlers[index] == updatable) { throw new IllegalStateException("Updatable already added, cannot add."); } if (updatablesAndHandlers[index] == null && !added) { updatablesAndHandlers[index] = updatable; updatablesAndHandlers[index + 1] = handler; added = true; } } if (!added) { final int newIndex = updatablesAndHandlers.length; updatablesAndHandlers = Arrays.copyOf(updatablesAndHandlers, Math.max(newIndex * 2, newIndex + 2)); updatablesAndHandlers[newIndex] = updatable; updatablesAndHandlers[newIndex + 1] = handler; } size++; } private void remove(@NonNull final Updatable updatable) { for (int index = 0; index < updatablesAndHandlers.length; index += 2) { if (updatablesAndHandlers[index] == updatable) { ((WorkerHandler) updatablesAndHandlers[index + 1]).removeMessages( WorkerHandler.MSG_CALL_UPDATABLE, updatable); updatablesAndHandlers[index] = null; updatablesAndHandlers[index + 1] = null; size--; return; } } throw new IllegalStateException("Updatable not added, cannot remove."); } synchronized void sendUpdate() { for (int index = 0; index < updatablesAndHandlers.length; index = index + 2) { final Updatable updatable = (Updatable) updatablesAndHandlers[index]; final WorkerHandler handler = (WorkerHandler) updatablesAndHandlers[index + 1]; if (updatable != null) { if (handler.getLooper() == Looper.myLooper()) { updatable.update(); } else { handler.obtainMessage(WorkerHandler.MSG_CALL_UPDATABLE, updatable).sendToTarget(); } } } } void callFirstUpdatableAdded() { baseObservable.observableActivated(); } void callLastUpdatableRemoved() { baseObservable.observableDeactivated(); } }}
@Override public final void addUpdatable(@NonNull final Updatable updatable) { checkState(Looper.myLooper() != null, "Can only be added on a Looper thread"); worker.addUpdatable(updatable); }
首先会去判断当前线程的Looper是否为空,这是因为agera中的Push event都是基础Android的handler机制的,对于handler机制不了解的同学可以去看我的这篇博客。
synchronized void addUpdatable(@NonNull final Updatable updatable) { add(updatable, workerHandler()); if (size == 1) { handler.obtainMessage(WorkerHandler.MSG_FIRST_ADDED, this).sendToTarget(); } }
private void add(@NonNull final Updatable updatable, @NonNull final Handler handler) { boolean added = false; for (int index = 0; index < updatablesAndHandlers.length; index += 2) { if (updatablesAndHandlers[index] == updatable) { throw new IllegalStateException("Updatable already added, cannot add."); } if (updatablesAndHandlers[index] == null && !added) { updatablesAndHandlers[index] = updatable; updatablesAndHandlers[index + 1] = handler; added = true; } } if (!added) { final int newIndex = updatablesAndHandlers.length; updatablesAndHandlers = Arrays.copyOf(updatablesAndHandlers, Math.max(newIndex * 2, newIndex + 2)); updatablesAndHandlers[newIndex] = updatable; updatablesAndHandlers[newIndex + 1] = handler; } size++; }
private static final ThreadLocal<WeakReference<WorkerHandler>> handlers = new ThreadLocal<>();
@NonNull static WorkerHandler workerHandler() { final WeakReference<WorkerHandler> handlerReference = handlers.get(); WorkerHandler handler = handlerReference != null ? handlerReference.get() : null; if (handler == null) { handler = new WorkerHandler(); handlers.set(new WeakReference<>(handler)); } return handler; }
synchronized void addUpdatable(@NonNull final Updatable updatable) { add(updatable, workerHandler()); if (size == 1) { handler.obtainMessage(WorkerHandler.MSG_FIRST_ADDED, this).sendToTarget(); } }
static final class WorkerHandler extends Handler { static final int MSG_FIRST_ADDED = 0; static final int MSG_LAST_REMOVED = 1; static final int MSG_UPDATE = 2; static final int MSG_CALL_UPDATABLE = 3; static final int MSG_CALL_MAYBE_START_FLOW = 4; static final int MSG_CALL_ACKNOWLEDGE_CANCEL = 5; static final int MSG_CALL_LOW_PASS_UPDATE = 6;
@Override public void handleMessage(final Message message) { switch (message.what) { case MSG_UPDATE: ((Worker) message.obj).sendUpdate(); break; case MSG_FIRST_ADDED: ((Worker) message.obj).callFirstUpdatableAdded(); break; case MSG_LAST_REMOVED: ((Worker) message.obj).callLastUpdatableRemoved(); break; case MSG_CALL_UPDATABLE: ((Updatable) message.obj).update(); break; case MSG_CALL_MAYBE_START_FLOW: ((CompiledRepository) message.obj).maybeStartFlow(); break; case MSG_CALL_ACKNOWLEDGE_CANCEL: ((CompiledRepository) message.obj).acknowledgeCancel(); break; case MSG_CALL_LOW_PASS_UPDATE: ((LowPassFilterObservable) message.obj).lowPassUpdate(); break; default: } } }
void callFirstUpdatableAdded() { baseObservable.observableActivated(); }
protected void observableActivated() {}
@Override protected void observableActivated() { eventSource.addUpdatable(this); maybeStartFlow(); }
1 2 3 4 5 6 7 8 9 10 11 12 13
void maybeStartFlow() { synchronized (this) { if (runState == IDLE || runState == PAUSED_AT_GO_LAZY) { runState = RUNNING; lastDirectiveIndex = -1; // this could be pointing at the goLazy directive restartNeeded = false; } else { return; // flow already running, do not continue. } } intermediateValue = currentValue; runFlowFrom(0, false); }
private void runFlowFrom(final int index, final boolean asynchronously) {
final Object[] directives = this.directives; final int length = directives.length; int i = index; while (0 <= i && i < length) { int directiveType = (Integer) directives[i]; if (asynchronously || directiveType == GO_TO || directiveType == GO_LAZY) { // Check cancellation before running the next directive. This needs to be done while locked. // For goTo and goLazy, because they need to change the states and suspend the flow, they // need the lock and are therefore treated specially here. synchronized (this) { if (checkCancellationLocked()) { break; } if (directiveType == GO_TO) { setPausedAtGoToLocked(i); // the actual executor delivery is done below, outside the lock, to eliminate any // deadlock possibility. } else if (directiveType == GO_LAZY) { setLazyAndEndFlowLocked(i); return; } } }
// A table-switch on a handful of options is a good compromise in code size and runtime // performance comparing to a full-fledged double-dispatch pattern with subclasses. switch (directiveType) { case GET_FROM: i = runGetFrom(directives, i); break; case MERGE_IN: i = runMergeIn(directives, i); break; case TRANSFORM: i = runTransform(directives, i); break; case CHECK: i = runCheck(directives, i); break; case GO_TO: i = runGoTo(directives, i); break; case SEND_TO: i = runSendTo(directives, i); break; case BIND: i = runBindWith(directives, i); break; case FILTER_SUCCESS: i = runFilterSuccess(directives, i); break; case END: i = runEnd(directives, i); break; // Missing GO_LAZY but it has already been dealt with in the synchronized block above. } } }
int directiveType = (Integer) directives[i];
其中switch的是这个,还记得directives这个list吗?存的是我们刚才的那些数据处理流的方式。我们刚才调用了getFrom()和thenTransform(),对应的case是GET_FROM, TRANSFORM和END,调用了runGetFrom(),runTransform()和runEnd()。
private int runGetFrom(@NonNull final Object[] directives, final int index) { Supplier supplier = (Supplier) directives[index + 1]; intermediateValue = checkNotNull(supplier.get()); return index + 2; }
private int runTransform(@NonNull final Object[] directives, final int index) { Function function = (Function) directives[index + 1]; intermediateValue = checkNotNull(function.apply(intermediateValue)); return index + 2; }
private int runEnd(@NonNull final Object[] directives, final int index) { boolean skip = (Boolean) directives[index + 1]; if (skip) { skipAndEndFlow(); } else { setNewValueAndEndFlow(intermediateValue); } return -1; }
repository = Repositories.repositoryWithInitialValue("default") .observe() .onUpdatesPerLoop() .getFrom(supplier) .thenTransform(function) .compile();
private int runEnd(@NonNull final Object[] directives, final int index) { boolean skip = (Boolean) directives[index + 1]; if (skip) { skipAndEndFlow(); } else { setNewValueAndEndFlow(intermediateValue); } return -1; }
private synchronized void setNewValueAndEndFlow(@NonNull final Object newValue) { boolean wasRunningLazily = runState == RUNNING_LAZILY; runState = IDLE; intermediateValue = initialValue; // GC the intermediate value but field must be kept non-null. if (wasRunningLazily) { currentValue = newValue; // Don't notify if this new value is produced lazily } else { setNewValueLocked(newValue); // May notify otherwise } checkRestartLocked(); }
private void setNewValueLocked(@NonNull final Object newValue) { boolean shouldNotify = notifyChecker.merge(currentValue, newValue); currentValue = newValue; if (shouldNotify) { dispatchUpdate(); } }
这里notifyChecker.merge(currentValue, newValue);这个操作就是去判断newValue和currentValue是否一致,newValue是我们经过一系列runXXX()得到的,而currentValue是CompiledRepository缓存的上一次的值,如果是第一次就直接是缺升值。如果两个值不同,则调用dispatchUpdate();
protected final void dispatchUpdate() { worker.dispatchUpdate(); }
void dispatchUpdate() { handler.obtainMessage(MSG_UPDATE, this).sendToTarget(); }
case MSG_UPDATE: ((Worker) message.obj).sendUpdate(); break;
synchronized void sendUpdate() { for (int index = 0; index < updatablesAndHandlers.length; index = index + 2) { final Updatable updatable = (Updatable) updatablesAndHandlers[index]; final WorkerHandler handler = (WorkerHandler) updatablesAndHandlers[index + 1]; if (updatable != null) { if (handler.getLooper() == Looper.myLooper()) { updatable.update(); } else { handler.obtainMessage(WorkerHandler.MSG_CALL_UPDATABLE, updatable).sendToTarget(); } } } }
static final int MSG_FIRST_ADDED = 0; static final int MSG_LAST_REMOVED = 1; static final int MSG_UPDATE = 2; static final int MSG_CALL_UPDATABLE = 3; static final int MSG_CALL_MAYBE_START_FLOW = 4; static final int MSG_CALL_ACKNOWLEDGE_CANCEL = 5; static final int MSG_CALL_LOW_PASS_UPDATE = 6;
Worker(@NonNull final BaseObservable baseObservable) { this.baseObservable = baseObservable; this.handler = workerHandler(); this.updatablesAndHandlers = NO_UPDATABLES_OR_HANDLERS; this.size = 0; }
synchronized void addUpdatable(@NonNull final Updatable updatable) { add(updatable, workerHandler()); if (size == 1) { handler.obtainMessage(WorkerHandler.MSG_FIRST_ADDED, this).sendToTarget(); } }
private void add(@NonNull final Updatable updatable, @NonNull final Handler handler) { boolean added = false; for (int index = 0; index < updatablesAndHandlers.length; index += 2) { if (updatablesAndHandlers[index] == updatable) { throw new IllegalStateException("Updatable already added, cannot add."); } if (updatablesAndHandlers[index] == null && !added) { updatablesAndHandlers[index] = updatable; updatablesAndHandlers[index + 1] = handler; added = true; } } ........ }
if (updatable != null) { if (handler.getLooper() == Looper.myLooper()) { updatable.update(); } else { handler.obtainMessage(WorkerHandler.MSG_CALL_UPDATABLE, updatable).sendToTarget(); } }
case MSG_CALL_UPDATABLE: ((Updatable) message.obj).update(); break;
虽然两者同样都会调用Updatable的update(),但是意义是不同的。首先我们来理解if (handler.getLooper() == Looper.myLooper())这个if判断。它的意思其实就是判断Updatable和Repository是否在同一个线程中。我们现在当然是,但是如果我们把代码改成这样:
new Thread(new Runnable() { @Override public void run() { repository.addUpdatable(this); } }).start();
synchronized void addUpdatable(@NonNull final Updatable updatable) { add(updatable, workerHandler()); if (size == 1) { handler.obtainMessage(WorkerHandler.MSG_FIRST_ADDED, this).sendToTarget(); } }
@NonNull static WorkerHandler workerHandler() { final WeakReference<WorkerHandler> handlerReference = handlers.get(); WorkerHandler handler = handlerReference != null ? handlerReference.get() : null; if (handler == null) { handler = new WorkerHandler(); handlers.set(new WeakReference<>(handler)); } return handler; }
Like this:
repository = Repositories.repositoryWithInitialValue("default") .observe() .onUpdatesPerLoop() .goLazy() .thenGetFrom(supplier) .compile();
@NonNull @Override public RepositoryCompiler goLazy() { checkExpect(FLOW); checkGoLazyUnused(); addGoLazy(directives); goLazyUsed = true; return this; }
1 2 3
static void addGoLazy(@NonNull final List<Object> directives) { directives.add(GO_LAZY); }
private void runFlowFrom(final int index, final boolean asynchronously) {
final Object[] directives = this.directives; final int length = directives.length; int i = index; while (0 <= i && i < length) { int directiveType = (Integer) directives[i]; if (asynchronously || directiveType == GO_TO || directiveType == GO_LAZY) { // Check cancellation before running the next directive. This needs to be done while locked. // For goTo and goLazy, because they need to change the states and suspend the flow, they // need the lock and are therefore treated specially here. synchronized (this) { if (checkCancellationLocked()) { break; } if (directiveType == GO_TO) { setPausedAtGoToLocked(i); // the actual executor delivery is done below, outside the lock, to eliminate any // deadlock possibility. } else if (directiveType == GO_LAZY) { setLazyAndEndFlowLocked(i); return; } } }
// A table-switch on a handful of options is a good compromise in code size and runtime // performance comparing to a full-fledged double-dispatch pattern with subclasses. switch (directiveType) { case GET_FROM: i = runGetFrom(directives, i); break; case MERGE_IN: i = runMergeIn(directives, i); break; case TRANSFORM: i = runTransform(directives, i); break; case CHECK: i = runCheck(directives, i); break; case GO_TO: i = runGoTo(directives, i); break; case SEND_TO: i = runSendTo(directives, i); break; case BIND: i = runBindWith(directives, i); break; case FILTER_SUCCESS: i = runFilterSuccess(directives, i); break; case END: i = runEnd(directives, i); break; // Missing GO_LAZY but it has already been dealt with in the synchronized block above. } } }
else if (directiveType == GO_LAZY) { setLazyAndEndFlowLocked(i); return; }

go lazy就lazy成这样??不要急,我们看下去。
rivate void setLazyAndEndFlowLocked(final int resumeIndex) { lastDirectiveIndex = resumeIndex; runState = PAUSED_AT_GO_LAZY; dispatchUpdate(); checkRestartLocked(); }
首先缓存这个index,lastDirectiveIndex = resumeIndex;然后dispatchUpdate();但是这并没有什么用,因为我们的数据流根本没有执行。。这样就完了,别说解决问题了,这不是创造问题了吗?!连数据都获取不到了!
@NonNull @Override public synchronized Object get() { if (runState == PAUSED_AT_GO_LAZY) { int index = lastDirectiveIndex; runState = RUNNING_LAZILY; runFlowFrom(continueFromGoLazy(directives, index), false); } return currentValue; }
private int runEnd(@NonNull final Object[] directives, final int index) { boolean skip = (Boolean) directives[index + 1]; if (skip) { skipAndEndFlow(); } else { setNewValueAndEndFlow(intermediateValue); } return -1; }
private synchronized void setNewValueAndEndFlow(@NonNull final Object newValue) { boolean wasRunningLazily = runState == RUNNING_LAZILY; runState = IDLE; intermediateValue = initialValue; // GC the intermediate value but field must be kept non-null. if (wasRunningLazily) { currentValue = newValue; // Don't notify if this new value is produced lazily } else { setNewValueLocked(newValue); // May notify otherwise } checkRestartLocked(); }
到这儿相信大家都明白了,但是其实goLazy这个方法的作用主要是字面意思,就是“懒加载”,如果我们不是用goLazy,当我们调用addUpdatable()方法的时候就会去做数据流的操作,而如果我们使用了goLazy(),所有的数据流操作会延迟到get()中去操作。这是agera”Push event,pull data model”的特点。
public class MainActivity extends AppCompatActivity implements Updatable{
private Button observableBtn; private TextView show;
private ClickObservable clickObservable; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main);
observableBtn = (Button)findViewById(; show = (TextView)findViewById(;
clickObservable = new ClickObservable(); clickObservable.addUpdatable(this); observableBtn.setOnClickListener(clickObservable); }
@Override public void update() { show.setText("update!!"); }
public static class ClickObservable extends BaseObservable implements View.OnClickListener{
@Override public void onClick(View v) { dispatchUpdate(); } } }
public static final class BroadcastObservable extends BroadcastReceiver implements ActivationHandler, Observable { @NonNull private final UpdateDispatcher updateDispatcher; @NonNull private final Context context; @NonNull private final IntentFilter filter;
BroadcastObservable(@NonNull final Context applicationContext, @NonNull final String... actions) { this.context = checkNotNull(applicationContext); this.updateDispatcher = Observables.updateDispatcher(this); this.filter = new IntentFilter(); for (final String action : actions) { this.filter.addAction(action); } }
@Override public void observableActivated(@NonNull final UpdateDispatcher caller) { context.registerReceiver(this, filter); }
@Override public void observableDeactivated(@NonNull final UpdateDispatcher caller) { context.unregisterReceiver(this); }
@Override public void onReceive(final Context context, final Intent intent) { updateDispatcher.update(); }
@Override public void addUpdatable(@NonNull final Updatable updatable) { updateDispatcher.addUpdatable(updatable); }
@Override public void removeUpdatable(@NonNull final Updatable updatable) { updateDispatcher.removeUpdatable(updatable); } }
public interface ActivationHandler {
/** * Called when the the {@code caller} changes state from having no {@link Updatable}s to * having at least one {@link Updatable}. */ void observableActivated(@NonNull UpdateDispatcher caller);
/** * Called when the the {@code caller} changes state from having {@link Updatable}s to * no longer having {@link Updatable}s. */ void observableDeactivated(@NonNull UpdateDispatcher caller); }
this.updateDispatcher = Observables.updateDispatcher(this);
@NonNull public static UpdateDispatcher updateDispatcher( @NonNull final ActivationHandler activationHandler) { return new AsyncUpdateDispatcher(activationHandler); }
private static final class AsyncUpdateDispatcher extends BaseObservable implements UpdateDispatcher {
@Nullable private final ActivationHandler activationHandler;
private AsyncUpdateDispatcher(@Nullable ActivationHandler activationHandler) { this.activationHandler = activationHandler; }
@Override protected void observableActivated() { if (activationHandler != null) { activationHandler.observableActivated(this); } }
@Override protected void observableDeactivated() { if (activationHandler != null) { activationHandler.observableDeactivated(this); } }
@Override public void update() { dispatchUpdate(); } }
public interface UpdateDispatcher extends Observable, Updatable {}
@Override public void onReceive(final Context context, final Intent intent) { updateDispatcher.update(); }
@Override public void update() { dispatchUpdate(); }
public class MainActivity extends AppCompatActivity implements Updatable{
private static final String ACTION = "action";
private TextView trigger;
private ContentObservables.BroadcastObservable observable; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main);
trigger = (TextView)findViewById(;
observable = (ContentObservables.BroadcastObservable) ContentObservables.broadcastObservable(this,ACTION); observable.addUpdatable(this); }
public void send(View view){ Intent intent = new Intent(); intent.setAction(ACTION); sendBroadcast(intent); }
@Override public void update() { trigger.setText("update!!"); }
@Override protected void onDestroy() { super.onDestroy();
observable.removeUpdatable(this); } }
@Override public void addUpdatable(@NonNull final Updatable updatable) { updateDispatcher.addUpdatable(updatable); }
@Override protected void observableActivated() { if (activationHandler != null) { activationHandler.observableActivated(this); } }
@Override public void observableActivated(@NonNull final UpdateDispatcher caller) { context.registerReceiver(this, filter); }
@Override public void onReceive(final Context context, final Intent intent) { updateDispatcher.update(); }
这里的核心思想是,既然你的这个类已经继承自了一个基类A,那么它[就是]基类A了,不可能在[是]Observable了,那怎么办呢?我们通过实现ActivationHandler, Observable这两个接口让它[拥有]对应的功能,并且通过[组合]的方式将UpdateDispatcher放置到其中,万事大吉。如果大家想要封装类似的功能,不妨按这样的思路和方式试一试。

第二点,也就是最重要,我反复说的一点,agera是”Push event,pull data model”,而RxJava是”Push data model”的,由于agera将event和data分离,所以我们可以看到,存在所谓的goLazy,知道get()方法执行的时候才去处理数据。