Zipper RX Magic
March 10, 2020As a developer you want to be able to fire off requests without worrying about what order they’re in or when they arrive. All you really care about is when they’re completed. And, with a little Rx magic, that’s when you stumble upon Single.zip
:
/**
* Waits until all SingleSource sources provided by the Iterable sequence signal a success
* value and calls a zipper function with an array of these values to return a result
* to be emitted to downstream.
...
Next, you pass along your list of Single
into Single.zip
like so…
Single.zip(
listOf(source1, source2, source3, ...)
) { results -> /* do something */ }
.subscribeBy(
onError = Timber::e,
onSuccess = {
celebrate()
}
)
Everything appears to be going smoothly. Until you push the app out into the world and begin receiving crash reports.
The exception couldn’t be delivered to the user because it already canceled/disposed the flow. That, or the exception has nowhere to go in the first place.
Why is this? Like any responsible programmer, you added an onError
condition to your subscription. But, instead of seeing it being called, you’re left with a crash that happens after the single has been completed.
You question how this could be happening and begin searching for the answer. Fortunately (or unfortunately), you begin to realize it’s entirely by design..
RxJava 2 tries to avoid losing exceptions which could be important to the developer even if it happens after the natural lifecycle of a flow.
Then you see one of the authors propose a couple of “solutions.”
Override the default handler with
RxJavaPlugins.setOnError()
and suppress what you don’t consider fatal. Alternatively, apply a per sourceonErrorReturn
oronErrorResumeNext
before zipping them together.
Though it would be nice to have a delayError
flag similar to Observable.zip
, you’re out of luck. Hey, we all occasionally forget to add an onErrorReturn
to every one of our Single
variables (although I strongly recommend taking this step).
Moving forward, I’ve been able to protect myself by using safeZip
which automatically wraps all of your Single
s, then returns all various errors along the way in a single error at the end.
sealed class SafeResult<out T> {
class Success<T>(val result: T): SafeResult<T>()
class Failure<T>(val error: Throwable): SafeResult<Nothing>()
}
/**
* Zip [Single] together safely. An onErrorReturn is automatically applied to each source
* to prevent any source from throwing. Then after all sources have completed, any errors
* are then reported
*/
fun <T> zipSafe(sources: List<Single<T>>): Single<List<T>> {
val safeSources = sources.map { source ->
source
.map<SafeResult<T>> { SafeResult.Success(it) }
.onErrorReturn { SafeResult.Failure(it) }
}
return Single.zip(safeSources) { it.filterIsInstance<SafeResult<T>>() }
.flatMap<List<T>> { safeResults ->
val failures = safeResults.filterIsInstance<SafeResult.Failure<T>>()
if (failures.isNotEmpty()) {
Single.error(CompositeException(failures.map { it.error }))
} else {
Single.just(
safeResults.map { (it as SafeResult.Success<T>).result }
)
}
}
}
Except there’s still a problem. If an empty list is passed into Single.zip
you will throw a java.util.NoSuchElementException
exception. Though this will be handled by an onError
in the subscription, if this is part of a larger stream, then the stream will have been completed. To avoid this issue you can make our safe zipper that much safer by returning an empty list when one is provided.
/**
* Zip [Single] together safely. An onErrorReturn is automatically applied to each source
to prevent any source from throwing. Then, after all sources have completed, any errors will then be reported.
*/
fun <T> zipSafe(sources: List<Single<T>>): Single<List<T>> {
if (sources.isEmpty()) {
return Single.just(emptyList())
}
val safeSources = sources.map { source ->
source
.map<SafeResult<T>> { SafeResult.Success(it) }
.onErrorReturn { SafeResult.Failure(it) }
}
return Single.zip(safeSources) { it.filterIsInstance<SafeResult<T>>() }
.flatMap<List<T>> { safeResults ->
val failures = safeResults.filterIsInstance<SafeResult.Failure<T>>()
if (failures.isNotEmpty()) {
Single.error(CompositeException(failures.map { it.error }))
} else {
Single.just(
safeResults.map { (it as SafeResult.Success<T>).result }
)
}
}
}
Success!
Stay in the loop with our latest content!
Select the topics you’re interested to receive our new relevant content in your inbox. Don’t worry, we won’t spam you.

A Tale of Two Imposters
July 8, 2020Imposter Syndrome is a common topic in the tech world, especially in the Software Development world. It’s also a very personal topic, arising to each person for different reasons and impacting each person differently. Here is an exploration of the topic from two points of view: Amanda’s and Scott’s.
Read more
Michigan Software Labs Ranks No. 50 on Inc. Magazine’s List of the Midwest’s Fastest-Growing Private Companies
March 17, 2021March 17, 2021 – Inc. magazine today revealed that Michigan Software Labs is No. 50 on its 2021 Inc. 5000 Regionals: Midwest list, the most prestigious ranking of the fastest-growing private companies in Illinois, Indiana, Iowa, Kansas, Michigan, Minnesota, Missouri, Nebraska, North Dakota, Ohio, South Dakota, and Wisconsin.
Read more
Michigan Software Labs named West Michigan’s 2019 Best and Brightest Companies to Work For®
April 8, 2019Michigan Software Labs, a software development company, has been named as one of West Michigan’s 2019 Best and Brightest Companies to Work For®. The award recognizes companies that display a commitment to excellence in their human resource practices practices with an impressive commitment to their employees.
Read more