The problem

public async Task<IEnumerable<User>> GetUsers()
{
var allResults = new List<User>();
var nextUrl = "https://account.zendesk.com/api/v2/users.json";
while (nextUrl != null)
{
var page = await _client.GetAsync(nextUrl)
.Content.ReadAsAsync<UsersListResponse>();
allResults.AddRange(page.Users);
nextUrl = page.NextPage;
// eg "https://account.zendesk.com/api/v2/users.json?page=2"
}
return allResults;
}

Take a look at the above code, you may have run into this familiar issue yourself. We'd like to represent the pageable results of this HTTP API in our types, while still be asynchronous.

Traditionally there would be 4 ways to approach this;

  1. Block on the async code with .Result/.GetAwaiter().GetResult() or .Wait(). This is not a good idea.
  2. The above approach of awaiting each asynchronous task, but doing it all eagerly and returning the complete results in a materialised collection. This defeats the purpose of this paging API, and more generally we lose the laziness of the problem we are trying to model.
  3. We flip the return type to IEnumerable<Task<User>>. This would require that we trust any consumers of this code to await the result of each task after every enumeration. There are ways to enforce this at runtime, and throw an exception if it's not consumed correctly, however this ultimately is a misleading type, and the shape of the type doesn't communicate it's hidden constraints.
  4. We don't try returning a single type such as Task<IEnumerable<T>> and we model it ourselves. This can be a good idea, but we lose the benefits of having a familiar type to work with.

Well it's about time we adopted a new type and end this madness. That's what IAsyncEnumerable<T> is for.

About Ix.Async

Currently IAsyncEnumerable<T> is a concept which exists in a few places, with no singular definition. The version I will be using today lives in the Reactive Extensions repo, in a fork that is based off of the latest C# 8.0 proposal.

Reactive Extensions (Rx) is the home of Observable implementation and extensions, it is also home to a sibling project named Interactive Extensions (Ix for short). Rx has lots of extensions and tools for composing pushed based sequences, and Ix is very similar but for pull based sequences (IEnumerable<T>). The part I am interested in for this post is the async part, which I'll be referring to as Ix.Async, this is shipped in it's own nuget package, and I will generally be referring to the IAsyncEnumerable<T> definition that lives here (although this will map trivially to other implementations).

In the near future, C# 8.0 will introduce Async Streams (I prefer the term Sequence, as Stream is already a different .NET concept) as a language feature, and there will be a new definition of IAsyncEnumerable<T> it will work with, but that doesn't stop us using Ix.Async today, either using the current definition which slightly differs from the C# 8.0 proposal, or building the fork with the latest definition in.

Definition

public interface IAsyncEnumerable<out T>
{
IAsyncEnumerator<T> GetAsyncEnumerator();
}

public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
T Current { get; }
ValueTask<bool> MoveNextAsync();
}

public interface IAsyncDisposable
{
ValueTask DisposeAsync();
}

This is the definition of IAsyncEnumerable<T> from the C# 8.0 proposal, it should look very familiar, it is just IEnumerable<T> with an async MoveNext method, as you might expect.
async-enum12
We can now see the relationship with IObservable<T> and IEnumerable<T>.

Being in this familiar family means that we don't have to learn new concepts to start consuming and composing operations over this type.

IAsyncEnumerable<Bar> ConvertGoodFoosToBars(IAsyncEnumerable<Foo> items)
{
return items
.Where(foo => foo.IsGood)
.Select(foo => Bar.FromFoo(foo));
}

These extension methods are immediately understandable to us and are ubiquitous in C# already.

Producing sequences

All of this would be pretty academic if we couldn't generate sequences to be consumed. Today there are a few options.

1. Implement the IAsyncEnumerable<T> and IAsyncEnumerator<T> interfaces directly

You can do this, and for performance critical code, this might be the most suitable approach.

It does require a fair bit of boilerplate code however, so here is a starting point:

// A starting point for your own IAsyncEnumerable extensions
public static class AsyncEnumerableExtensions
{
public static IAsyncEnumerable<T> MyExtensionMethod<T>(this IAsyncEnumerable<T> source)
{
return new MyAsyncEnumerable<T>(source);
}
public struct MyAsyncEnumerable<T> : IAsyncEnumerable<T>
{
readonly IAsyncEnumerable<T> enumerable;

internal MyAsyncEnumerable(IAsyncEnumerable<T> enumerable)
{
this.enumerable = enumerable;
}

public IAsyncEnumerator<T> GetAsyncEnumerator()
{
return new MyAsyncEnumerator(enumerable.GetAsyncEnumerator());
}
public struct MyAsyncEnumerator : IAsyncEnumerator<T>
{
readonly IAsyncEnumerator<T> enumerator;

internal MyAsyncEnumerator(IAsyncEnumerator<T> enumerator)
{
this.enumerator = enumerator;
}

public ValueTask DisposeAsync()
{
return enumerator.DisposeAsync();
}

public T Current => enumerator.Current;

public ValueTask<bool> MoveNextAsync()
{
return enumerator.MoveNextAsync();
}
}
}
}

2. Use the static helper methods in Ix.NET

IAsyncEnumerable<int> GenerateWithIx()
{
return AsyncEnumerable.CreateEnumerable(
() =>
{
var current = 0;

async Task<bool> f(CancellationToken ct)
{
await Task. Delay(TimeSpan.FromSeconds(0.5));
current++;
return true;
}

return AsyncEnumerable.CreateEnumerator(
moveNext: f,
current: () => current,
dispose: () => { }
);
});
}

3. Use CXuesong.AsyncEnumerableExtensions

I wanted to build something like this myself, and then I found this library, so I don't need to! Credit to Chen, this is a great library.

// using CXuesong.AsyncEnumerableExtensions
async Task Generator(IAsyncEnumerableSink<int> sink)
{
var i = 1;
while (true)
{
await Task.Delay(TimeSpan.FromSeconds(0.5));
await sink.YieldAndWait(i++);
}
}

AsyncEnumerableFactory.FromAsyncGenerator<int>(Generator)

This library offers a very nice and simple way to express sequences. You build an async function that takes a IAsyncEnumberableSink<T> (defined by the library), and returns a Task. Now you can do your awaits, but when you want to yield an item to the sequence, you call sink.YieldAndWait(value) where sink is that parameter.

4. Coming soon to a C# 8.0 near you

Today you cannot use the async keyword and iterator methods together, so having an async iterator method would require a new language feature. Well good news, it's in the works, take a sneak peak here.

Here is a snippet showing what it could look like.

static async IAsyncEnumerable<int> Mylterator()
{
try
{
for (int i = 0; i < 100; i++)
{
await Task.Delay(1000);
yield return i;
}
}
finally
{
await Task.Delay(200);
Console.WriteLine("finally");
}
}

Consuming sequencing

We can produce sequences, but that won't be much use to us if we cannot consume them.

1. ForEachAsync

Just like the .ForEach(...) extension method on List<T>, we have .ForEachAsync(...) from Ix.Async, this lets us do work on each item, and gives us a Task to await to drive the whole chain of pull based work.

await seq.ForEachAsync(x => Console.WriteLine(x));

Unfortunately, dogmatism fails here, ForEachAsync is suffixed with Async because it returns a Task and operates asynchronously, however the delegate it takes is synchronous, this led me to build a method that can take an async delegate and name it ForEachAsyncButActuallyAsync. :facepalm:

await seq.ForEachAsyncButActuallyAsync(x => Console.WriteLine(x));

2. C# 8.0 foreach

Again, we have language support on the way. Here's what it would look like:

var asyncSequence = GetMyAsyncSequence(cancellationToken: ct);
foreach await (var item in asyncSequence)
{
...
}

Design Decisions

One of the problems that has meant that we've had to wait so long a first class IAsyncEnumberable<T> and language features is because there are many design decisions that need answering, for example;

  • Does IAsyncEnumerator<T> implement IDisposable or a new async version (IAsyncDisposable)? Update IAsyncDisposable it is!
  • If there is going to be an IAsyncDisposable, should the language support the using syntax for it?
  • Does the CancellationToken get passed into MoveNext each move or GetEnumerator once? Update CancellationTokens are not going to be handled by syntax, so you should flow it into the IAsyncEnumerable<T> types yourself.
  • Should it be MoveNext, or MoveNextAsync? Update MoveNextAsync wins!
  • Should MoveNextAsync return a Task<bool> or a ValueTask<bool>? Update ValueTask<bool> has it!
  • In the foreach syntax, where does the await modifier go? Outside the brackets? (Yes, of course, what sort of monster do you take me for?)
  • In the foreach syntax, how do you do the equivalent of .ConfigureAwait(false)? Update like this.
  • Will the foreach syntax look for the type, or the pattern? await doesn't just apply to Task for example.

and that's just what comes immediately to mind, the more you think, the more you uncover.

Who is using it today?

There are a couple of large projects using this today:

  • Entity Framework Core - Currently using an internal definition, but there is talk of plans to use whatever comes in C# 8.
  • Google Cloud Platform Libraries - This one was a bit of a surprise to me. If you install any Google Cloud package, it will reference their Core package, which uses and references Ix.Async. One of the members of the team that builds this is (the) Jon Skeet, so that's quite an endorsement!

Stay tuned, there is more to come on this topic.