Saturday, June 2, 2012

Atomic operations in .NET

Recently, I had a chance to use System.Threading.Interlocked class which provides atomic operations for variables which are shared by multiple thread. It's quite a useful class for implement high performance thread-safe stuff.


Even counter++ operation is not a atomic operation, it's equivalence to counter = counter + 1. So, any other thread can access and modify the counter before the old value + 1 is put into counter variable. With Interlocked.Increment(ref counter)it ensures that counter is increased by 1 and no other thread interfered  into the operation.

With my case, I tried to implement a Lock-free Thread-safe string collection which allow multiple adding strings into it. I could use the Lock-based model, but we would have several drawbacks:
  • It serialize the access to the shared resources, once a thread has gained the locked on the resource, the other threads have to wait until the lock is released, this is quite a limitation to the performance.
  • Chance to cause deadlock, sometimes I got this :).
  • The low priority thread could hold the lock and high priority thread must wait for it. 
I found a very interesting post at http://philosopherdeveloper.wordpress.com/2011/02/23/how-to-build-a-thread-safe-lock-free-resizable-array/ which provides me an excellent algorithm for implementing lock-free random access list. Here is the implementation:
/// 
/// StringCollection is a thread-safe collection which allows adding string 
/// and return a comma-delimited string containing all the added strings when invoking ToString
/// 
public class StringCollection
{
    // _data collection contains all the data segment
    // the size of data is 8 * sizeof(int) = 32 with int's size is 4 bytes
    private string[][] _data = new string[8 * sizeof(int)][];
        
    // _lastIndex contains the last global index
    private int _lastIndex  = -1;

    /// 
    /// Constructor
    /// 
    public StringCollection()
    {
        // Initialize the first array with size 1
        _data[0] = new string[1];
    }        

    /// 
    /// Add a string into the collection and return it's index
    /// 
    /// Index of the string within the collection
    public int AddString(string str)
    {
        // Get the new index and update _lastIndex (this is an atomic operation)
        var index           = Interlocked.Increment(ref _lastIndex);

        // Calculate the index of data segment that the string should be put in
        var segmentIndex    = GetSegmentIndex(index);

        // If the segment has not been initialized then initialize it with calculated size of Math.Power(2, segmentIndex)
        if (_data[segmentIndex] == null)
        {
            var segmentSize = (int)Math.Pow(2, segmentIndex);
            // Atomic operation for intializing segment at segmentIndex
            Interlocked.CompareExchange(ref _data[segmentIndex], new string[segmentSize], null);
        }

        // Put the string into the segment at calculated internal index
        _data[segmentIndex][GetInternalIndex(segmentIndex, index)] = str; 

        // Return the index of the string
        return index;
    }

    /// 
    /// Override ToString for returning a single, comma-delimited string containing all the strings in the internal collection 
    /// 
    /// comma-delimited string
    public override string ToString()
    {           
        // Capture the last index
        var lastIndex       = _lastIndex;
        var joinDelimiter   = ",";

        // Quickly return empty string if the collection is empty
        if (lastIndex == -1) return String.Empty;

        // Calculate last segment index and last internal index on last segment
        var lastSegmentIndex    = GetSegmentIndex(lastIndex);
        var lastInternalIndex   = GetInternalIndex(lastSegmentIndex, lastIndex);

        // Prepare buffer for sub strings
        string[] buffer = new string[lastSegmentIndex + 1];            

        // Loop through all the segment and join all strings into sub strings
        // and put sub string into buffer
        for (var i = 0; i <= lastSegmentIndex; i++)
        {
            if (i < lastSegmentIndex) buffer[i] = string.Join(joinDelimiter, _data[i]);
            else buffer[i] = string.Join(joinDelimiter, _data[i], 0, lastInternalIndex + 1);                
        }
            
        // Return the result by joining the substring in the buffer
        return string.Join(joinDelimiter, buffer);
    }

    /// 
    /// Calculate the internal index on a segment from given global index
    /// 
    /// 
    private int GetInternalIndex(int sergmentIndex, int globalIndex)
    {            
        var segmentSize = (int)Math.Pow(2, sergmentIndex);
        return globalIndex + 1 - segmentSize;
    }

    /// 
    /// Calculate the segment index of an global index
    ///     
    /// 
    private int GetSegmentIndex(int globalIndex)
    {
        return (int)Math.Log(globalIndex + 1, 2);
    }
}


I used Interlocked.Increment and Interlocked.CompareExchange in AddString method. Increment helps me increase the last index and retrieve the new last index with 1 atomic operation. CompareExchange helps me initialize the new segment when no other thread had initialized it, it will compare _data[segmentIndex] with null (the expected current value) and will only replace the _data[segmentIndex] when current _data[segmentIndex] is null.


And this is the test for AddString method, implemented with NUnit 2.6. I tried to spawn 1 to 20 threads and insert several thousands strings into the collections on each thread. All the test took 1.05 sec to complete. 

/// 
/// TestFixture for StringCollection
/// 
[TestFixture]
public class StringCollectionTest
{
    private StringCollection _collection;
    private Dictionary<int, string>[] _inserted;
    private Thread[] _threads;
    private int _numOfThreads, _itemsPerThread;

    /// 
    /// Comparer for KeyValuePair for using with LinQ Expression below
    /// 
    private class KeyValuePairComparer : IEqualityComparer<KeyValuePair<int, string>>
    {
        public bool Equals(KeyValuePair<int, string> x, KeyValuePair<int, string> y)
        {
            return x.Key == y.Key;
        }

        public int GetHashCode(KeyValuePair<int, string> obj)
        {
            return obj.GetHashCode();
        }
    }

    /// 
    /// Set up testcase
    /// 
    [SetUp]
    public void SetUp()
    {
        _collection = new StringCollection();
    }

    /// 
    /// Test method for testing StringCollection 
    /// it runs all the testcases returned from StringCollectionTestCases
    /// 
    [Test, TestCaseSource("StringCollectionTestCases")]
    public void TestStringCollection(int numberOfThreads, int itemsPerThread)
    {
        // Prepare all local variables
        _numOfThreads   = numberOfThreads;
        _itemsPerThread = itemsPerThread;
        var total       = _numOfThreads * _itemsPerThread;

        // Initialize threads
        InitializeThreads();

        // Start threads and wait for them all
        for (int i = 0; i < _numOfThreads; i++) _threads[i].Start(i);
        for (int i = 0; i < _numOfThreads; i++) _threads[i].Join();
            
        // Prepare the expected string
        var expected    = BuildString();

        // Get to comma-delimited string from the collection
        var result      = _collection.ToString();

        // Verify the amount of inserted strings again
        Assert.AreEqual(numberOfThreads * _itemsPerThread, _inserted.Sum(dic => dic.Count));
            
        // Verify result with the expected string 
        Assert.AreEqual(expected, result);
    }

    /// 
    /// Tear down testcase, clean all data
    /// 
    [TearDown]
    public void TearDown()            
    {
        _collection = null;
        _inserted   = null;
        _threads    = null;
        GC.Collect();
        GC.WaitForPendingFinalizers();
    }

    /// 
    /// Initialize threads and _inserted dictionraries        
    /// 
    private void InitializeThreads()
    {
        _threads    = new Thread[_numOfThreads];
        _inserted   = new Dictionary<int,string>[_numOfThreads];
        for (var i = 0; i < _threads.Length; i++)
        {
            _inserted[i]    = new Dictionary<int, string>(_itemsPerThread);
            _threads[i]     = new Thread(obj =>
            {
                RunAddString((int)obj);
            });
        }
    }

    /// 
    /// Add string to the collection and record the return index to thread's _inserted dictionary
    /// 
    /// 


    private void RunAddString(int threadID)
    {
        for (var i = 0; i < _itemsPerThread; i++)
        {
            var str     = string.Format("Thread {0} Value {1}", threadID, i);                
            var index   = _collection.AddString(str);
            _inserted[threadID][index] = str;
        }
    }

    /// 
    /// Build expected string
    /// 
    /// 
    private string BuildString()
    {
        if ((_itemsPerThread == 0) || (_numOfThreads == 0)) return String.Empty;

        var allItems = _inserted.SelectMany(dic => dic.ToArray())
                                .Distinct(new KeyValuePairComparer())
                                .OrderBy(kp => kp.Key)
                                .Select(kp => kp.Value).ToArray();

        // Join the array
        return string.Join(",", allItems);
    }

    /// 
    /// Return testcases for the test
    /// 
    /// 
    private IEnumerable<int[]> StringCollectionTestCases()
    {
        return new int[][] {
            new int[] { 1, 0     },
            new int[] { 1, 1     },
            new int[] { 1, 500   },
            new int[] { 1, 1000  },
            new int[] { 1, 2000  },
            new int[] { 1, 10000 },
            new int[] { 2, 0     },
            new int[] { 2, 1     },
            new int[] { 2, 500   },
            new int[] { 2, 1000  },
            new int[] { 2, 2000  },
            new int[] { 2, 10000 },
            new int[] { 5, 0     },
            new int[] { 5, 1     },
            new int[] { 5, 500   },
            new int[] { 5, 1000  },
            new int[] { 5, 2000  },
            new int[] { 5, 10000 },
            new int[] { 10, 0    },
            new int[] { 10, 1    },
            new int[] { 10, 500  },
            new int[] { 10, 1000 },
            new int[] { 10, 2000 },
            new int[] { 10, 10000},
            new int[] { 20, 0    },
            new int[] { 20, 1    },
            new int[] { 20, 500  },
            new int[] { 20, 1000 },
            new int[] { 20, 2000 },
            new int[] { 20, 10000},
        };
    }
}