File: System\ServiceModel\Channels\MsmqInputSessionChannel.cs
Project: ndp\cdf\src\WCF\ServiceModel\System.ServiceModel.csproj (System.ServiceModel)
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation.  All rights reserved.
//------------------------------------------------------------
 
namespace System.ServiceModel.Channels
{
    using System.Runtime;
    using System.ServiceModel;
    using System.Transactions;
    using SR = System.ServiceModel.SR;
    using System.Threading;
 
    sealed class MsmqInputSessionChannel : InputChannel, IInputSessionChannel
    {
        IInputSession session;
        Transaction associatedTx;
        ReceiveContext sessiongramReceiveContext;
        bool receiveContextEnabled;
        bool sessiongramDoomed;
 
        // count of messages that have been pulled out of the base queue but Complete has not been called on them
        int incompleteMessageCount;
 
        // count of messages that have been completed but the transaction has not been committed
        int uncommittedMessageCount;
 
        public MsmqInputSessionChannel(MsmqInputSessionChannelListener listener, Transaction associatedTx, ReceiveContext sessiongramReceiveContext)
            : base(listener, new EndpointAddress(listener.Uri))
        {
            this.session = new InputSession();
            this.incompleteMessageCount = 0;
 
            if (sessiongramReceiveContext == null)
            {
                this.receiveContextEnabled = false;
 
                // only enlist if we are running in a non-receive context mode
                this.associatedTx = associatedTx;
                this.associatedTx.EnlistVolatile(new TransactionEnlistment(this, this.associatedTx), EnlistmentOptions.None);
            }
            else
            {
                //ignore the ambient transaction if any
                this.receiveContextEnabled = true;
                this.sessiongramReceiveContext = sessiongramReceiveContext;
                this.sessiongramDoomed = false;
            }
        }
 
        public IInputSession Session
        {
            get { return this.session; }
        }
 
        int TotalPendingItems
        {
            get
            {
                return this.InternalPendingItems + this.incompleteMessageCount;
            }
        }
 
        void DetachTransaction(bool aborted)
        {
            // disassociate the session channel from the current transaction and enlistment
            this.associatedTx = null;
            if (aborted)
            {
                this.incompleteMessageCount += this.uncommittedMessageCount;
            }
            this.uncommittedMessageCount = 0;
        }
 
        void AbandonMessage(TimeSpan timeout)
        {
            ThrowIfFaulted();
            this.sessiongramDoomed = true;
        }
 
        void CompleteMessage(TimeSpan timeout)
        {
            ThrowIfFaulted();
            EnsureReceiveContextTransaction();
 
            // the message is now off to transaction land
            Interlocked.Increment(ref uncommittedMessageCount);
            Interlocked.Decrement(ref incompleteMessageCount);
        }
 
        public override Message Receive()
        {
            return this.Receive(this.DefaultReceiveTimeout);
        }
 
        public override Message Receive(TimeSpan timeout)
        {
            return InputChannel.HelpReceive(this, timeout);
        }
 
        public override IAsyncResult BeginReceive(AsyncCallback callback, object state)
        {
            return this.BeginReceive(this.DefaultReceiveTimeout, callback, state);
        }
 
        public override IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return InputChannel.HelpBeginReceive(this, timeout, callback, state);
        }
 
        public override bool TryReceive(TimeSpan timeout, out Message message)
        {
            ThrowIfFaulted();
            if (CommunicationState.Closed == this.State || CommunicationState.Closing == this.State)
            {
                message = null;
                return true;
            }
 
            // we don't look at the transaction in the receive if receive context is enabled
            if (!this.receiveContextEnabled)
            {
                VerifyTransaction();
            }
 
            bool receiveSuccessful = base.TryReceive(timeout, out message);
 
            if (receiveSuccessful && message != null && this.receiveContextEnabled)
            {
                message.Properties[ReceiveContext.Name] = new MsmqSessionReceiveContext(this);
                Interlocked.Increment(ref incompleteMessageCount);
            }
 
            return receiveSuccessful;
        }
 
        public override IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
        {
            ThrowIfFaulted();
            if (CommunicationState.Closed == this.State || CommunicationState.Closing == this.State)
            {
                return new CompletedAsyncResult<bool, Message>(true, null, callback, state);
            }
            // we don't look at the transaction in the receive if receive context is enabled
            if (!this.receiveContextEnabled)
            {
                VerifyTransaction();
            }
            return base.BeginTryReceive(timeout, callback, state);
        }
 
        public override bool EndTryReceive(IAsyncResult result, out Message message)
        {
            CompletedAsyncResult<bool, Message> completedResult = result as CompletedAsyncResult<bool, Message>;
 
            if (null != completedResult)
            {
                return CompletedAsyncResult<bool, Message>.End(result, out message);
            }
            else
            {
                bool receiveSuccessful = base.EndTryReceive(result, out message);
                if (receiveSuccessful && message != null && this.receiveContextEnabled)
                {
                    message.Properties[ReceiveContext.Name] = new MsmqSessionReceiveContext(this);
                    Interlocked.Increment(ref incompleteMessageCount);
                }
 
                return receiveSuccessful;
            }
        }
 
        public void FaultChannel()
        {
            this.Fault();
        }
 
        void OnCloseReceiveContext(bool isAborting)
        {
            if (isAborting)
            {
                // can't do much on Channel.Abort if the transaction had already committed
                if (this.associatedTx != null)
                {
                    // Channel.Abort called within the associated transaction
                    Exception e = DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.MsmqSessionChannelAbort)));
                    RollbackTransaction(e);
                }
                this.sessiongramReceiveContext.Abandon(TimeSpan.MaxValue);
            }
            else
            {
                if (this.TotalPendingItems > 0)
                {
                    // no need for rollback, it will happen automatically when this condition is hit in the Prepare() call
                    this.Fault();
                    this.sessiongramReceiveContext.Abandon(TimeSpan.MaxValue);
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.MsmqSessionPrematureClose)));
                }
            }
        }
 
        void OnCloseTransactional(bool isAborting)
        {
            if (isAborting)
            {
                RollbackTransaction(null);
            }
            else
            {
                VerifyTransaction();
                if (this.InternalPendingItems > 0)
                {
                    RollbackTransaction(null);
                    this.Fault();
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.MsmqSessionMessagesNotConsumed)));
                }
            }
        }
 
        void OnCloseCore(bool isAborting)
        {
            if (this.receiveContextEnabled)
            {
                OnCloseReceiveContext(isAborting);
            }
            else
            {
                OnCloseTransactional(isAborting);
            }
        }
 
        protected override void OnAbort()
        {
            OnCloseCore(true);
            base.OnAbort();
        }
 
        protected override void OnClose(TimeSpan timeout)
        {
            OnCloseCore(false);
            base.OnClose(timeout);
        }
 
        protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
        {
            OnCloseCore(false);
            return base.OnBeginClose(timeout, callback, state);
        }
 
        void RollbackTransaction(Exception exception)
        {
            try
            {
                if (TransactionStatus.Active == this.associatedTx.TransactionInformation.Status)
                    this.associatedTx.Rollback(exception);
            }
            catch (TransactionAbortedException ex)
            {
                MsmqDiagnostics.ExpectedException(ex);
            }
            catch (ObjectDisposedException ex)
            {
                MsmqDiagnostics.ExpectedException(ex);
            }
        }
 
        void EnsureReceiveContextTransaction()
        {
            // if this is the first time we are seeing this transaction in receivecontext enabled mode then enlist and 
            // associate the session channel with this transaction
            if (Transaction.Current == null)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperCritical(new InvalidOperationException(SR.GetString(SR.MsmqTransactionRequired)));
            }
 
            if (this.associatedTx == null)
            {
                this.associatedTx = Transaction.Current;
                this.associatedTx.EnlistVolatile(new ReceiveContextTransactionEnlistment(this, this.associatedTx, this.sessiongramReceiveContext),
                    EnlistmentOptions.EnlistDuringPrepareRequired);
            }
            else
            {
                if (this.associatedTx != Transaction.Current)
                {
                    RollbackTransaction(null);
                    this.Fault();
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperCritical(new InvalidOperationException(SR.GetString(SR.MsmqSameTransactionExpected)));
                }
 
                if (TransactionStatus.Active != Transaction.Current.TransactionInformation.Status)
                {
                    this.Fault();
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperCritical(new InvalidOperationException(SR.GetString(SR.MsmqTransactionNotActive)));
                }
            }
        }
 
        void VerifyTransaction()
        {
            if (this.InternalPendingItems > 0)
            {
                if (this.associatedTx != Transaction.Current)
                {
                    RollbackTransaction(null);
                    this.Fault();
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperCritical(new InvalidOperationException(SR.GetString(SR.MsmqSameTransactionExpected)));
                }
 
                if (TransactionStatus.Active != Transaction.Current.TransactionInformation.Status)
                {
                    RollbackTransaction(null);
                    this.Fault();
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperCritical(new InvalidOperationException(SR.GetString(SR.MsmqTransactionNotActive)));
                }
            }
        }
 
        class InputSession : IInputSession
        {
            string id = "uuid://session-gram/" + Guid.NewGuid().ToString();
 
            public string Id
            {
                get { return this.id; }
            }
        }
 
        class MsmqSessionReceiveContext : ReceiveContext
        {
            MsmqInputSessionChannel channel;
 
            public MsmqSessionReceiveContext(MsmqInputSessionChannel channel)
            {
                this.channel = channel;
            }
 
            protected override void OnAbandon(TimeSpan timeout)
            {
                this.channel.AbandonMessage(timeout);
            }
 
            protected override IAsyncResult OnBeginAbandon(TimeSpan timeout, AsyncCallback callback, object state)
            {
                return SessionReceiveContextAsyncResult.CreateAbandon(this, timeout, callback, state);
            }
 
            protected override IAsyncResult OnBeginComplete(TimeSpan timeout, AsyncCallback callback, object state)
            {
                return SessionReceiveContextAsyncResult.CreateComplete(this, timeout, callback, state);
            }
 
            protected override void OnComplete(TimeSpan timeout)
            {
                this.channel.CompleteMessage(timeout);
            }
 
            protected override void OnEndAbandon(IAsyncResult result)
            {
                SessionReceiveContextAsyncResult.End(result);
            }
 
            protected override void OnEndComplete(IAsyncResult result)
            {
                SessionReceiveContextAsyncResult.End(result);
            }
 
            class SessionReceiveContextAsyncResult : AsyncResult
            {
                MsmqSessionReceiveContext receiveContext;
                Transaction completionTransaction;
 
                TimeoutHelper timeoutHelper;
                static Action<object> onComplete;
                static Action<object> onAbandon;
 
                SessionReceiveContextAsyncResult(MsmqSessionReceiveContext receiveContext, TimeSpan timeout, AsyncCallback callback, object state, Action<object> target)
                    : base(callback, state)
                {
                    this.completionTransaction = Transaction.Current;
                    this.timeoutHelper = new TimeoutHelper(timeout);
                    this.receiveContext = receiveContext;
                    ActionItem.Schedule(target, this);
                }
 
                public static IAsyncResult CreateComplete(MsmqSessionReceiveContext receiveContext, TimeSpan timeout, AsyncCallback callback, object state)
                {
                    if (onComplete == null)
                    {
                        onComplete = new Action<object>(OnComplete);
                    }
                    return new SessionReceiveContextAsyncResult(receiveContext, timeout, callback, state, onComplete);
                }
 
                public static IAsyncResult CreateAbandon(MsmqSessionReceiveContext receiveContext, TimeSpan timeout, AsyncCallback callback, object state)
                {
                    if (onAbandon == null)
                    {
                        onAbandon = new Action<object>(OnAbandon);
                    }
                    return new SessionReceiveContextAsyncResult(receiveContext, timeout, callback, state, onAbandon);
                }
 
                static void OnComplete(object parameter)
                {
                    SessionReceiveContextAsyncResult result = parameter as SessionReceiveContextAsyncResult;
                    Transaction savedTransaction = Transaction.Current;
                    Transaction.Current = result.completionTransaction;
 
                    try
                    {
                        Exception completionException = null;
                        try
                        {
                            result.receiveContext.OnComplete(result.timeoutHelper.RemainingTime());
                        }
                        catch (Exception e)
                        {
                            if (Fx.IsFatal(e))
                            {
                                throw;
                            }
 
                            completionException = e;
                        }
                        result.Complete(false, completionException);
                    }
                    finally
                    {
                        Transaction.Current = savedTransaction;
                    }
                }
 
                static void OnAbandon(object parameter)
                {
                    SessionReceiveContextAsyncResult result = parameter as SessionReceiveContextAsyncResult;
                    Exception completionException = null;
                    try
                    {
                        result.receiveContext.OnAbandon(result.timeoutHelper.RemainingTime());
                    }
                    catch (Exception e)
                    {
                        if (Fx.IsFatal(e))
                            throw;
                        completionException = e;
                    }
                    result.Complete(false, completionException);
                }
 
                public static void End(IAsyncResult result)
                {
                    AsyncResult.End<SessionReceiveContextAsyncResult>(result);
                }
            }
        }
 
        class ReceiveContextTransactionEnlistment : IEnlistmentNotification
        {
            MsmqInputSessionChannel channel;
            Transaction transaction;
            ReceiveContext sessiongramReceiveContext;
 
            public ReceiveContextTransactionEnlistment(MsmqInputSessionChannel channel, Transaction transaction, ReceiveContext receiveContext)
            {
                this.channel = channel;
                this.transaction = transaction;
                this.sessiongramReceiveContext = receiveContext;
            }
 
            public void Prepare(PreparingEnlistment preparingEnlistment)
            {
                // Abort if this happens before all messges are consumed
                // Note that we are not placing any restriction on the channel state
                if (this.channel.TotalPendingItems > 0 || this.channel.sessiongramDoomed)
                {
                    Exception e = DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.MsmqSessionChannelHasPendingItems)));
                    this.sessiongramReceiveContext.Abandon(TimeSpan.MaxValue);
                    preparingEnlistment.ForceRollback(e);
                    this.channel.Fault();
                }
                else
                {
                    Transaction savedTransaction = Transaction.Current;
                    // complete the sessiongram message within this transaction
                    try
                    {
                        Transaction.Current = this.transaction;
 
                        try
                        {
                            this.sessiongramReceiveContext.Complete(TimeSpan.MaxValue);
                            preparingEnlistment.Done();
                        }
                        catch (MsmqException msmqex)
                        {
                            preparingEnlistment.ForceRollback(msmqex);
                            this.channel.Fault();
                        }
                    }
                    finally
                    {
                        Transaction.Current = savedTransaction;
                    }
                }
            }
 
            public void Commit(Enlistment enlistment)
            {
                this.channel.DetachTransaction(false);
                enlistment.Done();
            }
 
            public void Rollback(Enlistment enlistment)
            {
                this.channel.DetachTransaction(true);
                enlistment.Done();
            }
 
            public void InDoubt(Enlistment enlistment)
            {
                enlistment.Done();
            }
        }
 
        class TransactionEnlistment : IEnlistmentNotification
        {
            MsmqInputSessionChannel channel;
            Transaction transaction;
 
            public TransactionEnlistment(MsmqInputSessionChannel channel, Transaction transaction)
            {
                this.channel = channel;
                this.transaction = transaction;
            }
 
            public void Prepare(PreparingEnlistment preparingEnlistment)
            {
                // Abort if this happens before all messges are consumed
                if (this.channel.State == CommunicationState.Opened && this.channel.InternalPendingItems > 0)
                {
                    Exception e = DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.MsmqSessionChannelsMustBeClosed)));
                    preparingEnlistment.ForceRollback(e);
                    this.channel.Fault();
                }
                else
                {
                    preparingEnlistment.Done();
                }
            }
 
            public void Commit(Enlistment enlistment)
            {
                enlistment.Done();
            }
 
            public void Rollback(Enlistment enlistment)
            {
                channel.Fault();
                enlistment.Done();
            }
 
            public void InDoubt(Enlistment enlistment)
            {
                enlistment.Done();
            }
        }
    }
}