Sunday, April 22, 2007

Thread safe observable collection

If you ever wrote something more, then standard UI input-output, you sure got into wide range of exceptions like "The calling thread cannot access this object because a different thread owns it" or "This type of CollectionView does not support changes to its SourceCollection from a thread different from the Dispatcher thread". What the problem? The problem is old like condom in my wallet world-old, the try to get into resources from different threads. Let me show you some code. Very-very bad code. It's unbelievable, that this code can ever work.

void onAdd(object sender, RoutedEventArgs e)
{
new Thread(new ParameterizedThreadStart(addProc)).Start(Resources["source"] as MyCollection);
}
void onRem(object sender, RoutedEventArgs e)
{
new Thread(new ParameterizedThreadStart(remProc)).Start(Resources["source"] as MyCollection);
}
void onMov(object sender, RoutedEventArgs e)
{
new Thread(new ParameterizedThreadStart(movProc)).Start(Resources["source"] as MyCollection);
}

<Window.Resources>
<
l:MyCollection x:Key="source"/>
</
Window.Resources>
<
StackPanel DataContext="{StaticResource source}">
<
Button Click="onAdd">Add</Button>
<
Button Click="onMov">Move</Button>
<
Button Click="onRem">Remove</Button>
<ListBox ItemsSource="{Binding}"/>
</
StackPanel>

 


Does not it looks scare? Such code might open unbelievable number of thread. Nothing will stay for it. But, wait. Let's try to imagine, that we have a bunch of system methods, such as RAM queries, or kind of Amazon web service with million of transactions per second. Even very smart programmer will work with at least 10 threads to update the same collection. So, what to do?


Let's write Thread Safe Observable Collection, that knows, that it might be called from big number of threads and take care on itself. Sounds good? Let's start.


First of all we'll try to get current STAThread (UI) dispatcher. How to do it? Simple. The only place we'll initialize our collection is in XAML, so default constructor will be called from the thread we need. Now, we save it


Dispatcher _dispatcher;
public ThreadSafeObservableCollection()
{
_dispatcher =
Dispatcher.CurrentDispatcher;
}

 


The next step is to override all vital methods of our collection to be invoked from this thread.


        protected override void ClearItems()
{
if (_dispatcher.CheckAccess())
{
base.ClearItems();
            }
else
{
_dispatcher.Invoke(
DispatcherPriority.DataBind, (SendOrPostCallback)delegate { Clear(); });
}
}

 


 Let's understand it. First we're checking if the current thread can access the instance of the object, if it can - we'll just do our work, but if not, we'll invoke it in the context of the current thread by sending anonymous delegate with or without parameters.


Why I'm casting the anonymous delegate into SendOrPostCallback? The answer is simple - look into Reflector. You can not implement your delegate with parameter and without return value better :)


The next step is to take care on locking. We can use old Lock(object) method, but why to do it, if I can continue read my information, while someone writing it. Let's use ReaderWriterLock. This class makes us able to write and read information concurrently, but still preventing us from writing things at the same simple. Due to the fact, that we are using this lock instance inside at one place we can use UpgradeToWriterLock and DowngradeFromWriterLock instead of real locking. Let's see how to do it


        protected override void InsertItem(int index, T item)
{
if (_dispatcher.CheckAccess())
{
if (index > this.Count)
return;
LockCookie c = _lock.UpgradeToWriterLock(-1);
base.InsertItem(index, item);
_lock.DowngradeFromWriterLock(
ref c);
}
else
{
object[] e = new object[] { index, item };
_dispatcher.Invoke(
DispatcherPriority.DataBind, (SendOrPostCallback)delegate { InsertItemImpl(e); }, e);
}
}
void InsertItemImpl(object[] e)
{
if (_dispatcher.CheckAccess())
{
InsertItem((
int)e[0], (T)e[1]);
}
else
{
_dispatcher.Invoke(
DispatcherPriority.DataBind, (SendOrPostCallback)delegate { InsertItemImpl(e); });
}
}

 


Sure, you have to initialize _lock = new ReaderWriterLock() in the constructor. Pay attention, that I'm using the same delegate even with two parameters. It still much better and faster, that write your own delegates for each type of action. Let the framework to do work for you.


After overwriting all vital methods for ObservableCollection, we're actually, finished. Now, you can derive from your new class and perform the code at the beginning on the article very fast. The smart thread safe observable collection takes care on all the least.


Download and run following example, after playing a bit, check "Fast" checkbox and start pressing buttons as wide spirit. The application will work as requested.


Source code for this article

1 comment:

Christopher said...

Tamir,

Very interesting and enjoyable post!

I'm probably missing something simple here, but I was wondering why the locking operations? When you check access, you've verified you are on the Dispatcher thread and are guarenteed to finish your operation before another on that same thread. When you que the work through Invoke you are sure the delegate will be executing on the thread and are in the same scenario.

In both cases it doesn't seem possible for another thread to simultaneously acess the memory locations in question because any attempt to do so will be queued and executed on the Dispatcher's thread.

Thanks for any added insight. I'm still wrapping my head around designing a pattern for ObservableCollection updates via threading.