File: System\Transactions\VolatileEnlistmentMultiplexing.cs
Project: ndp\cdf\src\NetFx20\System.Transactions\System.Transactions.csproj (System.Transactions)
//-----------------------------------------------------------------------------
// <copyright file="VolatileEnlistmentMultiplexing.cs" company="Microsoft">
//     Copyright (c) Microsoft Corporation.  All rights reserved.
// </copyright>
//-----------------------------------------------------------------------------
 
namespace System.Transactions
{
    using System;
    using System.Diagnostics;
    using System.Threading;
    using System.Transactions;
    using System.Transactions.Diagnostics;
 
 
    // The volatile Demultiplexer is a fanout point for promoted volatile enlistments.
    // When a transaction is promoted a single volatile enlistment is created in the new
    // transaction for all volitile enlistments on the transaction.  When the VolatileDemux
    // receives a preprepare it will fan that notification out to all of the enlistments
    // on the transaction.  When it has gathered all of the responses it will send a 
    // single vote back to the DistributedTransactionManager.
    internal abstract class VolatileDemultiplexer : IEnlistmentNotificationInternal
    {
        // Reference the transactions so that we have access to it's enlistments
        protected InternalTransaction transaction;
 
        // Store the IVolatileEnlistment interface to call back to the Distributed TM
        internal IPromotedEnlistment oletxEnlistment;
        internal IPromotedEnlistment preparingEnlistment;
 
        public VolatileDemultiplexer( InternalTransaction transaction )
        {
            this.transaction = transaction;
        }
 
 
        internal void BroadcastCommitted( ref VolatileEnlistmentSet volatiles )
        {
            // Broadcast preprepare to the volatile subordinates
            for ( int i = 0; i < volatiles.volatileEnlistmentCount; i++ )
            {
                volatiles.volatileEnlistments[i].twoPhaseState.InternalCommitted( 
                    volatiles.volatileEnlistments[i]);
            }
        }
 
 
        // This broadcast is used by the state machines and therefore must be internal.
        internal void BroadcastRollback( ref VolatileEnlistmentSet volatiles )
        {
            // Broadcast preprepare to the volatile subordinates
            for ( int i = 0; i < volatiles.volatileEnlistmentCount; i++ )
            {
                volatiles.volatileEnlistments[i].twoPhaseState.InternalAborted( 
                    volatiles.volatileEnlistments[i]);
            }
        }
 
 
        internal void BroadcastInDoubt( ref VolatileEnlistmentSet volatiles )
        {
            // Broadcast preprepare to the volatile subordinates
            for ( int i = 0; i < volatiles.volatileEnlistmentCount; i++ )
            {
                volatiles.volatileEnlistments[i].twoPhaseState.InternalIndoubt( 
                    volatiles.volatileEnlistments[i]);
            }
        }
 
 
        // Object for synchronizing access to the entire class( avoiding lock( typeof( ... )) )
        private static object classSyncObject;
        internal static object ClassSyncObject
        {
            get
            {
                if ( classSyncObject == null )
                {
                    object o = new object();
                    Interlocked.CompareExchange( ref classSyncObject, o, null );
                }
                return classSyncObject;
            }
        }
 
 
        // Double-checked locking pattern requires volatile for read/write synchronization
        private static volatile WaitCallback prepareCallback;
        private static WaitCallback PrepareCallback
        {
            get
            {
                if ( prepareCallback == null )
                {
                    lock ( ClassSyncObject )
                    {
                        if ( prepareCallback == null )
                        {
                            WaitCallback temp = new WaitCallback( PoolablePrepare );
                            prepareCallback = temp;
                        }
                    }
                }
 
                return prepareCallback;
            }
        }
        
        protected static void PoolablePrepare( object state )
        {
            VolatileDemultiplexer demux = (VolatileDemultiplexer) state;
 
            // Don't block an enlistment thread (or a thread pool thread).  So
            // try to get the transaction lock but if unsuccessfull give up and
            // queue this operation to try again later.
            bool tookLock = false;
            try
            {
                System.Threading.Monitor.TryEnter(demux.transaction, 250, ref tookLock);
                if (tookLock)
                {
                    demux.InternalPrepare();
                }
                else
                {
                    if (!ThreadPool.QueueUserWorkItem(PrepareCallback, demux))
                    {
                        throw TransactionException.CreateInvalidOperationException(
                            SR.GetString(SR.TraceSourceLtm),
                            SR.GetString(SR.UnexpectedFailureOfThreadPool),
                            null
                            );
                    }
                }
            }
            finally
            {
                if (tookLock)
                {
                    System.Threading.Monitor.Exit(demux.transaction);
                }
            }
        }
 
 
        // Double-checked locking pattern requires volatile for read/write synchronization
        private static volatile WaitCallback commitCallback;
        private static WaitCallback CommitCallback
        {
            get
            {
                if ( commitCallback == null )
                {
                    lock ( ClassSyncObject )
                    {
                        if ( commitCallback == null )
                        {
                            WaitCallback temp = new WaitCallback( PoolableCommit );
                            commitCallback = temp;
                        }
                    }
                }
 
                return commitCallback;
            }
        }
            
        
        protected static void PoolableCommit( object state )
        {
            VolatileDemultiplexer demux = (VolatileDemultiplexer) state;
 
            // Don't block an enlistment thread (or a thread pool thread).  So
            // try to get the transaction lock but if unsuccessfull give up and
            // queue this operation to try again later.
            bool tookLock = false;
            try
            {
                System.Threading.Monitor.TryEnter(demux.transaction, 250, ref tookLock);
                if (tookLock)
                {
                    demux.InternalCommit();
                }
                else
                {
                    if (!ThreadPool.QueueUserWorkItem(CommitCallback, demux))
                    {
                        throw TransactionException.CreateInvalidOperationException(
                            SR.GetString(SR.TraceSourceLtm),
                            SR.GetString(SR.UnexpectedFailureOfThreadPool),
                            null
                            );
                    }
                }
            }
            finally
            {
                if (tookLock)
                {
                    System.Threading.Monitor.Exit(demux.transaction);
                }
            }
        }
 
 
 
        // Double-checked locking pattern requires volatile for read/write synchronization
        private static volatile WaitCallback rollbackCallback;
        private static WaitCallback RollbackCallback
        {
            get
            {
                if ( rollbackCallback  == null )
                {
                    lock ( ClassSyncObject )
                    {
                        if (rollbackCallback  == null )
                        {
                            WaitCallback temp = new WaitCallback( PoolableRollback );
                            rollbackCallback = temp;
                        }
                    }
                }
 
                return rollbackCallback;
            }
        }
 
        protected static void PoolableRollback( object state )
        {
            VolatileDemultiplexer demux = (VolatileDemultiplexer) state;
 
            // Don't block an enlistment thread (or a thread pool thread).  So
            // try to get the transaction lock but if unsuccessfull give up and
            // queue this operation to try again later.
            bool tookLock = false;
            try
            {
                System.Threading.Monitor.TryEnter(demux.transaction, 250, ref tookLock);
                if (tookLock)
                {
                    demux.InternalRollback();
                }
                else
                {
                    if (!ThreadPool.QueueUserWorkItem(RollbackCallback, demux))
                    {
                        throw TransactionException.CreateInvalidOperationException(
                            SR.GetString(SR.TraceSourceLtm),
                            SR.GetString(SR.UnexpectedFailureOfThreadPool),
                            null
                            );
                    }
                }
            }
            finally
            {
                if (tookLock)
                {
                    System.Threading.Monitor.Exit(demux.transaction);
                }
            }
        }
 
 
        // Double-checked locking pattern requires volatile for read/write synchronization
        private static volatile WaitCallback inDoubtCallback;
        private static WaitCallback InDoubtCallback
        {
            get
            {
                if (inDoubtCallback  == null )
                {
                    lock ( ClassSyncObject )
                    {
                        if (inDoubtCallback  == null )
                        {
                            WaitCallback temp = new WaitCallback( PoolableInDoubt );
                            inDoubtCallback = temp;
                        }
                    }
                }
 
                return inDoubtCallback;
            }
        }
 
 
        protected static void PoolableInDoubt( object state )
        {
            VolatileDemultiplexer demux = (VolatileDemultiplexer) state;
 
            // Don't block an enlistment thread (or a thread pool thread).  So
            // try to get the transaction lock but if unsuccessfull give up and
            // queue this operation to try again later.
            bool tookLock = false;
            try
            {
                System.Threading.Monitor.TryEnter(demux.transaction, 250, ref tookLock);
                if (tookLock)
                {
                    demux.InternalInDoubt();
                }
                else
                {
                    if (!ThreadPool.QueueUserWorkItem(InDoubtCallback, demux))
                    {
                        throw TransactionException.CreateInvalidOperationException(
                            SR.GetString(SR.TraceSourceLtm),
                            SR.GetString(SR.UnexpectedFailureOfThreadPool),
                            null
                            );
                    }
                }
            }
            finally
            {
                if (tookLock)
                {
                    System.Threading.Monitor.Exit(demux.transaction);
                }
            }
        }
 
        
        protected abstract void InternalPrepare();
        protected abstract void InternalCommit();
        protected abstract void InternalRollback();
        protected abstract void InternalInDoubt();
 
        #region IEnlistmentNotification Members
 
        // Fanout Preprepare notifications
        public abstract void Prepare( IPromotedEnlistment en );
 
        public abstract void Commit( IPromotedEnlistment en );
 
        public abstract void Rollback( IPromotedEnlistment en );
 
        public abstract void InDoubt( IPromotedEnlistment en );
 
        #endregion
 
    }
 
 
    // This class implements the phase 0 version of a volatile demux.
    internal class Phase0VolatileDemultiplexer : VolatileDemultiplexer
    {
        public Phase0VolatileDemultiplexer( InternalTransaction transaction ) : base( transaction ) { }
 
        protected override void InternalPrepare()
        {
            try
            {
                this.transaction.State.ChangeStatePromotedPhase0( this.transaction );
            }
            catch ( TransactionAbortedException e )
            {
                this.oletxEnlistment.ForceRollback(e);
                if ( DiagnosticTrace.Verbose )
                {
                    ExceptionConsumedTraceRecord.Trace( SR.GetString( SR.TraceSourceLtm ),
                        e );
                }
            }
            catch ( TransactionInDoubtException e )
            {
                this.oletxEnlistment.EnlistmentDone();
                if ( DiagnosticTrace.Verbose )
                {
                    ExceptionConsumedTraceRecord.Trace( SR.GetString( SR.TraceSourceLtm ),
                        e );
                }
            }
        }
 
        protected override void InternalCommit()
        {
            // Respond immediately to the TM
            this.oletxEnlistment.EnlistmentDone();
 
            this.transaction.State.ChangeStatePromotedCommitted( this.transaction );
        }
 
        protected override void InternalRollback()
        {
            // Respond immediately to the TM
            this.oletxEnlistment.EnlistmentDone();
 
            this.transaction.State.ChangeStatePromotedAborted( this.transaction );
        }
 
        protected override void InternalInDoubt()
        {
            this.transaction.State.InDoubtFromDtc( this.transaction );
        }
 
        #region IEnlistmentNotification Members
 
        // Fanout Preprepare notifications
        public override void Prepare( IPromotedEnlistment en )
        {
            this.preparingEnlistment = en;
            PoolablePrepare( this );
        }
 
 
        public override void Commit( IPromotedEnlistment en )
        {
            this.oletxEnlistment = en;
            PoolableCommit( this );
        }
 
 
        public override void Rollback( IPromotedEnlistment en )
        {
            this.oletxEnlistment = en;
            PoolableRollback( this );
        }
 
 
        public override void InDoubt( IPromotedEnlistment en )
        {
            this.oletxEnlistment = en;
            PoolableInDoubt( this );
        }
 
        #endregion
    }
 
    // This class implements the phase 1 version of a volatile demux.
    internal class Phase1VolatileDemultiplexer : VolatileDemultiplexer
    {
        public Phase1VolatileDemultiplexer( InternalTransaction transaction ) : base( transaction ) { }
 
        protected override void InternalPrepare()
        {
            try
            {
                this.transaction.State.ChangeStatePromotedPhase1( this.transaction );
            }
            catch ( TransactionAbortedException e )
            {
                this.oletxEnlistment.ForceRollback(e);
                if ( DiagnosticTrace.Verbose )
                {
                    ExceptionConsumedTraceRecord.Trace( SR.GetString( SR.TraceSourceLtm ),
                        e );
                }
            }
            catch ( TransactionInDoubtException e )
            {
                this.oletxEnlistment.EnlistmentDone();
                if ( DiagnosticTrace.Verbose )
                {
                    ExceptionConsumedTraceRecord.Trace( SR.GetString( SR.TraceSourceLtm ),
                        e );
                }
            }
        }
 
 
        protected override void InternalCommit()
        {
            // Respond immediately to the TM
            this.oletxEnlistment.EnlistmentDone();
 
            this.transaction.State.ChangeStatePromotedCommitted( this.transaction );
        }
 
 
        protected override void InternalRollback()
        {
            // Respond immediately to the TM
            this.oletxEnlistment.EnlistmentDone();
 
            this.transaction.State.ChangeStatePromotedAborted( this.transaction );
        }
 
 
        protected override void InternalInDoubt()
        {
            this.transaction.State.InDoubtFromDtc( this.transaction );
        }
 
 
        // Fanout Preprepare notifications
        public override void Prepare( IPromotedEnlistment en )
        {
            this.preparingEnlistment = en;
            PoolablePrepare( this );
        }
 
 
        public override void Commit( IPromotedEnlistment en )
        {
            this.oletxEnlistment = en;
            PoolableCommit( this );
        }
 
 
        public override void Rollback( IPromotedEnlistment en )
        {
            this.oletxEnlistment = en;
            PoolableRollback( this );
        }
 
 
        public override void InDoubt( IPromotedEnlistment en )
        {
            this.oletxEnlistment = en;
            PoolableInDoubt( this );
        }
    }
 
 
 
    internal struct VolatileEnlistmentSet
    {
        internal InternalEnlistment[] volatileEnlistments;
        internal int volatileEnlistmentCount;
        internal int volatileEnlistmentSize;
        internal int dependentClones;
 
        // Track the number of volatile enlistments that have prepared.
        internal int preparedVolatileEnlistments;
 
        // This is a single pinpoint enlistment to represent all volatile enlistments that
        // may exist on a promoted transaction.  This member should only be initialized if
        // a transaction is promoted.
        private VolatileDemultiplexer volatileDemux;
        internal VolatileDemultiplexer VolatileDemux
        {
            get
            {
                return this.volatileDemux;
            }
            set
            {
                Debug.Assert( this.volatileDemux == null, "volatileDemux can only be set once." );
                this.volatileDemux = value;
            }
        }
    }
}