Inside F#

Brian's thoughts on F# and .NET

“An Introduction to Async Workflows in F#”, or “how to utilize all those CPUs without writing lots of threading code”, part two

Posted by Brian on May 5, 2008

Last time I showed a short example of a CPU-intensive task and demonstrated how to speed up the program by utilizing multiple CPUs.  The .NET ThreadPool APIs got the job done, but it was non-trivial to change our program to use these APIs.  Specifically, we had to change a portion of the original program from this:

    let primeInfo = nums  
                    |> (fun x -> (x,IsPrime x)) 

to this:

    let mutable numRemainingComputations = nums.Length 
    let mre = new ManualResetEvent(false) 
    let primeInfo = Array.create nums.Length (0,false) 
    |> Array.iteri (fun i x -> ignore (ThreadPool.QueueUserWorkItem(fun o ->  
                primeInfo.[i] <- (x, IsPrime x) 
                // if we’re the last one, signal that we’re done
                if Interlocked.Decrement(&numRemainingComputations) = 0 then 
                    mre.Set() |> ignore)))     
    // wait until all done

Yuck.  Today we’ll see how an F# library can accomplish the same goal much more simply.

A better way – the F# Control library

There is a better way to use our CPUs, using the F# Control library.  In particular, we’ll use the Async<‘a> class type along with a couple of its static methods, as well as the F# async workflow syntax (which is just a specific instance of the general syntax for F# computation expressions).  Each little piece (which I’ve called out in bold in the paragraphs that follow) is small and easy to understand, and as we’ll see, the F# type system and type inference make it easy to ensure that we glue the pieces together correctly.

The Async<‘a> class is a data type for representing an asynchronous computation that will result in a value of type ‘a.  So, for example, an Async<int> object represents a computation that will produce an integer result.

In order to turn a computation into a result, we use the Async.RunSynchronously method.  It’s signature is

    // Async.RunSynchronously : Async<‘a> -> ‘a

That is, you pass an async computation to this method, and it runs the computation and returns the result.  Simple enough?  (In point of fact, the Async.RunSynchronously method has other extra optional parameters, but we don’t need them for our example, and so I won’t distract you with those details.)

So, now we know about a data type to represent asynchronous computations, and we have a way to run a computation, but how do we actually create an async computation in the first place?  There are three common waysFirst, we might make a call to a library function that returns an async computation.  For example, if we have a Stream object, we may call

    // asyncBytes : Async<byte[]>
    let asyncBytes = aStream.AsyncRead(100) // read 100 bytes asynchronously

AsyncRead() is an extension method on the System.IO.Stream class (from Microsoft.FSharp.Control.CommonExtensions).  The F# Control library contains a number of such extension methods for common operations that you may want to execute asynchronously (like disk IO and networking operations).  Second, we can create our own async versions of methods out of "Begin" and "End" methods by using the Async.FromBeginEnd static method.  An example explains things well.  Consider the WebRequest class in the System.Net library. It has BeginGetResponse and EndGetResponse methods for doing standard .NET-style async programming.  The F# Control library defines this extension method:

    type System.Net.WebRequest with  
        // AsyncGetResponse : unit -> Async<WebResponse>
        member x.AsyncGetResponse()
            Async.FromBeginEnd(x.BeginGetResponse, x.EndGetResponse)

So with that defined, we can call "webRequest.AsyncGetResponse()" and the result is an Async<WebResponse> object that can be used in an F# async workflow computation.  Third, we can write code using the async workflow syntax using "async" followed by some code in curly braces, yielding a block of code that resolves to an Async value.  We’ll see an example of that shortly.

Finally, once we have a bunch of async computations we want to run in parallel, we can call Async.Parallel

    // Async.Parallel : #seq<Async<‘a>> -> Async<array<‘a>>

Recall that seq<‘a> is just shorthand for IEnumerable<‘a> (a "sequence").  So you pass a sequence of computations to Async.Parallel, and it returns you a single computation that will yield and array of all the results.  The library will take care of scheduling each computation on the thread pool and doing the synchronization so as to block until all of the results are ready.

Applying the F# Control library to our problem

Now that we have an overview of the library, let’s apply it to our prime-computing program.  I’ll once again rewrite the original code between the ResetStopWatch() and ShowTime() calls:

    // info : array<Async<int * bool>>
    let info = nums |> (fun x -> async { return (x, IsPrime x) } )
    // par : Async<array<int * bool>>
    let par = Async.Parallel info
    // primeInfo : array<int * bool>
    let primeInfo = Async.RunSynchronously par

I’ve broken the code down into three steps so as to comment on the data type at each step.  First, we map the input array over an async workflow (an example of a computation expression) that computes an Async tuple.  (I’m not going to describe the "computation expression" syntax – the stuff with "async" and curly braces and "return" – in any detail here, as it deserves its own blog entry.  The curious reader can check out Don’s blog about computation expressions.  Briefly, this is F#’s general syntax for monadic comprehensions – you can think of it like "LINQ notation on steroids".)  The result is an array of async tuples.  Next, we call Async.Parallel on that array, yielding a single async computation which will yield an array of all the tuples when it is run.  Finally, we call Async.RunSynchronously to run this giant parallel computation and get back the desired output array of tuples.  The type system steers you in the right direction the entire time – if the types all work out, then you have glued the pieces together correctly.

I would actually write this code more idiomatically like this:

    let primeInfo = nums 
                    |> (fun x -> async { return (x, IsPrime x) } )
                    |> Async.Parallel
                    |> Async.RunSynchronously

That is, we take the input array, map it into an array of async computations that will yield tuples, parallelize all those computations, and then run it.  That’s it!  No queuing up work on the thread pool, no synchronizing mutable data, no blocking on an event to wait for the results to finish – the library does all that for us.  We just describe the essence of the computation (I have all these computations, run them in parallel, go!) and the library does the rest.  Now that is simple.

I wrote this, ran it, and it worked the first time.  100% CPU utilization, and the program finishes in just over 3 seconds.  Huzzah!

Compared to the original program, the only difference is we changed this:

    let primeInfo = nums  
                    |> (fun x -> (x,IsPrime x))

to this:

    let primeInfo = nums  
                    |> (fun x -> async { return (x, IsPrime x) } ) 
                    |> Async.Parallel 
                    |> Async.RunSynchronously

That’s it!  That’s a trivial three-line change, and now our program runs four times faster.  And it worked the first time.

Want to learn more?

If you want to learn more about the F# Control library and async workflows, you should check out Don’s initial blog on async workflows, or Robert Pickering’s 4-part series on asynchronous programming in F# (part 1, part 2, part 3, part 4).  The code is slightly out of date (as some of the F# library namespaces & function names have changed slightly since the blogs were written), but these blogs are nevertheless a good place to read more about the F# Control library.

Using the F# Control library from C#

F# definitely makes it much easier to write simple and correct async code that takes full advantage of your CPU resources.  Since a lot of the "work" is done by an F# library, you might ask: "Can I consume the F# Control library from C#?"  Great question!  We’ll find out how to utilize some of these great F# ideas from C# code… next time.

Source code

Here’s the full F# code from today.

module AsyncControl = 
    open System 
    open System.Diagnostics  
    open System.Threading 

    let stopWatch = new Stopwatch() 
    let ResetStopWatch() = stopWatch.Reset(); stopWatch.Start() 
    let ShowTime() = printfn "took %d ms" stopWatch.ElapsedMilliseconds 
    let IsPrime x = 
        let mutable i = 2 
        let mutable foundFactor = false 
        while not foundFactor && i < x do 
            if x % i = 0 then 
                foundFactor <- true 
            i <- i + 1 
        not foundFactor 

    let nums = [| for i in 10000000..10004000 -> i |] 

    let primeInfo = nums   
                    |> (fun x -> async { return (x, IsPrime x) } )  
                    |> Async.Parallel  
                    |> Async.RunSynchronously
    |> Array.filter (fun (x,b) -> b) 
    |> Array.iter (fun (x,b) -> printf "%d," x) 
    printfn "" 

    printfn "press a key" 


Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )


Connecting to %s

%d bloggers like this: