Development

Zipper RX Magic

March 10, 2020

As a devel­op­er you want to be able to fire off requests with­out wor­ry­ing about what order they’re in or when they arrive. All you real­ly care about is when they’re com­plet­ed. And, with a lit­tle Rx mag­ic, that’s when you stum­ble upon Single.zip:

/**
 * Waits until all SingleSource sources provided by the Iterable sequence signal a success
 * value and calls a zipper function with an array of these values to return a result
 * to be emitted to downstream.
...

Next, you pass along your list of Single into Single.zip like so…

Single.zip(
  listOf(source1, source2, source3, ...)
) { results -> /* do something */ }
  .subscribeBy(
    onError = Timber::e,
    onSuccess = {
      celebrate()
    }
  )

Every­thing appears to be going smooth­ly. Until you push the app out into the world and begin receiv­ing crash reports.

The excep­tion couldn’t be deliv­ered to the user because it already canceled/​disposed the flow. That, or the excep­tion has nowhere to go in the first place.

Why is this? Like any respon­si­ble pro­gram­mer, you added an onError con­di­tion to your sub­scrip­tion. But, instead of see­ing it being called, you’re left with a crash that hap­pens after the sin­gle has been completed.

You ques­tion how this could be hap­pen­ing and begin search­ing for the answer. For­tu­nate­ly (or unfor­tu­nate­ly), you begin to real­ize it’s entire­ly by design..

RxJa­va 2 tries to avoid los­ing excep­tions which could be impor­tant to the devel­op­er even if it hap­pens after the nat­ur­al life­cy­cle of a flow.

Then you see one of the authors pro­pose a cou­ple of solu­tions.”

Over­ride the default han­dler with RxJavaPlugins.setOnError() and sup­press what you don’t con­sid­er fatal. Alter­na­tive­ly, apply a per source onErrorReturn or onErrorResumeNext before zip­ping them together.

Though it would be nice to have a delayError flag sim­i­lar to Observable.zip, you’re out of luck. Hey, we all occa­sion­al­ly for­get to add an onErrorReturn to every one of our Single vari­ables (although I strong­ly rec­om­mend tak­ing this step).

Mov­ing for­ward, I’ve been able to pro­tect myself by using safeZip which auto­mat­i­cal­ly wraps all of your Singles, then returns all var­i­ous errors along the way in a sin­gle error at the end.

sealed class SafeResult<out T> {
  class Success<T>(val result: T): SafeResult<T>()
  class Failure<T>(val error: Throwable): SafeResult<Nothing>()
}
 
/**
 * Zip [Single] together safely. An onErrorReturn is automatically applied to each source
 * to prevent any source from throwing. Then after all sources have completed, any errors
 * are then reported
 */
fun <T> zipSafe(sources: List<Single<T>>): Single<List<T>> {
  val safeSources = sources.map { source ->
    source
      .map<SafeResult<T>> { SafeResult.Success(it) }
      .onErrorReturn { SafeResult.Failure(it) }
  }
 
  return Single.zip(safeSources) { it.filterIsInstance<SafeResult<T>>() }
    .flatMap<List<T>> { safeResults ->
      val failures = safeResults.filterIsInstance<SafeResult.Failure<T>>()
      if (failures.isNotEmpty()) {
        Single.error(CompositeException(failures.map { it.error }))
      } else {
        Single.just(
          safeResults.map { (it as SafeResult.Success<T>).result }
        )
      }
    }
}

Except there’s still a prob­lem. If an emp­ty list is passed into Single.zip you will throw a java.util.NoSuchElementException excep­tion. Though this will be han­dled by an onError in the sub­scrip­tion, if this is part of a larg­er stream, then the stream will have been com­plet­ed. To avoid this issue you can make our safe zip­per that much safer by return­ing an emp­ty list when one is provided.

/**
 * Zip [Single] together safely. An onErrorReturn is automatically applied to each source
to prevent any source from throwing. Then, after all sources have completed, any errors will then be reported.
 
 */
fun <T> zipSafe(sources: List<Single<T>>): Single<List<T>> {
  if (sources.isEmpty()) {
    return Single.just(emptyList())
  }
 
  val safeSources = sources.map { source ->
    source
      .map<SafeResult<T>> { SafeResult.Success(it) }
      .onErrorReturn { SafeResult.Failure(it) }
  }
 
  return Single.zip(safeSources) { it.filterIsInstance<SafeResult<T>>() }
    .flatMap<List<T>> { safeResults ->
      val failures = safeResults.filterIsInstance<SafeResult.Failure<T>>()
      if (failures.isNotEmpty()) {
        Single.error(CompositeException(failures.map { it.error }))
      } else {
        Single.just(
          safeResults.map { (it as SafeResult.Success<T>).result }
        )
      }
    }
}

Suc­cess!

Scott Schmitz
Scott Schmitz
Software Developer

Looking for more like this?

Sign up for our monthly newsletter to receive helpful articles, case studies, and stories from our team.

2022 Best and Brightest Winners in West Michigan
Team

2022 Best and Brightest Winners in West Michigan

May 2, 2022

The Best and Brightest Companies to Work For® competition identifies and honors organizations that display a commitment to excellence in operations and employee enrichment that lead to increased productivity and financial performance!

Read more
Kotlin Multiplatform
Android Development iOS

Kotlin Multiplatform

July 14, 2022

A brief look at Kotlin Multiplatform Mobile, a newer cross-platform mobile application framework.

Read more
Why I use NextJS
Development Web

Why I use NextJS

December 21, 2022

Is NextJS right for your next project? In this post, David discusses three core functionalities that NextJS excels at, so that you can make a well-informed decision on your project’s major framework.

Read more
View more articles