combineLatest

RxJava的combineLatest()函数有点像zip()函数的特殊形式。正如我们已经学习的,zip()作用于最近未打包的两个Observables。相反,combineLatest()作用于最近发射的数据项:如果Observable1发射了A并且Observable2发射了B和C,combineLatest()将会分组处理AB和AC,如下图所示:

combineLatest - 图1

combineLatest()函数接受二到九个Observable作为参数,如果有需要的话或者单个Observables列表作为参数。

从之前的例子中把loadList()函数借用过来,我们可以修改一下来用于combineLatest()实现“真实世界”这个例子:

  1. private void loadList(List<AppInfo> apps) {
  2. mRecyclerView.setVisibility(View.VISIBLE);
  3. Observable<AppInfo> appsSequence = Observable.interval(1000, TimeUnit.MILLISECONDS)
  4. .map(position ->apps.get(position.intValue()));
  5. Observable<Long> tictoc = Observable.interval(1500, TimeUnit.MILLISECONDS);
  6. Observable.combineLatest(appsSequence, tictoc,
  7. this::updateTitle)
  8. .observeOn(AndroidSchedulers.mainThread())
  9. .subscribe(new Observer<AppInfo>() {
  10. @Override
  11. public void onCompleted() {
  12. Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
  13. }
  14. @Override
  15. public void onError(Throwable e) {
  16. mSwipeRefreshLayout.setRefreshing(false);
  17. Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
  18. }
  19. @Override
  20. public void onNext(AppInfoappInfo) {
  21. if (mSwipeRefreshLayout.isRefreshing()) {
  22. mSwipeRefreshLayout.setRefreshing(false);
  23. }
  24. mAddedApps.add(appInfo);
  25. int position = mAddedApps.size() - 1;
  26. mAdapter.addApplication(position, appInfo);
  27. mRecyclerView.smoothScrollToPosition(position);
  28. }
  29. });
  30. }

这我们使用了两个Observables:一个是每秒钟从我们已安装的应用列表发射一个App数据,第二个是每隔1.5秒发射一个Long型整数。我们将他们结合起来并执行updateTitle()函数,结果如下:

combineLatest - 图2

正如你看到的,由于不同的时间间隔,AppInfo对象如我们所预料的那样有时候会重复。