Rxjava 观察者模式简单使用

时间:2022-09-30 17:46:08
package com.lixinyang.mytest;

import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;

import com.lixinyang.mytest.aa.News;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;


public class MainActivity extends AppCompatActivity {
private static final String TAG = "MainActivity";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
initView();
initView1();
initView2();
initView3();
initView4();
}

private void initView4() {
String string[] = new String[]{"a","aa","aaa","aaaa"};
Observable.fromArray(string).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept: "+s );
}
});
}

private void initView3() {
Observable.just("1","2","3","4")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept: "+s );
}
});
}

private void initView2() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(111);
e.onNext(222);
e.onNext(333);
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: "+integer+"");
}
});
}

private void initView1() {
Observable.create(new ObservableOnSubscribe<News>() {
@Override
public void subscribe(ObservableEmitter<News> e) throws Exception {
e.onNext(new News("aa","asd"));
e.onNext(new News("bb","123"));
e.onNext(new News("cc","haha"));
}
}).subscribe(new Observer<News>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe: "+d );
}

@Override
public void onNext(News value) {
Log.e(TAG, "onNext: "+value.title+"======"+value.msg );
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: "+e );
}

@Override
public void onComplete() {
Log.e(TAG, "onComplete: "+"请求成功" );
}
});
}

private void initView() {
//1.创建被观察者,通过 创建操作符 创建observable对象
Observable<String> ob = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("a");
e.onNext("b");
e.onNext("c");
e.onNext("d");

// e.onError(new Throwable("我是宜昌"));
e.onNext("e");
e.onNext("f");
e.onNext("g");
e.onComplete();
}
});
//2.创建观察者
Observer<String> stringObserver = new Observer<String>(){

@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe: "+d );
}

@Override
public void onNext(String value) {
Log.e(TAG, "onNext: "+value );
}

@Override
public void onError(Throwable e) {
Log.e(TAG, "Throwable: "+e );

}

@Override
public void onComplete() {
Log.e(TAG, "onSubscribe: " );
}
};
ob.subscribe(stringObserver);
}
}