欢迎加入技术谈论群:714476794
本来昨晚2016.12.31晚要发布的,没写好,今天,2017年第一天,早上就整理了下,把RxJava1.x给替换成了RxJava2.x版本,基本都需要改了,不懂的可以先去了解RxJava1.x 2.x。这篇文章主要是使用RxJava封装异步任务,用过AsyncTask的童鞋都清楚,AsyncTask的原理,调用顺序,线程切换等,废话不多说上代码:
/** * Author KINCAI * . * description TODO * . * Time 2016-12-31 23:16 */ public abstract class RxAsyncTask<Param, Progress, Result> { private final String TAG = this.getClass().getSimpleName(); private Flowable<Progress[]> mFlowable2; public RxAsyncTask() { } @SafeVarargs private final void rxTask(final Param... params) { Flowable.create(new FlowableOnSubscribe<Result>() { @Override public void subscribe(FlowableEmitter<Result> e) throws Exception { e.onNext(RxAsyncTask.this.call(params)); e.onComplete(); } }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<Result>() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } @Override public void onNext(Result result) { RxAsyncTask.this.onResult(result); } @Override public void onError(Throwable t) { RxAsyncTask.this.onError(t); } @Override public void onComplete() { RxAsyncTask.this.onCompleted(); } }); } protected abstract Result call(Param... params); /**任务开始之前调用(在当前调用者所在线程执行)*/ protected void onPreExecute() { } /** 执行结果返回*/ protected void onResult(Result result) { } /**进度更新*/ protected void onProgressUpdate(Progress... progresses) { } /**RxJava中的onComplete回调*/ protected void onCompleted() { } /**RxJava中的onError回调*/ protected void onError(Throwable e) { } /**进度更新 子线程转主线程 回调给 onProgressUpdate()方法*/ protected void publishProgress(final Progress... progresses) { if (mFlowable2 == null) { mFlowable2 = Flowable.create(new FlowableOnSubscribe<Progress[]>() { @Override public void subscribe(FlowableEmitter<Progress[]> e) throws Exception { e.onNext(progresses); } }, BackpressureStrategy.BUFFER).observeOn(AndroidSchedulers.mainThread()); } mFlowable2.subscribe(new Consumer<Progress[]>() { @Override public void accept(Progress[] progress) throws Exception { onProgressUpdate(progresses); } }); } @SafeVarargs public final void execute(Param... params) { onPreExecute(); rxTask(params); } }
MainActivity使用RxAsyncTask,跟AsyncTask类似需要的方法可以重写父类方法:
public class MainActivity extends AppCompatActivity { @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); final TextView text= (TextView) findViewById(R.id.test); new RxAsyncTask<String, Integer, String>() { @Override protected String call(String... param) { Log.e("RxAsyncTask","call"); for (int i = 10; i >0; i--) { try { publishProgress(i); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } return param[0]; } @Override protected void onResult(String s) { Log.e("RxAsyncTask","onNext"); Toast.makeText(MainActivity.this,s,Toast.LENGTH_SHORT).show(); text.setText(s); } @Override protected void onPreExecute() { Log.e("RxAsyncTask","onPreExecute"); } @Override protected void onProgressUpdate(Integer... integers) { text.setText("2017倒数:"+integers[0]); Log.e("RxAsyncTask","onProgressUpdate"); } @Override protected void onCompleted() { Log.e("RxAsyncTask","onCompleted"); } @Override protected void onError(Throwable e) { Log.e("RxAsyncTask","onError"); } }.execute("Happy New Year 2017"); } }
效果 模拟器太卡了 是10开始倒数的
Demo:http://download.csdn.net/detail/hqiong208/9726525
有待完善
end