4.1.3 RxJava中的观察者模式
@Testpublic void rxJavaBaseUse(){//被观察者( 主题 )Observable observable = Observable.create(new Action1<Emitter<String>>(){@Overridepublic void call(Emitter<String> emitter){emitter.onNext("apple");emitter.onNext("banana");emitter.onNext("pear");emitter.onCompleted();}}, Emitter.BackpressureMode.NONE);//订阅者(观察者)Subscriber<String> subscriber = new Subscriber<String>(){@Overridepublic void onNext(String s){log.info("onNext: {}", s);}@Overridepublic void onCompleted(){log.info("onCompleted");}@Overridepublic void onError(Throwable e){log.info("onError");}};//订阅:Observable与Subscriber之间依然通过subscribe()进行关联。observable.subscribe(subscriber);}
4.1.4 RxJava的不完整回调
Rxjava为了支持函数式编程,另外定义了几个函数式接口:比较重要的:Action0和Action1
4.2 创建型操作符
- create(): 使用函数从头创建一个 Observable 主题对象
- defer(): 只有当订阅者订阅才创建 Observable 主题对象,为每个订阅创建一个新的Observable 主题对象。
- range(): 创建一个弹射指定范围的整数序列的 Observable 主题对象
- interval(): 创建一个按照给定的时间间隔弹射整数序列的 Observable 主题对象
- timer(): 创建一个在给定的延时之后弹射单个数据的 Observable 主题对象。
- empty(): 创建一个什么都不做直接通知完成的 Observable 主题对象。
- error(): 创建一个什么都不做直接通知错误的 Observable 主题对象。
never(): 创建一个不弹射任何数据的 Observable 主题对象。
4.2.1 just操作符
Observable 的 just 操作符用于创建一个 Observable 主题,并且会将实参数据弹射出来。 just操作符可接收多个实参,所有实参都将被逐一弹射。
@Testpublic void justDemo(){//发送一个字符串"hello world"Observable.just("hello world" ).subscribe(s -> log.info("just string->" + s));//逐一发送1,2,3,4这四个整数Observable.just(1, 2, 3, 4).subscribe(i -> log.info("just int->" + i));}
虽然 just 操作符可以弹射多个数据,但是上限为 9 个。
4.2.2 from操作符
from 操作符以数组、 Iterable 迭代器等对象作为输入,创建一个 Observable 主题对象,然后将实参(如数组、 Iterable 迭代器等)中的数据元素逐一弹射出去
@Testpublic void fromDemo(){//逐一发送一个字符数组的每个元素String[] items = {"a", "b", "c", "d", "e", "f"};Observable.from(items).subscribe(s -> log.info("just string->" + s));//逐一发送送一个字符数组的每个元素Integer[] array = {1, 2, 3, 4};List<Integer> list = Arrays.asList(array);Observable.from(list).subscribe(i -> log.info("just int->" + i));}
4.2.3 range操作符
@Testpublic void rangeDemo(){//逐一发一组范围内的整数序列Observable.range(1, 10).subscribe(i -> log.info("just int->" + i));}
4.2.4 interval操作符
interval 操作符创建一个 Observable 主题对象(消息流),该消息流会按照固定时间间隔发射整数序列。
@Testpublic void intervalDemo() throws InterruptedException{Observable.interval(100, TimeUnit.MILLISECONDS).subscribe(aLong -> log.info(aLong.toString()));Thread.sleep(Integer.MAX_VALUE);}
4.2.5 defer操作符
just、 from、 range 以及其他创建操作符都是在创建主题时弹射数据,而不是在被订阅时弹射数据。而 defer 操作符在创建主题时并不弹射数据,它会一直等待, 直到有观察者订阅才会弹射数据
4.3 过滤型操作
4.3.1 filter操作符
/*** 演示 filter 基本使用*/@Testpublic void filterDemo(){//通过filter过滤能被5整除的数Observable.range(1, 20).filter(new Func1<Integer, Boolean>(){@Overridepublic Boolean call(Integer integer){return integer % 5 == 0;}}).subscribe(i -> log.info("filter int->" + i));}/*** 演示 filter 基本使用 ,lamda 形式*/@Testpublic void filterDemoLamda(){//通过filter过滤能被5整除的数Observable.range(1, 20).filter(integer -> integer % 5 == 0).subscribe(i -> log.info("filter int->" + i));}
4.3.2 distinct操作符
/*** 演示 distinct 基本使用*/@Testpublic void distinctDemo(){Observable.just("apple", "pair", "banana", "apple", "pair" ).distinct().subscribe(s -> log.info("distinct s->" + s));}
4.4 转换型操作符
4.4.1 map操作符
/*** 演示 map 转换*/@Testpublic void intervalDemo(){Observable.range(1, 4).map(i -> i * i).subscribe(i -> log.info(i.toString()));}
4.4.2 flatMap操作符
flatMap 转换是一对一类型或者一对多类型的,原来弹射了几个数据,转换之后可以是更多个数据。
- flatMap 转换同样可以改变弹射的数据类型。
- flatMap 转换后的数据还是会逐个发射给下游的 Subscriber 来接收,表面上就像这些数据是由一个 Observable 发射的一样,其实是多个 Observable 发射然后合并的
/*** 演示 flapMap 转换*/@Testpublic void flapMapDemo(){/*** 注意 flatMap 中的 just 所创建的是一个新的流*/Observable.range(1, 4).flatMap(i -> Observable.just(i * i, i * i + 1)).subscribe(i -> log.info(i.toString()));}
4.4.3 scan操作符
scan 操作符对一个 Observable 流序列的每一项数据应用一个累积函数, 然后将这个函数的累积结果弹射出去。
```java
@Test
public void scanDemo()
{/*** 定义一个 accumulator 累积函数*/Func2<Integer, Integer, Integer> accumulator = new Func2<Integer, Integer, Integer>(){@Overridepublic Integer call(Integer input1, Integer input2){log.info(" {} + {} = {} ", input1, input2, input1 + input2);return input1 + input2;}};
/*** 使用scan 进行流扫描*/Observable.range(1, 5).scan(accumulator).subscribe(new Action1<Integer>(){@Overridepublic void call(Integer sum){log.info(" 累加的结果: {} ", sum);}});}
<a name="lvb78"></a>## 4.5 聚合操作符<a name="vyNkr"></a>## 4.5.1 count操作符```java/*** 演示 count 计数操作符*/@Testpublic void countDemo(){String[] items = {"one", "two", "three","fore"};Integer count = Observable.from(items).count().toBlocking().single();log.info("计数的结果为 {}",count);}
4.5.2 reduce操作符

与scan有异曲同工之妙
/*** 演示 reduce 扫描操作符*/@Testpublic void reduceDemo(){/*** 定义一个 accumulator 累积函数*/Func2<Integer, Integer, Integer> accumulator = new Func2<Integer, Integer, Integer>(){@Overridepublic Integer call(Integer input1, Integer input2){log.info(" {} + {} = {} ", input1, input2, input1 + input2);return input1 + input2;}};/*** 使用 reduce 规约操作符*/Observable.range(1, 5).reduce(accumulator).subscribe(new Action1<Integer>(){@Overridepublic void call(Integer sum){log.info(" 规约的结果: {} ", sum);}});}
4.6 其他操作符
4.6.1 take操作符
@Testpublic void takeDemo() throws InterruptedException{Observable.interval(1, TimeUnit.SECONDS) //设置间隔执行.take(10) //10秒倒计时.map(aLong -> 10 - aLong).subscribe(aLong -> log.info(aLong.toString()));Thread.sleep(Integer.MAX_VALUE);}
4.6.2 window操作符
RxJava 的窗口可以理解为固定数量(或者固定时间间隔)的元素分组。 假定通过 window 操作符以固定数量 n 进行窗口划分,一旦流上弹射的元素的数量足够一个窗口的数量 n, 那么输出流上将弹出一个新的元素,输出元素是一个 Observable 主题对象,该主题包含源流窗口之内的 n 个元素 
/*** window 创建操作符 创建滑动窗口* 演示 window 创建操作符 创建滑动窗口*/@Testpublic void simpleWindowObserverDemo(){List<Integer> srcList = Arrays.asList(10, 11, 20, 21, 30, 31);Observable.from(srcList).window(3).flatMap(o -> o.toList()).subscribe(list -> log.info(list.toString()));}
创建重叠窗口使用函数 window(int count, int skip),其中第一个参数为窗口的元素个数,第二个参数为下一个窗口跳过的元素个数 
/*** window 创建操作符 创建滑动窗口* 演示 window 创建操作符 创建滑动窗口*/@Testpublic void windowObserverDemo(){List<Integer> srcList = Arrays.asList(10, 11, 20, 21, 30, 31);Observable.from(srcList).window(3, 1).flatMap(o -> o.toList()).subscribe(list -> log.info(list.toString()));}
4.7 RxJava的Scheduler调度器
- Schedulers.io():用于获取内部的 ioScheduler 调度器实例
- Schedulers. newThread ():用于获取内部的 newThreadScheduler 调度器实例,该调度器 为 RxJava 流操作创建一个新线程。
- Schedulers.trampoline ():使用当前线程立即执行 RxJava 流操作。
Schedulers. single ():使用 RxJava 内置的单例线程执行 RxJava 流操作
4.8 背压
4.8.1 背压问题
当上下游的流操作处于不同的线程时,如果上游弹射数据的速度快于下游接收处理数据的速度,对于那些没来得及处理的数据就会造成积压,这些数据既不会丢失, 又不会被垃圾回收机制回收,而是存放在一个异步缓存池中,如果缓存池中的数据一直得不到处理,越积越多,最后就会造成内存溢出,这便是响应式编程中的背压问题。
4.8.2 背压问题的几种应对模式
在创建主题时可以使用 Observable 类的一个重载的 create 方法设置具体的背压模式, 该方法的源代码如下:
public static <T> Observable<T> create(Action1<Emitter<T>> emitter, Emitter.BackpressureMode backpressure) {return unsafeCreate(new OnSubscribeCreate<T>(emitter, backpressure));}
背压模式有多种,比较常用的有“最近模式”Emitter.BackpressureMode.LATEST。这种模式的含义为:如果消费跟不上, 那么仅仅缓存最近弹射出来的数据,将老旧一点的数据直接丢弃
背压模式:BackpressureMode.DROP: 在这种模式下, Observable 主题使用固定大小为 128 的缓冲区。如果下游订阅者无法处理,流的第一个元素就会缓存下来,后续的会被丢弃。
- BackpressureMode.LATEST
- BackpressureMode.NONE 和 BackpressureMode.ERROR: 在这两种模式中发送的数据不使用背压。
- BackpressureMode.BUFFER: 在这种模式下,有一个无限的缓冲区(初始化时是 128) ,下游消费不了的元素全部会放到缓冲区中。如果缓冲区中持续地积累, 就会导致内存耗尽,抛出OutOfMemoryException 异常
