The beginning: synchronous

Before asynchronous programming was widespread, or when projects are started in languages that do not have advanced tools like coroutines, the simplest approach is of course to just run everything sequentially:

String myFunction() {
  String result1 = doFirstThing();
  String result2 = doSecondThing();
  if (result1.isEmpty()) {
    result1= doOtherThing();
  }
  return result1 + result2;
}

This works perfectly fine – until, of course, doFirstThing() and doSecondThing() end up taking a long time (e.g. by making a network call), and user latency could be improved by doing both in parallel.

Most importantly, if doFirstThing() and doSecondThing() are outside of our control – say, a shared library – they have now slowed down our code and we are completely unaware: the method signature for a method which blocks for 5 seconds looks exactly the same as one that returns immediately.

The immediate path that’s taken by engineers at this point is to use a callback.

the disaster: callbacks

interface Callback<T> {
  void completed(T value);
}

class Processor {
  String result1;
  String result2;
  Callback<String> callback;
  boolean complete = false;

  Processor(Callback<String> c) {
    callback = c;
  }

  void whenFirstThingCompletes(String value) {
    result1 = value;
    maybeComplete();
  }

  void whenSecondThingCompletes(String value) {
    result2 = value;
    maybeComplete();
  }
  
  synchronized void maybeComplete() {
    if (complete || result1 == null || result2 == null) {
      return;
    }
    String localFirstResult = result1;
    if (localFirstResult.isEmpty()) {
      localFirstResult = doOtherThing();
    }
    callback.completed(localFirstResult + result2);
}

void myFunction(Callback<String> c) {
  Processor p = new Procsesor(c);
  doFirstThing(p::whenFirstThingCompletes);
  doSecondThing(p::whenSecondThingCompletes);
}

This seems like a perfectly good solution… at first. Instead of making a blocking synchronous call, use a callback:

It works – we can now run doFirstThing() and doSecondThing() in parallel, reducing user latency.

We also get the benefit of an updated method signature – it’s now been made clear that doFirstThing() and doSecondThing() are methods which take a long time to complete, and thus they take in a callback.

But, besides the fact that the code now looks horrendous and is a lot more difficult to follow, we’ve also created some serious pitfalls:

error handling

In our synchronous example, errors automatically propagated.

With the move to callbacks, however, error propagation has been lost.

If one of the methods we called threw an exception, for example, or timed out, that method call would fail & report back to our method, which would in turn fail (or handle the failure).

It is straightforward to modify Callback to handle errors:

interface Callback<T> {
  void completed(T value);
  void onError(ErrorCode error);
}

However, this will only work for the errors we think of – if there is an unexpected error which throws an exception or kills the thread, myFunction may wait indefinitely for something that will never complete.

Loss of control

What happens if one of the methods takes too long? Previously, they were all called from the same thread; we could abort the entire call.

We no longer have control over what doFirstThing() is doing – we may not even know what thread it’s working in. Nor can we stop it (without adding additional parameters).

More importantly, if doFirstThing() launches its own asynchronous work, we also don’t have control over that.

complex resource ownership

In languages that require memory management (C++), or if resources are acquired at the beginning of myFunction, it becomes more difficult to manage the resources, as they may become unavailable as the callstack completes.

This is one of the most common causes of use-after-free in C++, as the passed in variables don’t transfer ownership of the memory to Processor (or the memory was stack-allocated, and the stack is popped while the asynchronous methods are still running)

an improvement: futures, promises

By moving from ad-hoc callback methods to a standardized bag of callbacks, we can move towards structured concurrency, and a lot of the issues discussed above begin to fade away.

Frequently called futures or promises (in Java, ListenableFuture is used), these interfaces are an extension of the callback defined above, but standardized such that higher levels of indirection can be built on top of it.

A greatly-simplified high-level view:

interface Listener {
  void onComplete();
}

interface Future<T> {
  T getValue();
  void addListener(Listener listener);
}

Because our callbacks (or listeners) are now added to a holding structure instead of passed around, we can make our methods look more like they did before, returning a result instead of taking in a callback:

Future<String> myFunction() {
  Future<String> result1 = doFirstThing();
  Futures<String> result2 = doSecondThing();
  combineFutures(result1, result2).addListener(() -> {
    String completedResult1 = result1.get();
    
    if (completedResult1.isEmpty()) {
      completedResult1 = doOtherThing();
    }
    return result1 + result2;
  }
}

combineFutures demonstrates how higher levels of indirection can be built on top of these new interfaces. This helper method can be part of a standardized futures library, which takes care of all the complexities of waiting for the 2 futures to complete, efficient lock-free implementations, etc.

This code looks a lot more like our original synchronous code (though not quite), but offers vast advantages over it.

semantinc meaning from syntax

As discussed in our synchronous code example, it is common for a library in a transitive dependency to make implementation changes which cause it to go from a fast immediate-return method to one which has to communicate over the network or dis. An example would be if it needs to load user configuration.

By changing our methods to return a Future<String> instead of a String, our syntax clearly communicates the semantic meaning: this method takes time to complete, and the caller can react accordingly (such as doing multiple things concurrently)

structured concurrency

When we use a method like combineFutures(), it is able to link together the pending work

This allows for failures to propagate up to the root (the blue arrows) automatically, such as propagating an unexpected exception.

It also allows the root to abort work in its transitive dependencies by calling cancel(), which would immediately abort any ongoing or future work.

As the software evolves, if additional work is added behind doFirstThing() or doSecondThing(), those nodes are also linked into the structure, propagating failures & cancellations.

This allows for things such as timeout expirations to be cenralized: instead of having each level in the call chain handling timeouts, the timeout can be controlled centrally by the root (for example, putting an upper bound on a reasonable amount of time for the user to wait). When the timeout expires, cancel() is called & all pending work is automatically aborted & resources freed.

addendum: reactive streams

Astute readers will have noted: with callbacks, it is possible to report multiple values to the caller, whereas with futures, we’re limited to only reporting 1 value.

Fortunately, reactive streams expand the paradigm to support multiple values, while maintaining the benefits of meaning through syntax & structured concurrency, and the ability to combine streams and transform streams, bringing structured concurrency to multi-value producers.

A multitude of reactive stream libraries also provide additional tools to convert to single values, such as getting the first non-exceptional value, or collecting all values into a collection.

the finale: coroutines, flow

Though futures significantly improve readability & solve a multitude of other problems with callbacks, they leave the developer having to write a lot of boilerplate code at the barriers of asynchronous work.

Couldn’t a compiler rework our code such that at every point where we need to process asynchronous work, it encapsulates the code following to be inside a Listener?

That is precisely what coroutines do:

suspend fun myFunction() = coroutineScope {
  val result1Async = async { doFirstThing() };
  val result2 = async { doSecondThing() };
  
  var result1 = result1Async.await(); 
  if (result1.isEmpty()) {
    result1 = doOtherThing();
  }
  return result1 + result2.await();
}

suspend fun doFirstThing(): String { ... }
suspend fun doSecondThing(): String { ... }

The code now looks almost exactly like our original, sequential, synchronous code, with one key difference:

Our methods are marked with suspend, giving semantic meaning through syntax: “this method may suspend and will take time to complete”.

These methods can only be called from other suspend methods, or from within a coroutine block (like async), allowing behavior to change from immediate to long-running while communicating to the caller that the behavior changed.

Lastly, much like futures, the suspending work does not block the thread – instead, the remaining work is queued up in a lightweight structure, much like our Listener.

Flows, which are reactive streams, allow for a similar expansion to multi-valued operations, but much more simply as they build on suspension to “pause” the producer until the consumer is ready:

fun multiValueExample() = flow {
  emit(doFirstThing());
  emit(doSecondThing());
}

Here, the code is suspended on the first call to emit(), until the consumer of the flow does something with the emitted element. Once the value is consumed, then the code is resumed to call doSecondThing(), emitting the next value.

Coroutines are available in Kotlin, as well as Rust, and C++.

If you’re thinking of introducing callbacks in your codebase, or are already using them, you should strongly consider using coroutines instead.


Leave a Reply

Your email address will not be published. Required fields are marked *