File: System\ServiceModel\Dispatcher\DuplexChannelBinder.cs
Project: ndp\cdf\src\WCF\ServiceModel\System.ServiceModel.csproj (System.ServiceModel)
//-----------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------------------------
 
namespace System.ServiceModel.Dispatcher
{
    using System;
    using System.Collections.Generic;
    using System.Runtime;
    using System.Runtime.CompilerServices;
    using System.ServiceModel;
    using System.ServiceModel.Channels;
    using System.ServiceModel.Diagnostics;
    using System.ServiceModel.Security;
    using System.Threading;
    using System.Xml;
    using System.ServiceModel.Diagnostics.Application;
 
    class DuplexChannelBinder : IChannelBinder
    {
        IDuplexChannel channel;
        IRequestReplyCorrelator correlator;
        TimeSpan defaultCloseTimeout;
        TimeSpan defaultSendTimeout;
        IdentityVerifier identityVerifier;
        bool isSession;
        Uri listenUri;
        int pending;
        bool syncPumpEnabled;
        List<IDuplexRequest> requests;
        List<ICorrelatorKey> timedOutRequests;
        ChannelHandler channelHandler;
        bool requestAborted;
 
        internal DuplexChannelBinder(IDuplexChannel channel, IRequestReplyCorrelator correlator)
        {
            Fx.Assert(channel != null, "caller must verify");
            Fx.Assert(correlator != null, "caller must verify");
            this.channel = channel;
            this.correlator = correlator;
            this.channel.Faulted += new EventHandler(OnFaulted);
        }
 
        internal DuplexChannelBinder(IDuplexChannel channel, IRequestReplyCorrelator correlator, Uri listenUri)
            : this(channel, correlator)
        {
            this.listenUri = listenUri;
        }
 
        internal DuplexChannelBinder(IDuplexSessionChannel channel, IRequestReplyCorrelator correlator, Uri listenUri)
            : this((IDuplexChannel)channel, correlator, listenUri)
        {
            this.isSession = true;
        }
 
        internal DuplexChannelBinder(IDuplexSessionChannel channel, IRequestReplyCorrelator correlator, bool useActiveAutoClose)
            : this(useActiveAutoClose ? new AutoCloseDuplexSessionChannel(channel) : channel, correlator, null)
        {
        }
 
        public IChannel Channel
        {
            get { return this.channel; }
        }
 
        public TimeSpan DefaultCloseTimeout
        {
            get { return this.defaultCloseTimeout; }
            set { this.defaultCloseTimeout = value; }
        }
 
        internal ChannelHandler ChannelHandler
        {
            get
            {
                if (!(this.channelHandler != null))
                {
                    Fx.Assert("DuplexChannelBinder.ChannelHandler: (channelHandler != null)");
                }
                return this.channelHandler;
            }
            set
            {
                if (!(this.channelHandler == null))
                {
                    Fx.Assert("DuplexChannelBinder.ChannelHandler: (channelHandler == null)");
                }
                this.channelHandler = value;
            }
        }
 
        public TimeSpan DefaultSendTimeout
        {
            get { return this.defaultSendTimeout; }
            set { this.defaultSendTimeout = value; }
        }
 
        public bool HasSession
        {
            get { return this.isSession; }
        }
 
        internal IdentityVerifier IdentityVerifier
        {
            get
            {
                if (this.identityVerifier == null)
                {
                    this.identityVerifier = IdentityVerifier.CreateDefault();
                }
 
                return this.identityVerifier;
            }
            set
            {
                if (value == null)
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("value");
                }
 
                this.identityVerifier = value;
            }
        }
 
        public Uri ListenUri
        {
            get { return this.listenUri; }
        }
 
        public EndpointAddress LocalAddress
        {
            get { return this.channel.LocalAddress; }
        }
 
        bool Pumping
        {
            get
            {
                if (this.syncPumpEnabled)
                    return true;
 
                if (this.ChannelHandler != null && this.ChannelHandler.HasRegisterBeenCalled)
                    return true;
 
                return false;
            }
        }
 
        public EndpointAddress RemoteAddress
        {
            get { return this.channel.RemoteAddress; }
        }
 
        List<IDuplexRequest> Requests
        {
            get
            {
                lock (this.ThisLock)
                {
                    if (this.requests == null)
                        this.requests = new List<IDuplexRequest>();
                    return this.requests;
                }
            }
        }
 
        List<ICorrelatorKey> TimedOutRequests
        {
            get
            {
                lock (this.ThisLock)
                {
                    if (this.timedOutRequests == null)
                    {
                        this.timedOutRequests = new List<ICorrelatorKey>();
                    }
                    return this.timedOutRequests;
                }
            }
        }
 
        object ThisLock
        {
            get { return this; }
        }
 
        void OnFaulted(object sender, EventArgs e)
        {
            //Some unhandled exception happened on the channel. 
            //So close all pending requests so the callbacks (in case of async)
            //on the requests are called.
            this.AbortRequests();
        }
 
        public void Abort()
        {
            this.channel.Abort();
            this.AbortRequests();
        }
 
        public void CloseAfterFault(TimeSpan timeout)
        {
            this.channel.Close(timeout);
            this.AbortRequests();
        }
 
        void AbortRequests()
        {
            IDuplexRequest[] array = null;
 
            lock (this.ThisLock)
            {
                if (this.requests != null)
                {
                    array = this.requests.ToArray();
                    this.requests = null;
                }
 
                this.requestAborted = true;
            }
 
            bool hadRequests = array != null && array.Length > 0;
 
            if (hadRequests)
            {
                foreach (IDuplexRequest request in array)
                {
                    request.Abort();
                }
            }
 
            // Remove requests from the correlator since the channel might be either faulting or aborting,
            // We are not going to get a reply for these requests. If they are not removed from the correlator, this will cause a leak.
            // This operation does not have to be under the lock
            if (hadRequests)
            {
                RequestReplyCorrelator requestReplyCorrelator = this.correlator as RequestReplyCorrelator;
                if (requestReplyCorrelator != null)
                {
                    foreach (IDuplexRequest request in array)
                    {
                        ICorrelatorKey keyedRequest = request as ICorrelatorKey;
                        if (keyedRequest != null)
                        {
                            requestReplyCorrelator.RemoveRequest(keyedRequest);
                        }
                    }
                }
            }
 
            //if there are any timed out requests, delete it from the correlator table
            this.DeleteTimedoutRequestsFromCorrelator();
        }
 
        TimeoutException GetReceiveTimeoutException(TimeSpan timeout)
        {
            EndpointAddress address = this.channel.RemoteAddress ?? this.channel.LocalAddress;
            if (address != null)
            {
                return new TimeoutException(SR.GetString(SR.SFxRequestTimedOut2, address, timeout));
            }
            else
            {
                return new TimeoutException(SR.GetString(SR.SFxRequestTimedOut1, timeout));
            }
        }
 
        internal bool HandleRequestAsReply(Message message)
        {
            UniqueId relatesTo = null;
            try
            {
                relatesTo = message.Headers.RelatesTo;
            }
            catch (MessageHeaderException)
            {
                // ignore it
            }
            if (relatesTo == null)
            {
                return false;
            }
            else
            {
                return HandleRequestAsReplyCore(message);
            }
        }
 
        bool HandleRequestAsReplyCore(Message message)
        {
            IDuplexRequest request = correlator.Find<IDuplexRequest>(message, true);
            if (request != null)
            {
                request.GotReply(message);
                return true;
            }
            return false;
        }
 
        public void EnsurePumping()
        {
            lock (this.ThisLock)
            {
                if (!this.syncPumpEnabled)
                {
                    if (!this.ChannelHandler.HasRegisterBeenCalled)
                    {
                        ChannelHandler.Register(this.ChannelHandler);
                    }
                }
            }
        }
 
        public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
        {
            if (this.channel.State == CommunicationState.Faulted)
                return new ChannelFaultedAsyncResult(callback, state);
 
            return this.channel.BeginTryReceive(timeout, callback, state);
        }
 
        public bool EndTryReceive(IAsyncResult result, out RequestContext requestContext)
        {
            ChannelFaultedAsyncResult channelFaultedResult = result as ChannelFaultedAsyncResult;
            if (channelFaultedResult != null)
            {
                this.AbortRequests();
                requestContext = null;
                return true;
            }
 
            Message message;
            if (this.channel.EndTryReceive(result, out message))
            {
                if (message != null)
                {
                    requestContext = new DuplexRequestContext(this.channel, message, this);
                }
                else
                {
                    this.AbortRequests();
                    requestContext = null;
                }
                return true;
            }
            else
            {
                requestContext = null;
                return false;
            }
        }
 
        public RequestContext CreateRequestContext(Message message)
        {
            return new DuplexRequestContext(this.channel, message, this);
        }
 
        public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)
        {
            return this.channel.BeginSend(message, timeout, callback, state);
        }
 
        public void EndSend(IAsyncResult result)
        {
            this.channel.EndSend(result);
        }
 
        public void Send(Message message, TimeSpan timeout)
        {
            this.channel.Send(message, timeout);
        }
 
        public IAsyncResult BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state)
        {
            bool success = false;
            AsyncDuplexRequest duplexRequest = null;
 
            try
            {
                RequestReplyCorrelator.PrepareRequest(message);
                duplexRequest = new AsyncDuplexRequest(message, this, timeout, callback, state);
 
                lock (this.ThisLock)
                {
                    this.RequestStarting(message, duplexRequest);
                }
 
                IAsyncResult result = this.channel.BeginSend(message, timeout, Fx.ThunkCallback(new AsyncCallback(this.SendCallback)), duplexRequest);
 
                if (result.CompletedSynchronously)
                    duplexRequest.FinishedSend(result, true);
 
                EnsurePumping();
 
                success = true;
                return duplexRequest;
            }
            finally
            {
                lock (this.ThisLock)
                {
                    if (success)
                    {
                        duplexRequest.EnableCompletion();
                    }
                    else
                    {
                        this.RequestCompleting(duplexRequest);
                    }
                }
            }
        }
 
        public Message EndRequest(IAsyncResult result)
        {
            AsyncDuplexRequest duplexRequest = result as AsyncDuplexRequest;
 
            if (duplexRequest == null)
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentException(SR.GetString(SR.InvalidAsyncResult)));
 
            return duplexRequest.End();
        }
 
        public bool TryReceive(TimeSpan timeout, out RequestContext requestContext)
        {
            if (this.channel.State == CommunicationState.Faulted)
            {
                this.AbortRequests();
                requestContext = null;
                return true;
            }
 
            Message message;
            if (this.channel.TryReceive(timeout, out message))
            {
                if (message != null)
                {
                    requestContext = new DuplexRequestContext(this.channel, message, this);
                }
                else
                {
                    this.AbortRequests();
                    requestContext = null;
                }
                return true;
            }
            else
            {
                requestContext = null;
                return false;
            }
        }
 
        public Message Request(Message message, TimeSpan timeout)
        {
            SyncDuplexRequest duplexRequest = null;
            bool optimized = false;
 
            RequestReplyCorrelator.PrepareRequest(message);
 
            lock (this.ThisLock)
            {
                if (!Pumping)
                {
                    optimized = true;
                    syncPumpEnabled = true;
                }
 
                if (!optimized)
                    duplexRequest = new SyncDuplexRequest(this);
 
                this.RequestStarting(message, duplexRequest);
            }
 
            if (optimized)
            {
                TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
                UniqueId messageId = message.Headers.MessageId;
 
                try
                {
                    this.channel.Send(message, timeoutHelper.RemainingTime());
                    if (DiagnosticUtility.ShouldUseActivity &&
                        ServiceModelActivity.Current != null &&
                        ServiceModelActivity.Current.ActivityType == ActivityType.ProcessAction)
                    {
                        ServiceModelActivity.Current.Suspend();
                    }
 
                    for (;;)
                    {
                        TimeSpan remaining = timeoutHelper.RemainingTime();
                        Message reply;
 
                        if (!this.channel.TryReceive(remaining, out reply))
                        {
                            throw TraceUtility.ThrowHelperError(this.GetReceiveTimeoutException(timeout), message);
                        }
 
                        if (reply == null)
                        {
                            this.AbortRequests();
                            return null;
                        }
 
                        if (reply.Headers.RelatesTo == messageId)
                        {
                            this.ThrowIfInvalidReplyIdentity(reply);
                            return reply;
                        }
                        else if (!this.HandleRequestAsReply(reply))
                        {
                            // SFx drops a message here
                            if (DiagnosticUtility.ShouldTraceInformation)
                            {
                                EndpointDispatcher dispatcher = null;
                                if (this.ChannelHandler != null && this.ChannelHandler.Channel != null)
                                {
                                    dispatcher = this.ChannelHandler.Channel.EndpointDispatcher;
                                }
                                TraceUtility.TraceDroppedMessage(reply, dispatcher);
                            }
                            reply.Close();
                        }
                    }
                }
                finally
                {
                    lock (this.ThisLock)
                    {
                        this.RequestCompleting(null);
                        syncPumpEnabled = false;
                        if (this.pending > 0)
                            EnsurePumping();
                    }
                }
            }
            else
            {
                TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
                this.channel.Send(message, timeoutHelper.RemainingTime());
                EnsurePumping();
                return duplexRequest.WaitForReply(timeoutHelper.RemainingTime());
            }
        }
 
        // ASSUMPTION: (Microsoft) caller holds lock (this.mutex)
        void RequestStarting(Message message, IDuplexRequest request)
        {
            if (request != null)
            {
                this.Requests.Add(request);
                if (!this.requestAborted)
                {
                    this.correlator.Add<IDuplexRequest>(message, request);
                }
            }
            this.pending++;
 
        }
 
        // ASSUMPTION: (Microsoft) caller holds lock (this.mutex)
        void RequestCompleting(IDuplexRequest request)
        {
            this.pending--;
            if (this.pending == 0)
            {
                this.requests = null;
            }
            else if ((request != null) && (this.requests != null))
            {
                this.requests.Remove(request);
            }
        }
 
        // ASSUMPTION: caller holds ThisLock
        void AddToTimedOutRequestList(ICorrelatorKey request)
        {
            Fx.Assert(request != null, "request cannot be null");
            this.TimedOutRequests.Add(request);
        }
 
        // ASSUMPTION: caller holds  ThisLock
        void RemoveFromTimedOutRequestList(ICorrelatorKey request)
        {
            Fx.Assert(request != null, "request cannot be null");
            if (this.timedOutRequests != null)
            {
                this.timedOutRequests.Remove(request);
            }
        }
 
        void DeleteTimedoutRequestsFromCorrelator()
        {
            ICorrelatorKey[] array = null;
            if (this.timedOutRequests != null && this.timedOutRequests.Count > 0)
            {
                lock (this.ThisLock)
                {
                    if (this.timedOutRequests != null && this.timedOutRequests.Count > 0)
                    {
                        array = this.timedOutRequests.ToArray();
                        this.timedOutRequests = null;
                    }
                }
            }
 
            // Remove requests from the correlator since the channel might be either faulting, aborting or closing 
            // We are not going to get a reply for these timed out requests. If they are not removed from the correlator, this will cause a leak.
            // This operation does not have to be under the lock
            if (array != null && array.Length > 0)
            {
                RequestReplyCorrelator requestReplyCorrelator = this.correlator as RequestReplyCorrelator;
                if (requestReplyCorrelator != null)
                {
                    foreach (ICorrelatorKey request in array)
                    {
                        requestReplyCorrelator.RemoveRequest(request);
                    }
                }
            }
 
        }
 
        void SendCallback(IAsyncResult result)
        {
            AsyncDuplexRequest duplexRequest = result.AsyncState as AsyncDuplexRequest;
            if (!((duplexRequest != null)))
            {
                Fx.Assert("DuplexChannelBinder.RequestCallback: (duplexRequest != null)");
            }
 
            if (!result.CompletedSynchronously)
                duplexRequest.FinishedSend(result, false);
        }
 
        [MethodImpl(MethodImplOptions.NoInlining)]
        void EnsureIncomingIdentity(SecurityMessageProperty property, EndpointAddress address, Message reply)
        {
            this.IdentityVerifier.EnsureIncomingIdentity(address, property.ServiceSecurityContext.AuthorizationContext);
        }
 
        void ThrowIfInvalidReplyIdentity(Message reply)
        {
            if (!this.isSession)
            {
                SecurityMessageProperty property = reply.Properties.Security;
                EndpointAddress address = this.channel.RemoteAddress;
 
                if ((property != null) && (address != null))
                {
                    EnsureIncomingIdentity(property, address, reply);
                }
            }
        }
 
        public bool WaitForMessage(TimeSpan timeout)
        {
            return this.channel.WaitForMessage(timeout);
        }
 
        public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return this.channel.BeginWaitForMessage(timeout, callback, state);
        }
 
        public bool EndWaitForMessage(IAsyncResult result)
        {
            return this.channel.EndWaitForMessage(result);
        }
 
        class DuplexRequestContext : RequestContextBase
        {
            DuplexChannelBinder binder;
            IDuplexChannel channel;
 
            internal DuplexRequestContext(IDuplexChannel channel, Message request, DuplexChannelBinder binder)
                : base(request, binder.DefaultCloseTimeout, binder.DefaultSendTimeout)
            {
                this.channel = channel;
                this.binder = binder;
            }
 
            protected override void OnAbort()
            {
            }
 
            protected override void OnClose(TimeSpan timeout)
            {
            }
 
            protected override void OnReply(Message message, TimeSpan timeout)
            {
                if (message != null)
                {
                    this.channel.Send(message, timeout);
                }
            }
 
            protected override IAsyncResult OnBeginReply(Message message, TimeSpan timeout, AsyncCallback callback, object state)
            {
                return new ReplyAsyncResult(this, message, timeout, callback, state);
            }
 
            protected override void OnEndReply(IAsyncResult result)
            {
                ReplyAsyncResult.End(result);
            }
 
            class ReplyAsyncResult : AsyncResult
            {
                static AsyncCallback onSend;
                DuplexRequestContext context;
 
                public ReplyAsyncResult(DuplexRequestContext context, Message message, TimeSpan timeout, AsyncCallback callback, object state)
                    : base(callback, state)
                {
                    if (message != null)
                    {
                        if (onSend == null)
                        {
 
                            onSend = Fx.ThunkCallback(new AsyncCallback(OnSend));
                        }
                        this.context = context;
                        IAsyncResult result = context.channel.BeginSend(message, timeout, onSend, this);
                        if (!result.CompletedSynchronously)
                        {
                            return;
                        }
                        context.channel.EndSend(result);
                    }
 
                    base.Complete(true);
                }
 
                public static void End(IAsyncResult result)
                {
                    AsyncResult.End<ReplyAsyncResult>(result);
                }
 
                static void OnSend(IAsyncResult result)
                {
                    if (result.CompletedSynchronously)
                    {
                        return;
                    }
 
                    Exception completionException = null;
                    ReplyAsyncResult thisPtr = (ReplyAsyncResult)result.AsyncState;
                    try
                    {
                        thisPtr.context.channel.EndSend(result);
                    }
#pragma warning suppress 56500 // covered by FxCOP
                    catch (Exception exception)
                    {
                        if (Fx.IsFatal(exception))
                        {
                            throw;
                        }
                        completionException = exception;
                    }
 
                    thisPtr.Complete(false, completionException);
                }
            }
        }
 
        interface IDuplexRequest
        {
            void Abort();
            void GotReply(Message reply);
        }
 
        class SyncDuplexRequest : IDuplexRequest, ICorrelatorKey
        {
            Message reply;
            DuplexChannelBinder parent;
            ManualResetEvent wait = new ManualResetEvent(false);
            int waitCount = 0;
            RequestReplyCorrelator.Key requestCorrelatorKey;
            readonly object thisLock = new object();
 
            internal SyncDuplexRequest(DuplexChannelBinder parent)
            {
                this.parent = parent;
            }
 
            RequestReplyCorrelator.Key ICorrelatorKey.RequestCorrelatorKey
            {
                get
                {
                    return this.requestCorrelatorKey;
                }
                set
                {
                    Fx.Assert(this.requestCorrelatorKey == null, "RequestCorrelatorKey is already set for this request");
                    this.requestCorrelatorKey = value;
                }
            }
 
            public void Abort()
            {
                this.SetWaitHandle();
            }
 
            internal Message WaitForReply(TimeSpan timeout)
            {
                try
                {
                    if (!TimeoutHelper.WaitOne(this.wait, timeout))
                    {
                        throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(this.parent.GetReceiveTimeoutException(timeout));
                    }
                }
                finally
                {
                    this.CloseWaitHandle();
                }
 
                this.parent.ThrowIfInvalidReplyIdentity(this.reply);
                return this.reply;
            }
 
            public void GotReply(Message reply)
            {
                lock (this.parent.ThisLock)
                {
                    this.parent.RequestCompleting(this);
                }
                this.reply = reply;
                this.SetWaitHandle();
                this.CloseWaitHandle();
            }
 
            void SetWaitHandle()
            {
                lock (thisLock)
                {
                    if (this.waitCount < 2)
                    {
                        this.wait.Set();
                    }
                }
            }
 
            void CloseWaitHandle()
            {
                lock (this.thisLock)
                {
                    this.waitCount++;
 
                    if (this.waitCount == 2)
                    {
                        this.wait.Close();
                    }
                }
            }
        }
 
        class AsyncDuplexRequest : AsyncResult, IDuplexRequest, ICorrelatorKey
        {
            static Action<object> timerCallback = new Action<object>(AsyncDuplexRequest.TimerCallback);
 
            bool aborted;
            bool enableComplete;
            bool gotReply;
            Exception sendException;
            IAsyncResult sendResult;
            DuplexChannelBinder parent;
            Message reply;
            bool timedOut;
            TimeSpan timeout;
            IOThreadTimer timer;
            ServiceModelActivity activity;
            RequestReplyCorrelator.Key requestCorrelatorKey;
 
            internal AsyncDuplexRequest(Message message, DuplexChannelBinder parent, TimeSpan timeout, AsyncCallback callback, object state)
                : base(callback, state)
            {
                this.parent = parent;
                this.timeout = timeout;
 
                if (timeout != TimeSpan.MaxValue)
                {
                    this.timer = new IOThreadTimer(AsyncDuplexRequest.timerCallback, this, true);
                    this.timer.Set(timeout);
                }
                if (DiagnosticUtility.ShouldUseActivity)
                {
                    this.activity = TraceUtility.ExtractActivity(message);
                }
            }
 
            bool IsDone
            {
                get
                {
                    if (!this.enableComplete)
                    {
                        return false;
                    }
 
                    return (((this.sendResult != null) && this.gotReply) ||
                            (this.sendException != null) ||
                            this.timedOut ||
                            this.aborted);
                }
            }
 
            RequestReplyCorrelator.Key ICorrelatorKey.RequestCorrelatorKey
            {
                get
                {
                    return this.requestCorrelatorKey;
                }
                set
                {
                    Fx.Assert(this.requestCorrelatorKey == null, "RequestCorrelatorKey is already set for this request");
                    this.requestCorrelatorKey = value;
                }
            }
 
            public void Abort()
            {
                bool done;
 
                lock (this.parent.ThisLock)
                {
                    bool wasDone = this.IsDone;
                    this.aborted = true;
                    done = !wasDone && this.IsDone;
                }
 
                if (done)
                    this.Done(false);
            }
 
            void Done(bool completedSynchronously)
            {
                // Make sure that we are acting on the Reply activity.
                ServiceModelActivity replyActivity = DiagnosticUtility.ShouldUseActivity ?
                    TraceUtility.ExtractActivity(this.reply) : null;
                using (ServiceModelActivity.BoundOperation(replyActivity))
                {
                    if (this.timer != null)
                    {
                        this.timer.Cancel();
                        this.timer = null;
                    }
 
                    lock (this.parent.ThisLock)
                    {
                        if (this.timedOut)
                        {
                            // this needs to be saved in a list since we need to remove these from the correlator table when the channel aborts or closes
                            this.parent.AddToTimedOutRequestList(this);
                        }
                        this.parent.RequestCompleting(this);
                    }
 
                    if (this.sendException != null)
                        this.Complete(completedSynchronously, this.sendException);
                    else if (this.timedOut)
                        this.Complete(completedSynchronously, this.parent.GetReceiveTimeoutException(this.timeout));
                    else
                        this.Complete(completedSynchronously);
                }
            }
 
            public void EnableCompletion()
            {
                bool done;
 
                lock (this.parent.ThisLock)
                {
                    bool wasDone = this.IsDone;
                    this.enableComplete = true;
                    done = !wasDone && this.IsDone;
                }
 
                if (done)
                    this.Done(true);
            }
 
            public void FinishedSend(IAsyncResult sendResult, bool completedSynchronously)
            {
                Exception sendException = null;
 
                try
                {
                    this.parent.channel.EndSend(sendResult);
                }
#pragma warning suppress 56500 // covered by FxCOP
                catch (Exception e)
                {
                    if (Fx.IsFatal(e))
                        throw;
 
                    sendException = e;
                }
 
                bool done;
 
                lock (this.parent.ThisLock)
                {
                    bool wasDone = this.IsDone;
                    this.sendResult = sendResult;
                    this.sendException = sendException;
                    done = !wasDone && this.IsDone;
                }
 
                if (done)
                    this.Done(completedSynchronously);
            }
 
            internal Message End()
            {
                AsyncResult.End<AsyncDuplexRequest>(this);
                this.parent.ThrowIfInvalidReplyIdentity(this.reply);
                return this.reply;
            }
 
            public void GotReply(Message reply)
            {
                bool done;
 
                ServiceModelActivity replyActivity = DiagnosticUtility.ShouldUseActivity ?
                    TraceUtility.ExtractActivity(reply) : null;
 
                using (ServiceModelActivity.BoundOperation(replyActivity))
                {
                    lock (this.parent.ThisLock)
                    {
                        bool wasDone = this.IsDone;
                        this.reply = reply;
                        this.gotReply = true;
                        done = !wasDone && this.IsDone;
                        // we got reply on the channel after the request timed out, let's delete it from the pending timed out requests
                        // we don't neeed to hold on to it since it is now  removed from the correlator table
                        if (wasDone && this.timedOut)
                        {
                            this.parent.RemoveFromTimedOutRequestList(this);
                        }
                    }
                    if (replyActivity != null && DiagnosticUtility.ShouldUseActivity)
                    {
                        TraceUtility.SetActivity(reply, this.activity);
                        if (DiagnosticUtility.ShouldUseActivity && this.activity != null)
                        {
                            if (null != FxTrace.Trace)
                            {
                                FxTrace.Trace.TraceTransfer(this.activity.Id);
                            }
                        }
                    }
                }
                if (DiagnosticUtility.ShouldUseActivity && replyActivity != null)
                {
                    replyActivity.Stop();
                }
 
                if (done)
                    this.Done(false);
            }
 
            void TimedOut()
            {
                bool done;
 
                lock (this.parent.ThisLock)
                {
                    bool wasDone = this.IsDone;
                    this.timedOut = true;
                    done = !wasDone && this.IsDone;
                }
 
                if (done)
                    this.Done(false);
            }
 
            static void TimerCallback(object state)
            {
                ((AsyncDuplexRequest)state).TimedOut();
            }
        }
 
        class ChannelFaultedAsyncResult : CompletedAsyncResult
        {
            public ChannelFaultedAsyncResult(AsyncCallback callback, object state)
                : base(callback, state)
            {
            }
        }
 
        // used to read-ahead by a single message and auto-close the session when we read null
        class AutoCloseDuplexSessionChannel : IDuplexSessionChannel
        {
            static AsyncCallback receiveAsyncCallback;
            static Action<object> receiveThreadSchedulerCallback;
            static AsyncCallback closeInnerChannelCallback;
            IDuplexSessionChannel innerChannel;
            InputQueue<Message> pendingMessages;
            Action messageDequeuedCallback;
            CloseState closeState;
 
            public AutoCloseDuplexSessionChannel(IDuplexSessionChannel innerChannel)
            {
                this.innerChannel = innerChannel;
                this.pendingMessages = new InputQueue<Message>();
                this.messageDequeuedCallback = new Action(StartBackgroundReceive); // kick off a new receive when a message is picked up
                this.closeState = new CloseState();
            }
 
            object ThisLock
            {
                get
                {
                    return this;
                }
            }
 
            public EndpointAddress LocalAddress
            {
                get { return this.innerChannel.LocalAddress; }
            }
 
            public EndpointAddress RemoteAddress
            {
                get { return this.innerChannel.RemoteAddress; }
            }
 
            public Uri Via
            {
                get { return this.innerChannel.Via; }
            }
 
            public IDuplexSession Session
            {
                get { return this.innerChannel.Session; }
            }
 
            public CommunicationState State
            {
                get { return this.innerChannel.State; }
            }
 
            public event EventHandler Closing
            {
                add { this.innerChannel.Closing += value; }
                remove { this.innerChannel.Closing -= value; }
            }
 
            public event EventHandler Closed
            {
                add { this.innerChannel.Closed += value; }
                remove { this.innerChannel.Closed -= value; }
            }
 
            public event EventHandler Faulted
            {
                add { this.innerChannel.Faulted += value; }
                remove { this.innerChannel.Faulted -= value; }
            }
 
            public event EventHandler Opened
            {
                add { this.innerChannel.Opened += value; }
                remove { this.innerChannel.Opened -= value; }
            }
 
            public event EventHandler Opening
            {
                add { this.innerChannel.Opening += value; }
                remove { this.innerChannel.Opening -= value; }
            }
 
            TimeSpan DefaultCloseTimeout
            {
                get
                {
                    IDefaultCommunicationTimeouts defaultTimeouts = this.innerChannel as IDefaultCommunicationTimeouts;
 
                    if (defaultTimeouts != null)
                    {
                        return defaultTimeouts.CloseTimeout;
                    }
                    else
                    {
                        return ServiceDefaults.CloseTimeout;
                    }
                }
            }
 
            TimeSpan DefaultReceiveTimeout
            {
                get
                {
                    IDefaultCommunicationTimeouts defaultTimeouts = this.innerChannel as IDefaultCommunicationTimeouts;
 
                    if (defaultTimeouts != null)
                    {
                        return defaultTimeouts.ReceiveTimeout;
                    }
                    else
                    {
                        return ServiceDefaults.ReceiveTimeout;
                    }
                }
            }
 
            // kick off an async receive so that we notice when the server is trying to shutdown
            void StartBackgroundReceive()
            {
                if (receiveAsyncCallback == null)
                {
                    receiveAsyncCallback = Fx.ThunkCallback(new AsyncCallback(ReceiveAsyncCallback));
                }
 
                IAsyncResult result = null;
                Exception exceptionFromBeginReceive = null;
                try
                {
                    result = this.innerChannel.BeginReceive(TimeSpan.MaxValue, receiveAsyncCallback, this);
                }
                catch (Exception e)
                {
                    if (Fx.IsFatal(e))
                    {
                        throw;
                    }
 
                    exceptionFromBeginReceive = e;
                }
 
                if (exceptionFromBeginReceive != null)
                {
                    this.pendingMessages.EnqueueAndDispatch(exceptionFromBeginReceive, messageDequeuedCallback, false);
                }
                else if (result.CompletedSynchronously)
                {
                    if (receiveThreadSchedulerCallback == null)
                    {
                        receiveThreadSchedulerCallback = new Action<object>(ReceiveThreadSchedulerCallback);
                    }
                    IOThreadScheduler.ScheduleCallbackLowPriNoFlow(receiveThreadSchedulerCallback, result);
                }
            }
 
            static void ReceiveThreadSchedulerCallback(object state)
            {
                IAsyncResult result = (IAsyncResult)state;
                AutoCloseDuplexSessionChannel thisPtr = (AutoCloseDuplexSessionChannel)result.AsyncState;
                thisPtr.OnReceive(result);
            }
 
            static void ReceiveAsyncCallback(IAsyncResult result)
            {
                if (result.CompletedSynchronously)
                {
                    return;
                }
                AutoCloseDuplexSessionChannel thisPtr = (AutoCloseDuplexSessionChannel)result.AsyncState;
                thisPtr.OnReceive(result);
            }
 
            void OnReceive(IAsyncResult result)
            {
                Message message = null;
                Exception receiveException = null;
                try
                {
                    message = this.innerChannel.EndReceive(result);
                }
                catch (Exception e)
                {
                    if (Fx.IsFatal(e))
                    {
                        throw;
                    }
 
                    receiveException = e;
                }
 
                if (receiveException != null)
                {
                    this.pendingMessages.EnqueueAndDispatch(receiveException, this.messageDequeuedCallback, true);
                }
                else
                {
                    if (message == null)
                    {
                        // we've hit end of session, time for auto-close to kick in
                        this.pendingMessages.Shutdown();
                        this.CloseInnerChannel();
                    }
                    else
                    {
                        this.pendingMessages.EnqueueAndDispatch(message, this.messageDequeuedCallback, true);
                    }
                }
            }
 
            void CloseInnerChannel()
            {
                lock (ThisLock)
                {
                    if (!this.closeState.TryBackgroundClose() || this.State != CommunicationState.Opened)
                    {
                        return;
                    }
                }
 
                IAsyncResult result = null;
                Exception backgroundCloseException = null;
                try
                {
                    if (closeInnerChannelCallback == null)
                    {
                        closeInnerChannelCallback = Fx.ThunkCallback(new AsyncCallback(CloseInnerChannelCallback));
                    }
                    result = this.innerChannel.BeginClose(closeInnerChannelCallback, this);
                }
                catch (Exception e)
                {
                    if (Fx.IsFatal(e))
                    {
                        throw;
                    }
 
                    this.innerChannel.Abort();
 
                    backgroundCloseException = e;
                }
 
                if (backgroundCloseException != null)
                {
                    // stash away exception to throw out of user's Close()
                    this.closeState.CaptureBackgroundException(backgroundCloseException);
                }
                else if (result.CompletedSynchronously)
                {
                    OnCloseInnerChannel(result);
                }
            }
 
            static void CloseInnerChannelCallback(IAsyncResult result)
            {
                if (result.CompletedSynchronously)
                {
                    return;
                }
 
                ((AutoCloseDuplexSessionChannel)result.AsyncState).OnCloseInnerChannel(result);
            }
 
            void OnCloseInnerChannel(IAsyncResult result)
            {
                Exception backgroundCloseException = null;
                try
                {
                    this.innerChannel.EndClose(result);
                }
                catch (Exception e)
                {
                    if (Fx.IsFatal(e))
                    {
                        throw;
                    }
 
                    this.innerChannel.Abort();
                    backgroundCloseException = e;
                }
 
                if (backgroundCloseException != null)
                {
                    // stash away exception to throw out of user's Close()
                    this.closeState.CaptureBackgroundException(backgroundCloseException);
                }
                else
                {
                    this.closeState.FinishBackgroundClose();
                }
            }
 
            public Message Receive()
            {
                return Receive(DefaultReceiveTimeout);
            }
 
            public Message Receive(TimeSpan timeout)
            {
                return this.pendingMessages.Dequeue(timeout);
            }
 
            public IAsyncResult BeginReceive(AsyncCallback callback, object state)
            {
                return BeginReceive(DefaultReceiveTimeout, callback, state);
            }
 
            public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
            {
                return this.pendingMessages.BeginDequeue(timeout, callback, state);
            }
 
            public Message EndReceive(IAsyncResult result)
            {
                throw FxTrace.Exception.AsError(new NotImplementedException());
            }
 
            public bool TryReceive(TimeSpan timeout, out Message message)
            {
                return this.pendingMessages.Dequeue(timeout, out message);
            }
 
            public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
            {
                return this.pendingMessages.BeginDequeue(timeout, callback, state);
            }
 
            public bool EndTryReceive(IAsyncResult result, out Message message)
            {
                return this.pendingMessages.EndDequeue(result, out message);
            }
 
            public bool WaitForMessage(TimeSpan timeout)
            {
                return this.pendingMessages.WaitForItem(timeout);
            }
 
            public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
            {
                return this.pendingMessages.BeginWaitForItem(timeout, callback, state);
            }
 
            public bool EndWaitForMessage(IAsyncResult result)
            {
                return this.pendingMessages.EndWaitForItem(result);
            }
 
            public T GetProperty<T>() where T : class
            {
                return this.innerChannel.GetProperty<T>();
            }
 
            public void Abort()
            {
                this.innerChannel.Abort();
                Cleanup();
            }
 
            public void Close()
            {
                Close(DefaultCloseTimeout);
            }
 
            public void Close(TimeSpan timeout)
            {
                bool performChannelClose;
                lock (ThisLock)
                {
                    performChannelClose = this.closeState.TryUserClose();
                }
                if (performChannelClose)
                {
                    this.innerChannel.Close(timeout);
                }
                else
                {
                    this.closeState.WaitForBackgroundClose(timeout);
                }
                Cleanup();
            }
 
            public IAsyncResult BeginClose(AsyncCallback callback, object state)
            {
                return BeginClose(DefaultCloseTimeout, callback, state);
            }
 
            public IAsyncResult BeginClose(TimeSpan timeout, AsyncCallback callback, object state)
            {
                bool performChannelClose;
                lock (ThisLock)
                {
                    performChannelClose = this.closeState.TryUserClose();
                }
                if (performChannelClose)
                {
                    return this.innerChannel.BeginClose(timeout, callback, state);
                }
                else
                {
                    return this.closeState.BeginWaitForBackgroundClose(timeout, callback, state);
                }
            }
 
            public void EndClose(IAsyncResult result)
            {
                // don't need to lock here since BeginClose is the sync-point
                if (this.closeState.TryUserClose())
                {
                    this.innerChannel.EndClose(result);
                }
                else
                {
                    this.closeState.EndWaitForBackgroundClose(result);
                }
                Cleanup();
            }
 
            // called from both Abort and Close paths
            void Cleanup()
            {
                this.pendingMessages.Dispose();
            }
 
            public void Open()
            {
                this.innerChannel.Open();
                this.StartBackgroundReceive();
            }
 
            public void Open(TimeSpan timeout)
            {
                this.innerChannel.Open(timeout);
                this.StartBackgroundReceive();
            }
 
            public IAsyncResult BeginOpen(AsyncCallback callback, object state)
            {
                return this.innerChannel.BeginOpen(callback, state);
            }
 
            public IAsyncResult BeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
            {
                return this.innerChannel.BeginOpen(timeout, callback, state);
            }
 
            public void EndOpen(IAsyncResult result)
            {
                this.innerChannel.EndOpen(result);
                this.StartBackgroundReceive();
            }
 
            public void Send(Message message)
            {
                this.Send(message);
            }
 
            public void Send(Message message, TimeSpan timeout)
            {
                this.Send(message, timeout);
            }
 
            public IAsyncResult BeginSend(Message message, AsyncCallback callback, object state)
            {
                return this.innerChannel.BeginSend(message, callback, state);
            }
 
            public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)
            {
                return this.innerChannel.BeginSend(message, timeout, callback, state);
            }
 
            public void EndSend(IAsyncResult result)
            {
                this.innerChannel.EndSend(result);
            }
 
            class CloseState
            {
                bool userClose;
                InputQueue<object> backgroundCloseData;
 
                public CloseState()
                {
                }
 
                public bool TryBackgroundClose()
                {
                    Fx.Assert(this.backgroundCloseData == null, "can't try twice");
                    if (!this.userClose)
                    {
                        this.backgroundCloseData = new InputQueue<object>();
                        return true;
                    }
                    else
                    {
                        return false;
                    }
                }
 
                public void FinishBackgroundClose()
                {
                    Fx.Assert(this.backgroundCloseData != null, "Only callable from background close");
                    this.backgroundCloseData.Close();
                }
 
                public bool TryUserClose()
                {
                    if (this.backgroundCloseData == null)
                    {
                        this.userClose = true;
                        return true;
                    }
                    else
                    {
                        return false;
                    }
                }
 
                public void WaitForBackgroundClose(TimeSpan timeout)
                {
                    Fx.Assert(this.backgroundCloseData != null, "Need to check background close first");
                    object dummy = this.backgroundCloseData.Dequeue(timeout);
                    Fx.Assert(dummy == null, "we should get an exception or null");
                }
 
                public IAsyncResult BeginWaitForBackgroundClose(TimeSpan timeout, AsyncCallback callback, object state)
                {
                    Fx.Assert(this.backgroundCloseData != null, "Need to check background close first");
                    return this.backgroundCloseData.BeginDequeue(timeout, callback, state);
                }
 
                public void EndWaitForBackgroundClose(IAsyncResult result)
                {
                    Fx.Assert(this.backgroundCloseData != null, "Need to check background close first");
                    object dummy = this.backgroundCloseData.EndDequeue(result);
                    Fx.Assert(dummy == null, "we should get an exception or null");
                }
 
                public void CaptureBackgroundException(Exception exception)
                {
                    this.backgroundCloseData.EnqueueAndDispatch(exception, null, true);
                }
 
            }
        }
    }
}