File: System\ServiceModel\Channels\TransportDuplexSessionChannel.cs
Project: ndp\cdf\src\WCF\ServiceModel\System.ServiceModel.csproj (System.ServiceModel)
// <copyright>
// Copyright (c) Microsoft Corporation.  All rights reserved.
// </copyright>
 
namespace System.ServiceModel.Channels
{
    using System.Diagnostics;
    using System.Runtime;
    using System.Runtime.Diagnostics;
    using System.Security.Authentication.ExtendedProtection;
    using System.ServiceModel;
    using System.ServiceModel.Diagnostics;
    using System.ServiceModel.Diagnostics.Application;
    using System.ServiceModel.Security;
    using System.Threading;
 
    abstract class TransportDuplexSessionChannel : TransportOutputChannel, IDuplexSessionChannel
    {
        BufferManager bufferManager;
        IDuplexSession duplexSession;
        bool isInputSessionClosed;
        bool isOutputSessionClosed;
        MessageEncoder messageEncoder;
        SynchronizedMessageSource messageSource;
        SecurityMessageProperty remoteSecurity;
        EndpointAddress localAddress;
        ThreadNeutralSemaphore sendLock;
        Uri localVia;
        ChannelBinding channelBindingToken;
 
        protected TransportDuplexSessionChannel(
                  ChannelManagerBase manager, 
                  ITransportFactorySettings settings,
                  EndpointAddress localAddress, 
                  Uri localVia, 
                  EndpointAddress remoteAddresss, 
                  Uri via)
                : base(manager, remoteAddresss, via, settings.ManualAddressing, settings.MessageVersion)
        {
            this.localAddress = localAddress;
            this.localVia = localVia;
            this.bufferManager = settings.BufferManager;
            this.sendLock = new ThreadNeutralSemaphore(1);
            this.messageEncoder = settings.MessageEncoderFactory.CreateSessionEncoder();
            this.Session = new ConnectionDuplexSession(this);
        }
 
        public EndpointAddress LocalAddress
        {
            get { return this.localAddress; }
        }
 
        public SecurityMessageProperty RemoteSecurity
        {
            get { return this.remoteSecurity; }
            protected set { this.remoteSecurity = value; }
        }
 
        public IDuplexSession Session
        {
            get { return this.duplexSession; }
            protected set { this.duplexSession = value; }
        }
 
        public ThreadNeutralSemaphore SendLock
        {
            get { return this.sendLock; }
        }
 
        protected ChannelBinding ChannelBinding
        {
            get
            {
                return this.channelBindingToken;
            }
        }
 
        protected BufferManager BufferManager
        {
            get
            {
                return this.bufferManager;
            }
        }
 
        protected Uri LocalVia
        {
            get { return this.localVia; }
        }
 
        protected MessageEncoder MessageEncoder
        {
            get { return this.messageEncoder; }
            set { this.messageEncoder = value; }
        }
 
        protected SynchronizedMessageSource MessageSource
        {
            get { return this.messageSource; }
        }
 
        protected abstract bool IsStreamedOutput { get; }
        
        public Message Receive()
        {
            return this.Receive(this.DefaultReceiveTimeout);
        }
 
        public Message Receive(TimeSpan timeout)
        {
            Message message = null;
            if (DoneReceivingInCurrentState())
            {
                return null;
            }
 
            bool shouldFault = true;
            try
            {
                message = this.messageSource.Receive(timeout);
                this.OnReceiveMessage(message);
                shouldFault = false;
                return message;
            }
            finally
            {
                if (shouldFault)
                {
                    if (message != null)
                    {
                        message.Close();
                        message = null;
                    }
 
                    this.Fault();
                }
            }
        }
 
        public IAsyncResult BeginReceive(AsyncCallback callback, object state)
        {
            return this.BeginReceive(this.DefaultReceiveTimeout, callback, state);
        }
 
        public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
        {
            if (DoneReceivingInCurrentState())
            {
                return new DoneReceivingAsyncResult(callback, state);
            }
 
            bool shouldFault = true;
            try
            {
                IAsyncResult result = this.messageSource.BeginReceive(timeout, callback, state);
                shouldFault = false;
                return result;
            }
            finally
            {
                if (shouldFault)
                {
                    this.Fault();
                }
            }
        }
 
        [System.Diagnostics.CodeAnalysis.SuppressMessage(FxCop.Category.ReliabilityBasic, "Reliability106",
                            Justification = "This is an old method from previous release.")]
        public Message EndReceive(IAsyncResult result)
        {
            this.ThrowIfNotOpened(); // we can't be in Created or Opening
            if (result == null)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("result");
            }
 
            DoneReceivingAsyncResult doneReceivingResult = result as DoneReceivingAsyncResult;
            if (doneReceivingResult != null)
            {
                DoneReceivingAsyncResult.End(doneReceivingResult);
                return null;
            }
 
            bool shouldFault = true;
            Message message = null;
            try
            {
                message = this.messageSource.EndReceive(result);
                this.OnReceiveMessage(message);
                shouldFault = false;
                return message;
            }
            finally
            {
                if (shouldFault)
                {
                    if (message != null)
                    {
                        message.Close();
                        message = null;
                    }
 
                    this.Fault();
                }
            }
        }
 
        public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return new TryReceiveAsyncResult(this, timeout, callback, state);
        }
 
        public bool EndTryReceive(IAsyncResult result, out Message message)
        {
            return TryReceiveAsyncResult.End(result, out message);
        }
 
        public bool TryReceive(TimeSpan timeout, out Message message)
        {
            try
            {
                message = this.Receive(timeout);
                return true;
            }
            catch (TimeoutException e)
            {
                if (TD.ReceiveTimeoutIsEnabled())
                {
                    TD.ReceiveTimeout(e.Message);
                }
 
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
 
                message = null;
                return false;
            }
        }
 
        public bool WaitForMessage(TimeSpan timeout)
        {
            if (DoneReceivingInCurrentState())
            {
                return true;
            }
 
            bool shouldFault = true;
            try
            {
                bool success = this.messageSource.WaitForMessage(timeout);
                shouldFault = !success; // need to fault if we've timed out because we're now toast
                return success;
            }
            finally
            {
                if (shouldFault)
                {
                    this.Fault();
                }
            }
        }
 
        public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
        {
            if (DoneReceivingInCurrentState())
            {
                return new DoneReceivingAsyncResult(callback, state);
            }
 
            bool shouldFault = true;
            try
            {
                IAsyncResult result = this.messageSource.BeginWaitForMessage(timeout, callback, state);
                shouldFault = false;
                return result;
            }
            finally
            {
                if (shouldFault)
                {
                    this.Fault();
                }
            }
        }
 
        [System.Diagnostics.CodeAnalysis.SuppressMessage(FxCop.Category.ReliabilityBasic, "Reliability106",
                            Justification = "This is an old method from previous release.")]
        public bool EndWaitForMessage(IAsyncResult result)
        {
            this.ThrowIfNotOpened(); // we can't be in Created or Opening
            if (result == null)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("result");
            }
 
            DoneReceivingAsyncResult doneRecevingResult = result as DoneReceivingAsyncResult;
            if (doneRecevingResult != null)
            {
                return DoneReceivingAsyncResult.End(doneRecevingResult);
            }
 
            bool shouldFault = true;
            try
            {
                bool success = this.messageSource.EndWaitForMessage(result);
                shouldFault = !success; // need to fault if we've timed out because we're now toast
                return success;
            }
            finally
            {
                if (shouldFault)
                {
                    this.Fault();
                }
            }
        }
 
        protected void SetChannelBinding(ChannelBinding channelBinding)
        {
            Fx.Assert(this.channelBindingToken == null, "ChannelBinding token can only be set once.");
            this.channelBindingToken = channelBinding;
        }
 
        protected void SetMessageSource(IMessageSource messageSource)
        {
            this.messageSource = new SynchronizedMessageSource(messageSource);
        }
 
        protected IAsyncResult BeginCloseOutputSession(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return new CloseOutputSessionAsyncResult(this, timeout, callback, state);
        }
 
        protected void EndCloseOutputSession(IAsyncResult result)
        {
            CloseOutputSessionAsyncResult.End(result);
        }
        
        protected abstract void CloseOutputSessionCore(TimeSpan timeout);
 
        protected void CloseOutputSession(TimeSpan timeout)
        {
            ThrowIfNotOpened();
            ThrowIfFaulted();
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            if (!this.sendLock.TryEnter(timeoutHelper.RemainingTime()))
            {
                if (TD.CloseTimeoutIsEnabled())
                {
                    TD.CloseTimeout(SR.GetString(SR.CloseTimedOut, timeout));
                }
 
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new TimeoutException(
                                                SR.GetString(SR.CloseTimedOut, timeout),
                                                ThreadNeutralSemaphore.CreateEnterTimedOutException(timeout)));
            }
 
            try
            {
                // check again in case the previous send faulted while we were waiting for the lock
                ThrowIfFaulted();
 
                // we're synchronized by sendLock here
                if (this.isOutputSessionClosed)
                {
                    return;
                }
 
                this.isOutputSessionClosed = true;
                bool shouldFault = true;
                try
                {
                    this.CloseOutputSessionCore(timeout);
                    this.OnOutputSessionClosed(ref timeoutHelper);
                    shouldFault = false;
                }
                finally
                {
                    if (shouldFault)
                    {
                        this.Fault();
                    }
                }
            }
            finally
            {
                this.sendLock.Exit();
            }
        }
 
        // used to return cached connection to the pool/reader pool
        protected abstract void ReturnConnectionIfNecessary(bool abort, TimeSpan timeout);
 
        protected override void OnAbort()
        {
            this.ReturnConnectionIfNecessary(true, TimeSpan.Zero);
        }
 
        protected override void OnFaulted()
        {
            base.OnFaulted();
            this.ReturnConnectionIfNecessary(true, TimeSpan.Zero);
        }
 
        protected override void OnClose(TimeSpan timeout)
        {
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            this.CloseOutputSession(timeoutHelper.RemainingTime());
 
            // close input session if necessary
            if (!this.isInputSessionClosed)
            {
                this.EnsureInputClosed(timeoutHelper.RemainingTime());
                this.OnInputSessionClosed();
            }
 
            this.CompleteClose(timeoutHelper.RemainingTime());
        }
 
        protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return new CloseAsyncResult(this, timeout, callback, state);
        }
 
        protected override void OnEndClose(IAsyncResult result)
        {
            CloseAsyncResult.End(result);
        }
 
        protected override void OnClosed()
        {
            base.OnClosed();
 
            // clean up the CBT after transitioning to the closed state
            ChannelBindingUtility.Dispose(ref this.channelBindingToken);
        }
 
        protected virtual void OnReceiveMessage(Message message)
        {
            if (message == null)
            {
                this.OnInputSessionClosed();
            }
            else
            {
                this.PrepareMessage(message);
            }
        }
 
        protected void ApplyChannelBinding(Message message)
        {
            ChannelBindingUtility.TryAddToMessage(this.channelBindingToken, message, false);
        }
 
        protected virtual void PrepareMessage(Message message)
        {
            message.Properties.Via = this.localVia;
 
            this.ApplyChannelBinding(message);
 
            if (FxTrace.Trace.IsEnd2EndActivityTracingEnabled)
            {
                EventTraceActivity eventTraceActivity = EventTraceActivityHelper.TryExtractActivity(message);
                Guid relatedActivityId = EventTraceActivity.GetActivityIdFromThread();
                if (eventTraceActivity == null)
                {
                    eventTraceActivity = EventTraceActivity.GetFromThreadOrCreate();
                    EventTraceActivityHelper.TryAttachActivity(message, eventTraceActivity);
                }
 
                if (TD.MessageReceivedByTransportIsEnabled())
                {
                    TD.MessageReceivedByTransport(
                        eventTraceActivity,
                        this.LocalAddress != null && this.LocalAddress.Uri != null ? this.LocalAddress.Uri.AbsoluteUri : string.Empty,
                        relatedActivityId);
                }
            }
 
            if (DiagnosticUtility.ShouldTraceInformation)
            {
                TraceUtility.TraceEvent(
                             TraceEventType.Information, 
                             TraceCode.MessageReceived, 
                             SR.GetString(SR.TraceCodeMessageReceived),
                             MessageTransmitTraceRecord.CreateReceiveTraceRecord(message, this.LocalAddress), 
                             this, 
                             null, 
                             message);
            }
        }
 
        protected abstract AsyncCompletionResult StartWritingBufferedMessage(Message message, ArraySegment<byte> messageData, bool allowOutputBatching, TimeSpan timeout, WaitCallback callback, object state);
 
        protected abstract AsyncCompletionResult BeginCloseOutput(TimeSpan timeout, WaitCallback callback, object state);
 
        protected virtual void FinishWritingMessage()
        { 
        }
 
        protected abstract ArraySegment<byte> EncodeMessage(Message message);
 
        protected abstract void OnSendCore(Message message, TimeSpan timeout);
 
        protected abstract AsyncCompletionResult StartWritingStreamedMessage(Message message, TimeSpan timeout, WaitCallback callback, object state);        
 
        protected override void OnSend(Message message, TimeSpan timeout)
        {
            this.ThrowIfDisposedOrNotOpen();
 
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            if (!this.sendLock.TryEnter(timeoutHelper.RemainingTime()))
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new TimeoutException(
                                            SR.GetString(SR.SendToViaTimedOut, Via, timeout),
                                            ThreadNeutralSemaphore.CreateEnterTimedOutException(timeout)));
            }
 
            try
            {
                // check again in case the previous send faulted while we were waiting for the lock
                this.ThrowIfDisposedOrNotOpen();
                this.ThrowIfOutputSessionClosed();
 
                bool success = false;
                try
                {
                    this.ApplyChannelBinding(message);
 
                    this.OnSendCore(message, timeoutHelper.RemainingTime());
                    success = true;
                    if (TD.MessageSentByTransportIsEnabled())
                    {
                        EventTraceActivity eventTraceActivity = EventTraceActivityHelper.TryExtractActivity(message);
                        TD.MessageSentByTransport(eventTraceActivity, this.RemoteAddress.Uri.AbsoluteUri);
                    }
                }
                finally
                {
                    if (!success)
                    {
                        this.Fault();
                    }
                }
            }
            finally
            {
                this.sendLock.Exit();
            }
        }
 
        protected override IAsyncResult OnBeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)
        {
            this.ThrowIfDisposedOrNotOpen();
            return new SendAsyncResult(this, message, timeout, this.IsStreamedOutput, callback, state);
        }
 
        protected override void OnEndSend(IAsyncResult result)
        {
            SendAsyncResult.End(result);
        }
 
        // cleanup after the framing handshake has completed
        protected abstract void CompleteClose(TimeSpan timeout);
 
        // must be called under sendLock 
        void ThrowIfOutputSessionClosed()
        {
            if (this.isOutputSessionClosed)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.SendCannotBeCalledAfterCloseOutputSession)));
            }
        }
 
        void EnsureInputClosed(TimeSpan timeout)
        {
            Message message = this.MessageSource.Receive(timeout);
            if (message != null)
            {
                using (message)
                {
                    ProtocolException error = ProtocolException.ReceiveShutdownReturnedNonNull(message);
                    throw TraceUtility.ThrowHelperError(error, message);
                }
            }
        }
 
        void OnInputSessionClosed()
        {
            lock (ThisLock)
            {
                if (this.isInputSessionClosed)
                {
                    return;
                }
 
                this.isInputSessionClosed = true;
            }
        }
 
        void OnOutputSessionClosed(ref TimeoutHelper timeoutHelper)
        {
            bool releaseConnection = false;
            lock (ThisLock)
            {
                if (this.isInputSessionClosed)
                { 
                    // we're all done, release the connection
                    releaseConnection = true;
                }
            }
 
            if (releaseConnection)
            {
                this.ReturnConnectionIfNecessary(false, timeoutHelper.RemainingTime());
            }
        }
 
        internal class ConnectionDuplexSession : IDuplexSession
        {
            static UriGenerator uriGenerator;
            TransportDuplexSessionChannel channel;
            string id;
 
            public ConnectionDuplexSession(TransportDuplexSessionChannel channel)
                : base()
            {
                this.channel = channel;
            }
 
            public string Id
            {
                get
                {
                    if (this.id == null)
                    {
                        lock (this.channel)
                        {
                            if (this.id == null)
                            {
                                this.id = UriGenerator.Next();
                            }
                        }
                    }
 
                    return this.id;
                }
            }
 
            public TransportDuplexSessionChannel Channel
            {
                get { return this.channel; }
            }
 
            static UriGenerator UriGenerator
            {
                get
                {
                    if (uriGenerator == null)
                    {
                        uriGenerator = new UriGenerator();
                    }
 
                    return uriGenerator;
                }
            }
 
            public IAsyncResult BeginCloseOutputSession(AsyncCallback callback, object state)
            {
                return this.BeginCloseOutputSession(this.channel.DefaultCloseTimeout, callback, state);
            }
 
            public IAsyncResult BeginCloseOutputSession(TimeSpan timeout, AsyncCallback callback, object state)
            {
                return this.channel.BeginCloseOutputSession(timeout, callback, state);
            }
 
            public void EndCloseOutputSession(IAsyncResult result)
            {
                this.channel.EndCloseOutputSession(result);
            }
 
            public void CloseOutputSession()
            {
                this.CloseOutputSession(this.channel.DefaultCloseTimeout);
            }
 
            public void CloseOutputSession(TimeSpan timeout)
            {
                this.channel.CloseOutputSession(timeout);
            }
        }
 
        class CloseAsyncResult : AsyncResult
        {
            static AsyncCallback onCloseOutputSession = Fx.ThunkCallback(new AsyncCallback(OnCloseOutputSession));
            static AsyncCallback onCloseInputSession = Fx.ThunkCallback(new AsyncCallback(OnCloseInputSession));
            static Action<object> onCompleteCloseScheduled;
            TransportDuplexSessionChannel channel;
            TimeoutHelper timeoutHelper;
 
            public CloseAsyncResult(TransportDuplexSessionChannel channel, TimeSpan timeout, AsyncCallback callback, object state)
                 : base(callback, state)
            {
                this.channel = channel;
                this.timeoutHelper = new TimeoutHelper(timeout);
                IAsyncResult result =
                    this.channel.BeginCloseOutputSession(this.timeoutHelper.RemainingTime(), onCloseOutputSession, this);
 
                if (!result.CompletedSynchronously)
                {
                    return;
                }
 
                if (!this.HandleCloseOutputSession(result, true))
                {
                    return;
                }
 
                this.Complete(true);
            }
 
            public static void End(IAsyncResult result)
            {
                AsyncResult.End<CloseAsyncResult>(result);
            }
 
            static void OnCloseOutputSession(IAsyncResult result)
            {
                if (result.CompletedSynchronously)
                {
                    return;
                }
 
                CloseAsyncResult thisPtr = (CloseAsyncResult)result.AsyncState;
                bool completeSelf = false;
                Exception completionException = null;
                try
                {
                    completeSelf = thisPtr.HandleCloseOutputSession(result, false);
                }
#pragma warning suppress 56500 // Microsoft, transferring exception to another thread
                catch (Exception e)
                {
                    if (Fx.IsFatal(e))
                    {
                        throw;
                    }
 
                    completeSelf = true;
                    completionException = e;
                }
 
                if (completeSelf)
                {
                    thisPtr.Complete(false, completionException);
                }
            }
 
            static void OnCloseInputSession(IAsyncResult result)
            {
                if (result.CompletedSynchronously)
                {
                    return;
                }
 
                CloseAsyncResult thisPtr = (CloseAsyncResult)result.AsyncState;
                bool completeSelf = false;
                Exception completionException = null;
                try
                {
                    completeSelf = thisPtr.HandleCloseInputSession(result, false);
                }
#pragma warning suppress 56500 // Microsoft, transferring exception to another thread
                catch (Exception e)
                {
                    if (Fx.IsFatal(e))
                    {
                        throw;
                    }
 
                    completeSelf = true;
                    completionException = e;
                }
 
                if (completeSelf)
                {
                    thisPtr.Complete(false, completionException);
                }
            }
 
            static void OnCompleteCloseScheduled(object state)
            {
                CloseAsyncResult thisPtr = (CloseAsyncResult)state;
                Exception completionException = null;
                try
                {
                    thisPtr.OnCompleteCloseScheduled();
                }
                catch (Exception e)
                {
                    if (Fx.IsFatal(e))
                    {
                        throw;
                    }
 
                    completionException = e;
                }
 
                thisPtr.Complete(false, completionException);
            }
 
            bool HandleCloseOutputSession(IAsyncResult result, bool isStillSynchronous)
            {
                this.channel.EndCloseOutputSession(result);
 
                if (this.channel.isInputSessionClosed)
                {
                    return this.ScheduleCompleteClose(isStillSynchronous);
                }
                else
                {
                    IAsyncResult closeInputSessionResult =
                        this.channel.messageSource.BeginReceive(this.timeoutHelper.RemainingTime(), onCloseInputSession, this);
 
                    if (!closeInputSessionResult.CompletedSynchronously)
                    {
                        return false;
                    }
 
                    return this.HandleCloseInputSession(closeInputSessionResult, isStillSynchronous);
                }
            }
 
            bool HandleCloseInputSession(IAsyncResult result, bool isStillSynchronous)
            {
                Message message = this.channel.messageSource.EndReceive(result);
                if (message != null)
                {
                    using (message)
                    {
                        ProtocolException error = ProtocolException.ReceiveShutdownReturnedNonNull(message);
                        throw TraceUtility.ThrowHelperError(error, message);
                    }
                }
 
                this.channel.OnInputSessionClosed();
                return this.ScheduleCompleteClose(isStillSynchronous);
            }
 
            bool ScheduleCompleteClose(bool isStillSynchronous)
            {
                if (isStillSynchronous)
                {
                    if (onCompleteCloseScheduled == null)
                    {
                        onCompleteCloseScheduled = new Action<object>(OnCompleteCloseScheduled);
                    }
 
                    ActionItem.Schedule(onCompleteCloseScheduled, this);
                    return false;
                }
                else
                {
                    this.OnCompleteCloseScheduled();
                    return true;
                }
            }
 
            void OnCompleteCloseScheduled()
            {
                this.channel.CompleteClose(this.timeoutHelper.RemainingTime());
            }
        }
 
        class CloseOutputSessionAsyncResult : AsyncResult
        {
            static WaitCallback onWriteComplete = Fx.ThunkCallback(new WaitCallback(OnWriteComplete));
            static FastAsyncCallback onEnterComplete = new FastAsyncCallback(OnEnterComplete);
            TransportDuplexSessionChannel channel;
            TimeoutHelper timeoutHelper;            
 
            public CloseOutputSessionAsyncResult(TransportDuplexSessionChannel channel, TimeSpan timeout, AsyncCallback callback, object state)
                 : base(callback, state)
            {
                channel.ThrowIfNotOpened();
                channel.ThrowIfFaulted();
 
                this.timeoutHelper = new TimeoutHelper(timeout);
                this.channel = channel;
 
                if (!channel.sendLock.EnterAsync(this.timeoutHelper.RemainingTime(), onEnterComplete, this))
                {
                    return;
                }
 
                bool completeSelf = false;
                bool writeSuccess = false;
 
                try
                {
                    completeSelf = this.WriteEndBytes();
                    writeSuccess = true;
                }
                finally
                {
                    if (!writeSuccess)
                    {
                        this.Cleanup(false, true);
                    }
                }
 
                if (completeSelf)
                {
                    this.Cleanup(true, true);
                    this.Complete(true);
                }
            }
 
            public static void End(IAsyncResult result)
            {
                AsyncResult.End<CloseOutputSessionAsyncResult>(result);
            }
 
            static void OnEnterComplete(object state, Exception asyncException)
            {
                CloseOutputSessionAsyncResult thisPtr = (CloseOutputSessionAsyncResult)state;
                bool completeSelf = false;
                Exception completionException = asyncException;
                if (completionException != null)
                {
                    completeSelf = true;
                }
                else
                {
                    try
                    {
                        completeSelf = thisPtr.WriteEndBytes();
                    }
#pragma warning suppress 56500 // Microsoft, transferring exception to another thread
                    catch (Exception e)
                    {
                        if (Fx.IsFatal(e))
                        {
                            throw;
                        }
 
                        completeSelf = true;
                        completionException = e;
                    }
                }
 
                if (completeSelf)
                {
                    thisPtr.Cleanup(completionException == null, asyncException == null);
                    thisPtr.Complete(false, completionException);
                }
            }
 
            static void OnWriteComplete(object asyncState)
            {
                CloseOutputSessionAsyncResult thisPtr = (CloseOutputSessionAsyncResult)asyncState;
                Exception completionException = null;
                try
                {
                    thisPtr.HandleWriteEndBytesComplete();
                }
#pragma warning suppress 56500 // Microsoft, transferring exception to another thread
                catch (Exception e)
                {
                    if (Fx.IsFatal(e))
                    {
                        throw;
                    }
 
                    completionException = e;
                }
 
                thisPtr.Cleanup(completionException == null, true);
                thisPtr.Complete(false, completionException);
            }
 
            bool WriteEndBytes()
            {
                // check again in case we faulted while we were waiting for the lock
                this.channel.ThrowIfFaulted();
 
                // we're synchronized by sendLock here
                if (this.channel.isOutputSessionClosed)
                {
                    return true;
                }
 
                this.channel.isOutputSessionClosed = true;
 
                AsyncCompletionResult completionResult = this.channel.BeginCloseOutput(this.timeoutHelper.RemainingTime(), onWriteComplete, this);
 
                if (completionResult == AsyncCompletionResult.Queued)
                {
                    return false;
                }
 
                this.HandleWriteEndBytesComplete();
                return true;
            }
 
            void HandleWriteEndBytesComplete()
            {
                this.channel.FinishWritingMessage();
                this.channel.OnOutputSessionClosed(ref this.timeoutHelper);
            }
 
            void Cleanup(bool success, bool lockTaken)
            {
                try
                {
                    if (!success)
                    {
                        this.channel.Fault();
                    }
                }
                finally
                {
                    if (lockTaken)
                    {
                        this.channel.sendLock.Exit();
                    }
                }
            }
        }
 
        class SendAsyncResult : TraceAsyncResult
        {
            static WaitCallback onWriteComplete = Fx.ThunkCallback(new WaitCallback(OnWriteComplete));
            static FastAsyncCallback onEnterComplete = new FastAsyncCallback(OnEnterComplete);
            TransportDuplexSessionChannel channel;
            Message message;
            byte[] buffer;
            TimeoutHelper timeoutHelper;
            bool streamedOutput;
            EventTraceActivity eventTraceActivity;
 
            public SendAsyncResult(TransportDuplexSessionChannel channel, Message message, TimeSpan timeout, bool streamedOutput, AsyncCallback callback, object state)
                : base(callback, state)
            {
                this.timeoutHelper = new TimeoutHelper(timeout);
                this.channel = channel;
                this.message = message;
                this.streamedOutput = streamedOutput;
 
                if (!channel.sendLock.EnterAsync(this.timeoutHelper.RemainingTime(), onEnterComplete, this))
                {
                    return;
                }
 
                bool completeSelf = false;
                bool writeSuccess = false;
 
                try
                {
                    completeSelf = this.WriteCore();
                    writeSuccess = true;
                }
                finally
                {
                    if (!writeSuccess)
                    {
                        this.Cleanup(false, true);
                    }
                }
 
                if (completeSelf)
                {
                    this.Cleanup(true, true);
                    this.Complete(true);
                }
            }
 
            public static void End(IAsyncResult result)
            {
                if (TD.MessageSentByTransportIsEnabled())
                {
                    SendAsyncResult thisPtr = result as SendAsyncResult;
                    if (thisPtr != null)
                    {
                        TD.MessageSentByTransport(thisPtr.eventTraceActivity, thisPtr.channel.RemoteAddress.Uri.AbsoluteUri);
                    }
                }
 
                AsyncResult.End<SendAsyncResult>(result);
            }
 
            static void OnEnterComplete(object state, Exception asyncException)
            {
                SendAsyncResult thisPtr = (SendAsyncResult)state;
                bool completeSelf = false;
                Exception completionException = asyncException;
                if (completionException != null)
                {
                    completeSelf = true;
                }
                else
                {
                    try
                    {
                        completeSelf = thisPtr.WriteCore();
                    }
#pragma warning suppress 56500 // Microsoft, transferring exception to another thread
                    catch (Exception e)
                    {
                        if (Fx.IsFatal(e))
                        {
                            throw;
                        }
 
                        completeSelf = true;
                        completionException = e;
                    }
                }
 
                if (completeSelf)
                {
                    thisPtr.Cleanup(completionException == null, asyncException == null);
                    thisPtr.Complete(false, completionException);
                }
            }
 
            static void OnWriteComplete(object asyncState)
            {
                SendAsyncResult thisPtr = (SendAsyncResult)asyncState;
                Exception completionException = null;
                try
                {
                    thisPtr.channel.FinishWritingMessage();
                }
#pragma warning suppress 56500 // Microsoft, transferring exception to another thread
                catch (Exception e)
                {
                    if (Fx.IsFatal(e))
                    {
                        throw;
                    }
 
                    completionException = e;
                }
 
                thisPtr.Cleanup(completionException == null, true);
                thisPtr.Complete(false, completionException);
            }
 
            bool WriteCore()
            {
                // check again in case the previous send faulted while we were waiting for the lock
                this.channel.ThrowIfDisposedOrNotOpen();
                this.channel.ThrowIfOutputSessionClosed();
 
                this.channel.ApplyChannelBinding(this.message);
                
                Message message = this.message;
                this.message = null;
 
                // Because we nullify the message, we need to save its trace activity, for logging events later on.
                if (TD.MessageSentByTransportIsEnabled())
                {
                    this.eventTraceActivity = EventTraceActivityHelper.TryExtractActivity(message);
                }
 
                AsyncCompletionResult completionResult;
                if (this.streamedOutput)
                {
                    completionResult = this.channel.StartWritingStreamedMessage(message, this.timeoutHelper.RemainingTime(), onWriteComplete, this);
                }
                else
                {
                    bool allowOutputBatching;
                    ArraySegment<byte> messageData;
                    allowOutputBatching = message.Properties.AllowOutputBatching;
                    messageData = this.channel.EncodeMessage(message);
 
                    this.buffer = messageData.Array;
                    completionResult = this.channel.StartWritingBufferedMessage(
                                                                          message,
                                                                          messageData,
                                                                          allowOutputBatching,
                                                                          this.timeoutHelper.RemainingTime(),
                                                                          onWriteComplete,
                                                                          this);
                }
 
                if (completionResult == AsyncCompletionResult.Queued)
                {
                    return false;
                }
 
                this.channel.FinishWritingMessage();
                return true;
            }
 
            void Cleanup(bool success, bool lockTaken)
            {
                try
                {
                    if (!success)
                    {
                        this.channel.Fault();
                    }
                }
                finally
                {
                    if (lockTaken)
                    {
                        this.channel.sendLock.Exit();
                    }
                }
 
                if (this.buffer != null)
                {
                    this.channel.bufferManager.ReturnBuffer(this.buffer);
                    this.buffer = null;
                }
            }
        }
 
        class TryReceiveAsyncResult : AsyncResult
        {
            static AsyncCallback onReceive = Fx.ThunkCallback(new AsyncCallback(OnReceive));
            TransportDuplexSessionChannel channel;
            bool receiveSuccess;
            Message message;
 
            public TryReceiveAsyncResult(TransportDuplexSessionChannel channel, TimeSpan timeout, AsyncCallback callback, object state)
                : base(callback, state)
            {
                this.channel = channel;
 
                bool completeSelf = false;
                try
                {
                    IAsyncResult result = this.channel.BeginReceive(timeout, onReceive, this);
                    if (result.CompletedSynchronously)
                    {
                        this.CompleteReceive(result);
                        completeSelf = true;
                    }
                }
                catch (TimeoutException e)
                {
                    if (TD.ReceiveTimeoutIsEnabled())
                    {
                        TD.ReceiveTimeout(e.Message);
                    }
 
                    DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
 
                    completeSelf = true;
                }
 
                if (completeSelf)
                {
                    this.Complete(true);
                }
            }
 
            public static bool End(IAsyncResult result, out Message message)
            {
                TryReceiveAsyncResult thisPtr = AsyncResult.End<TryReceiveAsyncResult>(result);
                message = thisPtr.message;
                return thisPtr.receiveSuccess;
            }
            
            static void OnReceive(IAsyncResult result)
            {
                if (result.CompletedSynchronously)
                {
                    return;
                }
 
                TryReceiveAsyncResult thisPtr = (TryReceiveAsyncResult)result.AsyncState;
                Exception completionException = null;
                try
                {
                    thisPtr.CompleteReceive(result);
                }
                catch (TimeoutException e)
                {
                    if (TD.ReceiveTimeoutIsEnabled())
                    {
                        TD.ReceiveTimeout(e.Message);
                    }
 
                    DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                }
#pragma warning suppress 56500 // Microsoft, transferring exception to another thread
                catch (Exception e)
                {
                    if (Fx.IsFatal(e))
                    {
                        throw;
                    }
 
                    completionException = e;
                }
 
                thisPtr.Complete(false, completionException);
            }
 
            void CompleteReceive(IAsyncResult result)
            {
                this.message = this.channel.EndReceive(result);
                this.receiveSuccess = true;
            }
        }
    }
}