File: System\ServiceModel\Channels\OneWayChannelListener.cs
Project: ndp\cdf\src\WCF\ServiceModel\System.ServiceModel.csproj (System.ServiceModel)
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation.  All rights reserved.
//------------------------------------------------------------
 
namespace System.ServiceModel.Channels
{
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.Runtime;
    using System.ServiceModel;
    using System.ServiceModel.Diagnostics;
    using System.Threading;
    using System.Runtime.Diagnostics;
    using System.ServiceModel.Diagnostics.Application;
 
    /// <summary>
    /// Wraps an IChannelListener<IReplyChannel> into an IChannelListener<IInputChannel>
    /// </summary>
    class ReplyOneWayChannelListener
        : LayeredChannelListener<IInputChannel>
    {
        IChannelListener<IReplyChannel> innerChannelListener;
        bool packetRoutable;
 
        public ReplyOneWayChannelListener(OneWayBindingElement bindingElement, BindingContext context)
            : base(context.Binding, context.BuildInnerChannelListener<IReplyChannel>())
        {
            this.packetRoutable = bindingElement.PacketRoutable;
        }
 
        protected override void OnOpening()
        {
            this.innerChannelListener = (IChannelListener<IReplyChannel>)this.InnerChannelListener;
            base.OnOpening();
        }
 
        protected override IInputChannel OnAcceptChannel(TimeSpan timeout)
        {
            IReplyChannel innerChannel = this.innerChannelListener.AcceptChannel(timeout);
            return WrapInnerChannel(innerChannel);
        }
 
        protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return this.innerChannelListener.BeginAcceptChannel(timeout, callback, state);
        }
 
        protected override IInputChannel OnEndAcceptChannel(IAsyncResult result)
        {
            IReplyChannel innerChannel = this.innerChannelListener.EndAcceptChannel(result);
            return WrapInnerChannel(innerChannel);
        }
 
        protected override bool OnWaitForChannel(TimeSpan timeout)
        {
            return this.innerChannelListener.WaitForChannel(timeout);
        }
 
        protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return this.innerChannelListener.BeginWaitForChannel(timeout, callback, state);
        }
 
        protected override bool OnEndWaitForChannel(IAsyncResult result)
        {
            return this.innerChannelListener.EndWaitForChannel(result);
        }
 
        IInputChannel WrapInnerChannel(IReplyChannel innerChannel)
        {
            if (innerChannel == null)
            {
                return null;
            }
            else
            {
                return new ReplyOneWayInputChannel(this, innerChannel);
            }
        }
 
        class ReplyOneWayInputChannel : LayeredChannel<IReplyChannel>, IInputChannel
        {
            bool validateHeader;
 
            public ReplyOneWayInputChannel(ReplyOneWayChannelListener listener, IReplyChannel innerChannel)
                : base(listener, innerChannel)
            {
                this.validateHeader = listener.packetRoutable;
            }
 
            public EndpointAddress LocalAddress
            {
                get { return this.InnerChannel.LocalAddress; }
            }
 
            Message ProcessContext(RequestContext context, TimeSpan timeout)
            {
                if (context == null)
                {
                    return null;
                }
 
                bool replySuccess = false;
                Message result = null;
                try
                {
                    // validate that the request message contains our expected header
                    result = context.RequestMessage;
                    result.Properties.Add(RequestContextMessageProperty.Name, new RequestContextMessageProperty(context));
 
                    if (this.validateHeader)
                    {
                        PacketRoutableHeader.ValidateMessage(result);
                    }
 
                    try
                    {
                        TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
                        context.Reply(null, timeoutHelper.RemainingTime());
                        replySuccess = true;
                    }
                    catch (CommunicationException e)
                    {
                        DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    }
                    catch (TimeoutException e)
                    {
                        if (TD.SendTimeoutIsEnabled())
                        {
                            TD.SendTimeout(e.Message);
                        }
                        DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    }
                }
                finally
                {
                    if (!replySuccess)
                    {
                        context.Abort();
                        if (result != null)
                        {
                            result.Close();
                            result = null;
                        }
                    }
                }
 
                return result;
            }
 
            public Message Receive()
            {
                return this.Receive(this.DefaultReceiveTimeout);
            }
 
            public Message Receive(TimeSpan timeout)
            {
                TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
                RequestContext context = InnerChannel.ReceiveRequest(timeoutHelper.RemainingTime());
                return ProcessContext(context, timeoutHelper.RemainingTime());
            }
 
            public IAsyncResult BeginReceive(AsyncCallback callback, object state)
            {
                return this.BeginReceive(this.DefaultReceiveTimeout, callback, state);
            }
 
            public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
            {
                return new ReceiveAsyncResult(this.InnerChannel, timeout, this.validateHeader, callback, state);
            }
 
            public Message EndReceive(IAsyncResult result)
            {
                return ReceiveAsyncResult.End(result);
            }
 
            public bool TryReceive(TimeSpan timeout, out Message message)
            {
                TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
                RequestContext context;
                if (InnerChannel.TryReceiveRequest(timeoutHelper.RemainingTime(), out context))
                {
                    message = ProcessContext(context, timeoutHelper.RemainingTime());
                    return true;
                }
                else
                {
                    message = null;
                    return false;
                }
            }
 
            public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
            {
                return new TryReceiveAsyncResult(this.InnerChannel, timeout, this.validateHeader, callback, state);
            }
 
            public bool EndTryReceive(IAsyncResult result, out Message message)
            {
                return TryReceiveAsyncResult.End(result, out message);
            }
 
            public bool WaitForMessage(TimeSpan timeout)
            {
                return InnerChannel.WaitForRequest(timeout);
            }
 
            public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
            {
                return InnerChannel.BeginWaitForRequest(timeout, callback, state);
            }
 
            public bool EndWaitForMessage(IAsyncResult result)
            {
                return InnerChannel.EndWaitForRequest(result);
            }
 
            class TryReceiveAsyncResult : ReceiveAsyncResultBase
            {
                bool tryResult;
 
                public TryReceiveAsyncResult(IReplyChannel innerChannel, TimeSpan timeout, bool validateHeader,
                    AsyncCallback callback, object state)
                    : base(innerChannel, timeout, validateHeader, callback, state)
                {
                }
 
                public static bool End(IAsyncResult result, out Message message)
                {
                    TryReceiveAsyncResult thisPtr = AsyncResult.End<TryReceiveAsyncResult>(result);
                    message = thisPtr.Message;
                    return thisPtr.tryResult;
                }
 
                protected override IAsyncResult OnBeginReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)
                {
                    return InnerChannel.BeginTryReceiveRequest(timeout, callback, state);
                }
 
                protected override RequestContext OnEndReceiveRequest(IAsyncResult result)
                {
                    RequestContext context;
                    this.tryResult = InnerChannel.EndTryReceiveRequest(result, out context);
                    return context;
                }
            }
 
            class ReceiveAsyncResult : ReceiveAsyncResultBase
            {
                public ReceiveAsyncResult(IReplyChannel innerChannel, TimeSpan timeout, bool validateHeader,
                    AsyncCallback callback, object state)
                    : base(innerChannel, timeout, validateHeader, callback, state)
                {
                }
 
                public static Message End(IAsyncResult result)
                {
                    ReceiveAsyncResult thisPtr = AsyncResult.End<ReceiveAsyncResult>(result);
                    return thisPtr.Message;
                }
 
                protected override IAsyncResult OnBeginReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)
                {
                    return InnerChannel.BeginReceiveRequest(timeout, callback, state);
                }
 
                protected override RequestContext OnEndReceiveRequest(IAsyncResult result)
                {
                    return InnerChannel.EndReceiveRequest(result);
                }
            }
 
            abstract class ReceiveAsyncResultBase : AsyncResult
            {
                IReplyChannel innerChannel;
                RequestContext context;
                Message message;
                TimeoutHelper timeoutHelper;
                bool validateHeader;
                static AsyncCallback onReceiveRequest = Fx.ThunkCallback(new AsyncCallback(OnReceiveRequest));
                static AsyncCallback onReply = Fx.ThunkCallback(new AsyncCallback(OnReply));
 
                protected ReceiveAsyncResultBase(IReplyChannel innerChannel, TimeSpan timeout, bool validateHeader,
                    AsyncCallback callback, object state)
                    : base(callback, state)
                {
                    this.innerChannel = innerChannel;
                    this.timeoutHelper = new TimeoutHelper(timeout);
                    this.validateHeader = validateHeader;
                    IAsyncResult result = this.OnBeginReceiveRequest(timeoutHelper.RemainingTime(), onReceiveRequest, this);
                    if (!result.CompletedSynchronously)
                    {
                        return;
                    }
 
                    if (HandleReceiveRequestComplete(result))
                    {
                        base.Complete(true);
                    }
                }
 
                protected IReplyChannel InnerChannel
                {
                    get
                    {
                        return this.innerChannel;
                    }
                }
 
                protected Message Message
                {
                    get
                    {
                        return this.message;
                    }
                }
 
                protected abstract IAsyncResult OnBeginReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state);
                protected abstract RequestContext OnEndReceiveRequest(IAsyncResult result);
 
                bool HandleReplyComplete(IAsyncResult result)
                {
                    bool abortContext = true;
                    try
                    {
                        context.EndReply(result);
                        abortContext = false;
                    }
                    catch (CommunicationException e)
                    {
                        DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    }
                    catch (TimeoutException e)
                    {
                        if (TD.SendTimeoutIsEnabled())
                        {
                            TD.SendTimeout(e.Message);
                        }
                        DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    }
                    finally
                    {
                        if (abortContext)
                        {
                            context.Abort();
                        }
                    }
 
                    return true;
                }
 
                bool HandleReceiveRequestComplete(IAsyncResult result)
                {
                    this.context = this.OnEndReceiveRequest(result);
                    if (this.context == null)
                    {
                        return true;
                    }
 
                    bool replySuccess = false;
                    IAsyncResult replyResult = null;
                    try
                    {
                        this.message = context.RequestMessage;
                        this.message.Properties.Add(RequestContextMessageProperty.Name, new RequestContextMessageProperty(context));
 
                        if (validateHeader)
                        {
                            PacketRoutableHeader.ValidateMessage(this.message);
                        }
                        try
                        {
                            replyResult = context.BeginReply(null, timeoutHelper.RemainingTime(), onReply, this);
                            replySuccess = true;
                        }
                        catch (CommunicationException e)
                        {
                            DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                        }
                        catch (TimeoutException e)
                        {
                            if (TD.SendTimeoutIsEnabled())
                            {
                                TD.SendTimeout(e.Message);
                            }
                            DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                        }
                    }
                    finally
                    {
                        if (!replySuccess)
                        {
                            this.context.Abort();
                            if (this.message != null)
                            {
                                this.message.Close();
                                this.message = null;
                            }
                        }
                    }
 
                    if (replyResult == null)
                    {
                        return true;
                    }
                    else if (replyResult.CompletedSynchronously)
                    {
                        return HandleReplyComplete(replyResult);
                    }
                    else
                    {
                        return false;
                    }
                }
 
                static void OnReceiveRequest(IAsyncResult result)
                {
                    if (result.CompletedSynchronously)
                    {
                        return;
                    }
 
                    ReceiveAsyncResultBase thisPtr = (ReceiveAsyncResultBase)result.AsyncState;
 
                    Exception completionException = null;
                    bool completeSelf;
                    try
                    {
                        completeSelf = thisPtr.HandleReceiveRequestComplete(result);
                    }
#pragma warning suppress 56500 // Microsoft, transferring exception to another thread
                    catch (Exception e)
                    {
                        if (Fx.IsFatal(e))
                        {
                            throw;
                        }
                        completeSelf = true;
                        completionException = e;
                    }
 
                    if (completeSelf)
                    {
                        thisPtr.Complete(false, completionException);
                    }
                }
 
                static void OnReply(IAsyncResult result)
                {
                    if (result.CompletedSynchronously)
                    {
                        return;
                    }
 
                    ReceiveAsyncResultBase thisPtr = (ReceiveAsyncResultBase)result.AsyncState;
 
                    Exception completionException = null;
                    bool completeSelf;
                    try
                    {
                        completeSelf = thisPtr.HandleReplyComplete(result);
                    }
#pragma warning suppress 56500 // Microsoft, transferring exception to another thread
                    catch (Exception e)
                    {
                        if (Fx.IsFatal(e))
                        {
                            throw;
                        }
                        completeSelf = true;
                        completionException = e;
                    }
 
                    if (completeSelf)
                    {
                        thisPtr.Complete(false, completionException);
                    }
                }
            }
        }
    }
 
    // <summary>
    // Wraps an IChannelListener<IDuplexChannel> into an IChannelListener<IInputChannel>
    // </summary>
    class DuplexOneWayChannelListener
        : LayeredChannelListener<IInputChannel>
    {
        IChannelListener<IDuplexChannel> innerChannelListener;
        bool packetRoutable;
 
        public DuplexOneWayChannelListener(OneWayBindingElement bindingElement, BindingContext context)
            : base(context.Binding, context.BuildInnerChannelListener<IDuplexChannel>())
        {
            this.packetRoutable = bindingElement.PacketRoutable;
        }
 
        protected override void OnOpening()
        {
            this.innerChannelListener = (IChannelListener<IDuplexChannel>)this.InnerChannelListener;
            base.OnOpening();
        }
 
        protected override IInputChannel OnAcceptChannel(TimeSpan timeout)
        {
            IDuplexChannel channel = this.innerChannelListener.AcceptChannel(timeout);
            return WrapInnerChannel(channel);
        }
 
        protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return this.innerChannelListener.BeginAcceptChannel(timeout, callback, state);
        }
 
        protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return this.innerChannelListener.BeginWaitForChannel(timeout, callback, state);
        }
 
        protected override IInputChannel OnEndAcceptChannel(IAsyncResult result)
        {
            IDuplexChannel channel = this.innerChannelListener.EndAcceptChannel(result);
            return WrapInnerChannel(channel);
        }
 
        protected override bool OnEndWaitForChannel(IAsyncResult result)
        {
            return this.innerChannelListener.EndWaitForChannel(result);
        }
 
        protected override bool OnWaitForChannel(TimeSpan timeout)
        {
            return this.innerChannelListener.WaitForChannel(timeout);
        }
 
        IInputChannel WrapInnerChannel(IDuplexChannel innerChannel)
        {
            if (innerChannel == null)
            {
                return null;
            }
            else
            {
                return new DuplexOneWayInputChannel(this, innerChannel);
            }
        }
 
        class DuplexOneWayInputChannel : LayeredChannel<IDuplexChannel>, IInputChannel
        {
            bool validateHeader;
 
            public DuplexOneWayInputChannel(DuplexOneWayChannelListener listener, IDuplexChannel innerChannel)
                : base(listener, innerChannel)
            {
                this.validateHeader = listener.packetRoutable;
            }
 
            public EndpointAddress LocalAddress
            {
                get { return this.InnerChannel.LocalAddress; }
            }
 
            public IAsyncResult BeginReceive(AsyncCallback callback, object state)
            {
                return this.BeginReceive(this.DefaultReceiveTimeout, callback, state);
            }
 
            public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
            {
                return this.InnerChannel.BeginReceive(timeout, callback, state);
            }
 
            public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
            {
                return this.InnerChannel.BeginTryReceive(timeout, callback, state);
            }
 
            public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
            {
                return this.InnerChannel.BeginWaitForMessage(timeout, callback, state);
            }
 
            public Message EndReceive(IAsyncResult result)
            {
                Message message = this.InnerChannel.EndReceive(result);
                return ValidateMessage(message);
            }
 
            public bool EndTryReceive(IAsyncResult result, out Message message)
            {
                bool success = this.InnerChannel.EndTryReceive(result, out message);
                message = ValidateMessage(message);
                return success;
            }
 
            public bool EndWaitForMessage(IAsyncResult result)
            {
                return this.InnerChannel.EndWaitForMessage(result);
            }
 
            public Message Receive()
            {
                return this.Receive(this.DefaultReceiveTimeout);
            }
 
            public Message Receive(TimeSpan timeout)
            {
                Message result = this.InnerChannel.Receive(timeout);
                return ValidateMessage(result);
            }
 
            public bool TryReceive(TimeSpan timeout, out Message message)
            {
                bool success = this.InnerChannel.TryReceive(timeout, out message);
                message = ValidateMessage(message);
                return success;
            }
 
            public bool WaitForMessage(TimeSpan timeout)
            {
                return this.InnerChannel.WaitForMessage(timeout);
            }
 
            Message ValidateMessage(Message message)
            {
                if (this.validateHeader && message != null)
                {
                    PacketRoutableHeader.ValidateMessage(message);
                }
                return message;
            }
        }
    }
 
    /// <summary>
    /// Wraps an IChannelListener<IDuplexSessionChannel> into an IChannelListener<IInputChannel>
    /// </summary>
    class DuplexSessionOneWayChannelListener
        : DelegatingChannelListener<IInputChannel>
    {
        IChannelListener<IDuplexSessionChannel> innerChannelListener;
        DuplexSessionOneWayInputChannelAcceptor inputChannelAcceptor;
        bool packetRoutable;
        int maxAcceptedChannels;
        bool acceptPending;
        int activeChannels;
        TimeSpan idleTimeout;
        static AsyncCallback onAcceptInnerChannel = Fx.ThunkCallback(new AsyncCallback(OnAcceptInnerChannel));
        AsyncCallback onOpenInnerChannel;
        EventHandler onInnerChannelClosed;
        Action onExceptionDequeued;
        Action<object> handleAcceptCallback;
        bool ownsInnerListener;
        object acceptLock;
 
        public DuplexSessionOneWayChannelListener(OneWayBindingElement bindingElement, BindingContext context)
            : base(true, context.Binding, context.BuildInnerChannelListener<IDuplexSessionChannel>())
        {
            this.acceptLock = new object();
            this.inputChannelAcceptor = new DuplexSessionOneWayInputChannelAcceptor(this);
            this.packetRoutable = bindingElement.PacketRoutable;
            this.maxAcceptedChannels = bindingElement.MaxAcceptedChannels;
            this.Acceptor = this.inputChannelAcceptor;
            this.idleTimeout = bindingElement.ChannelPoolSettings.IdleTimeout;
            this.onOpenInnerChannel = Fx.ThunkCallback(new AsyncCallback(OnOpenInnerChannel));
            this.ownsInnerListener = true;
            this.onInnerChannelClosed = new EventHandler(OnInnerChannelClosed);
        }
 
        bool IsAcceptNecessary
        {
            get
            {
                return !acceptPending
                    && (activeChannels < maxAcceptedChannels)
                    && (this.innerChannelListener.State == CommunicationState.Opened);
            }
        }
 
        protected override void OnOpening()
        {
            this.innerChannelListener = (IChannelListener<IDuplexSessionChannel>)this.InnerChannelListener;
            this.inputChannelAcceptor.TransferInnerChannelListener(this.innerChannelListener); // acceptor now owns the lifetime
            this.ownsInnerListener = false;
            base.OnOpening();
        }
 
        protected override void OnOpened()
        {
            base.OnOpened();
            ActionItem.Schedule(new Action<object>(AcceptLoop), null);
        }
 
        protected override void OnAbort()
        {
            base.OnAbort();
            if (this.ownsInnerListener && this.innerChannelListener != null) // Open didn't complete
            {
                this.innerChannelListener.Abort();
            }
        }
 
        void AcceptLoop(object state)
        {
            AcceptLoop(null);
        }
 
        // we need to kick off an accept (and possibly process a completion as well)
        void AcceptLoop(IAsyncResult pendingResult)
        {
            IDuplexSessionChannel pendingChannel = null;
 
            if (pendingResult != null)
            {
                if (!ProcessEndAccept(pendingResult, out pendingChannel))
                {
                    return;
                }
                pendingResult = null;
            }
 
            lock (acceptLock)
            {
                while (IsAcceptNecessary)
                {
                    Exception exceptionToEnqueue = null;
                    try
                    {
                        IAsyncResult result = null;
 
                        try
                        {
                            result = this.innerChannelListener.BeginAcceptChannel(TimeSpan.MaxValue, onAcceptInnerChannel, this);
                        }
                        catch (CommunicationException e)
                        {
                            DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                            continue;
                        }
 
                        acceptPending = true;
                        if (!result.CompletedSynchronously)
                        {
                            break;
                        }
 
                        if (this.handleAcceptCallback == null)
                        {
                            this.handleAcceptCallback = new Action<object>(HandleAcceptCallback);
                        }
 
                        if (pendingChannel != null)
                        {
                            // don't starve our completed Accept
                            ActionItem.Schedule(handleAcceptCallback, pendingChannel);
                            pendingChannel = null;
                        }
 
                        IDuplexSessionChannel channel = null;
                        if (ProcessEndAccept(result, out channel))
                        {
                            if (channel != null)
                            {
                                ActionItem.Schedule(handleAcceptCallback, channel);
                            }
                        }
                        else
                        {
                            return;
                        }
                    }
#pragma warning suppress 56500 // Microsoft, transferring exception to input queue to be pulled off by user
                    catch (Exception e)
                    {
                        if (Fx.IsFatal(e))
                        {
                            throw;
                        }
 
                        exceptionToEnqueue = e;
                    }
 
                    if (exceptionToEnqueue != null)
                    {
                        this.inputChannelAcceptor.Enqueue(exceptionToEnqueue, null, false);
                    }
                }
            }
 
            if (pendingChannel != null)
            {
                HandleAcceptComplete(pendingChannel);
            }
        }
 
        // return true if the loop should continue
        bool ProcessEndAccept(IAsyncResult result, out IDuplexSessionChannel channel)
        {
            channel = null;
            Exception exceptionToEnqueue = null;
            bool success = false;
            try
            {
                channel = innerChannelListener.EndAcceptChannel(result);
                success = true;
            }
            catch (CommunicationException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
            }
#pragma warning suppress 56500 // Microsoft, transferring exception to input queue to be pulled off by user
            catch (Exception e)
            {
                if (Fx.IsFatal(e))
                {
                    throw;
                }
 
                exceptionToEnqueue = e;
            }
 
            if (success)
            {
                if (channel != null)
                {
                    channel.Closed += this.onInnerChannelClosed;
                    bool traceMaxInboundChannels = false;
                    lock (acceptLock)
                    {
                        this.acceptPending = false;
                        activeChannels++;
                        if (activeChannels >= maxAcceptedChannels)
                        {
                            traceMaxInboundChannels = true;
                        }
                    }
 
                    if (DiagnosticUtility.ShouldTraceWarning)
                    {
                        if (traceMaxInboundChannels)
                        {
                            TraceUtility.TraceEvent(TraceEventType.Warning,
                                TraceCode.MaxAcceptedChannelsReached,
                                SR.GetString(SR.TraceCodeMaxAcceptedChannelsReached),
                                new StringTraceRecord("MaxAcceptedChannels", maxAcceptedChannels.ToString(System.Globalization.CultureInfo.InvariantCulture)),
                                this,
                                null);
                        }
                    }
 
                }
                else
                {
                    // we're at EOF. close up the Acceptor and break out of our loop
                    this.inputChannelAcceptor.Close();
                    return false;
                }
            }
            else if (exceptionToEnqueue != null)
            {
                // see what the state of the inner listener is. If it's still open, don't block the accept loop
                bool canDispatchOnThisThread = (innerChannelListener.State != CommunicationState.Opened);
                if (this.onExceptionDequeued == null)
                {
                    this.onExceptionDequeued = new Action(OnExceptionDequeued);
                }
                this.inputChannelAcceptor.Enqueue(exceptionToEnqueue, this.onExceptionDequeued, canDispatchOnThisThread);
            }
            else
            {
                lock (acceptLock)
                {
                    this.acceptPending = false;
                }
            }
 
            return true;
        }
 
        void OnExceptionDequeued()
        {
            lock (acceptLock)
            {
                this.acceptPending = false;
            }
            AcceptLoop(null);
        }
 
        static void OnAcceptInnerChannel(IAsyncResult result)
        {
            if (result.CompletedSynchronously)
            {
                return;
            }
 
            DuplexSessionOneWayChannelListener thisPtr = (DuplexSessionOneWayChannelListener)result.AsyncState;
            thisPtr.AcceptLoop(result);
        }
 
        void HandleAcceptCallback(object state)
        {
            this.HandleAcceptComplete((IDuplexSessionChannel)state);
        }
 
        void OnInnerChannelClosed(object sender, EventArgs e)
        {
            // Reduce our quota and kick off an accept
            IDuplexSessionChannel channel = (IDuplexSessionChannel)sender;
            channel.Closed -= this.onInnerChannelClosed;
 
            lock (acceptLock)
            {
                activeChannels--;
            }
            this.AcceptLoop(null);
        }
 
        void HandleAcceptComplete(IDuplexSessionChannel channel)
        {
            Exception exceptionToEnqueue = null;
            bool success = false;
 
            this.inputChannelAcceptor.PrepareChannel(channel);
            IAsyncResult openResult = null;
            try
            {
                openResult = channel.BeginOpen(this.idleTimeout, onOpenInnerChannel, channel);
                success = true;
            }
            catch (CommunicationException e) // ---- CommunicationException
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
            }
            catch (TimeoutException e)
            {
                if (TD.OpenTimeoutIsEnabled())
                {
                    TD.OpenTimeout(e.Message);
                }
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
            }
#pragma warning suppress 56500 // Microsoft, transferring exception to input queue to be pulled off by user
            catch (Exception e)
            {
                if (Fx.IsFatal(e))
                {
                    throw;
                }
 
                exceptionToEnqueue = e;
            }
            finally
            {
                if (!success && channel != null)
                {
                    channel.Abort();
                }
            }
 
            if (success)
            {
                if (openResult.CompletedSynchronously)
                {
                    CompleteOpen(channel, openResult);
                }
            }
            else
            {
                if (exceptionToEnqueue != null)
                {
                    this.inputChannelAcceptor.Enqueue(exceptionToEnqueue, null);
                }
            }
        }
 
        void OnOpenInnerChannel(IAsyncResult result)
        {
            if (result.CompletedSynchronously)
            {
                return;
            }
 
            IDuplexSessionChannel channel = (IDuplexSessionChannel)result.AsyncState;
            CompleteOpen(channel, result);
        }
 
        // open channel and start receiving messages
        void CompleteOpen(IDuplexSessionChannel channel, IAsyncResult result)
        {
            Exception exceptionToEnqueue = null;
            bool success = false;
            try
            {
                channel.EndOpen(result);
                success = true;
            }
            catch (CommunicationException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
            }
            catch (TimeoutException e)
            {
                if (TD.OpenTimeoutIsEnabled())
                {
                    TD.OpenTimeout(e.Message);
                }
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
            }
#pragma warning suppress 56500 // Microsoft, transferring exception to input queue to be pulled off by user
            catch (Exception e)
            {
                if (Fx.IsFatal(e))
                {
                    throw;
                }
 
                exceptionToEnqueue = e;
            }
            finally
            {
                if (!success)
                {
                    channel.Abort();
                }
            }
 
            if (success)
            {
                this.inputChannelAcceptor.AcceptInnerChannel(this, channel);
            }
            else if (exceptionToEnqueue != null)
            {
                this.inputChannelAcceptor.Enqueue(exceptionToEnqueue, null);
            }
        }
 
        class DuplexSessionOneWayInputChannelAcceptor : InputChannelAcceptor
        {
            ChannelTracker<IDuplexSessionChannel, ChannelReceiver> receivers;
            IChannelListener<IDuplexSessionChannel> innerChannelListener;
 
            public DuplexSessionOneWayInputChannelAcceptor(DuplexSessionOneWayChannelListener listener)
                : base(listener)
            {
                this.receivers = new ChannelTracker<IDuplexSessionChannel, ChannelReceiver>();
            }
 
            public void TransferInnerChannelListener(IChannelListener<IDuplexSessionChannel> innerChannelListener)
            {
                Fx.Assert(this.innerChannelListener == null, "innerChannelListener must be null prior to transfer");
                bool abortListener = false;
                lock (ThisLock)
                {
                    this.innerChannelListener = innerChannelListener;
                    if (this.State == CommunicationState.Closing || this.State == CommunicationState.Closed)
                    {
                        // abort happened before we completed the transfer
                        abortListener = true;
                    }
                }
 
                if (abortListener)
                {
                    innerChannelListener.Abort();
                }
            }
 
            public void AcceptInnerChannel(DuplexSessionOneWayChannelListener listener, IDuplexSessionChannel channel)
            {
                ChannelReceiver channelReceiver = new ChannelReceiver(listener, channel);
                this.receivers.Add(channel, channelReceiver);
                channelReceiver.StartReceiving();
            }
 
            public void PrepareChannel(IDuplexSessionChannel channel)
            {
                this.receivers.PrepareChannel(channel);
            }
 
            protected override InputChannel OnCreateChannel()
            {
                return new DuplexSessionOneWayInputChannel(this.ChannelManager, null);
            }
 
            protected override void OnOpen(TimeSpan timeout)
            {
                TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
                base.OnOpen(timeoutHelper.RemainingTime());
                this.receivers.Open(timeoutHelper.RemainingTime());
                this.innerChannelListener.Open(timeoutHelper.RemainingTime());
            }
 
            protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
            {
                return new ChainedOpenAsyncResult(timeout, callback, state, base.OnBeginOpen, base.OnEndOpen, this.receivers, this.innerChannelListener);
            }
 
            protected override void OnEndOpen(IAsyncResult result)
            {
                ChainedOpenAsyncResult.End(result);
            }
 
            protected override void OnAbort()
            {
                base.OnAbort();
                if (!TransferReceivers())
                {
                    this.receivers.Abort();
                    if (this.innerChannelListener != null)
                    {
                        this.innerChannelListener.Abort();
                    }
                }
            }
 
            protected override void OnClose(TimeSpan timeout)
            {
                TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
                base.OnClose(timeoutHelper.RemainingTime());
                if (!TransferReceivers())
                {
                    this.receivers.Close(timeoutHelper.RemainingTime());
                    this.innerChannelListener.Close(timeoutHelper.RemainingTime());
                }
            }
 
            protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
            {
                List<ICommunicationObject> objectsToClose = new List<ICommunicationObject>();
                if (!TransferReceivers())
                {
                    objectsToClose.Add(this.receivers);
                    objectsToClose.Add(this.innerChannelListener);
                }
 
                return new ChainedCloseAsyncResult(timeout, callback, state, base.OnBeginClose, base.OnEndClose, objectsToClose);
            }
 
            protected override void OnEndClose(IAsyncResult result)
            {
                ChainedCloseAsyncResult.End(result);
            }
 
            // used to decouple our channel and listener lifetimes
            bool TransferReceivers()
            {
                DuplexSessionOneWayInputChannel singletonChannel = (DuplexSessionOneWayInputChannel)base.GetCurrentChannel();
                if (singletonChannel == null)
                {
                    return false;
                }
                else
                {
                    return singletonChannel.TransferReceivers(this.receivers, this.innerChannelListener);
                }
            }
 
            class DuplexSessionOneWayInputChannel : InputChannel
            {
                ChannelTracker<IDuplexSessionChannel, ChannelReceiver> receivers;
                IChannelListener<IDuplexSessionChannel> innerChannelListener;
 
                public DuplexSessionOneWayInputChannel(ChannelManagerBase channelManager, EndpointAddress localAddress)
                    : base(channelManager, localAddress)
                {
                }
 
                public bool TransferReceivers(ChannelTracker<IDuplexSessionChannel, ChannelReceiver> receivers,
                    IChannelListener<IDuplexSessionChannel> innerChannelListener)
                {
                    lock (ThisLock)
                    {
                        if (this.State != CommunicationState.Opened)
                        {
                            return false;
                        }
 
                        this.receivers = receivers;
                        this.innerChannelListener = innerChannelListener;
                        return true;
                    }
                }
 
                protected override void OnAbort()
                {
                    if (this.receivers != null)
                    {
                        Fx.Assert(this.innerChannelListener != null, "innerChannelListener and receiver should both be null or non-null");
                        this.receivers.Abort();
                        this.innerChannelListener.Abort();
                    }
                    base.OnAbort();
                }
 
                protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
                {
                    List<ICommunicationObject> objectsToClose = new List<ICommunicationObject>();
                    if (this.receivers != null)
                    {
                        objectsToClose.Add(this.receivers);
                        objectsToClose.Add(this.innerChannelListener);
                    }
 
                    return new ChainedCloseAsyncResult(timeout, callback, state, base.OnBeginClose, base.OnEndClose, objectsToClose);
                }
 
                protected override void OnEndClose(IAsyncResult result)
                {
                    ChainedCloseAsyncResult.End(result);
                }
 
                protected override void OnClose(TimeSpan timeout)
                {
                    TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
                    if (this.receivers != null)
                    {
                        Fx.Assert(this.innerChannelListener != null, "innerChannelListener and receiver should both be null or non-null");
                        this.receivers.Close(timeoutHelper.RemainingTime());
                        this.innerChannelListener.Close(timeoutHelper.RemainingTime());
                    }
                    base.OnClose(timeoutHelper.RemainingTime());
                }
 
            }
        }
 
 
        // given an inner channel, pulls messages off of it and enqueues them into the upper channel
        class ChannelReceiver
        {
            Action onMessageDequeued;
            static AsyncCallback onReceive = Fx.ThunkCallback(new AsyncCallback(OnReceive));
            DuplexSessionOneWayInputChannelAcceptor acceptor;
            IDuplexSessionChannel channel;
            TimeSpan idleTimeout;
            static Action<object> startReceivingCallback;
            Action<object> onStartReceiveLater;
            Action<object> onDispatchItemsLater;
            bool validateHeader;
 
            public ChannelReceiver(DuplexSessionOneWayChannelListener parent, IDuplexSessionChannel channel)
            {
                this.channel = channel;
                this.acceptor = parent.inputChannelAcceptor;
                this.idleTimeout = parent.idleTimeout;
                this.validateHeader = parent.packetRoutable;
                this.onMessageDequeued = new Action(OnMessageDequeued);
            }
 
            void StartReceivingCallback(object state)
            {
                ((ChannelReceiver)state).StartReceiving();
            }
 
            public void StartReceiving()
            {
                Exception exceptionToEnqueue = null;
 
                while (true)
                {
                    if (channel.State != CommunicationState.Opened)
                    {
                        channel.Abort();
                        break;
                    }
 
                    IAsyncResult result = null;
                    try
                    {
                        result = this.channel.BeginTryReceive(this.idleTimeout, onReceive, this);
                    }
                    catch (CommunicationException e)
                    {
                        DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    }
#pragma warning suppress 56500 // Microsoft, transferring exception to input queue to be pulled off by user
                    catch (Exception e)
                    {
                        if (Fx.IsFatal(e))
                        {
                            throw;
                        }
 
                        exceptionToEnqueue = e;
                        break;
                    }
 
                    if (result != null)
                    {
                        if (!result.CompletedSynchronously)
                        {
                            break;
                        }
 
                        bool dispatch;
                        bool continueLoop = OnCompleteReceive(result, out dispatch);
                        if (dispatch)
                        {
                            Dispatch();
                        }
                        if (!continueLoop)
                        {
                            break;
                        }
                    }
                }
 
                if (exceptionToEnqueue != null)
                {
                    this.acceptor.Enqueue(exceptionToEnqueue, this.onMessageDequeued);
                }
            }
 
            bool EnqueueMessage(Message message)
            {
                if (this.validateHeader)
                {
                    if (!PacketRoutableHeader.TryValidateMessage(message))
                    {
                        this.channel.Abort();
                        message.Close();
                        return false;
                    }
                    else
                    {
                        this.validateHeader = false; // only validate the first message on a session
                    }
                }
 
                return this.acceptor.EnqueueWithoutDispatch(message, this.onMessageDequeued);
            }
 
            void OnStartReceiveLater(object state)
            {
                StartReceiving();
            }
 
            void OnDispatchItemsLater(object state)
            {
                Dispatch();
            }
 
            void Dispatch()
            {
                this.acceptor.DispatchItems();
            }
 
            // returns true if the Receive Loop should continue (or be started if it's not running)
            bool OnCompleteReceive(IAsyncResult result, out bool dispatchLater)
            {
                Exception exceptionToEnqueue = null;
                Message message = null;
                bool startLoop = false;
                dispatchLater = false;
 
                try
                {
                    if (!this.channel.EndTryReceive(result, out message))
                    {
                        this.channel.Abort(); // we've hit our IdleTimeout
                    }
                    else if (message == null)
                    {
                        this.channel.Close(); // read EOF, close our half of the session
                    }
                }
                catch (CommunicationException e)
                {
                    DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    startLoop = (this.channel.State == CommunicationState.Opened);
                }
                catch (TimeoutException e)
                {
                    if (TD.CloseTimeoutIsEnabled())
                    {
                        TD.CloseTimeout(e.Message);
                    }
                    DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    startLoop = (this.channel.State == CommunicationState.Opened);
                }
#pragma warning suppress 56500 // Microsoft, transferring exception to input queue to be pulled off by user
                catch (Exception e)
                {
                    if (Fx.IsFatal(e))
                    {
                        throw;
                    }
 
                    exceptionToEnqueue = e;
                }
 
                if (message != null)
                {
                    dispatchLater = EnqueueMessage(message);
                }
                else if (exceptionToEnqueue != null)
                {
                    dispatchLater = this.acceptor.EnqueueWithoutDispatch(exceptionToEnqueue, this.onMessageDequeued);
                }
 
                return startLoop;
            }
 
            void OnMessageDequeued()
            {
                IAsyncResult result = null;
                Exception exceptionToEnqueue = null;
 
                try
                {
                    result = this.channel.BeginTryReceive(this.idleTimeout, onReceive, this);
                }
                catch (CommunicationException e)
                {
                    DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                }
#pragma warning suppress 56500 // Microsoft, transferring exception to input queue to be pulled off by user
                catch (Exception e)
                {
                    if (Fx.IsFatal(e))
                    {
                        throw;
                    }
 
                    exceptionToEnqueue = e;
                }
 
                if (result != null)
                {
                    if (result.CompletedSynchronously)
                    {
                        bool dispatchLater;
 
                        if (OnCompleteReceive(result, out dispatchLater))
                        {
                            if (onStartReceiveLater == null)
                            {
                                onStartReceiveLater = new Action<object>(OnStartReceiveLater);
                            }
                            ActionItem.Schedule(onStartReceiveLater, null);
                        }
 
                        if (dispatchLater)
                        {
                            if (onDispatchItemsLater == null)
                            {
                                onDispatchItemsLater = new Action<object>(OnDispatchItemsLater);
                            }
                            ActionItem.Schedule(onDispatchItemsLater, null);
                        }
                    }
                }
                else if (exceptionToEnqueue != null)
                {
                    this.acceptor.Enqueue(exceptionToEnqueue, this.onMessageDequeued, false);
                }
                else // need to kickoff a new loop 
                {
                    if (this.channel.State == CommunicationState.Opened)
                    {
                        if (startReceivingCallback == null)
                        {
                            startReceivingCallback = new Action<object>(StartReceivingCallback);
                        }
 
                        ActionItem.Schedule(startReceivingCallback, this);
                    }
                }
            }
 
            static void OnReceive(IAsyncResult result)
            {
                if (result.CompletedSynchronously)
                {
                    return;
                }
 
                ChannelReceiver thisPtr = (ChannelReceiver)result.AsyncState;
                bool dispatch;
                if (thisPtr.OnCompleteReceive(result, out dispatch))
                {
                    thisPtr.StartReceiving();
                }
 
                if (dispatch)
                {
                    thisPtr.Dispatch();
                }
            }
        }
    }
}