File: System\Transactions\TransactionTable.cs
Project: ndp\cdf\src\NetFx20\System.Transactions\System.Transactions.csproj (System.Transactions)
//-----------------------------------------------------------------------------
// <copyright file="TransactionTable.cs" company="Microsoft">
//     Copyright (c) Microsoft Corporation.  All rights reserved.
// </copyright>
//-----------------------------------------------------------------------------
 
namespace System.Transactions
{
    using System;
    using System.Diagnostics;
    using System.Threading;
 
 
    class CheapUnfairReaderWriterLock
    {
        object writerFinishedEvent;
 
        int readersIn;
        int readersOut;
        bool writerPresent;
 
        object syncRoot;
 
        // Spin lock params
        const int MAX_SPIN_COUNT = 100;
        const int SLEEP_TIME = 500;
 
 
        public CheapUnfairReaderWriterLock()
        {
        }
 
 
        object SyncRoot
        {
            get
            {
                if ( this.syncRoot == null )
                {
                    Interlocked.CompareExchange( ref this.syncRoot, new object(), null );
                }
                return this.syncRoot;
            }
        }
 
 
        bool ReadersPresent
        {
            get
            {
                return this.readersIn != this.readersOut;
            }
        }
 
 
        ManualResetEvent WriterFinishedEvent
        {
            get
            {
                if ( this.writerFinishedEvent == null )
                {
                    Interlocked.CompareExchange( ref this.writerFinishedEvent, new ManualResetEvent( true ), null );
                }
                return (ManualResetEvent)this.writerFinishedEvent;
            }
        }
 
 
        public int AcquireReaderLock()
        {
            int readerIndex = 0;
            do
            {
                if ( this.writerPresent )
                {
                    WriterFinishedEvent.WaitOne();
                }
 
                readerIndex = Interlocked.Increment( ref this.readersIn );
 
                if ( !this.writerPresent )
                {
                    break;
                }
 
                Interlocked.Decrement( ref this.readersIn );
            } 
            while ( true );
 
            return readerIndex;
        }
 
 
        public void AcquireWriterLock()
        {
#pragma warning disable 0618
            //@
            Monitor.Enter(this.SyncRoot);
#pragma warning restore 0618
 
            this.writerPresent = true;
            this.WriterFinishedEvent.Reset();
 
            do
            {
                int i = 0;
                while ( ReadersPresent && i < MAX_SPIN_COUNT )
                {
                    Thread.Sleep( 0 );
                    i++;
                }
 
                if ( ReadersPresent )
                {
                    Thread.Sleep( SLEEP_TIME );
                }
            } 
            while ( ReadersPresent );
        }
 
 
        public void ReleaseReaderLock()
        {
            Interlocked.Increment( ref this.readersOut );
        }
 
 
        public void ReleaseWriterLock()
        {
            try
            {
                this.writerPresent = false;
                this.WriterFinishedEvent.Set();
            }
            finally
            {
                Monitor.Exit( this.SyncRoot );
            }
        }
    }
 
 
    // This transaction table implementation uses an array of lists to avoid contention.  The list for a
    // transaction is decided by its hashcode.
    class TransactionTable
    {
        // Use a timer to initiate looking for transactions that have timed out.
        System.Threading.Timer timer;
 
        // Private storage noting if the timer is enabled.
        bool timerEnabled;
 
        // Store the timer interval
        const int timerInternalExponent = 9;
        int timerInterval;
 
        // Store the number of ticks.  A tick is a mark of 1 timer interval.  By counting ticks
        // we can avoid expensive calls to get the current time for every transaction creation.
        const long TicksPerMillisecond = 10000;
        long ticks;
        Int64 lastTimerTime;
        
        // Sets of arrays of transactions.
        BucketSet headBucketSet;
 
        // Synchronize adding transactions with shutting off the timer and started events.
        CheapUnfairReaderWriterLock rwLock;
 
        internal TransactionTable()
        {
            // Create a timer that is initially disabled by specifing an Infinite time to the first interval
            this.timer = new Timer( new TimerCallback(ThreadTimer), null, Timeout.Infinite, this.timerInterval );
 
            // Note that the timer is disabled
            this.timerEnabled = false;
 
            // Store the timer interval
            this.timerInterval = 1 << TransactionTable.timerInternalExponent;
 
            // Ticks start off at zero.
            this.ticks = 0;
 
            // The head of the list is long.MaxValue.  It contains all of the transactions that for
            // some reason or other don't have a timeout.
            this.headBucketSet = new BucketSet( this, long.MaxValue );
 
            // Allocate the lock
            rwLock = new CheapUnfairReaderWriterLock();
        }
 
 
        // Calculate the maximum number of ticks for which this transaction should live
        internal long TimeoutTicks( TimeSpan timeout )
        {
            if ( timeout != TimeSpan.Zero )
            {
                // Note: At the current setting of approximately 2 ticks per second this timer will
                //       wrap in approximately 2^64/2/60/60/24/365=292,471,208,677.5360162195585996
                //       (nearly 300 billion) years.
                long timeoutTicks = ((timeout.Ticks / TimeSpan.TicksPerMillisecond) >> 
                        TransactionTable.timerInternalExponent) + this.ticks;
                return timeoutTicks;
            }
            else
            {
                return long.MaxValue;
            }
        }
 
 
        // Absolute timeout
        internal TimeSpan RecalcTimeout( InternalTransaction tx )
        {
            return TimeSpan.FromMilliseconds( (tx.AbsoluteTimeout - this.ticks) * this.timerInterval );
        }
 
 
        // Creation time
        private Int64 CurrentTime
        {
            get
            {
                if ( this.timerEnabled )
                {
                    return this.lastTimerTime;
                }
                else
                {
                    return DateTime.UtcNow.Ticks;
                }
            }
        }
 
 
        // Add a transaction to the table.  Transactions are added to the end of the list in sorted order based on their 
        // absolute timeout.
        internal int Add( InternalTransaction txNew )
        {
            // Tell the runtime that we are modifying global state.
            Thread.BeginCriticalRegion();
            int readerIndex = 0;
 
            try
            {
                readerIndex = rwLock.AcquireReaderLock();
                try
                {
                    // Start the timer if needed before checking the current time since the current
                    // time can be more efficient with a running timer.
                    if ( txNew.AbsoluteTimeout != long.MaxValue )
                    {
                        if ( !this.timerEnabled )
                        {
                            if ( !this.timer.Change( this.timerInterval, this.timerInterval ))
                            {
                                throw TransactionException.CreateInvalidOperationException(
                                    SR.GetString( SR.TraceSourceLtm ), 
                                    SR.GetString(SR.UnexpectedTimerFailure), 
                                    null
                                    );
                            }
                            this.lastTimerTime = DateTime.UtcNow.Ticks;
                            this.timerEnabled = true;
                        }
                    }
                    txNew.CreationTime = CurrentTime;
 
                    AddIter( txNew );
                }
                finally
                {
                    rwLock.ReleaseReaderLock();
                }
            }
            finally
            {
                Thread.EndCriticalRegion();
            }
 
            return readerIndex;
        }
 
 
        void AddIter( InternalTransaction txNew )
        {
            //
            // Theory of operation.
            //
            // Note that the head bucket contains any transaction with essentially infinite
            // timeout (long.MaxValue).  The list is sorted in decending order.  To add
            // a node the code must walk down the list looking for a set of bucket that matches
            // the absolute timeout value for the transaction.  When it is found it passes
            // the insert down to that set.
            //
            // An importent thing to note about the list is that forward links are all weak
            // references and reverse links are all strong references.  This allows the GC
            // to clean up old links in the list so that they don't need to be removed manually.
            // However if there is still a rooted strong reference to an old link in the
            // chain that link wont fall off the list because there is a strong reference held
            // forward.
            //
            
            BucketSet currentBucketSet = this.headBucketSet;
 
            while ( currentBucketSet.AbsoluteTimeout != txNew.AbsoluteTimeout )
            {
                BucketSet lastBucketSet = null;
                do
                {
                    WeakReference nextSetWeak = (WeakReference)currentBucketSet.nextSetWeak;
                    BucketSet nextBucketSet = null;
                    if ( nextSetWeak != null )
                    {
                        nextBucketSet = (BucketSet)nextSetWeak.Target;
                    }
                    
                    if ( nextBucketSet == null )
                    {
                        //
                        // We've reached the end of the list either because nextSetWeak was null or
                        // because its reference was collected.  This code doesn't care.  Make a new
                        // set, attempt to attach it and move on.
                        //
                        BucketSet newBucketSet = new BucketSet( this, txNew.AbsoluteTimeout );
                        WeakReference newSetWeak = new WeakReference( newBucketSet );
 
                        WeakReference oldNextSetWeak = (WeakReference)Interlocked.CompareExchange( 
                            ref currentBucketSet.nextSetWeak, newSetWeak, nextSetWeak );
                        if ( oldNextSetWeak == nextSetWeak )
                        {
                            // Ladies and Gentlemen we have a winner.
                            newBucketSet.prevSet = currentBucketSet;
                        }
 
                        // Note that at this point we don't update currentBucketSet.  On the next loop
                        // iteration we should be able to pick up where we left off.
                    }
                    else
                    {
                        lastBucketSet = currentBucketSet;
                        currentBucketSet = nextBucketSet;
                    }
                }
                while ( currentBucketSet.AbsoluteTimeout > txNew.AbsoluteTimeout );
 
                if ( currentBucketSet.AbsoluteTimeout != txNew.AbsoluteTimeout )
                {
                    //
                    // Getting to here means that we've found a slot in the list where this bucket set should go.
                    //
                    BucketSet newBucketSet = new BucketSet( this, txNew.AbsoluteTimeout );
                    WeakReference newSetWeak = new WeakReference( newBucketSet );
 
                    newBucketSet.nextSetWeak = lastBucketSet.nextSetWeak;
                    WeakReference oldNextSetWeak = (WeakReference)Interlocked.CompareExchange( 
                        ref lastBucketSet.nextSetWeak, newSetWeak, newBucketSet.nextSetWeak );
                    if ( oldNextSetWeak == newBucketSet.nextSetWeak )
                    {
                        // Ladies and Gentlemen we have a winner.
                        if ( oldNextSetWeak != null )
                        {
                            BucketSet oldSet = (BucketSet)oldNextSetWeak.Target;
                            if ( oldSet != null )
                            {
                                // prev references are just there to root things for the GC.  If this object is 
                                // gone we don't really care.
                                oldSet.prevSet = newBucketSet;
                            }
                        }
                        newBucketSet.prevSet = lastBucketSet;
                    }
 
                    // Special note - We are going to loop back to the BucketSet that preceeds the one we just tried
                    // to insert because we may have lost the ---- to insert our new BucketSet into the list to another
                    // "Add" thread. By looping back, we check again to see if the BucketSet we just created actually
                    // got added. If it did, we will exit out of the outer loop and add the transaction. But if we
                    // lost the ----, we will again try to add a new BucketSet. In the latter case, the BucketSet
                    // we created during the first iteration will simply be Garbage Collected because there are no
                    // strong references to it since we never added the transaction to a bucket and the act of
                    // creating the second BucketSet with remove the backward reference that was created in the
                    // first trip thru the loop.
                    currentBucketSet = lastBucketSet;
                    lastBucketSet = null;
 
                    // The outer loop will iterate and pick up where we left off.
                }
            }
 
            //
            // Great we found a spot.
            //
            currentBucketSet.Add( txNew );
        }
 
 
        // Remove a transaction from the table.
        internal void Remove( InternalTransaction tx )
        {
            tx.tableBucket.Remove( tx );
            tx.tableBucket = null;
        }
 
 
        // Process a timer event
        private void ThreadTimer( Object state )
        {
            //
            // Theory of operation.
            //
            // To timeout transactions we must walk down the list starting from the head
            // until we find a link with an absolute timeout that is greater than our own.
            // At that point everything further down in the list is elegable to be timed
            // out.  So simply remove that link in the list and walk down from that point
            // timing out any transaction that is found.
            //
            
            // There could be a ---- between this callback being queued and the timer
            // being disabled.  If we get here when the timer is disabled, just return.
            if ( !this.timerEnabled )
            {
                return;
            }
 
            // Increment the number of ticks
            this.ticks++;
            this.lastTimerTime = DateTime.UtcNow.Ticks;
 
            //
            // First find the starting point of transactions that should time out.  Every transaction after
            // that point will timeout so once we've found it then it is just a matter of traversing the
            // structure.
            //
            BucketSet lastBucketSet = null; 
            BucketSet currentBucketSet = this.headBucketSet; // The list always has a head.
 
            // Acquire a writer lock before checking to see if we should disable the timer.
            // Adding of transactions acquires a reader lock and might insert a new BucketSet.
            // If that ----s with our check for a BucketSet existing, we may not timeout that
            // transaction that is being added.
            WeakReference nextWeakSet = null;
            BucketSet nextBucketSet = null;
 
            nextWeakSet = (WeakReference)currentBucketSet.nextSetWeak;
            if (nextWeakSet != null)
            {
                nextBucketSet = (BucketSet)nextWeakSet.Target;
            }
            
            if (nextBucketSet == null)
            {
                this.rwLock.AcquireWriterLock();
                try
                {
                    // Access the nextBucketSet again in writer lock to account for any ---- before disabling the timeout. 
                    nextWeakSet = (WeakReference)currentBucketSet.nextSetWeak;
                    if (nextWeakSet != null)
                    {
                        nextBucketSet = (BucketSet)nextWeakSet.Target;
                    }
 
                    if (nextBucketSet == null)
                    {
                        //
                        // Special case to allow for disabling the timer.
                        //
                        // If there are no transactions on the timeout list we can disable the
                        // timer.
                        if (!this.timer.Change(Timeout.Infinite, Timeout.Infinite))
                        {
                            throw TransactionException.CreateInvalidOperationException(
                                SR.GetString(SR.TraceSourceLtm),
                                SR.GetString(SR.UnexpectedTimerFailure),
                                null
                                );
                        }
                        this.timerEnabled = false;
 
                        return;
                    }
                }
                finally
                {
                    this.rwLock.ReleaseWriterLock();
                }
            }
 
            // Note it is slightly subtle that we always skip the head node.  This is done
            // on purpose because the head node contains transactions with essentially 
            // an infinite timeout.
            do
            {
                do
                {
                    nextWeakSet = (WeakReference)currentBucketSet.nextSetWeak;
                    if ( nextWeakSet == null )
                    {
                        // Nothing more to do.
                        return;
                    }
 
                    nextBucketSet = (BucketSet)nextWeakSet.Target;
                    if ( nextBucketSet == null )
                    {
                        // Again nothing more to do.
                        return;
                    }
                    lastBucketSet = currentBucketSet;
                    currentBucketSet = nextBucketSet;
                }
                while ( currentBucketSet.AbsoluteTimeout > this.ticks );
 
                // Tell the runtime that we are modifying global state.
                Thread.BeginCriticalRegion();
                try
                {
                    //
                    // Pinch off the list at this point making sure it is still the correct set.
                    //
                    // Note: We may lose a ---- with an "Add" thread that is inserting a BucketSet in this location in
                    // the list. If that happens, this CompareExchange will not be performed and the returned abortingSetsWeak
                    // value will NOT equal nextWeakSet. But we check for that and if this condition occurs, this iteration of
                    // the timer thread will simply return, not timing out any transactions. When the next timer interval
                    // expires, the thread will walk the list again, find the appropriate BucketSet to pinch off, and
                    // then time out the transactions. This means that it is possible for a transaction to live a bit longer,
                    // but not much.
                    WeakReference abortingSetsWeak = 
                        (WeakReference)Interlocked.CompareExchange( ref lastBucketSet.nextSetWeak, null, nextWeakSet );
 
                    if ( abortingSetsWeak == nextWeakSet )
                    {
                        // Yea - now proceed to abort the transactions.
                        BucketSet abortingBucketSets = null;
 
                        do
                        {
                            if ( abortingSetsWeak != null )
                            {
                                abortingBucketSets = (BucketSet)abortingSetsWeak.Target;
                            }
                            else
                            {
                                abortingBucketSets = null;
                            }
                            if ( abortingBucketSets != null )
                            {
                                abortingBucketSets.TimeoutTransactions();
                                abortingSetsWeak = (WeakReference)abortingBucketSets.nextSetWeak;
                            }
                        } 
                        while ( abortingBucketSets != null );
 
                        // That's all we needed to do.
                        break;
                    }
                }
                finally
                {
                    Thread.EndCriticalRegion();
                }
 
                // We missed pulling the right transactions off.  Loop back up and try again.
                currentBucketSet = lastBucketSet;
            }
            while ( true );
        }
    }
 
 
    class BucketSet
    {
        // Buckets are kept in sets.  Each element of a set will have the same absoluteTimeout.
        internal object nextSetWeak;
        internal BucketSet prevSet;
 
        TransactionTable table;
 
        long absoluteTimeout;
 
        internal Bucket headBucket;
 
        internal BucketSet( TransactionTable table, long absoluteTimeout )
        {
            this.headBucket = new Bucket( this );
            this.table = table;
            this.absoluteTimeout = absoluteTimeout;
        }
 
 
        internal long AbsoluteTimeout
        {
            get
            {
                return this.absoluteTimeout;
            }
        }
 
 
        internal void Add( InternalTransaction newTx )
        {
            while ( !this.headBucket.Add( newTx ));
        }
 
 
        internal void TimeoutTransactions()
        {
            Bucket currentBucket = this.headBucket;
            // It will always have a head.
            do
            {
                currentBucket.TimeoutTransactions();
 
                WeakReference nextWeakBucket = (WeakReference)currentBucket.nextBucketWeak;
                if ( nextWeakBucket != null )
                {
                    currentBucket = (Bucket)nextWeakBucket.Target;
                }
                else
                {
                    currentBucket = null;
                }
            } 
            while ( currentBucket != null );
        }
    }
 
 
    class Bucket
    {
        bool timedOut;
        int index;
        int size;
        InternalTransaction[] transactions;
        internal WeakReference nextBucketWeak;
        Bucket previous;
 
        BucketSet owningSet;
 
        internal Bucket( BucketSet owningSet )
        {
            this.timedOut = false;
            this.index = -1;
            this.size = 1024; // A possible design change here is to have this scale dynamically based on load.
            transactions = new InternalTransaction[this.size];
            this.owningSet = owningSet;
        }
 
 
        internal bool Add( InternalTransaction tx )
        {
            int currentIndex = Interlocked.Increment( ref this.index );
            if ( currentIndex < this.size )
            {
                tx.tableBucket = this;
                tx.bucketIndex = currentIndex;
                Thread.MemoryBarrier(); // This data must be written before the transaction 
                                        // could be timed out.
                this.transactions[currentIndex] = tx;
 
                if ( this.timedOut )
                {
                    lock ( tx )
                    {
                        tx.State.Timeout( tx );
                    }
                }
            }
            else
            {
                Bucket newBucket = new Bucket( this.owningSet );
                newBucket.nextBucketWeak = new WeakReference( this );
 
                Bucket oldBucket = Interlocked.CompareExchange( ref this.owningSet.headBucket, newBucket, this );
                if ( oldBucket == this )
                {
                    // ladies and gentlemen we have a winner.
                    this.previous = newBucket;
                }
 
                return false;
            }
            return true;
        }
 
 
        internal void Remove( InternalTransaction tx )
        {
            this.transactions[tx.bucketIndex] = null;
        }
 
 
        internal void TimeoutTransactions()
        {
            int i;
            int transactionCount = this.index;
 
            this.timedOut = true;
            Thread.MemoryBarrier();
 
            for ( i = 0; i <= transactionCount && i < this.size; i++ )
            {
                Debug.Assert( transactionCount == this.index, "Index changed timing out transactions" );
                InternalTransaction tx = this.transactions[i];
                if ( tx != null )
                {
                    lock ( tx )
                    {
                        tx.State.Timeout( tx );
                    }
                }
            }
        }
    }
}