RxJava: merge, zip, resumeNext

面试的人问一个简单的api方法的内容

说真的,每天写了大量的代码,就算对之前的某个api很熟悉

后面还是会遗忘的,用你当下熟悉的东西去面试别人加嘲笑

https://www.kotlincn.net/docs/reference/coroutines/coroutines-guide.html

https://reactivex.io/documentation/observable.html

https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators

https://github.com/realm/realm-kotlin

主动通知不可用时, 轮询等待银行的支付结果

/**
 * 轮询控制
 * Created by intbird on 16/12/12.
 */
class BankCrawlingWorker {

    static Observable<BankCrawStatusResponse> startWorker(Observable<String> accountIdObservable) {
        return accountIdObservable.concatMap(crawlingWorker());
    }

    private static Func1<String, Observable<BankCrawStatusResponse>> crawlingWorker() {
        return crawlingWorker(BankCrawlingMissionsHelper.getRetryLimitCountNetError()
                , BankCrawlingMissionsHelper.getRepeatLimitCount()
                , BankCrawlingMissionsHelper.getRepeatInterval());
    }

    private static Func1<String, Observable<BankCrawStatusResponse>> crawlingWorker(
            final int retryLimit,
            final int repeatLimit,
            final int repeatDelaySecond) {
        return new Func1<String, Observable<BankCrawStatusResponse>>() {
            int repeatCount = 0;

            @Override
            public Observable<BankCrawStatusResponse> call(final String accountId) {
                return BankCrawlingServer
                        .getTaskStatus(accountId)
                        .repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
                            @Override
                            public Observable<?> call(Observable<? extends Void> observable) {
                                return observable.delay(repeatDelaySecond, TimeUnit.SECONDS);
                            }
                        })
                        .takeUntil(new Func1<BankCrawStatusResponse, Boolean>() {
                            @Override
                            public Boolean call(BankCrawStatusResponse bankCrawStatusResponse) {
                                if (null == bankCrawStatusResponse
                                        || repeatCount >= repeatLimit) {
                                    return true;
                                }
                                BankCrawlingStatus bankCrawlingStatus = bankCrawStatusResponse.getTaskStatus();
                                if (bankCrawlingStatus == BankCrawlingStatus.CLIENT_STATUS_NULL
                                        || bankCrawlingStatus == BankCrawlingStatus.DONE_FAIL
                                        || bankCrawlingStatus == BankCrawlingStatus.UNABLE
                                        || bankCrawlingStatus == BankCrawlingStatus.DONE_TIMEOUT
                                        || bankCrawlingStatus == BankCrawlingStatus.WAIT_CODE) {
                                    return true;
                                } else if (bankCrawlingStatus == BankCrawlingStatus.DONE_SUCC
                                        && bankCrawStatusResponse.getTotalProgress() == 100
                                        && bankCrawStatusResponse.isFinished()) {
                                    return true;
                                }
                                return false;
                            }
                        })
                        .retry(new Func2<Integer, Throwable, Boolean>() {
                            @Override
                            public Boolean call(Integer integer, Throwable throwable) {
                                return integer <= retryLimit;
                            }
                        })
                        .filter(new Func1<BankCrawStatusResponse, Boolean>() {
                            @Override
                            public Boolean call(BankCrawStatusResponse bankCrawStatusResponse) {
                                return null != bankCrawStatusResponse;
                            }
                        })
                        .scan(new Func2<BankCrawStatusResponse, BankCrawStatusResponse, BankCrawStatusResponse>() {
                            @Override
                            public BankCrawStatusResponse call(BankCrawStatusResponse bankCrawStatusResponse, BankCrawStatusResponse bankCrawStatusResponse2) {
                                if (bankCrawStatusResponse.getTimestamp() == bankCrawStatusResponse2.getTimestamp()
                                        && bankCrawStatusResponse.getDescription().equals(bankCrawStatusResponse2.getDescription())
                                        && bankCrawStatusResponse.getTotalProgress() == bankCrawStatusResponse2.getTotalProgress()
                                        && bankCrawStatusResponse.isFinished() == bankCrawStatusResponse2.isFinished()) {
                                    repeatCount++;
                                } else {
                                    repeatCount = 0;
                                }
                                return bankCrawStatusResponse2;
                            }
                        })
                        .doOnNext(new Action1<BankCrawStatusResponse>() {
                            @Override
                            public void call(BankCrawStatusResponse bankCrawStatusResponse) {
                                if (repeatCount >= repeatLimit) {
                                    bankCrawStatusResponse.setTaskStatus(BankCrawlingStatus.CLIENT_REPEAT_TIMEOUT);
                                }
                            }
                        });
            }
        };
    }
}

每次对列表进行 插入广告1,广告2的 数据

列表的每次加载都会 恢复没有广告之前的干净数据

解决: 缓存广告1 和 广告n 的 index, 列表重新变动时加回去

    private var otherPagedListMap = mutableMapOf<String,OtherIndexedData>()
    private var otherPagedIndex =  TreeSet<Int>()
    private class OtherIndexedData(val pageIndex:Int, val indexed: Int,var data: Any)

    private fun loadOtherList(
        lists: MutableList<Any>,
        callback: CommonCallBack<MutableList<Any>>
    ) {
        var pageIndex = lists.size / PAGE_SIZE 
        if (pageIndex <= 1) { pageIndex = 1 ;otherPagedListMap.clear();otherPagedIndex.clear() }

        //如果数据过多,可能会从1 -> 5, 2 .. 4 的广告无法加载
        for (index in 1..pageIndex) {
            if (!otherPagedIndex.contains(index)) {
                loadOtherList(index, PAGE_SIZE, lists, callback)
                otherPagedIndex.add(index)
            }
        }
    }

    private fun loadOtherList(
        itemIndex:Int,
        itemSize:Int,
        lists: MutableList<Any>,
        callback: CommonCallBack<MutableList<Any>>
    ) {

        val banner = getForYouHomeBanner(
                itemIndex,
                itemSize,
            ).onErrorReturn { BaseResponse() }


        // 1. 做法一: 接口有响应即记录缓存 防止异常重复请求
        // 2. 做法二: 请求时就加入缓存 当出错时移除去重索引
        val bannerCacheKey = getCacheKeyType(itemIndex,"bn")
        val banner = 
               if(otherPagedIndex.contains(bannerCacheKey)) 
                 Observable.just(BaseResponse()) 
               else
                 getForYouHomeBanner()
                 .onErrorReturn { BaseResponse() }
                 .doOnNext {t -> if(t.data!=null && !otherPagedIndex.contains(bannerCacheKey)) 
                           otherPagedIndex.add(bannerCacheKey) }


        val pfCard = getHomeMovieCard(
                itemIndex,
                itemSize
            ).onErrorReturn { BaseResponse() }

        val mvCard = getHomeMovieCard(
                itemIndex,
                itemSize
            ).onErrorReturn { BaseResponse() }

        Observable.zip(banner, pfCard, mvCard) 
             { t1, t2 , t3 -> arrayOf<Any?>(t1.data, t2.data, t3.data) }
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(object : Observer<Array<Any?>> {
                override fun onSubscribe(d: Disposable) {
                }

                override fun onError(e: Throwable) {
                    otherPagedIndex.remove(index)
                    callback.onSuccess(lists) // 任何一个出错,直接略过所有, 
                                              // onErrorRetry | onErrorReturn | onErrorResumeNext
                }

                override fun onComplete() {
                }

                override fun onNext(t: Array<Any?>) {
                    t.forEach {
                        it?.run {
                            if (this is MovieModel) {
                                playForm?.run {
                                    val indexedData = OtherIndexedData(if (pageIndex == -1) itemIndex else pageIndex, if (playForm?.isTop == true) 0 else playForm?.curSlot ?: 0,this)
                                    pfVideoList?.let {
                                        addToListIndexedHashMap(indexedData,"pf")
                                    }
                                }

                                movie?.let {
                                    val indexedData =  OtherIndexedData(if (pageIndex == -1) itemIndex else pageIndex, if (movie?.isTop == true) 0 else movie?.curSlot ?: 0,this)
                                    addToListIndexedHashMap(indexedData,"mv")
                                }

                            } else if (this is BannerModel) {
                                url?.let {
                                    val indexedData  = OtherIndexedData(if (pageIndex == -1) itemIndex else pageIndex, if (isTop) 0 else curSlot,this)
                                    addToListIndexedHashMap(indexedData,"bn")
                                }
                            }
                        }
                    }

                    otherPagedListMap.forEach { entry ->
                       // 当前列表顺序不一定和前一个一致, 所以每次结束后动态再插
                       addToListIndexed(entry.value.pageIndex, itemSize, 
                                        entry.value.indexed, entry.value.data, lists)

                       otherPagedIndex.remove(entry.value.pageIndex)
                    }
                  
                    callback.onSuccess(lists)
                }
            })
    }


    private fun addToListIndexed(pageIndex:Int,pageSize:Int, currentIndex: Int,data: Any, list: MutableList<Any>) {
        val start = (pageIndex-1)*pageSize
        val index = start + currentIndex
        when {
            index <= 0 -> { if (!list.contains(data)) list.add(start, data) }
            index > list.size -> {if (!list.contains(data)) list.add(data)}
            else -> {if (!list.contains(data)) list.add(index, data)}
        }
    }

    private fun addToListIndexedHashMap(indexedData: OtherIndexedData,tag:String) {
        val key = "${indexedData.pageIndex}_${indexedData.indexed}_$tag"
        if (!otherPagedListMap.containsKey(key)) {
            otherPagedListMap.put(key,indexedData)
        }
        if (!otherPagedIndex.contains(indexedData.pageIndex)) {
            otherPagedIndex.add(indexedData.pageIndex)
        }
    }

override equals & hashCode