“An Introduction to Async Workflows in F#”, or “how to utilize all those CPUs without writing lots of threading code”, part two
Posted by Brian on May 5, 2008
Last time I showed a short example of a CPU-intensive task and demonstrated how to speed up the program by utilizing multiple CPUs. The .NET ThreadPool APIs got the job done, but it was non-trivial to change our program to use these APIs. Specifically, we had to change a portion of the original program from this:
|> Array.map (fun x -> (x,IsPrime x))
to this:
let mre = new ManualResetEvent(false)
let primeInfo = Array.create nums.Length (0,false)
nums
|> Array.iteri (fun i x -> ignore (ThreadPool.QueueUserWorkItem(fun o ->
primeInfo.[i] <- (x, IsPrime x)
// if we’re the last one, signal that we’re done
if Interlocked.Decrement(&numRemainingComputations) = 0 then
mre.Set() |> ignore)))
// wait until all done
mre.WaitOne()
Yuck. Today we’ll see how an F# library can accomplish the same goal much more simply.
A better way – the F# Control library
There is a better way to use our CPUs, using the F# Control library. In particular, we’ll use the Async<’a> class type along with a couple of its static methods, as well as the F# async workflow syntax (which is just a specific instance of the general syntax for F# computation expressions). Each little piece (which I’ve called out in bold in the paragraphs that follow) is small and easy to understand, and as we’ll see, the F# type system and type inference make it easy to ensure that we glue the pieces together correctly.
The Async<’a> class is a data type for representing an asynchronous computation that will result in a value of type ‘a. So, for example, an Async<int> object represents a computation that will produce an integer result.
In order to turn a computation into a result, we use the Async.RunSynchronously method. It’s signature is
That is, you pass an async computation to this method, and it runs the computation and returns the result. Simple enough? (In point of fact, the Async.RunSynchronously method has other extra optional parameters, but we don’t need them for our example, and so I won’t distract you with those details.)
So, now we know about a data type to represent asynchronous computations, and we have a way to run a computation, but how do we actually create an async computation in the first place? There are three common ways. First, we might make a call to a library function that returns an async computation. For example, if we have a Stream object, we may call
let asyncBytes = aStream.AsyncRead(100) // read 100 bytes asynchronously
AsyncRead() is an extension method on the System.IO.Stream class (from Microsoft.FSharp.Control.CommonExtensions). The F# Control library contains a number of such extension methods for common operations that you may want to execute asynchronously (like disk IO and networking operations). Second, we can create our own async versions of methods out of "Begin" and "End" methods by using the Async.FromBeginEnd static method. An example explains things well. Consider the WebRequest class in the System.Net library. It has BeginGetResponse and EndGetResponse methods for doing standard .NET-style async programming. The F# Control library defines this extension method:
// AsyncGetResponse : unit -> Async<WebResponse>
member x.AsyncGetResponse() =
Async.FromBeginEnd(x.BeginGetResponse, x.EndGetResponse)
So with that defined, we can call "webRequest.AsyncGetResponse()" and the result is an Async<WebResponse> object that can be used in an F# async workflow computation. Third, we can write code using the async workflow syntax using "async" followed by some code in curly braces, yielding a block of code that resolves to an Async value. We’ll see an example of that shortly.
Finally, once we have a bunch of async computations we want to run in parallel, we can call Async.Parallel
Recall that seq<’a> is just shorthand for IEnumerable<’a> (a "sequence"). So you pass a sequence of computations to Async.Parallel, and it returns you a single computation that will yield and array of all the results. The library will take care of scheduling each computation on the thread pool and doing the synchronization so as to block until all of the results are ready.
Applying the F# Control library to our problem
Now that we have an overview of the library, let’s apply it to our prime-computing program. I’ll once again rewrite the original code between the ResetStopWatch() and ShowTime() calls:
// info : array<Async<int * bool>>
let info = nums |> Array.map (fun x -> async { return (x, IsPrime x) } )
// par : Async<array<int * bool>>
let par = Async.Parallel info
// primeInfo : array<int * bool>
let primeInfo = Async.RunSynchronously par
ShowTime()
I’ve broken the code down into three steps so as to comment on the data type at each step. First, we map the input array over an async workflow (an example of a computation expression) that computes an Async tuple. (I’m not going to describe the "computation expression" syntax – the stuff with "async" and curly braces and "return" – in any detail here, as it deserves its own blog entry. The curious reader can check out Don’s blog about computation expressions. Briefly, this is F#’s general syntax for monadic comprehensions – you can think of it like "LINQ notation on steroids".) The result is an array of async tuples. Next, we call Async.Parallel on that array, yielding a single async computation which will yield an array of all the tuples when it is run. Finally, we call Async.RunSynchronously to run this giant parallel computation and get back the desired output array of tuples. The type system steers you in the right direction the entire time – if the types all work out, then you have glued the pieces together correctly.
I would actually write this code more idiomatically like this:
|> Array.map (fun x -> async { return (x, IsPrime x) } )
|> Async.Parallel
|> Async.RunSynchronously
That is, we take the input array, map it into an array of async computations that will yield tuples, parallelize all those computations, and then run it. That’s it! No queuing up work on the thread pool, no synchronizing mutable data, no blocking on an event to wait for the results to finish – the library does all that for us. We just describe the essence of the computation (I have all these computations, run them in parallel, go!) and the library does the rest. Now that is simple.
I wrote this, ran it, and it worked the first time. 100% CPU utilization, and the program finishes in just over 3 seconds. Huzzah!
Compared to the original program, the only difference is we changed this:
|> Array.map (fun x -> (x,IsPrime x))
to this:
|> Array.map (fun x -> async { return (x, IsPrime x) } )
|> Async.Parallel
|> Async.RunSynchronously
That’s it! That’s a trivial three-line change, and now our program runs four times faster. And it worked the first time.
Want to learn more?
If you want to learn more about the F# Control library and async workflows, you should check out Don’s initial blog on async workflows, or Robert Pickering’s 4-part series on asynchronous programming in F# (part 1, part 2, part 3, part 4). The code is slightly out of date (as some of the F# library namespaces & function names have changed slightly since the blogs were written), but these blogs are nevertheless a good place to read more about the F# Control library.
Using the F# Control library from C#
F# definitely makes it much easier to write simple and correct async code that takes full advantage of your CPU resources. Since a lot of the "work" is done by an F# library, you might ask: "Can I consume the F# Control library from C#?" Great question! We’ll find out how to utilize some of these great F# ideas from C# code… next time.
Source code
Here’s the full F# code from today.
open System
open System.Diagnostics
open System.Threading
let stopWatch = new Stopwatch()
let ResetStopWatch() = stopWatch.Reset(); stopWatch.Start()
let ShowTime() = printfn "took %d ms" stopWatch.ElapsedMilliseconds
let IsPrime x =
let mutable i = 2
let mutable foundFactor = false
while not foundFactor && i < x do
if x % i = 0 then
foundFactor <- true
i <- i + 1
not foundFactor
let nums = [| for i in 10000000..10004000 -> i |]
ResetStopWatch()
let primeInfo = nums
|> Array.map (fun x -> async { return (x, IsPrime x) } )
|> Async.Parallel
|> Async.RunSynchronously
ShowTime()
primeInfo
|> Array.filter (fun (x,b) -> b)
|> Array.iter (fun (x,b) -> printf "%d," x)
printfn ""
printfn "press a key"
Console.ReadKey()