Rxjava的使用与操作符的使用

时间:2022-05-20 17:48:07

依赖:

  1. compile 'io.reactivex.rxjava2:rxandroid:2.0.1'  
  2. // Because RxAndroid releases are few and far between, it is recommended you also  
  3. // explicitly depend on RxJava's latest version for bug fixes and new features.  
  4.     compile 'io.reactivex.rxjava2:rxjava:2.1.5' 

另一种解析数据的方法,阿里巴巴旗下的,听说是解析最快的解析器

  1. compile 'com.alibaba:fastjson:1.2.39'  


以下为代码:
  1. import android.os.Bundle;  
  2. import android.support.v7.app.AppCompatActivity;  
  3. import android.view.View;  
  4. import android.widget.TextView;  
  5.   
  6. import com.alibaba.fastjson.JSONObject;  
  7.   
  8. import java.io.IOException;  
  9. import java.util.concurrent.TimeUnit;  
  10.   
  11. import io.reactivex.BackpressureStrategy;  
  12. import io.reactivex.Flowable;  
  13. import io.reactivex.FlowableEmitter;  
  14. import io.reactivex.FlowableOnSubscribe;  
  15. import io.reactivex.Observable;  
  16. import io.reactivex.ObservableEmitter;  
  17. import io.reactivex.ObservableOnSubscribe;  
  18. import io.reactivex.Observer;  
  19. import io.reactivex.android.schedulers.AndroidSchedulers;  
  20. import io.reactivex.annotations.NonNull;  
  21. import io.reactivex.disposables.Disposable;  
  22. import io.reactivex.functions.BiFunction;  
  23. import io.reactivex.functions.Consumer;  
  24. import io.reactivex.functions.Function;  
  25. import io.reactivex.schedulers.Schedulers;  
  26. import okhttp3.Call;  
  27. import okhttp3.Callback;  
  28. import okhttp3.OkHttpClient;  
  29. import okhttp3.Request;  
  30. import okhttp3.Response;  
  31.   
  32. public class MainActivity extends AppCompatActivity {  
  33.   
  34.     private TextView name;  
  35.   
  36.     @Override  
  37.     protected void onCreate(Bundle savedInstanceState) {  
  38.         super.onCreate(savedInstanceState);  
  39.         setContentView(R.layout.activity_main);  
  40.   
  41.         name = (TextView) findViewById(R.id.name);  
  42.         //用来调用下面的方法,监听。  
  43.         name.setOnClickListener(new View.OnClickListener() {  
  44.             @Override  
  45.             public void onClick(View v) {  
  46.   
  47.                 interval();  
  48.             }  
  49.         });  
  50.     }  
  51.   
  52.     //例1:Observer  
  53.     public void observer() {  
  54.         //观察者  
  55.         Observer<String> observer = new Observer<String>() {  
  56.             @Override  
  57.             public void onSubscribe(@NonNull Disposable d) {  
  58.   
  59.             }  
  60.             @Override  
  61.             public void onNext(@NonNull String s) {  
  62.                 //接收从被观察者中返回的数据  
  63.                 System.out.println("onNext :" + s);  
  64.             }  
  65.             @Override  
  66.             public void onError(@NonNull Throwable e) {  
  67.   
  68.             }  
  69.             @Override  
  70.             public void onComplete() {  
  71.   
  72.             }  
  73.         };  
  74.         //被观察者  
  75.         Observable<String> observable = new Observable<String>() {  
  76.             @Override  
  77.             protected void subscribeActual(Observer<? super String> observer) {  
  78.                 observer.onNext("11111");  
  79.                 observer.onNext("22222");  
  80.                 observer.onComplete();  
  81.             }  
  82.         };  
  83.         //产生了订阅  
  84.         observable.subscribe(observer);  
  85.     }  
  86.   
  87.     //例2:Flowable  
  88.     private void flowable(){  
  89.         //被观察者  
  90.         Flowable.create(new FlowableOnSubscribe<String>() {  
  91.             @Override  
  92.             public void subscribe(@NonNull FlowableEmitter<String> e) throws Exception {  
  93.                 for (int i = 0; i < 100; i++) {  
  94.                     e.onNext(i+"");  
  95.                 }  
  96.             }  
  97.             //背压的策略,buffer缓冲区                观察者  
  98.             //背压一共给了五种策略  
  99.             // BUFFER、  
  100.             // DROP、打印前128个,后面的删除  
  101.             // ERROR、  
  102.             // LATEST、打印前128个和最后一个,其余删除  
  103.             // MISSING  
  104.             //这里的策略若不是BUFFER 那么,会出现著名的:MissingBackpressureException错误  
  105.         }, BackpressureStrategy.BUFFER).subscribe(new Consumer<String>() {  
  106.             @Override  
  107.             public void accept(String s) throws Exception {  
  108.                 System.out.println("subscribe accept"+s);  
  109.                 Thread.sleep(1000);  
  110.             }  
  111.         });  
  112.     }  
  113.   
  114.     //例3:线程调度器  Scheduler  
  115.     public void flowable1(){  
  116.         Flowable.create(new FlowableOnSubscribe<String>() {  
  117.             @Override  
  118.             public void subscribe(@NonNull FlowableEmitter<String> e) throws Exception {  
  119.                 for (int i = 0; i < 100; i++) {  
  120.                     //输出在哪个线程  
  121.                     System.out.println("subscribe Thread.currentThread.getName = " + Thread.currentThread().getName());  
  122.                     e.onNext(i+"");  
  123.                 }  
  124.             }  
  125.         },BackpressureStrategy.BUFFER)  
  126.                 //被观察者一般放在子线程  
  127.                 .subscribeOn(Schedulers.io())  
  128.                 //观察者一般放在主线程  
  129.                 .observeOn(AndroidSchedulers.mainThread())  
  130.                 .subscribe(new Consumer<String>() {  
  131.                     @Override  
  132.                     public void accept(String s) throws Exception {  
  133.                         System.out.println("s"+ s);  
  134.                         Thread.sleep(100);  
  135.                         //输出在哪个线程  
  136.                         System.out.println("subscribe Thread.currentThread.getName = " + Thread.currentThread().getName());  
  137.                     }  
  138.                 });  
  139.     }  
  140.   
  141.   
  142.     //例4:http请求网络,map转化器,fastjson解析器  
  143.     public void map1(){  
  144.         Observable.create(new ObservableOnSubscribe<String>() {  
  145.             @Override  
  146.             public void subscribe(@NonNull final ObservableEmitter<String> e) throws Exception {  
  147.                 OkHttpClient client = new OkHttpClient();  
  148.                 Request request = new Request.Builder()  
  149.                         .url("http://qhb.2dyt.com/Bwei/login")  
  150.                         .build();  
  151.                 client.newCall(request).enqueue(new Callback() {  
  152.                     @Override  
  153.                     public void onFailure(Call call, IOException e) {  
  154.   
  155.                     }  
  156.   
  157.                     @Override  
  158.                     public void onResponse(Call call, Response response) throws IOException {  
  159.                         String result = response.body().string();  
  160.                         e.onNext(result);  
  161.                     }  
  162.                 });  
  163.             }  
  164.         })  
  165.                 //map转换器  flatmap(无序),concatmap(有序)  
  166.                 .map(new Function<String, Bean>() {  
  167.             @Override  
  168.             public Bean apply(@NonNull String s) throws Exception {  
  169.                 //用fastjson来解析数据  
  170.                 return JSONObject.parseObject(s,Bean.class);  
  171.             }  
  172.         }).subscribe(new Consumer<Bean>() {  
  173.             @Override  
  174.             public void accept(Bean bean) throws Exception {  
  175.                 System.out.println("bean = "+ bean.toString() );  
  176.             }  
  177.         });  
  178.     }  
  179.   
  180.     //常见rxjava操作符  
  181.     //例 定时发送消息  
  182.     public void interval(){  
  183.         Observable.interval(2,1, TimeUnit.SECONDS)  
  184.                 .take(10)  
  185.                 .subscribe(new Consumer<Long>() {  
  186.                     @Override  
  187.                     public void accept(Long aLong) throws Exception {  
  188.                         System.out.println("aLong = " + aLong);  
  189.                     }  
  190.                 });  
  191.     }  
  192.   
  193.   
  194.     //例 zip字符串合并  
  195.     public void zip(){  
  196.         Observable observable1 = Observable.create(new ObservableOnSubscribe<String>() {  
  197.             @Override  
  198.             public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {  
  199.                 e.onNext("1");  
  200.                 e.onNext("2");  
  201.                 e.onNext("3");  
  202.                 e.onNext("4");  
  203.                 e.onComplete();  
  204.   
  205.             }  
  206.         });  
  207.         Observable observable2 = Observable.create(new ObservableOnSubscribe<String>() {  
  208.             @Override  
  209.             public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {  
  210.                 e.onNext("A");  
  211.                 e.onNext("B");  
  212.                 e.onNext("C");  
  213.                 e.onNext("D");  
  214.                 e.onComplete();  
  215.             }  
  216.         });  
  217.   
  218.         Observable.zip(observable1, observable2, new BiFunction<String,String,String>() {  
  219.             @Override  
  220.             public String apply(@NonNull String o, @NonNull String o2) throws Exception {  
  221.                 return o + o2;  
  222.             }  
  223.         }).subscribe(new Consumer<String>() {  
  224.             @Override  
  225.             public void accept(String o) throws Exception {  
  226.                 System.out.println("o"+ o);  
  227.             }  
  228.         });  
  229.     }