English 中文(简体)
What s the best way to have multiple threads doing work, and waiting for all of them to complete?
原标题:

I m writing a simple app (for my wife no less :-P ) that does some image manipulation (resizing, timestamping etc) for a potentially large batch of images. So I m writing a library that can do this both synchronously and asynchronously. I decided to use the Event-based Asynchronous Pattern. When using this pattern, you need to raise an event when the work has been completed. This is where I m having problems knowing when it s done. So basically, in my DownsizeAsync method (async method for downsizing images) I m doing something like this:

    public void DownsizeAsync(string[] files, string destination)
    {
        foreach (var name in files)
        {
            string temp = name; //countering the closure issue
            ThreadPool.QueueUserWorkItem(f =>
            {
                string newFileName = this.DownsizeImage(temp, destination);
                this.OnImageResized(newFileName);
            });
        }
     }

The tricky part now is knowing when they are all complete.

Here s what I ve considered: Using ManualResetEvents like here: http://msdn.microsoft.com/en-us/library/3dasc8as%28VS.80%29.aspx But the problem I came across is that you can only wait for 64 or less events. I may have many many more images.

Second option: Have a counter that counts the images that have been done, and raise the event when the count reaches the total:

public void DownsizeAsync(string[] files, string destination)
{
    foreach (var name in files)
    {
        string temp = name; //countering the closure issue
        ThreadPool.QueueUserWorkItem(f =>
        {
            string newFileName = this.DownsizeImage(temp, destination);
            this.OnImageResized(newFileName);
            total++;
            if (total == files.Length)
            {
                this.OnDownsizeCompleted(new AsyncCompletedEventArgs(null, false, null));
            }
        });
    }


}

private volatile int total = 0;

Now this feels "hacky" and I m not entirely sure if that s thread safe.

So, my question is, what s the best way of doing this? Is there another way to synchronize all threads? Should I not be using a ThreadPool? Thanks!!

UPDATE Based on feedback in the comments and from a few answers I ve decided to take this approach:

First, I created an extension method that batches an enumerable into "batches":

    public static IEnumerable<IEnumerable<T>> GetBatches<T>(this IEnumerable<T> source, int batchCount)
    {
        for (IEnumerable<T> s = source; s.Any(); s = s.Skip(batchCount))
        {
            yield return s.Take(batchCount);
        }
    }

Basically, if you do something like this:

        foreach (IEnumerable<int> batch in Enumerable.Range(1, 95).GetBatches(10))
        {
            foreach (int i in batch)
            {
                Console.Write("{0} ", i);
            }
            Console.WriteLine();
        }

You get this output:

1 2 3 4 5 6 7 8 9 10
11 12 13 14 15 16 17 18 19 20
21 22 23 24 25 26 27 28 29 30
31 32 33 34 35 36 37 38 39 40
41 42 43 44 45 46 47 48 49 50
51 52 53 54 55 56 57 58 59 60
61 62 63 64 65 66 67 68 69 70
71 72 73 74 75 76 77 78 79 80
81 82 83 84 85 86 87 88 89 90
91 92 93 94 95

The idea being that (as someone in the comments pointed out) there s no need to create a separate thread for each image. Therefore, I ll batch the images into [machine.cores * 2] number of batches. Then, I ll use my second approach which is simply to keep a counter going and when the counter reaches the total I m expecting, I ll know I m done.

The reason I m convinced now that it is in fact thread safe, is because I ve marked the total variable as volatile which according to MSDN:

The volatile modifier is usually used for a field that is accessed by multiple threads without using the lock statement to serialize access. Using the volatile modifier ensures that one thread retrieves the most up-to-date value written by another thread

means I should be in the clear (if not, please let me know!!)

So here s the code I m going with:

    public void DownsizeAsync(string[] files, string destination)
    {
        int cores = Environment.ProcessorCount * 2;
        int batchAmount = files.Length / cores;

        foreach (var batch in files.GetBatches(batchAmount))
        {
            var temp = batch.ToList(); //counter closure issue
            ThreadPool.QueueUserWorkItem(b =>
            {
                foreach (var item in temp)
                {
                    string newFileName = this.DownsizeImage(item, destination);
                    this.OnImageResized(newFileName);
                    total++;
                    if (total == files.Length)
                    {
                        this.OnDownsizeCompleted(new AsyncCompletedEventArgs(null, false, null));
                    }
                }
            });
        }
    }

I m open to feedback as I m in no way an expert on multithreading, so if anyone sees any issue with this, or has a better idea, please let me know. (Yes, this is just a home made app, but I have some ideas on how I can use the knowledge I gain here to improve our Search / Index service we use at work.) For now I ll keep this question open till I feel like I m using the right approach. Thanks everyone for your help.

问题回答

The simplest thing would be to create new threads, and then call Thread.Join on each of them. You could use a semaphore or something like it - but it s probably easier to just create new threads.

In .NET 4.0 you could use Parallel Extensions to do this quite easily with tasks.

As another alternative which would use the threadpool, you could create a delegate and call BeginInvoke on it, to return an IAsyncResult - you can then get the WaitHandle for each result via the AsyncWaitHandle property, and call WaitHandle.WaitAll.

EDIT: As pointed out in comments, you can only call WaitAll with up to 64 handles at a time on some implementations. Alternatives could be calling WaitOne on each of them in turn, or calling WaitAll with batches. It won t really matter, so long as you re doing it from a thread which isn t going to block the threadpool. Also note that you can t call WaitAll from an STA thread.

You still want to use the ThreadPool because it will manage the number of threads it runs simultaneously. I ran into a similar issue recently and solved it like this:

var dispatcher = new ThreadPoolDispatcher();
dispatcher = new ChunkingDispatcher(dispatcher, 10);

foreach (var image in images)
{
    dispatcher.Add(new ResizeJob(image));
}

dispatcher.WaitForJobsToFinish();

The IDispatcher and IJob look like this:

public interface IJob
{
    void Execute();
}

public class ThreadPoolDispatcher : IDispatcher
{
    private IList<ManualResetEvent> resetEvents = new List<ManualResetEvent>();

    public void Dispatch(IJob job)
    {
        var resetEvent = CreateAndTrackResetEvent();
        var worker = new ThreadPoolWorker(job, resetEvent);
        ThreadPool.QueueUserWorkItem(new WaitCallback(worker.ThreadPoolCallback));
    }

    private ManualResetEvent CreateAndTrackResetEvent()
    {
        var resetEvent = new ManualResetEvent(false);
        resetEvents.Add(resetEvent);
        return resetEvent;
    }

    public void WaitForJobsToFinish()
    {
        WaitHandle.WaitAll(resetEvents.ToArray() ?? new ManualResetEvent[] { });
        resetEvents.Clear();
    }
}

And then used a decorator to chunk the use of ThreadPool:

public class ChunkingDispatcher : IDispatcher
{
    private IDispatcher dispatcher;
    private int numberOfJobsDispatched;
    private int chunkSize;

    public ChunkingDispatcher(IDispatcher dispatcher, int chunkSize)
    {
        this.dispatcher = dispatcher;
        this.chunkSize = chunkSize;
    }

    public void Dispatch(IJob job)
    {
        dispatcher.Dispatch(job);

        if (++numberOfJobsDispatched % chunkSize == 0)
            WaitForJobsToFinish();
    }

    public void WaitForJobsToFinish()
    {
        dispatcher.WaitForJobsToFinish();
    }
}

The IDispatcher abstraction works pretty well for swapping out your threading technique. I have another implementation that is a SingleThreadedDispatcher and you could make a ThreadStart version like Jon Skeet suggested. Then it s easy to run each one and see what kind of performance you get. The SingleThreadedDispatcher is good when debugging your code or when you don t want to kill the processor on your box.

Edit: I forgot to add the code for ThreadPoolWorker:

public class ThreadPoolWorker
{
    private IJob job;
    private ManualResetEvent doneEvent;

    public ThreadPoolWorker(IJob job, ManualResetEvent doneEvent)
    {
        this.job = job;
        this.doneEvent = doneEvent;
    }

    public void ThreadPoolCallback(object state)
    {
        try
        {
            job.Execute();
        }
        finally
        {
            doneEvent.Set();
        }
    }
}

The simplest and efficient solution would be to use the counters and make them thread safe. This would consume less memory and can scale up to higher number of threads

Here is a sample

int itemCount = 0;
for (int i = 0; i < 5000; i++)
{
    Interlocked.Increment(ref itemCount);

    ThreadPool.QueueUserWorkItem(x=>{
        try
        {
            //code logic here.. sleep is just for demo
            Thread.Sleep(100);
        }
        finally
        {
            Interlocked.Decrement(ref itemCount);
        }
    });
}

while (itemCount > 0)
{
    Console.WriteLine("Waiting for " + itemCount + " threads...");
    Thread.Sleep(100);
}
Console.WriteLine("All Done!");

I ve used SmartThreadPool with much succes to cope with this problem. There is also a Codeplex site about de assembly.

SmartThreadPool can help with other problems as well like some threads cannot run at te same time while others can.

I use a static utility method to examine all the individual wait handles..

    public static void WaitAll(WaitHandle[] handles)
    {
        if (handles == null)
            throw new ArgumentNullException("handles",
                "WaitHandle[] handles was null");
        foreach (WaitHandle wh in handles) wh.WaitOne();
    }

Then in my main thread, I create a List of these wait handles, and for each delegate I put in my ThreadPool Queue, I add the wait handle to the List...

 List<WaitHandle> waitHndls = new List<WaitHandle>();
 foreach (iterator logic )
 {
      ManualResetEvent txEvnt = new ManualResetEvent(false);

      ThreadPool.QueueUserWorkItem(
           delegate
               {
                   try { // Code to process each task... }
                   // Finally, set each wait handle when done
                   finally { lock (locker) txEvnt.Set(); } 
               });
      waitHndls.Add(txEvnt);  // Add wait handle to List
 }
 util.WaitAll(waitHndls.ToArray());   // Check all wait Handles in List

.Net 4.0 makes multi-threading even easier (although you can still shoot yourself with side effects).

Another option would be to use a Pipe.

You post all the work to be done to the pipe and then read data from the pipe from each thread. When the pipe is empty, you re done, threads ends themselves and everybody is happy (of course make sure you first produce all the work, then consume it)

I suggest putting the untouched images in a queue and as you read from the queue launch a thread and insert its System.Threading.Thread.ManagedThreadId property into a dictionary along with the file name. This way your UI can list both pending and active files.

When each thread completes it invokes a callback routine, passing back its ManagedThreadId. This callback (passed as a delegate to the thread) removes the thread s id from the dictionary, launches another thread from the queue, and updates the UI.

When both the queue and the dictionary are empty, you re done.

Slightly more complicated but this way you get a responsive UI, you can easily control the number of active threads, and you can see what s in flight. Collect statistics. Get fancy with WPF and put up progress bars for each file. She can t help but be impressed.





相关问题
Anyone feel like passing it forward?

I m the only developer in my company, and am getting along well as an autodidact, but I know I m missing out on the education one gets from working with and having code reviewed by more senior devs. ...

NSArray s, Primitive types and Boxing Oh My!

I m pretty new to the Objective-C world and I have a long history with .net/C# so naturally I m inclined to use my C# wits. Now here s the question: I feel really inclined to create some type of ...

C# Marshal / Pinvoke CBitmap?

I cannot figure out how to marshal a C++ CBitmap to a C# Bitmap or Image class. My import looks like this: [DllImport(@"test.dll", CharSet = CharSet.Unicode)] public static extern IntPtr ...

How to Use Ghostscript DLL to convert PDF to PDF/A

How to user GhostScript DLL to convert PDF to PDF/A. I know I kind of have to call the exported function of gsdll32.dll whose name is gsapi_init_with_args, but how do i pass the right arguments? BTW, ...

Linqy no matchy

Maybe it s something I m doing wrong. I m just learning Linq because I m bored. And so far so good. I made a little program and it basically just outputs all matches (foreach) into a label control. ...

热门标签