Inside F#

Brian's thoughts on F# and .NET

“An Introduction to Async Workflows in F#”, or “how to utilize all those CPUs without writing lots of threading code”, part three

Posted by Brian on May 7, 2008

Last time I showed you how to use the F# Control library to easily parallelize a primes-computing program.  Today I’ll show you how to use the same library in C# to achieve the same end.  I’ll also talk more about one of the real strong suits of the library – making async programming more compositional – and this leads to all kinds of interesting discussions for C#, involving LINQ, as well as iterators and "yield".  Hold on tight – it’s a wild ride!

Easy async from C#

As we saw last time, the F# solution involved calling library functions like Async.Parallel, but it also involved a tiny async workflow that looked like this:

    async { return (x, IsPrime x) }

That is, this part of the F# solution leveraged the F# computation expression syntax.  We don’t have this syntax available in C#, so what are we to do?

As I hinted obliquely last time, C# has its own syntax sugar that will fit the bill: LINQ.  The key piece of the original C# code that we need to parallelize can be written this way:

    var primeInfo = nums.Select((x) => new KeyValuePair<int, bool>(x, IsPrime(x))).ToArray();

That’s the bit that takes more than 12 seconds to run on my box.  We can parallelize just as in F#, using the F# library + LINQ thusly:

    var computations = nums.Select((x) => 
        from dummy in AsyncExtensions.StartWorkflow // enter async monad & protect rest of code in a lambda 
        select new KeyValuePair<int, bool>(x, IsPrime(x)));
    var primeInfo = AsyncExtensions.Run(AsyncExtensions.Parallel(computations));

I’ll explain it in a minute, but the first thing to note is – we got the same big win as in F#.  If you look back at the original C# code from part one, you’ll see it was about 10 lines of mess involving ThreadPool.QueueUserWorkItem, ManualResetEvent, and Interlocked.Decrement.  The code above is much simpler – we just create a sequence of all the computations we want to do, parallelize them, and run.

AsyncExtensions is a small wrapper class I authored that just wraps up the F# Control library in a thin facade that makes it more C#-friendly.  You’ll get a chance to see the implementation shortly.  The Run() and Parallel() methods just forward calls to the corresponding Async calls in the F# Control library.

The interesting/unexpected part is the LINQ query.  Though you typically think of LINQ as just syntactic sugar for authoring queries over IEnumerables, the way LINQ is defined in the C# language specification is far more general, and enables the LINQ syntax (things like "from" and "select") to be used in an arbitrary monad.  (I demonstrated this a long while back, when I showed how to use LINQ in the definition of monadic parser combinators in C#.)  I’m continuing to defer the full discussion about monads (what they are, why they matter, what LINQ has to do with it) because at this point it would still be a needless distraction.  All you need to do right now is take it on faith that this C# code:

    from dummy in AsyncExtensions.StartWorkflow
    rest_of_linq_query

means the same thing as this F# code:

    async { body_of_computation }

and that this C# inside a LINQ query:

    select expr

means the same as this F# inside a computation expression:

    return expr

and you’re good to go.  Actually, you don’t have to just take it on faith – you can try the code yourself.  Here’s the full C# code.  Just throw it in a C# project, reference FSharp.Core.dll, and try it out.  But then keep reading – there’s more blog after this code:

//#define SYNC 
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.FSharp.Core;
using Microsoft.FSharp.Control;
using System.Diagnostics;

class Program
{
    static Stopwatch stopWatch = new Stopwatch();
    static void ResetStopWatch() { stopWatch.Reset(); stopWatch.Start(); }
    static void ShowTime() { Console.WriteLine("took {0} ms", stopWatch.ElapsedMilliseconds); }

    static bool IsPrime(int x)
    {
        for (int i = 2; i < x; ++i)
        {
            if (x % i == 0)
                return false;
        }
        return true;
    }
    public static void Main()
    {
        var nums = new int[4001];
        for (int i = 0; i < nums.Length; ++i)
            nums[i] = 10000000 + i;
        ResetStopWatch();
#if SYNC 
        Console.WriteLine("Computing primes sequentially..."); 
        var primeInfo = nums.Select((x) => new KeyValuePair<int, bool>(x, IsPrime(x))).ToArray(); 
#else
        Console.WriteLine("Computing primes in parallel...");
        var computations = nums.Select((x) =>
            from dummy in AsyncExtensions.StartWorkflow
            select new KeyValuePair<int, bool>(x, IsPrime(x)));
        var primeInfo = AsyncExtensions.Run(AsyncExtensions.Parallel(computations));
#endif
        ShowTime();
        var primes = from x in primeInfo
                     where x.Value
                     select x.Key;
        foreach (var x in primes)
            Console.Write("{0},", x);
        Console.WriteLine();
        ShowTime();
        Console.WriteLine("press a key");
        Console.ReadKey();
    }
}

// boilerplate code to wrap F# library in a nice C# facade 
static class AsyncExtensions
{
    static Microsoft.FSharp.Control.FSharpAsyncBuilder async = Microsoft.FSharp.Core.ExtraTopLevelOperators.DefaultAsyncBuilder;

    // easily massage "Func" types into F# function types 
    public static FSharpFunc<A, B> ToFastFunc<A, B>(this Func<A, B> f)
    {
        return FuncConvert.ToFSharpFunc(new Converter<A, B>(f));
    }
    // LINQ syntax sugars 
    public static FSharpAsync<B> Select<A, B>(this FSharpAsync<A> x, Func<A, B> selector)
    {
        return async.Bind(x,
            ToFastFunc<A, FSharpAsync<B>>((r) => async.Return(selector(r))));
    }
    public static FSharpAsync<V> SelectMany<T, U, V>(
        this FSharpAsync<T> p, Func<T, FSharpAsync<U>> selector, Func<T, U, V> projector)
    {
        return async.Bind(p, FuncConvert.ToFSharpFunc<T, FSharpAsync<V>>(new System.Converter<T, FSharpAsync<V>>(r1 =>
            async.Bind(selector(r1), FuncConvert.ToFSharpFunc<U, FSharpAsync<V>>(r2 =>
                async.Return(projector(r1, r2)))))));
    }
    // Wrap F# Control library functions in simpler facade 
    public static FSharpAsync<R[]> Parallel<R>(IEnumerable<FSharpAsync<R>> computations)
    {
        return Microsoft.FSharp.Control.FSharpAsync.Parallel<R>(computations);
    }
    public static R Run<R>(FSharpAsync<R> computation)
    {
        return Microsoft.FSharp.Control.FSharpAsync.RunSynchronously(computation,
            FSharpOption<int>.None, FSharpOption<System.Threading.CancellationToken>.None);
    }
    // convenience object to get in the Async monad 
    public static FSharpAsync<int> StartWorkflow = async.Return(0);
}

So there’s some fun C# code you can run with today.

Compositional async code

We’ve seen how the F# Control library can be used to parallelize computations in both F# and C#.  But parallelization is just one aspect of the F# Control library.  An even more enticing value proposition is the ability to write async code that composes easily.  Let’s consider another example, from a networking domain, using WCF.

Suppose I have a web service that exposes an endpoint that can be used to compute squares of integers – for example, you pass 5 to the web service, and it does some extremely difficult computations and finally returns you the answer 25.  (As is often the case, my blog samples are very contrived, in order to stay simple.)  Given a particular object that represents the connection to this service, we might write some C# code like this:

    static int SumSquares(IMyClientContract client)
    {
        ((IClientChannel)client).Open();
        var sq1 = client.Square(3);
        var sq2 = client.Square(4);
        ((IClientChannel)client).Close();
        return sq1 + sq2;
    }

as a sample of what we can do with a "client" connection to the web service.

Now, each of the operations on the client (Open(), Square(), and Close()) can potentially involve a call out over the network.  In the code above, all the code executes synchronously, which means this function holds a CLR thread for the duration of the whole call to SumSquares().  Sometimes this is fine, but often (especially when writing servers or middle-tier components) we need to be more frugal when it comes to resources like threads, and ensure that we are only holding threads when necessary – not while we are blocked, waiting for some network call to return.  To this end, the .NET framework provides the Begin/End pattern and IAsyncResult type.  So for example, in addition to methods like Open(), frameworks also provide the corresponding async versions via BeginOpen() and EndOpen().  This API pattern makes it possible to get the job done – you can make a "Begin" call with a callback object, release the current thread, and then when the operation eventually finishes, your callback gets invoked with the result so you can pick up where you left off (probably with code now running on a different thread). 

While the Begin/End pattern is somewhat workable for a single call, it fails miserably for composing a series of async calls.  Suppose we want to author BeginSumSquares() and EndSumSquares(), with the implementation making calls to the Begin/End versions of Open, Square, and Close.  This is a perfectly reasonable thing to want to do – in fact, if you work on framework code in this kind of domain, this may be something you need to do all the time.  Ideally it would be straightforward – SumSquares is a five-line method, and we just want to do the same thing, only async.  In practice, it’s a nightmare.  I won’t even attempt to write the code for Begin/End-SumSquares here, because I know I will get it wrong.  The Begin/End pattern forces you into a hideous mess of spaghetti code where each async call necessitates a new callback method and a new IAsyncResult object, and this simple SumSquares example will probably take on the order of 100 lines of code to implement async.  If you’ve never had to write this kind of code before, thank your lucky stars.

Again, the problem is that the Begin/End pattern does not compose.  If I have two synchronous methods I want to call in series, one after another, I just write "Method1(); Method2();".  But with two async methods, I have to write tons of code just to correctly string two calls together in series.  What we need is a pattern for writing async code that makes composing a series of async calls as easy as composing a series of sync calls.

In F# this is easy to do.  The F# synchronous code looks like this:

    let SumSquares (client : IMyClientContract) =
        (box client :?> IClientChannel).Open()
        let sq1 = client.Square(3)
        let sq2 = client.Square(4)
        (box client :?> IClientChannel).Close()
        sq1 + sq2

(The "box" and ":?>" bits are just how the cast to type IClientChannel is performed in F#.)  To make it async, rather than use the Begin/End pattern, we can use the F# Async pattern (which is easy to build on top of the Begin/End methods, using the Async.FromBeginEnd function in the library).  Then we can use F# async workflows like this:

    let SumSquaresAsync (client : IMyClientContract) =
        async { do! (box client :?> IClientChannel).OpenAsync()
                let! sq1 = client.SquareAsync(3)
                let! sq2 = client.SquareAsync(4)
                do! (box client :?> IClientChannel).CloseAsync() 
                return sq1 + sq2 }

The "do!" keyword in an F# async workflow lets us call an async method when we don’t care about the result, and the "let!" keyword calls an async method and binds the result to a new variable name.  As a result, our original five-line synchronous method is still just five lines when we rewrite it to run asynchronously – the difference is that we write the code inside an async workflow.

It turns out that we can do the same thing for C#.  This sync C# code:

    static int SumSquares(IMyClientContract client) 
    { 
        ((IClientChannel)client).Open(); 
        var sq1 = client.Square(3); 
        var sq2 = client.Square(4); 
        ((IClientChannel)client).Close(); 
        return sq1 + sq2; 
    }

can be rewritten as this async code:

    static Async<int> SumSquaresAsync(IMyClientContract client)
    {
        return from _0  in AsyncExtensions.StartWorkflow
               from _1  in ((IClientChannel)client).OpenAsync()
               from sq1 in client.SquareAsync(3)
               from sq2 in client.SquareAsync(4)
               from _2  in ((IClientChannel)client).CloseAsync()
               select sq1 + sq2;
    }

using the F# Control library from C#.  Again, I am using LINQ.  Once we enter the async monad (the first line with "StartWorkflow"), each subsequent "from" in C# is like a "let!" in F#.  There is no LINQ syntax that corresponds to "do!", so we have to use "from" and bind the meaningless results to dummy variable names (I chose "_0", "_1", and "_2" as the names for my "don’t care" variables here).  The code looks a little weird, since we are "abusing" LINQ in order to achieve our goal.  But I am a pragmatic, and if I can save myself from having to write 100 lines of nightmare Begin/End/IAsyncResult/callback code in order to achieve composable async C# code, then I am willing to pay the price of having to use some awkward syntax (the "from", "select", and dummy variables).  The code looks a little weird, but it still feels like a big win.

Shortcomings of the LINQ approach to async in C#, and an alternative approach

The async C# code I just showed is pretty nifty, but there are more problems with the "LINQ approach to async" other than just the awkward syntax.  The example I chose (SumSquaresAsync) was very simple – a method with straight-line code that made five async method calls.  What if we want to start with some more complicated synchronous code, that involves if-then-else, while loops, try-catch blocks, or arbitrary other C# constructs?  These constructs do not have an obvious/straightforward mapping into LINQ when we try to create the corresponding async code.  As a result, the async version of code using such constructs will probably have to look different from (and be more complicated than) the corresponding sync code.  That’s unfortunate, as it erodes the main benefit we were out to achieve in the first place (async code that’s as simple as the original sync code).

There is an alternative to the LINQ approach.  Another of the C# language’s "syntax sugars" fits the bill: "yield".  Iterator blocks that use the "yield" statement create a way to write C# code that will ‘exit and return later where we left off’, which is just the type of thing you need for writing async code.  As a result, it’s possible to build an async library atop the iterator metaphor, rather than atop an async monad (utilizing LINQ).  This is the approach taken, for example, by CCR in Robotics Studio, which you can read a little about here.  I haven’t yet studied this approach in depth, but it seems very interesting, as it doesn’t suffer the LINQ drawbacks I just mentioned in the previous paragraph.  It’s possible the two approaches might be complimentary (perhaps the same library can expose both the LINQ programming model and the "yield"/iterator programming model for composing async computations).

What’s next?

There’s potentially a lot more to talk about, but this is a good place to wrap things up for today. 

Source code

Below is the full source code for the WCF example – first in F#, then in C#. 

F# code

open System 
open System.Collections.Generic 
open System.Diagnostics 
open System.ServiceModel 
open System.Threading 
open System.ServiceModel.Channels 
open Microsoft.FSharp.Control 

// define WCF service 
[<ServiceContract>] 
type IMyContract = interface 
    [<OperationContract>] 
    abstract Square: x:int -> int 
end 

[<ServiceContract(Name="IMyContract")>] 
type IMyClientContract =
    [<OperationContract>] 
    abstract Square: x:int -> int 
    [<OperationContract(AsyncPattern = true)>] 
    abstract BeginSquare: x:int * cb:AsyncCallback * o:obj -> IAsyncResult  
    abstract EndSquare: iar:IAsyncResult -> int 

type MyService() = class 
    interface IMyContract with 
        member this.Square x = x * x 
end 

// set up a WCF service, and then run a client function against it 
let DoWCFRun (clientFunc : IMyClientContract -> int) = 
    let addr = "http://localhost/WCF" 
    let address = new Uri(addr) 
    let host = new ServiceHost(typeof<MyService>, [|address|]) 
    let reliableBinding = new WSHttpBinding(SecurityMode.None, true) 
    host.AddServiceEndpoint(typeof<IMyContract>, reliableBinding, "") |> ignore 
    host.Open() 

    let cf = new ChannelFactory<IMyClientContract>(reliableBinding, new EndpointAddress(addr)) 
    let client = cf.CreateChannel() 
    printfn "about to call client" 
    let ans = clientFunc client 
    printfn "done - answer is %d" ans 
    host.Close() 

// a sample client function that runs synchronously 
let SumSquares (client : IMyClientContract) = 
    (box client :?> IClientChannel).Open() 
    let sq1 = client.Square(3) 
    let sq2 = client.Square(4) 
    (box client :?> IClientChannel).Close() 
    sq1 + sq2 

// run it 
DoWCFRun SumSquares 

// define Async versions of the key client methods 
type IClientChannel with 
    member this.OpenAsync() = 
        Async.FromBeginEnd(this.BeginOpen, this.EndOpen) 
    member this.CloseAsync() = 
        Async.FromBeginEnd(this.BeginClose, this.EndClose) 
module Extensions =
    type IMyClientContract with 
        member this.SquareAsync x = 
            Async.FromBeginEnd(x, this.BeginSquare, this.EndSquare) 
open Extensions

// async version of our sample client - does not hold threads while calling out to network 
let SumSquaresAsync (client : IMyClientContract) = 
    async { do! (box client :?> IClientChannel).OpenAsync() 
            let! sq1 = client.SquareAsync(3) 
            let! sq2 = client.SquareAsync(4) 
            do! (box client :?> IClientChannel).CloseAsync() 
            return sq1 + sq2 } 

DoWCFRun (SumSquaresAsync >> Async.RunSynchronously)  // ">>" is function composition operator 

printfn "press a key" 
Console.ReadKey() |> ignore

 

C# code

//#define SYNC
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.FSharp.Core;
using Microsoft.FSharp.Control;
using System.Diagnostics;
using System.ServiceModel;
using System.Threading;
using System.ServiceModel.Channels;

// define WCF service 
[ServiceContract]
interface IMyContract
{
    [OperationContract]
    int Square(int x);
}

[ServiceContract(Name = "IMyContract")]
interface IMyClientContract
{
    [OperationContract]
    int Square(int x);
    [OperationContract(AsyncPattern = true)]
    IAsyncResult BeginSquare(int x, AsyncCallback cb, object o);
    int EndSquare(IAsyncResult iar);
}

class MyService : IMyContract
{
    public int Square(int x)
    {
        return x * x;
    }
}

class Program
{
    // set up a WCF service, and then run a client against it 
    public static void Main()
    {
        Uri address = new Uri("http://localhost/WCF");
        ServiceHost host = new ServiceHost(typeof(MyService), address);
        Binding reliableBinding = new WSHttpBinding(SecurityMode.None, true);
        host.AddServiceEndpoint(typeof(IMyContract), reliableBinding, "");
        host.Open();

        ChannelFactory<IMyClientContract> cf = new ChannelFactory<IMyClientContract>(reliableBinding, new EndpointAddress(address));
        IMyClientContract client = cf.CreateChannel();
        Console.WriteLine("about to call client");
#if SYNC
        var ans = SumSquares(client);
#else 
        var ans = AsyncExtensions.Run(SumSquaresAsync(client)); 
#endif
        Console.WriteLine("done - answer is {0}", ans);
        host.Close();
        Console.WriteLine("press a key");
        Console.ReadKey();
    }
    // a sample client function that runs synchronously 
    static int SumSquares(IMyClientContract client)
    {
        ((IClientChannel)client).Open();
        var sq1 = client.Square(3);
        var sq2 = client.Square(4);
        ((IClientChannel)client).Close();
        return sq1 + sq2;
    }
    // async version of our sample client - does not hold threads while calling out to network 
    static FSharpAsync<int> SumSquaresAsync(IMyClientContract client)
    {
        return from _0 in AsyncExtensions.StartWorkflow
               from _1 in ((IClientChannel)client).OpenAsync()
               from sq1 in client.SquareAsync(3)
               from sq2 in client.SquareAsync(4)
               from _2 in ((IClientChannel)client).CloseAsync()
               select sq1 + sq2;
    }
}

// define Async versions of the key client methods 
static class ClientExtension
{
    public static FSharpAsync<Unit> OpenAsync(this IClientChannel client)
    {
        return AsyncExtensions.FromBeginEnd(client.BeginOpen, client.EndOpen);
    }
    public static FSharpAsync<Unit> CloseAsync(this IClientChannel client)
    {
        return AsyncExtensions.FromBeginEnd(client.BeginClose, client.EndClose);
    }
    public static FSharpAsync<int> SquareAsync(this IMyClientContract client, int x)
    {
        return AsyncExtensions.FromBeginEnd<int, int>(x, client.BeginSquare, client.EndSquare);
    }
}

// boilerplate code to wrap F# library in a nice C# facade 
static class AsyncExtensions
{
    static Microsoft.FSharp.Control.FSharpAsyncBuilder async = Microsoft.FSharp.Core.ExtraTopLevelOperators.DefaultAsyncBuilder;
    // easily massage "Func" types into F# function types 
    public static FSharpFunc<A, Result> ToFSharpFunc<A, Result>(this Func<A, Result> f)
    {
        return FuncConvert.ToFSharpFunc(new Converter<A, Result>(f));
    }
    public static FSharpFunc<Tuple<A1, A2>, Result> ToTupledFSharpFunc<A1, A2, Result>(this Func<A1, A2, Result> f)
    {
        return FuncConvert.ToFSharpFunc(new Converter<Tuple<A1, A2>, Result>(t => f(t.Item1, t.Item2)));
    }
    public static FSharpFunc<Tuple<A1, A2, A3>, Result> ToTupledFSharpFunc<A1, A2, A3, Result>(this Func<A1, A2, A3, Result> f)
    {
        return FuncConvert.ToFSharpFunc(new Converter<Tuple<A1, A2, A3>, Result>(t => f(t.Item1, t.Item2, t.Item3)));
    }
    // LINQ syntax sugars 
    public static FSharpAsync<B> Select<A, B>(this FSharpAsync<A> x, Func<A, B> selector)
    {
        return async.Bind(x,
            ToFSharpFunc<A, FSharpAsync<B>>((r) => async.Return(selector(r))));
    }
    public static FSharpAsync<V> SelectMany<T, U, V>(this FSharpAsync<T> p, Func<T, FSharpAsync<U>> selector, Func<T, U, V> projector)
    {
        return async.Bind(p, FuncConvert.ToFSharpFunc(new System.Converter<T, FSharpAsync<V>>(r1 =>
            async.Bind(selector(r1), FuncConvert.ToFSharpFunc<U, FSharpAsync<V>>(r2 =>
                async.Return(projector(r1, r2)))))));
    }
    // Wrap F# Control library functions in simpler facade 
    public static FSharpAsync<R[]> Parallel<R>(IEnumerable<FSharpAsync<R>> computations)
    {
        return Microsoft.FSharp.Control.FSharpAsync.Parallel<R>(computations);
    }
    public static R Run<R>(FSharpAsync<R> computation)
    {
        return Microsoft.FSharp.Control.FSharpAsync.RunSynchronously(computation,
            FSharpOption<int>.None, FSharpOption<System.Threading.CancellationToken>.None);
    }
    public static FSharpAsync<R> FromBeginEnd<R>(Func<AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, R> end)
    {
        return FSharpAsync.FromBeginEnd(begin.ToTupledFSharpFunc(), end.ToFSharpFunc(), null);
    }
    public static FSharpAsync<Unit> FromBeginEnd(Func<AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
    {
        return FSharpAsync.FromBeginEnd(begin.ToTupledFSharpFunc(), FuncConvert.ToFSharpFunc<IAsyncResult,Unit>(iar => { end(iar); return (Unit)null; }), null);
    }
    public static FSharpAsync<R> FromBeginEnd<Arg, R>(Arg a, Func<Arg, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, R> end)
    {
        return FSharpAsync.FromBeginEnd(a, begin.ToTupledFSharpFunc(), end.ToFSharpFunc(), null);
    }
    // convenience object to get in the Async monad 
    public static FSharpAsync<int> StartWorkflow = async.Return(0);
}
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

 
%d bloggers like this: