File: System\ServiceModel\Channels\ReliableOutputConnection.cs
Project: ndp\cdf\src\WCF\ServiceModel\System.ServiceModel.csproj (System.ServiceModel)
//----------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation.  All rights reserved.
//----------------------------------------------------------------------------
namespace System.ServiceModel.Channels
{
    using System.Runtime;
    using System.Threading;
    using System.Xml;
 
    delegate IAsyncResult BeginSendHandler(MessageAttemptInfo attemptInfo, TimeSpan timeout, bool maskUnhandledException, AsyncCallback asyncCallback, object state);
    delegate void EndSendHandler(IAsyncResult result);
    delegate void SendHandler(MessageAttemptInfo attemptInfo, TimeSpan timeout, bool maskUnhandledException);
    delegate void ComponentFaultedHandler(Exception faultException, WsrmFault fault);
    delegate void ComponentExceptionHandler(Exception exception);
    delegate void RetryHandler(MessageAttemptInfo attemptInfo);
 
    sealed class ReliableOutputConnection
    {
        BeginSendHandler beginSendHandler;
        OperationWithTimeoutBeginCallback beginSendAckRequestedHandler;
        bool closed = false;
        EndSendHandler endSendHandler;
        OperationEndCallback endSendAckRequestedHandler;
        UniqueId id;
        MessageVersion messageVersion;
        object mutex = new Object();
        static AsyncCallback onSendRetriesComplete = Fx.ThunkCallback(new AsyncCallback(OnSendRetriesComplete));
        static AsyncCallback onSendRetryComplete = Fx.ThunkCallback(new AsyncCallback(OnSendRetryComplete));
        ReliableMessagingVersion reliableMessagingVersion;
        Guard sendGuard = new Guard(Int32.MaxValue);
        SendHandler sendHandler;
        OperationWithTimeoutCallback sendAckRequestedHandler;
        static Action<object> sendRetries = new Action<object>(SendRetries);
        TimeSpan sendTimeout;
        InterruptibleWaitObject shutdownHandle = new InterruptibleWaitObject(false);
        TransmissionStrategy strategy;
        bool terminated = false;
 
        public ReliableOutputConnection(UniqueId id,
            int maxTransferWindowSize,
            MessageVersion messageVersion,
            ReliableMessagingVersion reliableMessagingVersion,
            TimeSpan initialRtt,
            bool requestAcks,
            TimeSpan sendTimeout)
        {
            this.id = id;
            this.messageVersion = messageVersion;
            this.reliableMessagingVersion = reliableMessagingVersion;
            this.sendTimeout = sendTimeout;
            this.strategy = new TransmissionStrategy(reliableMessagingVersion, initialRtt, maxTransferWindowSize,
                requestAcks, id);
            this.strategy.RetryTimeoutElapsed = OnRetryTimeoutElapsed;
            this.strategy.OnException = RaiseOnException;
        }
 
        public ComponentFaultedHandler Faulted;
        public ComponentExceptionHandler OnException;
 
        MessageVersion MessageVersion
        {
            get
            {
                return this.messageVersion;
            }
        }
 
        public BeginSendHandler BeginSendHandler
        {
            set
            {
                this.beginSendHandler = value;
            }
        }
 
        public OperationWithTimeoutBeginCallback BeginSendAckRequestedHandler
        {
            set
            {
                this.beginSendAckRequestedHandler = value;
            }
        }
 
        public bool Closed
        {
            get
            {
                return this.closed;
            }
        }
 
        public EndSendHandler EndSendHandler
        {
            set
            {
                this.endSendHandler = value;
            }
        }
 
        public OperationEndCallback EndSendAckRequestedHandler
        {
            set
            {
                this.endSendAckRequestedHandler = value;
            }
        }
 
        public Int64 Last
        {
            get
            {
                return this.strategy.Last;
            }
        }
 
        public SendHandler SendHandler
        {
            set
            {
                this.sendHandler = value;
            }
        }
 
        public OperationWithTimeoutCallback SendAckRequestedHandler
        {
            set
            {
                this.sendAckRequestedHandler = value;
            }
        }
 
        public TransmissionStrategy Strategy
        {
            get
            {
                return this.strategy;
            }
        }
 
        object ThisLock
        {
            get { return this.mutex; }
        }
 
        public void Abort(ChannelBase channel)
        {
            this.sendGuard.Abort();
            this.shutdownHandle.Abort(channel);
            this.strategy.Abort(channel);
        }
 
        void CompleteTransfer(TimeSpan timeout)
        {
            if (this.reliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
            {
                Message message = Message.CreateMessage(this.MessageVersion, WsrmFeb2005Strings.LastMessageAction);
                message.Properties.AllowOutputBatching = false;
 
                // Return value ignored.
                this.InternalAddMessage(message, timeout, null, true);
            }
            else if (this.reliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11)
            {
                if (this.strategy.SetLast())
                {
                    this.shutdownHandle.Set();
                }
                else
                {
                    this.sendAckRequestedHandler(timeout);
                }
            }
            else
            {
                throw Fx.AssertAndThrow("Unsupported version.");
            }
        }
 
        public bool AddMessage(Message message, TimeSpan timeout, object state)
        {
            return this.InternalAddMessage(message, timeout, state, false);
        }
 
        public IAsyncResult BeginAddMessage(Message message, TimeSpan timeout, object state, AsyncCallback callback, object asyncState)
        {
            return new AddAsyncResult(message, false, timeout, state, this, callback, asyncState);
        }
 
        public bool EndAddMessage(IAsyncResult result)
        {
            return AddAsyncResult.End(result);
        }
 
        IAsyncResult BeginCompleteTransfer(TimeSpan timeout, AsyncCallback callback, object state)
        {
            if (this.reliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
            {
                Message message = Message.CreateMessage(this.MessageVersion, WsrmFeb2005Strings.LastMessageAction);
                message.Properties.AllowOutputBatching = false;
                return new AddAsyncResult(message, true, timeout, null, this, callback, state);
            }
            else if (this.reliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11)
            {
                if (this.strategy.SetLast())
                {
                    this.shutdownHandle.Set();
                    return new AlreadyCompletedTransferAsyncResult(callback, state);
                }
                else
                {
                    return this.beginSendAckRequestedHandler(timeout, callback, state);
                }
            }
            else
            {
                throw Fx.AssertAndThrow("Unsupported version.");
            }
        }
 
        void EndCompleteTransfer(IAsyncResult result)
        {
            if (this.reliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
            {
                AddAsyncResult.End(result);
            }
            else if (this.reliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11)
            {
                AlreadyCompletedTransferAsyncResult completedResult = result as AlreadyCompletedTransferAsyncResult;
                if (completedResult != null)
                {
                    completedResult.End();
                }
                else
                {
                    this.endSendAckRequestedHandler(result);
                }
            }
            else
            {
                throw Fx.AssertAndThrow("Unsupported version.");
            }
        }
 
        public IAsyncResult BeginClose(TimeSpan timeout, AsyncCallback callback, object state)
        {
            bool completeTransfer = false;
 
            lock (this.ThisLock)
            {
                completeTransfer = !this.closed;
                this.closed = true;
            }
 
            OperationWithTimeoutBeginCallback[] beginCallbacks;
            OperationEndCallback[] endCallbacks;
 
            beginCallbacks = new OperationWithTimeoutBeginCallback[] {
                completeTransfer ? this.BeginCompleteTransfer : default(OperationWithTimeoutBeginCallback),
                this.shutdownHandle.BeginWait,
                this.sendGuard.BeginClose,
                this.beginSendAckRequestedHandler };
 
            endCallbacks = new OperationEndCallback[] {
                completeTransfer ? this.EndCompleteTransfer : default(OperationEndCallback),
                this.shutdownHandle.EndWait,
                this.sendGuard.EndClose,
                this.endSendAckRequestedHandler };
 
            return OperationWithTimeoutComposer.BeginComposeAsyncOperations(timeout, beginCallbacks, endCallbacks, callback, state);
        }
 
        public bool CheckForTermination()
        {
            return this.strategy.DoneTransmitting;
        }
 
        public void Close(TimeSpan timeout)
        {
            bool completeTransfer = false;
 
            lock (this.ThisLock)
            {
                completeTransfer = !this.closed;
                this.closed = true;
            }
 
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
 
            if (completeTransfer)
            {
                this.CompleteTransfer(timeoutHelper.RemainingTime());
            }
 
            this.shutdownHandle.Wait(timeoutHelper.RemainingTime());
            this.sendGuard.Close(timeoutHelper.RemainingTime());
            this.strategy.Close();
        }
 
        void CompleteSendRetries(IAsyncResult result)
        {
            while (true)
            {
                this.endSendHandler(result);
                this.sendGuard.Exit();
                this.strategy.DequeuePending();
 
                if (this.sendGuard.Enter())
                {
                    MessageAttemptInfo attemptInfo = this.strategy.GetMessageInfoForRetry(true);
 
                    if (attemptInfo.Message == null)
                    {
                        break;
                    }
                    else
                    {
                        result = this.beginSendHandler(attemptInfo, this.sendTimeout, true, onSendRetriesComplete, this);
                        if (!result.CompletedSynchronously)
                        {
                            return;
                        }
                    }
                }
                else
                {
                    return;
                }
            }
 
            // We are here if there are no more messages to retry.            
            this.sendGuard.Exit();
            this.OnTransferComplete();
        }
 
        void CompleteSendRetry(IAsyncResult result)
        {
            try
            {
                this.endSendHandler(result);
            }
            finally
            {
                this.sendGuard.Exit();
            }
        }
 
        public void EndClose(IAsyncResult result)
        {
            OperationWithTimeoutComposer.EndComposeAsyncOperations(result);
            this.strategy.Close();
        }
 
        public void Fault(ChannelBase channel)
        {
            this.sendGuard.Abort();
            this.shutdownHandle.Fault(channel);
            this.strategy.Fault(channel);
        }
 
        bool InternalAddMessage(Message message, TimeSpan timeout, object state, bool isLast)
        {
            MessageAttemptInfo attemptInfo;
            TimeoutHelper helper = new TimeoutHelper(timeout);
 
            try
            {
                if (isLast)
                {
                    if (state != null)
                    {
                        throw Fx.AssertAndThrow("The isLast overload does not take a state.");
                    }
 
                    attemptInfo = this.strategy.AddLast(message, helper.RemainingTime(), null);
                }
                else if (!this.strategy.Add(message, helper.RemainingTime(), state, out attemptInfo))
                {
                    return false;
                }
            }
            catch (TimeoutException)
            {
                if (isLast)
                    this.RaiseFault(null, SequenceTerminatedFault.CreateCommunicationFault(this.id, SR.GetString(SR.SequenceTerminatedAddLastToWindowTimedOut), null));
                // else - RM does not fault the channel based on a timeout exception trying to add a sequenced message to the window.
 
                throw;
            }
            catch (Exception e)
            {
                if (!Fx.IsFatal(e))
                    this.RaiseFault(null, SequenceTerminatedFault.CreateCommunicationFault(this.id, SR.GetString(SR.SequenceTerminatedUnknownAddToWindowError), null));
 
                throw;
            }
 
            if (sendGuard.Enter())
            {
                try
                {
                    this.sendHandler(attemptInfo, helper.RemainingTime(), false);
                }
                catch (QuotaExceededException)
                {
                    this.RaiseFault(null, SequenceTerminatedFault.CreateQuotaExceededFault(this.id));
                    throw;
                }
                finally
                {
                    this.sendGuard.Exit();
                }
            }
 
            return true;
        }
 
        public bool IsFinalAckConsistent(SequenceRangeCollection ranges)
        {
            return this.strategy.IsFinalAckConsistent(ranges);
        }
 
        void OnRetryTimeoutElapsed(MessageAttemptInfo attemptInfo)
        {
            if (this.sendGuard.Enter())
            {
                IAsyncResult result = this.beginSendHandler(attemptInfo, this.sendTimeout, true, onSendRetryComplete, this);
 
                if (result.CompletedSynchronously)
                {
                    this.CompleteSendRetry(result);
                }
            }
        }
 
        static void OnSendRetryComplete(IAsyncResult result)
        {
            if (!result.CompletedSynchronously)
            {
                ReliableOutputConnection outputConnection = (ReliableOutputConnection)result.AsyncState;
 
                try
                {
                    outputConnection.CompleteSendRetry(result);
                }
#pragma warning suppress 56500 // covered by FxCOP
                catch (Exception e)
                {
                    if (Fx.IsFatal(e))
                        throw;
 
                    outputConnection.RaiseOnException(e);
                }
            }
        }
 
        static void OnSendRetriesComplete(IAsyncResult result)
        {
            if (!result.CompletedSynchronously)
            {
                ReliableOutputConnection outputConnection = (ReliableOutputConnection)result.AsyncState;
 
                try
                {
                    outputConnection.CompleteSendRetries(result);
                }
#pragma warning suppress 56500 // covered by FxCOP
                catch (Exception e)
                {
                    if (Fx.IsFatal(e))
                        throw;
 
                    outputConnection.RaiseOnException(e);
                }
            }
        }
 
        void OnTransferComplete()
        {
            this.strategy.DequeuePending();
 
            if (this.strategy.DoneTransmitting)
                Terminate();
        }
 
        public void ProcessTransferred(Int64 transferred, SequenceRangeCollection ranges, int quotaRemaining)
        {
            if (transferred < 0)
            {
                throw Fx.AssertAndThrow("Argument transferred must be a valid sequence number or 0 for protocol messages.");
            }
 
            bool invalidAck;
 
            // ignored, TransmissionStrategy is being used to keep track of what must be re-sent.
            // In the Request-Reply case this state may not align with acks.
            bool inconsistentAck;
 
            this.strategy.ProcessAcknowledgement(ranges, out invalidAck, out inconsistentAck);
            invalidAck = (invalidAck || ((transferred != 0) && !ranges.Contains(transferred)));
 
            if (!invalidAck)
            {
                if ((transferred > 0) && this.strategy.ProcessTransferred(transferred, quotaRemaining))
                {
                    ActionItem.Schedule(sendRetries, this);
                }
                else
                {
                    this.OnTransferComplete();
                }
            }
            else
            {
                WsrmFault fault = new InvalidAcknowledgementFault(this.id, ranges);
                RaiseFault(fault.CreateException(), fault);
            }
        }
 
        public void ProcessTransferred(SequenceRangeCollection ranges, int quotaRemaining)
        {
            bool invalidAck;
            bool inconsistentAck;
 
            this.strategy.ProcessAcknowledgement(ranges, out invalidAck, out inconsistentAck);
 
            if (!invalidAck && !inconsistentAck)
            {
                if (this.strategy.ProcessTransferred(ranges, quotaRemaining))
                {
                    ActionItem.Schedule(sendRetries, this);
                }
                else
                {
                    this.OnTransferComplete();
                }
            }
            else
            {
                WsrmFault fault = new InvalidAcknowledgementFault(this.id, ranges);
                RaiseFault(fault.CreateException(), fault);
            }
        }
 
        void RaiseFault(Exception faultException, WsrmFault fault)
        {
            ComponentFaultedHandler handler = this.Faulted;
 
            if (handler != null)
                handler(faultException, fault);
        }
 
        void RaiseOnException(Exception exception)
        {
            ComponentExceptionHandler handler = this.OnException;
 
            if (handler != null)
                handler(exception);
        }
 
        void SendRetries()
        {
            IAsyncResult result = null;
 
            if (this.sendGuard.Enter())
            {
                MessageAttemptInfo attemptInfo = this.strategy.GetMessageInfoForRetry(false);
 
                if (attemptInfo.Message != null)
                {
                    result = this.beginSendHandler(attemptInfo, this.sendTimeout, true, onSendRetriesComplete, this);
                }
 
                if (result != null)
                {
                    if (result.CompletedSynchronously)
                    {
                        this.CompleteSendRetries(result);
                    }
                }
                else
                {
                    this.sendGuard.Exit();
                    this.OnTransferComplete();
                }
            }
        }
 
        static void SendRetries(object state)
        {
            ReliableOutputConnection outputConnection = (ReliableOutputConnection)state;
 
            try
            {
                outputConnection.SendRetries();
            }
#pragma warning suppress 56500 // covered by FxCOP
            catch (Exception e)
            {
                if (Fx.IsFatal(e))
                    throw;
 
                outputConnection.RaiseOnException(e);
            }
        }
 
        public void Terminate()
        {
            lock (this.ThisLock)
            {
                if (this.terminated)
                    return;
 
                this.terminated = true;
            }
 
            this.shutdownHandle.Set();
        }
 
        sealed class AddAsyncResult : AsyncResult
        {
            static AsyncCallback addCompleteStatic = Fx.ThunkCallback(new AsyncCallback(AddComplete));
            ReliableOutputConnection connection;
            bool isLast;
            static AsyncCallback sendCompleteStatic = Fx.ThunkCallback(new AsyncCallback(SendComplete));
            TimeoutHelper timeoutHelper;
            bool validAdd;
 
            public AddAsyncResult(Message message, bool isLast, TimeSpan timeout, object state,
                ReliableOutputConnection connection, AsyncCallback callback, object asyncState)
                : base(callback, asyncState)
            {
                this.connection = connection;
                this.timeoutHelper = new TimeoutHelper(timeout);
                this.isLast = isLast;
 
                bool complete = false;
                IAsyncResult result;
 
                try
                {
                    if (isLast)
                    {
                        if (state != null)
                        {
                            throw Fx.AssertAndThrow("The isLast overload does not take a state.");
                        }
 
                        result = this.connection.strategy.BeginAddLast(message, this.timeoutHelper.RemainingTime(), state, addCompleteStatic, this);
                    }
                    else
                    {
                        result = this.connection.strategy.BeginAdd(message, this.timeoutHelper.RemainingTime(), state, addCompleteStatic, this);
                    }
                }
                catch (TimeoutException)
                {
                    if (isLast)
                        this.connection.RaiseFault(null, SequenceTerminatedFault.CreateCommunicationFault(this.connection.id, SR.GetString(SR.SequenceTerminatedAddLastToWindowTimedOut), null));
                    // else - RM does not fault the channel based on a timeout exception trying to add a sequenced message to the window.
 
                    throw;
                }
                catch (Exception e)
                {
                    if (!Fx.IsFatal(e))
                        this.connection.RaiseFault(null, SequenceTerminatedFault.CreateCommunicationFault(this.connection.id, SR.GetString(SR.SequenceTerminatedUnknownAddToWindowError), null));
 
                    throw;
                }
 
                if (result.CompletedSynchronously)
                    complete = this.CompleteAdd(result);
 
                if (complete)
                    this.Complete(true);
            }
 
            static void AddComplete(IAsyncResult result)
            {
                if (!result.CompletedSynchronously)
                {
                    AddAsyncResult addResult = (AddAsyncResult)result.AsyncState;
                    bool complete = false;
                    Exception completeException = null;
 
                    try
                    {
                        complete = addResult.CompleteAdd(result);
                    }
#pragma warning suppress 56500 // covered by FxCOP
                    catch (Exception e)
                    {
                        if (Fx.IsFatal(e))
                            throw;
 
                        completeException = e;
                    }
 
                    if (complete || completeException != null)
                        addResult.Complete(false, completeException);
                }
            }
 
            bool CompleteAdd(IAsyncResult result)
            {
                MessageAttemptInfo attemptInfo = default(MessageAttemptInfo);
                this.validAdd = true;
 
                try
                {
                    if (this.isLast)
                    {
                        attemptInfo = this.connection.strategy.EndAddLast(result);
                    }
                    else if (!this.connection.strategy.EndAdd(result, out attemptInfo))
                    {
                        this.validAdd = false;
                        return true;
                    }
                }
                catch (TimeoutException)
                {
                    if (this.isLast)
                        this.connection.RaiseFault(null, SequenceTerminatedFault.CreateCommunicationFault(this.connection.id, SR.GetString(SR.SequenceTerminatedAddLastToWindowTimedOut), null));
                    // else - RM does not fault the channel based on a timeout exception trying to add a sequenced message to the window.
 
                    throw;
                }
                catch (Exception e)
                {
                    if (!Fx.IsFatal(e))
                        this.connection.RaiseFault(null, SequenceTerminatedFault.CreateCommunicationFault(this.connection.id, SR.GetString(SR.SequenceTerminatedUnknownAddToWindowError), null));
 
                    throw;
                }
 
                if (this.connection.sendGuard.Enter())
                {
                    bool throwing = true;
 
                    try
                    {
                        result = this.connection.beginSendHandler(attemptInfo, this.timeoutHelper.RemainingTime(), false, sendCompleteStatic, this);
                        throwing = false;
                    }
                    catch (QuotaExceededException)
                    {
                        this.connection.RaiseFault(null, SequenceTerminatedFault.CreateQuotaExceededFault(this.connection.id));
                        throw;
                    }
                    finally
                    {
                        if (throwing)
                            this.connection.sendGuard.Exit();
                    }
                }
                else
                {
                    return true;
                }
 
                if (result.CompletedSynchronously)
                {
                    this.CompleteSend(result);
                    return true;
                }
                else
                {
                    return false;
                }
            }
 
            void CompleteSend(IAsyncResult result)
            {
                try
                {
                    this.connection.endSendHandler(result);
                }
                catch (QuotaExceededException)
                {
                    this.connection.RaiseFault(null, SequenceTerminatedFault.CreateQuotaExceededFault(this.connection.id));
                    throw;
                }
                finally
                {
                    this.connection.sendGuard.Exit();
                }
            }
 
            static void SendComplete(IAsyncResult result)
            {
                if (!result.CompletedSynchronously)
                {
                    AddAsyncResult addResult = (AddAsyncResult)result.AsyncState;
                    Exception completeException = null;
 
                    try
                    {
                        addResult.CompleteSend(result);
                    }
#pragma warning suppress 56500 // covered by FxCOP
                    catch (Exception e)
                    {
                        if (Fx.IsFatal(e))
                            throw;
 
                        completeException = e;
                    }
 
                    addResult.Complete(false, completeException);
                }
            }
 
            public static bool End(IAsyncResult result)
            {
                AsyncResult.End<AddAsyncResult>(result);
                return ((AddAsyncResult)result).validAdd;
            }
        }
 
        class AlreadyCompletedTransferAsyncResult : CompletedAsyncResult
        {
            public AlreadyCompletedTransferAsyncResult(AsyncCallback callback, object state)
                : base(callback, state)
            {
            }
 
            public void End()
            {
                AsyncResult.End<AlreadyCompletedTransferAsyncResult>(this);
            }
        }
    }
}