Synchronization Using Barrier for Tasks in the .NET Task Parallel Library

Lately I have been converting some GPGPU algorithms from CUDA into C# and the Task Parallel Library, e.g., tiled reduction. However, I noticed a significant problem in the efficiency of the System.Threading.Barrier class for task synchronization. This problem was also noted by Andrea Esuli here, who proposed an incomplete solution.

As stated in the documentation, “[Barrier] enables multiple tasks to cooperatively work on an algorithm in parallel through multiple phases.” Although it does work with System.Threading.Tasks.Task, when the participant count for the Barrier is more than four–the number of cores in my CPU–the method SignalAndWait() blocks with a time out of one second, probably because Barrier blocks a thread serving multiple task. The example below illustrates the problem using Barrier and Task with a large number of tasks spawned. Notwithstanding the efficiency, Barrier.SignalAndWait() is correct.

As noted by Esuli, a solution is to try to coerce each task to be in its own thread using TaskCreationOptions.LongRunning during construction. Unfortunately, this seems to only work outside a debugging session, or for problems with a participant count up to 8 when debugging the program. This limits its use. Esuli also notes to use ManualResetEvent instead of Barrier. Unfortunately, that solution isn’t correct because ManualResetEvent.Set() does not block the execution of the code following the Set() in a task the same way as Barrier.

The best solution I came up with is a task-level barrier, implemented as follows. (1) Replace “SignalAndWait();” in each task with “await Task.Yield();”. (2) Create a custom task scheduler that maintains a special queue for the execution of the asynchronous continuation tasks. The scheduler examines the context of the call to QueueTask(Task task). If the call is for a continuation, place the task on a special queue to be executed after all “normal” tasks. (3) When the final continuation task has been queued, transfer the continuation tasks the the normal queue. (4) Wait for all normal tasks to complete using Task.WaitAll(…). (5) Complete all continuation tasks using RunSynchronously().

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using System.Diagnostics;

namespace TPL_Experiments
{
    class Program
    {
        static void Main(string[] args)
        {
            System.Console.WriteLine("Barrier");
            for (int size = 2; size <= 16; size *= 2)
            {
                FastBarrierTaskScheduler lcts = new FastBarrierTaskScheduler(size);
                TaskFactory factory = new TaskFactory(lcts);
                Barrier b = new Barrier(size);
                DateTime start = DateTime.Now;

                Task[] tasks = new Task[size];
                for (int i = 0; i < size; ++i)
                {
                    int copy = i;
                    Task t = factory.StartNew(
                        () =>
                        {
                            System.Console.WriteLine("a");
                            b.SignalAndWait();
                            System.Console.WriteLine("b");
                        });
                    tasks[i] = t;
                }
                Task.WaitAll(tasks);

                DateTime end = DateTime.Now;
                TimeSpan duration = end - start;
                string durationString = "";
                if (duration.Hours > 0)
                    durationString += duration.Hours + ":";
                durationString += duration.Minutes + ":" + duration.Seconds;
                durationString = String.Format("{0:00}:{1:00}:{2:00}.{3:000}", duration.Hours, duration.Minutes, duration.Seconds, duration.Milliseconds);
                System.Console.Out.Flush();
                System.Console.WriteLine("Time: " + durationString);
            }
            System.Console.WriteLine();


            System.Console.WriteLine("Barrier with TaskCreationOptions.LongRunning");
            for (int size = 2; size <= 16; size *= 2)
            {
                FastBarrierTaskScheduler lcts = new FastBarrierTaskScheduler(size);
                TaskFactory factory = new TaskFactory(lcts);
                Barrier b = new Barrier(size);
                DateTime start = DateTime.Now;

                Task[] tasks = new Task[size];
                for (int i = 0; i < size; ++i)
                {
                    int copy = i;
                    Task t = factory.StartNew(
                        () =>
                        {
                            System.Console.WriteLine("a");
                            b.SignalAndWait();
                            System.Console.WriteLine("b");
                        }, TaskCreationOptions.LongRunning);
                    tasks[i] = t;
                }
                Task.WaitAll(tasks);

                DateTime end = DateTime.Now;
                TimeSpan duration = end - start;
                string durationString = "";
                if (duration.Hours > 0)
                    durationString += duration.Hours + ":";
                durationString += duration.Minutes + ":" + duration.Seconds;
                durationString = String.Format("{0:00}:{1:00}:{2:00}.{3:000}", duration.Hours, duration.Minutes, duration.Seconds, duration.Milliseconds);
                System.Console.Out.Flush();
                System.Console.WriteLine("Time: " + durationString);
            }
            System.Console.WriteLine();

            System.Console.WriteLine("FastBarrier");
            for (int size = 8; size <= 256; size *= 2)
            {
                FastBarrierTaskScheduler lcts = new FastBarrierTaskScheduler(size);
                TaskFactory factory = new TaskFactory(lcts);
                System.Console.WriteLine();
                System.Console.WriteLine("Size = " + size);
                DateTime start = DateTime.Now;

                Task[] tasks = new Task[size];
                for (int i = 0; i < size; ++i)
                {
                    int copy = i;
                    Task t = factory.StartNew(
                        async () =>
                        {
                            System.Console.WriteLine("a" + copy);
                            await Task.Yield();
                            System.Console.WriteLine("b" + copy);
                        });
                    tasks[i] = t;
                }
                // Wait until tasks have synchronized.
                while (true)
                {
                    bool all_sync = true;
                    foreach (Task t in tasks)
                    {
                        if (!t.IsCompleted)
                        {
                            t.Wait();
                            all_sync = false;
                        }
                    }
                    if (all_sync)
                        break;
                }
                // From http://stackoverflow.com/questions/5095183/how-would-i-run-an-async-taskt-method-synchronously
                foreach (Task t in tasks)
                {
                    try { t.RunSynchronously(); }
                    catch { }
                }

                DateTime end = DateTime.Now;
                TimeSpan duration = end - start;
                string durationString = "";
                if (duration.Hours > 0)
                    durationString += duration.Hours + ":";
                durationString += duration.Minutes + ":" + duration.Seconds;
                durationString = String.Format("{0:00}:{1:00}:{2:00}.{3:000}", duration.Hours, duration.Minutes, duration.Seconds, duration.Milliseconds);
                System.Console.Out.Flush();
                System.Console.WriteLine("Time: " + durationString);
            }
            System.Console.WriteLine();
        }
    }

    // Support FastBarrier synchronization by manipulation of the
    // task queue.
    public class FastBarrierTaskScheduler : TaskScheduler
    {
        // Indicates whether the current thread is processing work items.
        [ThreadStatic]
        private static bool _currentThreadIsProcessingItems;

        // The list of tasks to be executed  
        private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks) 

        // The list of continuations to be enqueued after fast barrier signal.
        private readonly LinkedList<Task> _continuation_tasks = new LinkedList<Task>(); // protected by lock(_tasks) 

        // Indicates whether the scheduler is currently processing work items.  
        private int _delegatesQueuedOrRunning = 0;

        private int _participant_count = 0;

        // Creates a new instance with the specified degree of parallelism.  
        public FastBarrierTaskScheduler(int participant_count)
        {
            _participant_count = participant_count;
        }

        // Queues a task to the scheduler.  
        protected sealed override void QueueTask(Task task)
        {
            lock (_tasks)
            {
                // Get call stack and determine if this is queuing of continuation.
                bool special = false;
                StackTrace stackTrace = new StackTrace();
                StackFrame[] stackFrames = stackTrace.GetFrames();
                foreach (StackFrame stackFrame in stackFrames)
                {
                    //System.Console.WriteLine("sf " + stackFrame.GetMethod().Name);
                    if (stackFrame.GetMethod().Name.Contains("QueueContinuation"))
                    {
                        special = true;
                        break;
                    }
                }
                if (special)
                {
                    //System.Console.WriteLine("En sp " + task.Id);
                    _continuation_tasks.AddLast(task);
                    if (_continuation_tasks.Count == _participant_count)
                    {
                        // Execute continuations tasks, and signal.
                        EnqueueContinuationTasks();
                    }
                }
                else
                {
                    // Add the task to the list of tasks to be processed.  If there aren't enough  
                    // delegates currently queued or running to process tasks, schedule another.  
                    //System.Console.WriteLine("En no " + task.Id);
                    _tasks.AddLast(task);
                    ++_delegatesQueuedOrRunning;
                    NotifyThreadPoolOfPendingWork();
                }
            }
        }

        // Inform the ThreadPool that there's work to be executed for this scheduler.  
        private void NotifyThreadPoolOfPendingWork()
        {
            ThreadPool.UnsafeQueueUserWorkItem(_ =>
            {
                // Note that the current thread is now processing work items. 
                // This is necessary to enable inlining of tasks into this thread.
                _currentThreadIsProcessingItems = true;
                try
                {
                    // Process all available items in the queue. 
                    while (true)
                    {
                        Task item;
                        lock (_tasks)
                        {
                            // When there are no more items to be processed, 
                            // note that we're done processing, and get out. 
                            if (_tasks.Count == 0)
                            {
                                --_delegatesQueuedOrRunning;
                                break;
                            }

                            // Get the next item from the queue
                            item = _tasks.First.Value;
                            _tasks.RemoveFirst();
                        }

                        // Execute the task we pulled out of the queue
                        //System.Console.WriteLine("pre " + item.Id);
                        base.TryExecuteTask(item);
                        //System.Console.WriteLine("post " + item.Id);
                    }
                }
                // We're done processing items on the current thread 
                finally { _currentThreadIsProcessingItems = false; }
            }, null);
        }

        // Attempts to execute the specified task on the current thread.  
        protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            // If this thread isn't already processing a task, we don't support inlining 
            if (!_currentThreadIsProcessingItems)
                return false;

            // If the task was previously queued, remove it from the queue 
            if (taskWasPreviouslyQueued)
                // Try to run the task.  
                if (TryDequeue(task))
                    return base.TryExecuteTask(task);
                else
                    return false;
            else
                return base.TryExecuteTask(task);
        }

        // Attempt to remove a previously scheduled task from the scheduler.  
        protected sealed override bool TryDequeue(Task task)
        {
            lock (_tasks)
            {
                bool result = _tasks.Remove(task);
                if (result)
                    return result;
                else
                    return false;
            }
        }

        // Gets the maximum concurrency level supported by this scheduler.  
        public sealed override int MaximumConcurrencyLevel
        {
            get
            {
                return 100000;
            }
        }

        // Gets an enumerable of the tasks currently scheduled on this scheduler.  
        protected sealed override IEnumerable<Task> GetScheduledTasks()
        {
            bool lockTaken = false;
            try
            {
                Monitor.TryEnter(_tasks, ref lockTaken);
                if (lockTaken)
                    return _tasks;
                else throw new NotSupportedException();
            }
            finally
            {
                if (lockTaken) Monitor.Exit(_tasks);
            }
        }

        private void EnqueueContinuationTasks()
        {
            while (_continuation_tasks.Count > 0)
            {
                Task t = _continuation_tasks.First();
                _continuation_tasks.RemoveFirst();
                _tasks.AddLast(t);
                ++_delegatesQueuedOrRunning;
            }
            NotifyThreadPoolOfPendingWork();
        }
    }
}

 

Posted in Tip