## “An Introduction to Async Workflows in F#”, or “how to utilize all those CPUs without writing lots of threading code”, part one

Posted by Brian on May 4, 2008

If I told you that I could make your program run four times faster, and all you had to do was make trivial changes to three lines of code, would you be interested? If so, then read on. :)

### The current tragedy of multi-core boxes

It’s often the case that you have to write a program that processes lots of data. Here’s a quick real-world example: a friend of mine was recently writing some test automation for Intellisense, where the product parses a huge source code file and the test verifies that if you press "." at the end of the file you get the right Intellisense completion list. Or something. I don’t recall all the details, but they’re not important to our story. What *is* important are two facts: first, the product being tested takes a couple seconds to parse the huge file, and second, there are about 100,000 variations to test. And so at the end of the day, it was taking about 60 hours to complete that entire test run on a fast lab machine. 60 hours! Bummer.

What’s even more of a bummer is that, if you fire up Task Manager on the lab machine while it’s running the tests, you’ll see that only 25% of the CPU is being used. Why? Because the machine has 4 CPUs, but the test was single-threaded, and so only one CPU was in use with the rest just sitting idle. This has probably happened to all of us. Nowadays, most new desktop machines are multi-core (my main development box at work is also a quad-core, for instance) and so every developer eventually has that awful experience of sitting at your desk waiting for some program you just wrote to run, and it’s taking *forever *(anything more than 10 seconds seems like forever), *and*, what’s worse, you can see that it’s only using 25% of the CPU because the little program you just coded up is single-threaded. Ugh, I hate that feeling.

So why don’t we always write code that uses threads, so as to utilize all the horsepower we have sitting on our desktops? Because threads are complicated – you have to carefully synchronize data, maybe using locks or the Interlocked class, signal, wait, use weird ThreadPool APIs… it’s *just too easy* to screw up, and wind up with race conditions or other kinds of erroneous programs.

Which is too bad, because it would be handy to speed up programs by a factor of 4 on a 4-CPU box. A test that runs in 60 hours takes the wind out of my sails – I have to wait *days *for the results. But if it runs in 15 hours, I can start it before I leave the office this afternoon, and have the results by the time I return to the office tomorrow morning. That’s a huge difference.

In this series of blog posts, I want to introduce you to an awesome F# library that enables you to write multi-threaded code so that it’s *just too easy *to get the code exactly *right*, the very first time. :)

### A simple problem

If we want to demo this mysterious F# library, we need a sample problem to solve. So let’s write a program to determine if various large numbers are prime. Specifically, for each number between 10,000,000 and 10,004,000, I want to know if the number is prime or not. The output of my program is an array of 4000 int-bool pairs – the int is the number in question, and the bool says whether than number is prime or not.

Yes, this is a very contrived problem. Nevertheless, is it representative of any real-world problem where you’ve got a big array of input data where you need to run some computation on each piece of data. For easy blog-exposition, I want to keep the particular computation task (computing primes) very simple, and as for the peculiar numbers I chose (why "ten million four thousand"?), well, you’ll see why I chose them shortly.

Anyway, this is a simple task, and so it can be solved with a simple program. Here’s an F# program I wrote that computes my array of int-bool pairs. It also measures how long it takes to compute this array, and, just for kicks, it prints out the prime number results. It’s less than 30 lines of code – read it over:

open System.Diagnostics

let stopWatch = new Stopwatch()

let ResetStopWatch() = stopWatch.Reset(); stopWatch.Start()

let ShowTime() = printfn "took %d ms" stopWatch.ElapsedMilliseconds

// IsPrime : int -> bool

let IsPrime x =

// extremely naive approach – good because we want it to be slow

let mutable i = 2

let mutable foundFactor = false

while not foundFactor && i < x do

if x % i = 0 then

foundFactor <- true

i <- i + 1

not foundFactor

let nums = [| for i in 10000000..10004000 -> i |]

ResetStopWatch()

// primeInfo = array<int * bool>

let primeInfo = nums

|> Array.map (fun x -> (x,IsPrime x))

ShowTime()

primeInfo

|> Array.filter (fun (x,b) -> b)

|> Array.iter (fun (x,b) -> printf "%d," x)

printfn ""

There are 4 main parts. I use the Stopwatch class in System.Diagnostics to measure how long it takes to compute the prime-ness of the array of numbers. I wrote an extremely naive "IsPrime" function to test if a number is prime. (It’s slow, which bodes well for my storytelling example, as we’ll see in a moment.) Then I compute the results I want ("primeInfo") by using Array.map to turn each number into a int*bool 2-tuple – the number itself, and a boolean of whether it’s prime. (This is the major computation, and thus this step is the only portion being timed by the stopwatch.) Finally I print the primes, by filtering out just those tuples whose boolean value is true and printing the corresponding numbers.

When I run this program (unoptimized, inside the VS debugger) on my box at work – tragedy! It takes *forever *(about 12.5 seconds), and the CPU is pegged at 25% the whole time. But at least now you know why I picked those numbers (ten million to ten million four thousand) – I did some quick trial-and-error ranges until I found a range where the program would take more than 10 seconds (a.k.a "forever"). :) So let’s try to speed it up by bringing our idle CPUs to the party.

### Threads

This program is just begging to be run with multiple threads. We have an array of 4000 inputs and we run some computation on each individual input to yield an array of 4000 outputs. If we had 4000 CPUs, we could run the whole thing in parallel in an instant. Alas, my box only has 4 CPUs, but I would still love to speed this up by a factor of four. But how?

One approach would be to partition the array into 4 quadrants, and have one thread work on each one. There are 4000 elements in the array, so the first thread could do the first 1000 elements, the next thread would do the next 1000, etc. This strategy has two shortcomings, however. First of all, it’s hardcoding the number of threads: as a result, it will work pretty well for a 4-CPU box, but what if your box has 8 CPUs? (We could work around this by dynamically probing the box we’re running on to find out how many CPUs it has, but that would not solve the *next* problem… which is…) Second, it presumes that each quadrant of the array contains the same amount of "work". Look back at our IsPrime function. It will run fast on a lot of composite numbers (if the number is divisible by 2 or 3, for example, you’ll only do one or two iterations of the loop). It’s the primes that are the killers – my naive IsPrime method iterates from 2 to n-1 for primes, and with the huge numbers we’re testing, that’s very slow. Furthermore, primes aren’t necessarily evenly distributed – if there are 400 primes in this range, maybe 300 of them are in the first quadrant and the other 100 are spread evenly over the remaining three quadrants. If that were true, then our first thread would be stuck churning along, working furiously to find primes, while the other threads breezed quickly through their portions of the work and then sat idle waiting for thread #1 to finish up.

As a result, a better strategy is to queue up each element of the array as a separate piece of work, and whenever there is a free CPU, have it grab the next element to process. The .NET thread pool, specifically the QueueUserWorkItem method, makes this relatively straightforward to do. Using this strategy, there are no worries that CPUs will sit idle while there are still unprocessed array elements.

### Coding against the thread pool APIs

Let’s look at some code that uses the thread pool APIs directly to utilize all our CPUs. It is rather tricky to get right. The only portion of the original program that changes is the bit between the ResetStopWatch() and ShowTime() calls, so that’s all I’m showing here (the rest of the program remains unchanged):

// we need to "join" at the end to know when we’re done, and these will help do that

let mutable numRemainingComputations = nums.Length

let mre = new ManualResetEvent(false)

// primeInfo = array<int * bool>

let primeInfo = Array.create nums.Length (0,false)

nums

|> Array.iteri (fun i x -> ignore (ThreadPool.QueueUserWorkItem(fun o ->

primeInfo.[i] <- (x, IsPrime x)

// if we’re the last one, signal that we’re done

if Interlocked.Decrement(&numRemainingComputations) = 0 then

mre.Set() |> ignore)))

// wait until all done

mre.WaitOne()

ShowTime()

The tricky bit is that after we queue up all the work, we need a way to find out when it’s all done. So here I’ve created a mutable variable "numRemainingComputations" that "counts down" how many array elements of work we have left. I also need a ManualResetEvent that I can "wait" on (when the final piece of work finishes, it will "signal" the event). Each bit of work that I queue up will

- Do the actual work for this one array element (compute the int*bool tuple and stuff it in the output array "primeInfo")
- Decrement the numRemainingComputations counter (using Interlocked.Decrement, so I don’t need an explicit lock for this shared mutable data)
- If it just completed the final computation, signal that we are all done (by calling mre.Set())

The "ignore" calls just throw away useless booleans (both QueueUserWorkItem() and Set() returns boolean results that I don’t care about).

Sigh. Well, the code works. When I run it on my box, the CPU utilization goes to 100% and the program finishes in just over 3 seconds. So we accomplished the goal of using our CPUs to make things run 4 times faster. But…

That code is a mess. It doesn’t make happy to read it or write it. I originally wrote it in C# (see code at the end of this blog entry) and I made two different tragic errors along the way. There are too many little details to keep track of and get right. Surely there is a better way!

### A better way – the F# Control library

There is a better way, using the F# Control library. We’ll see how to apply it to this problem… in the next blog entry.

### Source code

Here’s the code when I tried it in C#, both with and without multi-threading:

using System;

using System.Diagnostics;

using System.Collections.Generic;

using System.Linq;

using System.Threading;

class Program

{

static Stopwatch stopWatch = new Stopwatch();

static void ResetStopWatch() { stopWatch.Reset(); stopWatch.Start(); }

static void ShowTime() { Console.WriteLine("took {0} ms", stopWatch.ElapsedMilliseconds); }

static bool IsPrime(int x)

{

for (int i = 2; i < x; ++i)

{

if (x % i == 0)

return false;

}

return true;

}

static void Main(string[] args)

{

var nums = new int[4001];

for (int i = 0; i < nums.Length; ++i)

nums[i] = 10000000 + i;

ResetStopWatch();

var primeInfo = new KeyValuePair<int, bool>[nums.Length];

#if MULTITHREAD

// we need to "join" at the end to know when to stop, and these will help do that

int numRemainingComputations = nums.Length;

ManualResetEvent mre = new ManualResetEvent(false);

#endif

for (int i = 0; i < nums.Length; ++i)

{

#if MULTITHREAD

// be sure to use a local variable inside the loop, else you’ll capture the mutable variable

// "i" inside the lambda and go nuts and get an ArrayIndexOutOfBoundsException

int j = i;

ThreadPool.QueueUserWorkItem((obj) =>

{

primeInfo[j] = new KeyValuePair<int, bool>(nums[j], IsPrime(nums[j]));

// if we’re the last one, signal that we’re done

if (Interlocked.Decrement(ref numRemainingComputations) == 0)

mre.Set();

}

);

#else

primeInfo[i] = new KeyValuePair<int, bool>(nums[i], IsPrime(nums[i]));

#endif

}

ShowTime();

#if MULTITHREAD

// wait until all done

mre.WaitOne();

#endif

var primes = from x in primeInfo

where x.Value

select x.Key;

foreach (var x in primes)

Console.Write("{0},", x);

Console.WriteLine();

ShowTime();

Console.WriteLine("press a key");

Console.ReadKey();

}

}

And now here’s the code in F# – first single-threaded, then multi-threaded using the .NET threading APIs.

open System

open System.Diagnostics

let stopWatch = new Stopwatch()

let ResetStopWatch() = stopWatch.Reset(); stopWatch.Start()

let ShowTime() = printfn "took %d ms" stopWatch.ElapsedMilliseconds

// IsPrime : int -> bool

let IsPrime x =

// extremely naive approach – good because we want it to be slow

let mutable i = 2

let mutable foundFactor = false

while not foundFactor && i < x do

if x % i = 0 then

foundFactor <- true

i <- i + 1

not foundFactor

let nums = [| for i in 10000000..10004000 -> i |]

ResetStopWatch()

// primeInfo = array<int * bool>

let primeInfo = nums

|> Array.map (fun x -> (x,IsPrime x))

ShowTime()

primeInfo

|> Array.filter (fun (x,b) -> b)

|> Array.iter (fun (x,b) -> printf "%d," x)

printfn ""

printfn "press a key"

Console.ReadKey()

module ThreadPool =

open System

open System.Diagnostics

open System.Threading

let stopWatch = new Stopwatch()

let ResetStopWatch() = stopWatch.Reset(); stopWatch.Start()

let ShowTime() = printfn "took %d ms" stopWatch.ElapsedMilliseconds

let IsPrime x =

let mutable i = 2

let mutable foundFactor = false

while not foundFactor && i < x do

if x % i = 0 then

foundFactor <- true

i <- i + 1

not foundFactor

let nums = [| for i in 10000000..10004000 -> i |]

ResetStopWatch()

// we need to "join" at the end to know when to stop, and these will help do that

let mutable numRemainingComputations = nums.Length

let mre = new ManualResetEvent(false)

let primeInfo = Array.create nums.Length (0,false)

nums

|> Array.iteri (fun i x -> ignore (ThreadPool.QueueUserWorkItem(fun o ->

primeInfo.[i] <- (x, IsPrime x)

// if we’re the last one, signal that we’re done

if Interlocked.Decrement(&numRemainingComputations) = 0 then

mre.Set() |> ignore)))

// wait until all done

mre.WaitOne()

ShowTime()

primeInfo

|> Array.filter (fun (x,b) -> b)

|> Array.iter (fun (x,b) -> printf "%d," x)

printfn ""

printfn "press a key"

Console.ReadKey()

## Leave a Reply