Retrofit+Rxjava的完美结合

时间:2021-12-02 17:44:21

    前两篇文章给大家讲解了下关于Retrofit的一些基本使用:

     Retrofit的进阶之路(一) Service

     Retrofit的进阶之路(二)添加请求头和上传图片 

     Retrofit+Rxjava的完美结合

在这篇文章里我们一起来看下Retrofit结合Rxjava后的神奇效果   1、关于Rxjava        Rxjava在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库),我们可以简单的概括为“ 异步”,说到底Rxjava是一个异步的库        那么大家就会想到同样是异步,为什么不用现成的 [ AsyncTask / Handler / XXX / ... ?]        异步操作很关键的一点是程序的简洁性,因为在调度过程比较复杂的情况下,异步代码经常会既难写也难被读懂。 Android 创造的 AsyncTask 和Handler ,其实都是为了让异步代码更加简洁。RxJava 的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁。        其次,Rxjava是属于函数响应式编程,它的基本思想是利用观察者模式,说到观察者模式我们就很容易引出RxJava 的四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。 与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。 onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。 onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和/2、Rxjava的观察者模式大致如下:
Retrofit+Rxjava的完美结合
2、如何实现Rxjava和Retrofit的结合
我们先一起看下只用Retrofit是如何进行的
我们使用豆瓣电影的Top250做测试连接,目标地址为
https://api.douban.com/v2/movie/top250?start=0&count=10
至于返回的数据格式,大家自己访问下链接就看到了,太长就不放进来了。
首先我们要根据返回的结果封装一个Entity,暂命名为MovieEntity,代码就不贴了。
接下来我们要创建一个接口取名为CommonService,代码如下:
public interface CommonService {  
//以下是retrofit 的GET请求方式
@GET("top250")
Call<MovieEntity> getTopMovie(@Query("start") int start, @Query("count") int count);
}
回到MainActivity,看下getData()方法
    private void getData() {  
//初始化Retrofit
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(ReqUrl.baseUrl)
.addConverterFactory(GsonConverterFactory.create())
.build();
//初始化Service
CommonService movieService = retrofit.create(CommonService.class);
//请求数据
Call<MovieEntity> call = movieService.getTopMovie(0, 10);
call.enqueue(new Callback<MovieEntity>() {
@Override
public void onResponse(Call<MovieEntity> call, Response<MovieEntity> response) {
tvContent.setText(response.body().toString());
Toast.makeText(RetrofitActivity.this, "成功", Toast.LENGTH_SHORT).show();
}
@Override
public void onFailure(Call<MovieEntity> call, Throwable t) {
tvContent.setText(t.getMessage());
Toast.makeText(RetrofitActivity.this, "失败", Toast.LENGTH_SHORT).show();
}
});
}
以上为没有经过封装的、原生态的Retrofit写网络请求的代码,那么问题来了,大家可能会想我们可以封装创建Retrofit和service部分的代码,然后Activity用创建一个Callback作为参数给Call,这样Activity中只关注请求的结果,而且Call有cancel方法可以取消一个请求,好像没Rxjava什么事了
接下来我们要面对的问题是这样的:
1)、ProgressDialog的show方法应该在哪调用呢?看样子只能在getMovie()这个方法里面调用了,换个地方发出请求就要在对应的Listener里面写一遍show()的代码,其实挺闹心
2)、错误请求我也想集中处理掉不要贴重复的代码
3)、如果我的Http返回数据是一个统一的格式,例
{  
"resultCode": 0,
"resultMessage": "成功",
"data": {}
}
该如何对返回结果进行一个统一的处理呢?
好了,接下来我们引入Rxjava,看看有没有转机
Step1-----> 封装第一步
Retrofit本身对Rxjava提供了支持,添加Retrofit对Rxjava的支持需要在Gradle文件中添加
compile 'com.squareup.retrofit2:adapter-rxjava:2.0.2' 
然后在创建Retrofit的过程中添加如下代码:
    Retrofit retrofit = new Retrofit.Builder()  
.baseUrl(baseUrl)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build();
这样一来我们定义的service返回值就不在是一个Call了,而是一个Observable
我们需要重新定义下CommonService
    //结合Rxjava后的  
@GET("top250")
Observable<MovieEntity> getTopMovie4(@Query("start") int start, @Query("count") int count);
getData()方法就变成了
    private void getRxjavaData() {  
String baseUrl = "https://api.douban.com/v2/movie/";
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(baseUrl)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build();
CommonService movieService = retrofit.create(CommonService.class);
movieService.getTopMovie2(0,10)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<MovieEntity>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
tv.setText(e.getMessage() + e.getLocalizedMessage());
Toast.makeText(MainActivity.this, "失败", Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(MovieEntity MovieEntity) {
tv.setText(MovieEntity.getCount());
Toast.makeText(MainActivity.this, "成功", Toast.LENGTH_SHORT).show();
}
});
}
这样基本上就完成了Retrofit和Rxjava的结合,当然这肯定不是我们最终想要的结果,我们希望在将重复的部分封装起来,在Activity中只需要传入一个Subscriber对象进来即可。
Step2------>封装第二步,将请求过程进行封装
创建一个HttpMethods来进行封装
public class HttpMethods {  
private Retrofit retrofit;
private CommonService movieService;

//构造方法私有
private HttpMethods() {
//手动创建一个OkHttpClient并设置超时时间
OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
httpClientBuilder.connectTimeout(Constant.DEFAULT_TIMEOUT, TimeUnit.SECONDS);

retrofit = new Retrofit.Builder()
.client(httpClientBuilder.build())
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.baseUrl(Constant.BASE_URL)
.build();

movieService = retrofit.create(CommonService.class);
}

//在访问HttpMethods时创建单例
private static class SingletonHolder{
private static final HttpMethods INSTANCE = new HttpMethods();
}

//获取单例
public static synchronized HttpMethods getInstance(){
if (SingletonHolder.INSTANCE == null) {
new HttpMethods();
}
return SingletonHolder.INSTANCE;
}

/**
* 用于获取豆瓣电影Top250的数据
* @param subscriber 由调用者传过来的观察者对象
* @param start 起始位置
* @param count 获取长度
*/
public void getTopMovie(Activity activity, Subscriber<MovieEntity> subscriber, int start, int count){
movieService.getTopMovie2(start, count)
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
}
}
用一个单例来封装该对象,在构造方法中创建Retrofit和对应的Service,当然如果需要访问不同的基地址,那么你可能需要创建多个请求方法
此时我们再回头来看下Activity中对应的方法
    private void getRxjavaData2() {  
Subscriber subscriber = new Subscriber<MovieEntity>() {
@Override
public void onStart() {
super.onStart();
}
@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
tv.setText(e.getMessage()+e.getLocalizedMessage());
Toast.makeText(MainActivity.this,"失败",Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(MovieEntity MovieEntity) {
tv.setText(MovieEntity.toString());
Toast.makeText(MainActivity.this,"成功",Toast.LENGTH_SHORT).show();
}
};
HttpMethods.getInstance().getTopMovie(this,subscriber, 0, 10);
}
于是又有很多人会说这个subsciber 每次请求都得写,也是属于一个重复的步骤,我们在Activity里关心的只是成功后的数据,没错,我们可以将其进一步的进行封装,新建一个抽象类ProgressSubscriber,在里面不实现onNext()方法
public abstract class ProgressSubscriber<T> extends Subscriber<T> {
private Activity activity;
public ProgressSubscriber(Activity activity) {
this.activity = activity;
}
@Override
public void onStart() {
Toast.makeText(activity, "开始请求", Toast.LENGTH_SHORT).show();
}
@Override
public void onCompleted() {
Toast.makeText(activity, "完成了赛", Toast.LENGTH_SHORT).show();
}
@Override
public void onError(Throwable e) {
Toast.makeText(activity,"失败",Toast.LENGTH_SHORT).show();
}
public void onNext(T t) {
}
}
那么此时我们回过头来看下Activity中获取数据的方法:
    private void getRxjavaData3() {
HttpMethods.getInstance().getTopMovie(this,new ProgressSubscriber1<MovieEntity>(this) {
@Override
public void onNext(MovieEntity mySubjects) {
tv.setText(mySubjects.toString());
Toast.makeText(MainActivity.this,"成功",Toast.LENGTH_SHORT).show();
}
},0,10);

}
大家可以看到是不是简洁了很多,我们只需要关注成功后的数据是,至于具体的逻辑不用在这里操作
那么接着问题又来了,现在很多的网络请求都需要加上一个dialog用来提高用户体验效果,以及在请求的时候加上判断网络是否存在,如何实现呢?
Step3------->封装第三步(加上progressDialog和网络判断)
想到Subscriber里面提供了4个方法,我们可以在start()方法里面进行操作,先做下前期的准备工作
1、判断网络是否存在的方法:
public static boolean isNetworkAvailable(Context context) {
ConnectivityManager connectivity = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
if (connectivity != null) {
NetworkInfo info = connectivity.getActiveNetworkInfo();
if (info != null && info.isConnected()) {
if (info.getState() == NetworkInfo.State.CONNECTED) {
return true;
}
}
}
return false;
}
2、自定义一个dialog
public class WaitingDialog {
public Dialog loadingDialog;
public WaitingDialog(Context context, boolean cancelable){
LayoutInflater inflater = LayoutInflater.from(context);
View v = inflater.inflate(R.layout.wait_dialog, null);// 得到加载view
LinearLayout layout = (LinearLayout) v.findViewById(R.id.dialog_view);// 加载布局

TextView tipTextView = (TextView) v.findViewById(R.id.tipTextView);// 提示文字

tipTextView.setText("正在加载中");// 设置加载信息

loadingDialog = new Dialog(context, R.style.MyDialog);// 创建自定义样式dialog
loadingDialog.setCancelable(cancelable);

loadingDialog.setContentView(v, new LinearLayout.LayoutParams(
LinearLayout.LayoutParams.MATCH_PARENT, LinearLayout.LayoutParams.MATCH_PARENT));// 设置布局

}
public void show(){
loadingDialog.show();
}
public void dismiss(){
loadingDialog.dismiss();
}
}
3、重新设计ProgressSubscirber(加上网络判断和dialog)
public abstract class ProgressSubscriber1<T> extends Subscriber<T> {
private Activity activity;
private WaitingDialog dialog;

public ProgressSubscriber1(Activity activity) {
this.activity = activity;
if(dialog==null){
dialog = new WaitingDialog(activity,true);
}
}
public void onNext(T t) {
}
@Override
public void onStart() {
// 显示进度条
showLoadingProgress();
Toast.makeText(activity, "开始请求", Toast.LENGTH_SHORT).show();
if (!CheckNetUtil.isNetworkAvailable(activity)) {
Toast.makeText(activity, "当前网络不可用,请检查网络情况", Toast.LENGTH_SHORT).show();
// 一定要调用下面这一句
onCompleted();
return;
}
}
private void showLoadingProgress() {
dialog.show();
}
@Override
public void onCompleted() {
dissMissLoadingProgress();
Toast.makeText(activity, "完成了赛", Toast.LENGTH_SHORT).show();
}

@Override
public void onError(Throwable e) {
dissMissLoadingProgress();
Toast.makeText(activity,"失败",Toast.LENGTH_SHORT).show();
}
public void dissMissLoadingProgress(){
dialog.dismiss();
}
}
当然,我们在使用Observable的时候有一个doOnSubscribe,网上有很多人说初始化最好放在这里面实现,但是我试了一下,执行的时候是先执行Subscriber的onStart方法后才会执行doOnSubscribe,所以我这边是在onstart()方法里进行操作的,具体什么原因,欢迎各位不腻赐教!!!!!!
Observable.doOnSubscribe(new Action0() {
@Override
public void call() {
Toast.makeText(activity, "试试", Toast.LENGTH_SHORT).show();
}
})
好了,到这边我们可以告一段落了,接下来我们回头再看下HttpMethods方法,里面有一段代码
movieService.getTopMovie2(start, count)
.map(new HttpResultFunc<MovieEntity>())
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
我们每次增加一个网络请求的时候有些代码也是重复的,接下来我们一下来再次封装下
Step4----->封装第四步
新建一个toSubscribe方法
private <T> void toSubscribe(Observable<T> o, Subscriber<T> s){
o.compose(schedulersTransformer())
.subscribe(s);
}

Observable.Transformer schedulersTransformer() {
return new Observable.Transformer() {
@Override
public Object call(Object observable) {
//在这里进行操作刚才的代码
return ((Observable) observable).subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
此时,getTopMovie2变成如下
public void getTopMovie1(Activity activity, Subscriber<MovieEntity> subscriber, int start, int count){
// movieService.getTopMovie2(start, count)
// .map(new HttpResultFunc<MovieEntity>())
// .subscribeOn(Schedulers.io())
// .unsubscribeOn(Schedulers.io())
// .observeOn(AndroidSchedulers.mainThread())
// .subscribe(subscriber);

Observable observable = movieService.getTopMovie2(start, count)
.map(new HttpResultFunc1<MovieEntity>());
toSubscribe(observable, subscriber);
}
这里有一个.map操作符,它是把发射对象转成另外一个对象发射出去,如下,相当于把第一个泛型T 转换成第二个泛型T发射出去
private class HttpResultFunc<T> implements Func1<T,T>{
@Override
public T call(T t) {
return t;
}
}
当然除了map操作符外,还有其他的操作符,比如flatMap,filter,each等等,这里就不一一解释了,需要的可以网上找下
filter:集合进行过滤
flatMap:发射对象转成另一个Observable,进而把这个Observable发射的对象发射出去
each:遍历集合
take:取出集合中的前几个
skip:跳过前几个元素
unique:相当于按照数学上的集合处理,去重
Step5------>相同格式的Http请求返回数据如何封装
有些Http服务返回一个固定格式的数据的问题。例如:
{
"resultCode": 0,
"resultMessage": "成功",
"data": {}
}
大部分的Http服务可能都是这样设置,resultCode和resultMessage的内容相对比较稳定,而data的内容变化多端,针对这个问题如何解决呢,别着急,接着往下看
我们可以创建一个HttpResult类
public class HttpResults<T> {

//用来模仿resultCode和resultMessage
private int count;
private int start;
private int total;
private String title;

//用来模仿Data
private T subjects;
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
public int getStart() {
return start;
}
public void setStart(int start) {
this.start = start;
}
public int getTotal() {
return total;
}
public void setTotal(int total) {
this.total = total;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public T getSubjects() {
return subjects;
}
public void setSubjects(T subjects) {
this.subjects = subjects;
}
}
如果data是一个MySubject对象的话。那么在定义Service方法的返回值就可以写为
   //结合Rxjava后的
@GET("top250")
Observable<HttpResults<List<MySubject>>> getTopMovie3(@Query("start") int start, @Query("count") int count);
这样一来HttpResult就相当于一个包装类,将结果包装了起来,但是在使用的时候要给出一个明确的类型。
那么此时只需要将上面提到的map里面出入的HttpResultFunc进行完善下即可
 private class HttpResultFunc<T> implements Func1<HttpResults<T>,T>{
@Override
public T call(HttpResults<T> tHttpResults) {
//在这边可以进行异常的处理
// 这里只是写的一个例子,当返回条数不等于10的时候,抛出一个异常,在异常里进行处理
if(tHttpResults.getCount()!=10){
throw new ApiException(100);
}
return tHttpResults.getSubjects();
}
}
在方法里面我们可以根据自己的情况进行代码编写,比如我们想要对resultCode和resultMessage先做一个判断,因为如果resultCode == 0代表success,那么resultCode != 0时data一般都是null
基于这种考虑,我们在resultCode != 0的时候,抛出个自定义的ApiException。这样就会进入到subscriber的onError中,我们可以在onError中处理错误信息。
另外,请求成功时,需要将data数据转换为目标数据类型传递给subscriber
这边贴出ApiException这个类,大家可根据自己的实际情况进行编写
public class ApiException extends RuntimeException{
public static final int USER_NOT_EXIST = 100;
public static final int WRONG_PASSWORD = 101;

public ApiException(int resultCode) {
this(getApiExceptionMessage(resultCode));
}

public ApiException(String detailMessage) {
super(detailMessage);
}

/**
* 由于服务器传递过来的错误信息直接给用户看的话,用户未必能够理解
* 需要根据错误码对错误信息进行一个转换,在显示给用户
* @param code
* @return
*/
private static String getApiExceptionMessage(int code){
String message = "";
switch (code) {
case USER_NOT_EXIST:
message = "该用户不存在";
break;
case WRONG_PASSWORD:
message = "密码错误";
break;
default:
message = "未知错误";
}
return message;
}
}

好了,到这边基本上已经将Retrofit集合Rxjava讲解完了,希望各位能用的顺手,同时文章中有错误的或者有更好方案的欢迎大家一起讨论