Controlling access to shared resources in an asynchronous world

In my last post I introduced the AsyncLock and AsyncDebouncedFunction.  AsyncLock is a fantastic way of asynchronously holding a ‘lock’ without actually blocking threads.  It is the natural go to class for resource synchronization, but there are alternatives (with caveats).

In .Net 4.5 the ConcurrentExclusiveSchedulerPair was introduced, offering a powerful approach to concurrency control.  Using this class, we can schedule tasks to run concurrently or exclusively, and at first glance it appears to offer a powerful alternative to my AsyncDebouncedFunction.

Consider the following code:

int a = 0;
Stopwatch s = new Stopwatch();
// Create a task factory that schedules tasks to run exclusively.
TaskFactory exclusiveFactory = new TaskFactory(new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler);
Func<cancellationtoken, task<int="">> runner = token => exclusiveFactory.StartNew(
	() => {
		int run = Interlocked.Increment(ref a);
		Console.WriteLine("{0}: Started {1}", s.Elapsed, run);
		Thread.Sleep(1000);

		Console.WriteLine("{0}: Completed {1}", s.Elapsed, run);
		return run;
	}, token);
Console.WriteLine();
Console.WriteLine("Testing ConcurrentExclusiveSchedulerPair");
a=0;
s.Restart();
try {
	await Task.WhenAll(
		runner(new CancellationTokenSource(TimeSpan.FromSeconds(0.5)).Token)
			.ContinueWith(i => Console.WriteLine("{0}: Result {1}", s.Elapsed, i.Result), TaskContinuationOptions.OnlyOnRanToCompletion),
		runner(CancellationToken.None)
			.ContinueWith(i => Console.WriteLine("{0}: Result {1}", s.Elapsed, i.Result), TaskContinuationOptions.OnlyOnRanToCompletion),
		runner(CancellationToken.None)
			.ContinueWith(i => Console.WriteLine("{0}: Result {1}", s.Elapsed, i.Result), TaskContinuationOptions.OnlyOnRanToCompletion));
} catch (TaskCanceledException e) {
	Console.WriteLine(e.Message);
}

This creates a new TaskFactory which makes use of the ConcurrentExclusiveSchedulerPair‘s ExclusiveScheduler.  Any tasks created on this factory will run sequentially rather than concurrently, which looks like a great start.  In fact, running this code we get the following result:

Testing ConcurrentExclusiveSchedulerPair
00:00:00.0012267: Started 1
00:00:01.0013490: Completed 1
00:00:01.0014078: Started 2
00:00:01.0017885: Result 1
00:00:02.0019925: Completed 2
00:00:02.0021238: Started 3
00:00:02.0024653: Result 2
00:00:03.0022518: Completed 3
00:00:03.0026929: Result 3

Showing each task runs one after the other, as we expect.  The first problem is that this doesn’t take into account when we created each task, so we don’t get the result re-use we had with AsyncDebouncedFunction.  For that we’d have to create a slightly more involved class, that only runs the inner function on the ExclusiveScheduler.  Before we jump into such an implementation though, consider what happens if we run asynchronous code on the scheduler that yields, for example:

int a = 0;
Stopwatch s = new Stopwatch();
// Create a task factory that schedules tasks to run exclusively.
TaskFactory exclusiveFactory = new TaskFactory(new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler);
Func<cancellationtoken, task<int="">> runner = token => exclusiveFactory.StartNew(
	async () => {
		int run = Interlocked.Increment(ref a);
		Console.WriteLine("{0}: Started {1}", s.Elapsed, run);
		await Task.Delay(1000, token);

		Console.WriteLine("{0}: Completed {1}", s.Elapsed, run);
		return run;
	}, token).Unwrap();
Console.WriteLine();
Console.WriteLine("Testing ConcurrentExclusiveSchedulerPair");
a=0;
s.Restart();
try {
	await Task.WhenAll(
		runner(new CancellationTokenSource(TimeSpan.FromSeconds(0.5)).Token)
			.ContinueWith(i => Console.WriteLine("{0}: Result {1}", s.Elapsed, i.Result), TaskContinuationOptions.OnlyOnRanToCompletion),
		runner(CancellationToken.None)
			.ContinueWith(i => Console.WriteLine("{0}: Result {1}", s.Elapsed, i.Result), TaskContinuationOptions.OnlyOnRanToCompletion),
		runner(CancellationToken.None)
			.ContinueWith(i => Console.WriteLine("{0}: Result {1}", s.Elapsed, i.Result), TaskContinuationOptions.OnlyOnRanToCompletion));
} catch (TaskCanceledException e) {
	Console.WriteLine(e.Message);
}

Here, we’ve told the

runner

  to run asynchronous code that uses Task.Delay() instead of Thread.Sleep().  Note the Unwrap() call as we’re running an asynchronous operation asynchronously!  The result of running this code is not what you might hope (unless you’re paying attention):

Testing ConcurrentExclusiveSchedulerPair
00:00:00.0017058: Started 1
00:00:00.0022739: Started 2
00:00:00.0022872: Started 3
00:00:01.0161957: Completed 3
00:00:01.0162235: Completed 2
00:00:01.0165797: Result 2
00:00:01.0166038: Result 3
A task was canceled.

At first glance there doesn’t appear to be any concurrency at all!  In fact, that’s not entirely true at all, the ExclusiveScheduler is in fact exclusively scheduling the tasks, however Task.Delay() is non-blocking and so yields execution back to the scheduler which allows a waiting task to execute, we can best illustrate this by tweaking our function slightly:

async () => {
		int run = Interlocked.Increment(ref a);
		Console.WriteLine("{0}: Started {1}", s.Elapsed, run);
		Thread.Sleep(100);
		Console.WriteLine("{0}: Yielding {1}", s.Elapsed, run);
		await Task.Delay(800, token);

		Console.WriteLine("{0}: Restarting {1}", s.Elapsed, run);
		Thread.Sleep(100);
		Console.WriteLine("{0}: Completed {1}", s.Elapsed, run);
		return run;
	}

Our function now blocks for 100ms on either side of the 800ms delay, which should illustrate the exclusivity nicely:

Testing ConcurrentExclusiveSchedulerPair
00:00:00.0015790: Started 1
00:00:00.1020952: Yielding 1
00:00:00.1029492: Started 2
00:00:00.2035512: Yielding 2
00:00:00.2036396: Started 3
00:00:00.3043756: Yielding 3
00:00:01.0145095: Restarting 2
00:00:01.1156219: Completed 2
00:00:01.1156772: Restarting 3
00:00:01.1160992: Result 2
00:00:01.2159296: Completed 3
00:00:01.2163921: Result 3
A task was canceled.

Now we can see that each task doesn’t start until the preceding task yields control to the scheduler, so we do still have exclusivity, and the tasks are not running at the same time as each other.  However, they are free to interleave each other.  The take home from these examples is that the ConcurrentExclusiveSchedulerPair‘s ExclusiveScheduler can prevent concurrent access to a shared resource, but the resource may be accessed by another thread at any point you yield control.  When you need to hold a resource across yields then AsyncLock is a more appropriate choice.

The other advantage of ConcurrentExclusiveSchedulerPair is that is also has a ConcurrentScheduler that allows concurrent scheduling (with a configurable concurrency limit, like AsyncSemaphore), that won’t overlap with tasks run on the ExclusiveScheduler, in this way you can easily simulate a reader/writer lock, with the same caveats as above.  Of course, Stephen Toub also provides an AsyncReaderWriterLock for holding locks across yields.

Related Images: