Development

Zipper RX Magic

March 10, 2020

As a devel­op­er you want to be able to fire off requests with­out wor­ry­ing about what order they’re in or when they arrive. All you real­ly care about is when they’re com­plet­ed. And, with a lit­tle Rx mag­ic, that’s when you stum­ble upon Single.zip:

/**
 * Waits until all SingleSource sources provided by the Iterable sequence signal a success
 * value and calls a zipper function with an array of these values to return a result
 * to be emitted to downstream.
...

Next, you pass along your list of Single into Single.zip like so…

Single.zip(
  listOf(source1, source2, source3, ...)
) { results -> /* do something */ }
  .subscribeBy(
    onError = Timber::e,
    onSuccess = {
      celebrate()
    }
  )

Every­thing appears to be going smooth­ly. Until you push the app out into the world and begin receiv­ing crash reports.

The excep­tion couldn’t be deliv­ered to the user because it already canceled/​disposed the flow. That, or the excep­tion has nowhere to go in the first place.

Why is this? Like any respon­si­ble pro­gram­mer, you added an onError con­di­tion to your sub­scrip­tion. But, instead of see­ing it being called, you’re left with a crash that hap­pens after the sin­gle has been completed.

You ques­tion how this could be hap­pen­ing and begin search­ing for the answer. For­tu­nate­ly (or unfor­tu­nate­ly), you begin to real­ize it’s entire­ly by design..

RxJa­va 2 tries to avoid los­ing excep­tions which could be impor­tant to the devel­op­er even if it hap­pens after the nat­ur­al life­cy­cle of a flow.

Then you see one of the authors pro­pose a cou­ple of solu­tions.”

Over­ride the default han­dler with RxJavaPlugins.setOnError() and sup­press what you don’t con­sid­er fatal. Alter­na­tive­ly, apply a per source onErrorReturn or onErrorResumeNext before zip­ping them together.

Though it would be nice to have a delayError flag sim­i­lar to Observable.zip, you’re out of luck. Hey, we all occa­sion­al­ly for­get to add an onErrorReturn to every one of our Single vari­ables (although I strong­ly rec­om­mend tak­ing this step).

Mov­ing for­ward, I’ve been able to pro­tect myself by using safeZip which auto­mat­i­cal­ly wraps all of your Singles, then returns all var­i­ous errors along the way in a sin­gle error at the end.

sealed class SafeResult<out T> {
  class Success<T>(val result: T): SafeResult<T>()
  class Failure<T>(val error: Throwable): SafeResult<Nothing>()
}
 
/**
 * Zip [Single] together safely. An onErrorReturn is automatically applied to each source
 * to prevent any source from throwing. Then after all sources have completed, any errors
 * are then reported
 */
fun <T> zipSafe(sources: List<Single<T>>): Single<List<T>> {
  val safeSources = sources.map { source ->
    source
      .map<SafeResult<T>> { SafeResult.Success(it) }
      .onErrorReturn { SafeResult.Failure(it) }
  }
 
  return Single.zip(safeSources) { it.filterIsInstance<SafeResult<T>>() }
    .flatMap<List<T>> { safeResults ->
      val failures = safeResults.filterIsInstance<SafeResult.Failure<T>>()
      if (failures.isNotEmpty()) {
        Single.error(CompositeException(failures.map { it.error }))
      } else {
        Single.just(
          safeResults.map { (it as SafeResult.Success<T>).result }
        )
      }
    }
}

Except there’s still a prob­lem. If an emp­ty list is passed into Single.zip you will throw a java.util.NoSuchElementException excep­tion. Though this will be han­dled by an onError in the sub­scrip­tion, if this is part of a larg­er stream, then the stream will have been com­plet­ed. To avoid this issue you can make our safe zip­per that much safer by return­ing an emp­ty list when one is provided.

/**
 * Zip [Single] together safely. An onErrorReturn is automatically applied to each source
to prevent any source from throwing. Then, after all sources have completed, any errors will then be reported.
 
 */
fun <T> zipSafe(sources: List<Single<T>>): Single<List<T>> {
  if (sources.isEmpty()) {
    return Single.just(emptyList())
  }
 
  val safeSources = sources.map { source ->
    source
      .map<SafeResult<T>> { SafeResult.Success(it) }
      .onErrorReturn { SafeResult.Failure(it) }
  }
 
  return Single.zip(safeSources) { it.filterIsInstance<SafeResult<T>>() }
    .flatMap<List<T>> { safeResults ->
      val failures = safeResults.filterIsInstance<SafeResult.Failure<T>>()
      if (failures.isNotEmpty()) {
        Single.error(CompositeException(failures.map { it.error }))
      } else {
        Single.just(
          safeResults.map { (it as SafeResult.Success<T>).result }
        )
      }
    }
}

Suc­cess!

Scott Schmitz
Scott Schmitz
Staff Engineer

Looking for more like this?

Sign up for our monthly newsletter to receive helpful articles, case studies, and stories from our team.

UX Writing Tips
Design Process

UX Writing Tips

February 3, 2023

Kai shares a few tips he's collected on how to write for user interfaces.

Read more
Between the brackets: MichiganLabs’ approach to software development
Development Team

Between the brackets: MichiganLabs’ approach to software development

February 12, 2024

Read more
What the smartest executive leaders are doing right now
Business

What the smartest executive leaders are doing right now

July 25, 2024

We are consistently seeing three things right now in terms of how executives are tackling the technological challenges they are facing.

Read more
View more articles