目录
观察者模式
四大要素:Observable(被观察者),Observer (观察者),subscribe (订阅),事件。
观察者订阅被观察者,一旦被观察者发出事件,观察者就可以接收到。
扩展的观察者模式
当事件完成时会回调onComplete(),在完成过程中发生了异常会回调onError(),onError()和onComplete()只会回调一个。
引入依赖
implementation 'io.reactivex.rxjava3:rxjava:3.1.3'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
//创建被观察者 Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable { emitter.onNext("Hello Uncle Xing"); emitter.onComplete(); } }); //创建观察者 Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { Log.i(tag, "onSubscribe"); } @Override public void onNext(@NonNull String s) { Log.i(tag, "onNext:" + s); } @Override public void onError(@NonNull Throwable e) { Log.i(tag, "onError:" + e.getMessage()); } @Override public void onComplete() { Log.i(tag, "onComplete"); } }; //订阅事件 observable.subscribe(observer);
操作符
创建Observable
create:用于创建Observable
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable { emitter.onNext("Hello Uncle Xing"); emitter.onComplete(); } }).subscribe(new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { Log.i(tag, "onSubscribe"); } @Override public void onNext(@NonNull String s) { Log.i(tag, "onNext:" + s); } @Override public void onError(@NonNull Throwable e) { Log.i(tag, "onError:" + e.getMessage()); } @Override public void onComplete() { Log.i(tag, "onComplete"); } });
just:创建一个Observable并自动调用onNext发射数据,just中传递的参数将直接在Observer的onNext方法中接收到
Observable.just("Uncle Xing").subscribe(new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { Log.i(tag, "onSubscribe"); } @Override public void onNext(@NonNull String s) { Log.i(tag, "onNext:" + s); } @Override public void onError(@NonNull Throwable e) { Log.i(tag, "onError:" + e.getMessage()); } @Override public void onComplete() { Log.i(tag, "onComplete"); } });
interval:创建一个按固定时间间隔发射整数序列的Observable,可用作定时器。
Observable.interval(1000, TimeUnit.MILLISECONDS).subscribe(new Observer<Long>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Long aLong) { Log.i(tag, "count:" + aLong); //这里是非主线程,会隔1s打印出0,1,2,3.... } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { } });
timer:创建一个Observable,它在一个特定延迟后发射一个值
Observable.timer(1000, TimeUnit.MILLISECONDS).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long aLong) { Log.i(tag, "count:" + aLong); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
转换Observable
map:对数据进行变换后,可以返回任意值,对数据的变换是1对1进行的。
Observable.just(666).map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Throwable { return integer.toString(); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Throwable { Log.i(tag, "map:" + s); } });
flatMap:对数据变换后,返回ObservableSource对象,可以对数据进行一对多,多对多的变换。
Observable.just(1, 2, 3, 4, 5).flatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Throwable { return Observable.just(integer.toString()); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Throwable { Log.i(tag, "accept:" + s); } });
buffer:把Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值
Observable.just(1, 2, 3, 4, 5, 6).buffer(3).subscribe(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> integers) throws Throwable { Log.i(tag, integers.toString()); } });
Log会分两次打印,第一次打印 [1, 2, 3],第二次打印 [4, 5, 6]
过滤Observable
distinct:去掉重复数据
Observable.just(1, 2, 3, 4, 2, 3).distinct().subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Integer integer) { Log.i(tag, "distinct:" + integer); } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { } });
elementAt:取出指定位置的数据
Observable.just(1, 2, 3, 4).elementAt(1).subscribe(new MaybeObserver<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onSuccess(@NonNull Integer integer) { Log.i(tag, "onSuccess:" + integer); } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { } });
filter:对数据进行指定规则的过滤
Observable.just(1, 2, 3, 4).filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Throwable { return integer > 1; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Throwable { Log.i(tag, "filter:" + integer); } });
组合Observable
zip:通过一个函数将多个Observable的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项
Observable<Integer> observable = Observable.just(10, 20, 30, 40); Observable<Integer> observable2 = Observable.just(1, 2, 3); Observable.zip(observable, observable2, new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer integer, Integer integer2) throws Throwable { return integer + integer2; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Throwable { Log.i(tag, "zip:" + integer); } });
注意:当其中一个Observable发送数据结束或异常,另外一个也停止发送,所以这里只会打印出11,22,33
merge:合并多个Observable的发射物
Observable<Integer> observable = Observable.just(10, 20, 30, 40); Observable<Integer> observable2 = Observable.just(1, 2, 3); Observable.merge(observable, observable2).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Throwable { Log.i(tag, "merge:" + integer);//会打印出10,20,30,1,2,3 } });
错误处理
- onErrorReturn:让Observable遇到错误时发射一个特殊的项并且正常终止
- onErrorResumeNext:让Observable在遇到错误时开始发射第二个Observable的数据序列
Schedulers调度器-解决多线程问题
- io():用于I/O操作;
- computation():计算工作默认的调度器;
- immediate():立即执行,允许立即在当前线程执行你指定的工作;
- newThread():创建新线程;
- trampoline():顺序处理,按需处理队列,并运行队列的每一个任务。
AndroidSchedulers:RxAndroid提供在Android平台的调度器,指定观察者在主线程。
SubscribeOn用于每个Observable对象,ObserveOn用于每个Observer对象
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable { emitter.onNext(100); emitter.onComplete(); Log.i(tag, "subscribe thread:" + Thread.currentThread().getName());//打印subscribe thread:RxNewThreadScheduler-1 } }).subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Integer integer) { Log.i(tag, "onNext thread:" + Thread.currentThread().getName());//打印onNext thread:main } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { } });
管理RxJava的生命周期
在使用RxJava的时候,如果没有及时解除订阅,在退出Activity的时候,异步线程还在执行,对Activity的引用还在,此时就会产生内存泄露问题。
可使用RxLifecycle,传送门
引入依赖
implementation 'com.trello.rxlifecycle4:rxlifecycle:4.0.2'
implementation 'com.trello.rxlifecycle4:rxlifecycle-components:4.0.2'
让你的Activity继承RxAppCompatActivity,Fragment继承RxFragment,其余类似,然后使用bindUntilEvent或者bindToLifecycle
Observable.interval(1000, TimeUnit.MILLISECONDS) .compose(bindUntilEvent(ActivityEvent.DESTROY)) //当前Activity执行到onDestroy时,Observable取消订阅 .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Throwable { Log.i(tag, "accept:" + aLong); } });
Observable.interval(1000, TimeUnit.MILLISECONDS) .compose(bindToLifecycle()) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Throwable { Log.i(tag, "accept:" + aLong); } });
使用bindToLifecycle:
如果Observable在onCreate执行,那么当执行到onDestroy时取消订阅。
如果Observable在onStart执行,那么当执行到onStop时取消订阅。
如果Observable在onResume执行,那么当执行到onPause时取消订阅。
RxJava与Retrofit完成网络请求
public interface MyService { @GET("gallery/{imageType}/response") Observable<List<String>> getImages(@Path("imageType") String imageType); }
Retrofit retrofit = new Retrofit.Builder() .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) .baseUrl(BASE_URL) .build(); MyService service = retrofit.create(MyService.class); service.getImages("banner") .compose(bindToLifecycle()) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> strings) throws Throwable { //todo } });