The reference for this class is here
http://msdn.microsoft.com/en-us/library/system.threading.interlocked.aspx
Even counter++ operation is not a atomic operation, it's equivalence to counter = counter + 1. So, any other thread can access and modify the counter before the old value + 1 is put into counter variable. With Interlocked.Increment(ref counter), it ensures that counter is increased by 1 and no other thread interfered into the operation.
With my case, I tried to implement a Lock-free Thread-safe string collection which allow multiple adding strings into it. I could use the Lock-based model, but we would have several drawbacks:
- It serialize the access to the shared resources, once a thread has gained the locked on the resource, the other threads have to wait until the lock is released, this is quite a limitation to the performance.
- Chance to cause deadlock, sometimes I got this :).
- The low priority thread could hold the lock and high priority thread must wait for it.
I found a very interesting post at http://philosopherdeveloper.wordpress.com/2011/02/23/how-to-build-a-thread-safe-lock-free-resizable-array/ which provides me an excellent algorithm for implementing lock-free random access list. Here is the implementation:
I used Interlocked.Increment and Interlocked.CompareExchange in AddString method. Increment helps me increase the last index and retrieve the new last index with 1 atomic operation. CompareExchange helps me initialize the new segment when no other thread had initialized it, it will compare _data[segmentIndex] with null (the expected current value) and will only replace the _data[segmentIndex] when current _data[segmentIndex] is null.
And this is the test for AddString method, implemented with NUnit 2.6. I tried to spawn 1 to 20 threads and insert several thousands strings into the collections on each thread. All the test took 1.05 sec to complete.
////// StringCollection is a thread-safe collection which allows adding string /// and return a comma-delimited string containing all the added strings when invoking ToString /// public class StringCollection { // _data collection contains all the data segment // the size of data is 8 * sizeof(int) = 32 with int's size is 4 bytes private string[][] _data = new string[8 * sizeof(int)][]; // _lastIndex contains the last global index private int _lastIndex = -1; ////// Constructor /// public StringCollection() { // Initialize the first array with size 1 _data[0] = new string[1]; } ////// Add a string into the collection and return it's index /// ///Index of the string within the collection public int AddString(string str) { // Get the new index and update _lastIndex (this is an atomic operation) var index = Interlocked.Increment(ref _lastIndex); // Calculate the index of data segment that the string should be put in var segmentIndex = GetSegmentIndex(index); // If the segment has not been initialized then initialize it with calculated size of Math.Power(2, segmentIndex) if (_data[segmentIndex] == null) { var segmentSize = (int)Math.Pow(2, segmentIndex); // Atomic operation for intializing segment at segmentIndex Interlocked.CompareExchange(ref _data[segmentIndex], new string[segmentSize], null); } // Put the string into the segment at calculated internal index _data[segmentIndex][GetInternalIndex(segmentIndex, index)] = str; // Return the index of the string return index; } ////// Override ToString for returning a single, comma-delimited string containing all the strings in the internal collection /// ///comma-delimited string public override string ToString() { // Capture the last index var lastIndex = _lastIndex; var joinDelimiter = ","; // Quickly return empty string if the collection is empty if (lastIndex == -1) return String.Empty; // Calculate last segment index and last internal index on last segment var lastSegmentIndex = GetSegmentIndex(lastIndex); var lastInternalIndex = GetInternalIndex(lastSegmentIndex, lastIndex); // Prepare buffer for sub strings string[] buffer = new string[lastSegmentIndex + 1]; // Loop through all the segment and join all strings into sub strings // and put sub string into buffer for (var i = 0; i <= lastSegmentIndex; i++) { if (i < lastSegmentIndex) buffer[i] = string.Join(joinDelimiter, _data[i]); else buffer[i] = string.Join(joinDelimiter, _data[i], 0, lastInternalIndex + 1); } // Return the result by joining the substring in the buffer return string.Join(joinDelimiter, buffer); } ////// Calculate the internal index on a segment from given global index /// ///private int GetInternalIndex(int sergmentIndex, int globalIndex) { var segmentSize = (int)Math.Pow(2, sergmentIndex); return globalIndex + 1 - segmentSize; } /// /// Calculate the segment index of an global index /// ///private int GetSegmentIndex(int globalIndex) { return (int)Math.Log(globalIndex + 1, 2); } }
I used Interlocked.Increment and Interlocked.CompareExchange in AddString method. Increment helps me increase the last index and retrieve the new last index with 1 atomic operation. CompareExchange helps me initialize the new segment when no other thread had initialized it, it will compare _data[segmentIndex] with null (the expected current value) and will only replace the _data[segmentIndex] when current _data[segmentIndex] is null.
And this is the test for AddString method, implemented with NUnit 2.6. I tried to spawn 1 to 20 threads and insert several thousands strings into the collections on each thread. All the test took 1.05 sec to complete.
////// TestFixture for StringCollection /// [TestFixture] public class StringCollectionTest { private StringCollection _collection; private Dictionary<int, string>[] _inserted; private Thread[] _threads; private int _numOfThreads, _itemsPerThread; ////// Comparer for KeyValuePair for using with LinQ Expression below /// private class KeyValuePairComparer : IEqualityComparer<KeyValuePair<int, string>> { public bool Equals(KeyValuePair<int, string> x, KeyValuePair<int, string> y) { return x.Key == y.Key; } public int GetHashCode(KeyValuePair<int, string> obj) { return obj.GetHashCode(); } } ////// Set up testcase /// [SetUp] public void SetUp() { _collection = new StringCollection(); } ////// Test method for testing StringCollection /// it runs all the testcases returned from StringCollectionTestCases /// [Test, TestCaseSource("StringCollectionTestCases")] public void TestStringCollection(int numberOfThreads, int itemsPerThread) { // Prepare all local variables _numOfThreads = numberOfThreads; _itemsPerThread = itemsPerThread; var total = _numOfThreads * _itemsPerThread; // Initialize threads InitializeThreads(); // Start threads and wait for them all for (int i = 0; i < _numOfThreads; i++) _threads[i].Start(i); for (int i = 0; i < _numOfThreads; i++) _threads[i].Join(); // Prepare the expected string var expected = BuildString(); // Get to comma-delimited string from the collection var result = _collection.ToString(); // Verify the amount of inserted strings again Assert.AreEqual(numberOfThreads * _itemsPerThread, _inserted.Sum(dic => dic.Count)); // Verify result with the expected string Assert.AreEqual(expected, result); } ////// Tear down testcase, clean all data /// [TearDown] public void TearDown() { _collection = null; _inserted = null; _threads = null; GC.Collect(); GC.WaitForPendingFinalizers(); } ////// Initialize threads and _inserted dictionraries /// private void InitializeThreads() { _threads = new Thread[_numOfThreads]; _inserted = new Dictionary<int,string>[_numOfThreads]; for (var i = 0; i < _threads.Length; i++) { _inserted[i] = new Dictionary<int, string>(_itemsPerThread); _threads[i] = new Thread(obj => { RunAddString((int)obj); }); } } ////// Add string to the collection and record the return index to thread's _inserted dictionary /// /// private void RunAddString(int threadID) { for (var i = 0; i < _itemsPerThread; i++) { var str = string.Format("Thread {0} Value {1}", threadID, i); var index = _collection.AddString(str); _inserted[threadID][index] = str; } } ////// Build expected string /// ///private string BuildString() { if ((_itemsPerThread == 0) || (_numOfThreads == 0)) return String.Empty; var allItems = _inserted.SelectMany(dic => dic.ToArray()) .Distinct(new KeyValuePairComparer()) .OrderBy(kp => kp.Key) .Select(kp => kp.Value).ToArray(); // Join the array return string.Join(",", allItems); } /// /// Return testcases for the test /// ///private IEnumerable<int[]> StringCollectionTestCases() { return new int[][] { new int[] { 1, 0 }, new int[] { 1, 1 }, new int[] { 1, 500 }, new int[] { 1, 1000 }, new int[] { 1, 2000 }, new int[] { 1, 10000 }, new int[] { 2, 0 }, new int[] { 2, 1 }, new int[] { 2, 500 }, new int[] { 2, 1000 }, new int[] { 2, 2000 }, new int[] { 2, 10000 }, new int[] { 5, 0 }, new int[] { 5, 1 }, new int[] { 5, 500 }, new int[] { 5, 1000 }, new int[] { 5, 2000 }, new int[] { 5, 10000 }, new int[] { 10, 0 }, new int[] { 10, 1 }, new int[] { 10, 500 }, new int[] { 10, 1000 }, new int[] { 10, 2000 }, new int[] { 10, 10000}, new int[] { 20, 0 }, new int[] { 20, 1 }, new int[] { 20, 500 }, new int[] { 20, 1000 }, new int[] { 20, 2000 }, new int[] { 20, 10000}, }; } }
No comments:
Post a Comment