依赖:
- compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
- // Because RxAndroid releases are few and far between, it is recommended you also
- // explicitly depend on RxJava's latest version for bug fixes and new features.
- compile 'io.reactivex.rxjava2:rxjava:2.1.5'
另一种解析数据的方法,阿里巴巴旗下的,听说是解析最快的解析器
- compile 'com.alibaba:fastjson:1.2.39'
以下为代码:
- import android.os.Bundle;
- import android.support.v7.app.AppCompatActivity;
- import android.view.View;
- import android.widget.TextView;
- import com.alibaba.fastjson.JSONObject;
- import java.io.IOException;
- import java.util.concurrent.TimeUnit;
- import io.reactivex.BackpressureStrategy;
- import io.reactivex.Flowable;
- import io.reactivex.FlowableEmitter;
- import io.reactivex.FlowableOnSubscribe;
- import io.reactivex.Observable;
- import io.reactivex.ObservableEmitter;
- import io.reactivex.ObservableOnSubscribe;
- import io.reactivex.Observer;
- import io.reactivex.android.schedulers.AndroidSchedulers;
- import io.reactivex.annotations.NonNull;
- import io.reactivex.disposables.Disposable;
- import io.reactivex.functions.BiFunction;
- import io.reactivex.functions.Consumer;
- import io.reactivex.functions.Function;
- import io.reactivex.schedulers.Schedulers;
- import okhttp3.Call;
- import okhttp3.Callback;
- import okhttp3.OkHttpClient;
- import okhttp3.Request;
- import okhttp3.Response;
- public class MainActivity extends AppCompatActivity {
- private TextView name;
- @Override
- protected void onCreate(Bundle savedInstanceState) {
- super.onCreate(savedInstanceState);
- setContentView(R.layout.activity_main);
- name = (TextView) findViewById(R.id.name);
- //用来调用下面的方法,监听。
- name.setOnClickListener(new View.OnClickListener() {
- @Override
- public void onClick(View v) {
- interval();
- }
- });
- }
- //例1:Observer
- public void observer() {
- //观察者
- Observer<String> observer = new Observer<String>() {
- @Override
- public void onSubscribe(@NonNull Disposable d) {
- }
- @Override
- public void onNext(@NonNull String s) {
- //接收从被观察者中返回的数据
- System.out.println("onNext :" + s);
- }
- @Override
- public void onError(@NonNull Throwable e) {
- }
- @Override
- public void onComplete() {
- }
- };
- //被观察者
- Observable<String> observable = new Observable<String>() {
- @Override
- protected void subscribeActual(Observer<? super String> observer) {
- observer.onNext("11111");
- observer.onNext("22222");
- observer.onComplete();
- }
- };
- //产生了订阅
- observable.subscribe(observer);
- }
- //例2:Flowable
- private void flowable(){
- //被观察者
- Flowable.create(new FlowableOnSubscribe<String>() {
- @Override
- public void subscribe(@NonNull FlowableEmitter<String> e) throws Exception {
- for (int i = 0; i < 100; i++) {
- e.onNext(i+"");
- }
- }
- //背压的策略,buffer缓冲区 观察者
- //背压一共给了五种策略
- // BUFFER、
- // DROP、打印前128个,后面的删除
- // ERROR、
- // LATEST、打印前128个和最后一个,其余删除
- // MISSING
- //这里的策略若不是BUFFER 那么,会出现著名的:MissingBackpressureException错误
- }, BackpressureStrategy.BUFFER).subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- System.out.println("subscribe accept"+s);
- Thread.sleep(1000);
- }
- });
- }
- //例3:线程调度器 Scheduler
- public void flowable1(){
- Flowable.create(new FlowableOnSubscribe<String>() {
- @Override
- public void subscribe(@NonNull FlowableEmitter<String> e) throws Exception {
- for (int i = 0; i < 100; i++) {
- //输出在哪个线程
- System.out.println("subscribe Thread.currentThread.getName = " + Thread.currentThread().getName());
- e.onNext(i+"");
- }
- }
- },BackpressureStrategy.BUFFER)
- //被观察者一般放在子线程
- .subscribeOn(Schedulers.io())
- //观察者一般放在主线程
- .observeOn(AndroidSchedulers.mainThread())
- .subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- System.out.println("s"+ s);
- Thread.sleep(100);
- //输出在哪个线程
- System.out.println("subscribe Thread.currentThread.getName = " + Thread.currentThread().getName());
- }
- });
- }
- //例4:http请求网络,map转化器,fastjson解析器
- public void map1(){
- Observable.create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(@NonNull final ObservableEmitter<String> e) throws Exception {
- OkHttpClient client = new OkHttpClient();
- Request request = new Request.Builder()
- .url("http://qhb.2dyt.com/Bwei/login")
- .build();
- client.newCall(request).enqueue(new Callback() {
- @Override
- public void onFailure(Call call, IOException e) {
- }
- @Override
- public void onResponse(Call call, Response response) throws IOException {
- String result = response.body().string();
- e.onNext(result);
- }
- });
- }
- })
- //map转换器 flatmap(无序),concatmap(有序)
- .map(new Function<String, Bean>() {
- @Override
- public Bean apply(@NonNull String s) throws Exception {
- //用fastjson来解析数据
- return JSONObject.parseObject(s,Bean.class);
- }
- }).subscribe(new Consumer<Bean>() {
- @Override
- public void accept(Bean bean) throws Exception {
- System.out.println("bean = "+ bean.toString() );
- }
- });
- }
- //常见rxjava操作符
- //例 定时发送消息
- public void interval(){
- Observable.interval(2,1, TimeUnit.SECONDS)
- .take(10)
- .subscribe(new Consumer<Long>() {
- @Override
- public void accept(Long aLong) throws Exception {
- System.out.println("aLong = " + aLong);
- }
- });
- }
- //例 zip字符串合并
- public void zip(){
- Observable observable1 = Observable.create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
- e.onNext("1");
- e.onNext("2");
- e.onNext("3");
- e.onNext("4");
- e.onComplete();
- }
- });
- Observable observable2 = Observable.create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
- e.onNext("A");
- e.onNext("B");
- e.onNext("C");
- e.onNext("D");
- e.onComplete();
- }
- });
- Observable.zip(observable1, observable2, new BiFunction<String,String,String>() {
- @Override
- public String apply(@NonNull String o, @NonNull String o2) throws Exception {
- return o + o2;
- }
- }).subscribe(new Consumer<String>() {
- @Override
- public void accept(String o) throws Exception {
- System.out.println("o"+ o);
- }
- });
- }