File: System\ServiceModel\Channels\ChannelDemuxer.cs
Project: ndp\cdf\src\WCF\ServiceModel\System.ServiceModel.csproj (System.ServiceModel)
//-----------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------------------------
 
namespace System.ServiceModel.Channels
{
    using System;
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.Runtime;
    using System.ServiceModel;
    using System.ServiceModel.Diagnostics;
    using System.ServiceModel.Dispatcher;
    using System.Threading;
    using System.Xml;
    using System.ServiceModel.Diagnostics.Application;
    using System.Diagnostics.CodeAnalysis;
 
    class ChannelDemuxer
    {
        public readonly static TimeSpan UseDefaultReceiveTimeout = TimeSpan.MinValue;
 
        TypedChannelDemuxer inputDemuxer;
        TypedChannelDemuxer replyDemuxer;
        Dictionary<Type, TypedChannelDemuxer> typeDemuxers;
        TimeSpan peekTimeout;
        int maxPendingSessions;
 
        public ChannelDemuxer()
        {
            this.peekTimeout = ChannelDemuxer.UseDefaultReceiveTimeout; //use the default receive timeout (original behavior)
            this.maxPendingSessions = 10;
            this.typeDemuxers = new Dictionary<Type, TypedChannelDemuxer>();
        }
 
        public TimeSpan PeekTimeout
        {
            get
            {
                return this.peekTimeout;
            }
            set
            {
                this.peekTimeout = value;
            }
        }
 
        public int MaxPendingSessions
        {
            get
            {
                return this.maxPendingSessions;
            }
            set
            {
                this.maxPendingSessions = value;
            }
        }
 
        public IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)
            where TChannel : class, IChannel
        {
            return this.BuildChannelListener<TChannel>(context, new ChannelDemuxerFilter(new MatchAllMessageFilter(), 0));
        }
 
        public IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context, ChannelDemuxerFilter filter)
            where TChannel : class, IChannel
        {
            return GetTypedDemuxer(typeof(TChannel), context).BuildChannelListener<TChannel>(filter);
        }
 
        TypedChannelDemuxer CreateTypedDemuxer(Type channelType, BindingContext context)
        {
            if (channelType == typeof(IDuplexChannel))
                return (TypedChannelDemuxer)(object)new DuplexChannelDemuxer(context);
            if (channelType == typeof(IInputSessionChannel))
                return (TypedChannelDemuxer)(object)new InputSessionChannelDemuxer(context, this.peekTimeout, this.maxPendingSessions);
            if (channelType == typeof(IReplySessionChannel))
                return (TypedChannelDemuxer)(object)new ReplySessionChannelDemuxer(context, this.peekTimeout, this.maxPendingSessions);
            if (channelType == typeof(IDuplexSessionChannel))
                return (TypedChannelDemuxer)(object)new DuplexSessionChannelDemuxer(context, this.peekTimeout, this.maxPendingSessions);
            throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException());
        }
 
        [SuppressMessage(FxCop.Category.Usage, "CA2301:EmbeddableTypesInContainersRule", MessageId = "typeDemuxers", Justification = "No need to support type equivalence here.")]
        TypedChannelDemuxer GetTypedDemuxer(Type channelType, BindingContext context)
        {
            TypedChannelDemuxer typeDemuxer = null;
            bool createdDemuxer = false;
 
            if (channelType == typeof(IInputChannel))
            {
                if (this.inputDemuxer == null)
                {
                    if (context.CanBuildInnerChannelListener<IReplyChannel>())
                        this.inputDemuxer = this.replyDemuxer = new ReplyChannelDemuxer(context);
                    else
                        this.inputDemuxer = new InputChannelDemuxer(context);
                    createdDemuxer = true;
                }
                typeDemuxer = this.inputDemuxer;
            }
            else if (channelType == typeof(IReplyChannel))
            {
                if (this.replyDemuxer == null)
                {
                    this.inputDemuxer = this.replyDemuxer = new ReplyChannelDemuxer(context);
                    createdDemuxer = true;
                }
                typeDemuxer = this.replyDemuxer;
            }
            else if (!this.typeDemuxers.TryGetValue(channelType, out typeDemuxer))
            {
                typeDemuxer = this.CreateTypedDemuxer(channelType, context);
                this.typeDemuxers.Add(channelType, typeDemuxer);
                createdDemuxer = true;
            }
 
            if (!createdDemuxer)
            {
                context.RemainingBindingElements.Clear();
            }
 
            return (TypedChannelDemuxer)typeDemuxer;
        }
    }
 
    abstract class TypedChannelDemuxer
    {
        internal static void AbortMessage(RequestContext request)
        {
            // RequestContext.RequestMessage can throw an AddressMismatch exception.
            try
            {
                AbortMessage(request.RequestMessage);
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e))
                {
                    throw;
                }
 
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
            }
        }
 
        internal static void AbortMessage(Message message)
        {
            try
            {
                message.Close();
            }
            catch (CommunicationException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
            }
            catch (TimeoutException e)
            {
                if (TD.CloseTimeoutIsEnabled())
                {
                    TD.CloseTimeout(e.Message);
                }
 
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
            }
        }
 
        public abstract IChannelListener<TChannel> BuildChannelListener<TChannel>(ChannelDemuxerFilter filter)
            where TChannel : class, IChannel;
    }
 
    //
    // Datagram demuxers
    //
 
    interface IChannelDemuxer
    {
        void OnOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout);
        IAsyncResult OnBeginOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout, AsyncCallback callback, object state);
        void OnEndOuterListenerOpen(IAsyncResult result);
        void OnOuterListenerAbort(ChannelDemuxerFilter filter);
        void OnOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout);
        IAsyncResult OnBeginOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout, AsyncCallback callback, object state);
        void OnEndOuterListenerClose(IAsyncResult result);
    }
 
    abstract class DatagramChannelDemuxer<TInnerChannel, TInnerItem> : TypedChannelDemuxer, IChannelDemuxer
        where TInnerChannel : class, IChannel
        where TInnerItem : class, IDisposable
    {
        MessageFilterTable<IChannelListener> filterTable;
        TInnerChannel innerChannel;
        IChannelListener<TInnerChannel> innerListener;
        static AsyncCallback onReceiveComplete = Fx.ThunkCallback(new AsyncCallback(OnReceiveCompleteStatic));
        static Action<object> startReceivingStatic = new Action<object>(StartReceivingStatic);
        Action onItemDequeued;
        int openCount;
        IChannelDemuxFailureHandler demuxFailureHandler;
        // since the OnOuterListenerOpen method will be called for every outer listener and we will open
        // the inner listener only once, we need to ensure that all the outer listeners wait till the 
        // inner listener is opened.
        ThreadNeutralSemaphore openSemaphore;
        Exception pendingInnerListenerOpenException;
        bool abortOngoingOpen;
 
        public DatagramChannelDemuxer(BindingContext context)
        {
            this.filterTable = new MessageFilterTable<IChannelListener>();
            this.innerListener = context.BuildInnerChannelListener<TInnerChannel>();
            if (context.BindingParameters != null)
            {
                this.demuxFailureHandler = context.BindingParameters.Find<IChannelDemuxFailureHandler>();
            }
            this.openSemaphore = new ThreadNeutralSemaphore(1);
        }
 
        protected TInnerChannel InnerChannel
        {
            get { return this.innerChannel; }
        }
 
        protected IChannelListener<TInnerChannel> InnerListener
        {
            get { return this.innerListener; }
        }
 
        protected object ThisLock
        {
            get { return this; }
        }
 
        protected IChannelDemuxFailureHandler DemuxFailureHandler
        {
            get { return this.demuxFailureHandler; }
        }
 
        protected abstract void AbortItem(TInnerItem item);
        protected abstract IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state);
        protected abstract LayeredChannelListener<TChannel> CreateListener<TChannel>(ChannelDemuxerFilter filter) where TChannel : class, IChannel;
        protected abstract void Dispatch(IChannelListener listener);
        protected abstract void EndpointNotFound(TInnerItem item);
        protected abstract TInnerItem EndReceive(IAsyncResult result);
        protected abstract void EnqueueAndDispatch(IChannelListener listener, TInnerItem item, Action dequeuedCallback, bool canDispatchOnThisThread);
        protected abstract void EnqueueAndDispatch(IChannelListener listener, Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread);
        protected abstract Message GetMessage(TInnerItem item);
 
        public override IChannelListener<TChannel> BuildChannelListener<TChannel>(ChannelDemuxerFilter filter)
        {
            LayeredChannelListener<TChannel> listener = this.CreateListener<TChannel>(filter);
            listener.InnerChannelListener = this.innerListener;
            return listener;
        }
 
        // return false if BeginReceive should be called again
        bool HandleReceiveResult(IAsyncResult result)
        {
            TInnerItem item;
            try
            {
                item = this.EndReceive(result);
            }
            catch (CommunicationObjectFaultedException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                return true;
            }
            catch (CommunicationObjectAbortedException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                return true;
            }
            catch (ObjectDisposedException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                return true;
            }
            catch (CommunicationException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                return false;
            }
            catch (TimeoutException e)
            {
                if (TD.ReceiveTimeoutIsEnabled())
                {
                    TD.ReceiveTimeout(e.Message);
                }
 
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                return false;
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e)) throw;
                this.HandleUnknownException(e);
                return true;
            }
 
            if (item == null)
            {
                if (this.innerChannel.State == CommunicationState.Opened)
                {
                    if (DiagnosticUtility.ShouldTraceError)
                    {
                        TraceUtility.TraceEvent(TraceEventType.Error, TraceCode.PrematureDatagramEof, SR.GetString(SR.TraceCodePrematureDatagramEof),
                            null, this.innerChannel, null);
                    }
                }
 
                return true;
            }
 
            try
            {
                return this.ProcessItem(item);
            }
            catch (CommunicationException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                return false;
            }
            catch (TimeoutException e)
            {
                if (TD.ReceiveTimeoutIsEnabled())
                {
                    TD.ReceiveTimeout(e.Message);
                }
 
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                return false;
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e)) throw;
                this.HandleUnknownException(e);
                return true;
            }
        }
 
        IChannelListener MatchListener(Message message)
        {
            IChannelListener matchingListener = null;
            lock (this.ThisLock)
            {
                if (this.filterTable.GetMatchingValue(message, out matchingListener))
                {
                    return matchingListener;
                }
            }
            return null;
        }
 
        void OnItemDequeued()
        {
            this.StartReceiving();
        }
 
        static void StartReceivingStatic(object state)
        {
            ((DatagramChannelDemuxer<TInnerChannel, TInnerItem>)state).StartReceiving();
        }
 
        protected void HandleUnknownException(Exception exception)
        {
            DiagnosticUtility.TraceHandledException(exception, TraceEventType.Error);
 
            IChannelListener listener = null;
            lock (this.ThisLock)
            {
                if (this.filterTable.Count > 0)
                {
                    KeyValuePair<MessageFilter, IChannelListener>[] pairs = new KeyValuePair<MessageFilter, IChannelListener>[this.filterTable.Count];
                    this.filterTable.CopyTo(pairs, 0);
                    listener = pairs[0].Value;
 
                    if (this.onItemDequeued == null)
                    {
                        this.onItemDequeued = new Action(this.OnItemDequeued);
                    }
                    this.EnqueueAndDispatch(listener, exception, this.onItemDequeued, false);
                }
            }
        }
 
        void AbortState()
        {
            if (this.innerChannel != null)
            {
                this.innerChannel.Abort();
            }
            this.innerListener.Abort();
        }
 
        public void OnOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout)
        {
            bool closeInnerChannelAndListener = false;
 
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            lock (this.ThisLock)
            {
                if (this.filterTable.ContainsKey(filter.Filter))
                {
                    this.filterTable.Remove(filter.Filter);
                    if (--this.openCount == 0)
                    {
                        closeInnerChannelAndListener = true;
                    }
                }
            }
            if (closeInnerChannelAndListener)
            {
                bool closeSucceeded = false;
                try
                {
                    if (this.innerChannel != null)
                    {
                        this.innerChannel.Close(timeoutHelper.RemainingTime());
                    }
                    this.innerListener.Close(timeoutHelper.RemainingTime());
                    closeSucceeded = true;
                }
                finally
                {
                    // we should abort the state since calling Abort on the channel demuxer will be a no-op
                    // due to the reference count being 0
                    if (!closeSucceeded)
                    {
                        AbortState();
                    }
                }
            }
        }
 
        public IAsyncResult OnBeginOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout, AsyncCallback callback, object state)
        {
            bool closeInnerChannelAndListener = false;
            lock (this.ThisLock)
            {
                if (this.filterTable.ContainsKey(filter.Filter))
                {
                    this.filterTable.Remove(filter.Filter);
                    if (--this.openCount == 0)
                    {
                        closeInnerChannelAndListener = true;
                    }
                }
            }
            if (!closeInnerChannelAndListener)
            {
                return new CompletedAsyncResult(callback, state);
            }
            else
            {
                return new CloseAsyncResult(this, timeout, callback, state);
            }
        }
 
        public void OnEndOuterListenerClose(IAsyncResult result)
        {
            if (result is CompletedAsyncResult)
            {
                CompletedAsyncResult.End(result);
            }
            else
            {
                CloseAsyncResult.End(result);
            }
        }
 
        public void OnOuterListenerAbort(ChannelDemuxerFilter filter)
        {
            bool abortInnerChannelAndListener = false;
            lock (this.ThisLock)
            {
                if (this.filterTable.ContainsKey(filter.Filter))
                {
                    this.filterTable.Remove(filter.Filter);
                    if (--this.openCount == 0)
                    {
                        abortInnerChannelAndListener = true;
                        this.abortOngoingOpen = true;
                    }
                }
            }
            if (abortInnerChannelAndListener)
            {
                AbortState();
            }
        }
 
        void ThrowPendingOpenExceptionIfAny()
        {
            if (this.pendingInnerListenerOpenException != null)
            {
                if (pendingInnerListenerOpenException is CommunicationObjectAbortedException)
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperWarning(new CommunicationObjectAbortedException(SR.GetString(SR.PreviousChannelDemuxerOpenFailed, this.pendingInnerListenerOpenException.ToString())));
                }
                else if (pendingInnerListenerOpenException is CommunicationObjectFaultedException)
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperWarning(new CommunicationObjectFaultedException(SR.GetString(SR.PreviousChannelDemuxerOpenFailed, this.pendingInnerListenerOpenException.ToString())));
                }
                else
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperWarning(new CommunicationException(SR.GetString(SR.PreviousChannelDemuxerOpenFailed, this.pendingInnerListenerOpenException.ToString())));
                }
            }
        }
 
        bool ShouldOpenInnerListener(ChannelDemuxerFilter filter, IChannelListener listener)
        {
            lock (this.ThisLock)
            {
                // the listener's Abort may be racing with Open
                if (listener.State == CommunicationState.Closed || listener.State == CommunicationState.Closing)
                {
                    return false;
                }
                this.filterTable.Add(filter.Filter, listener, filter.Priority);
                if (++this.openCount == 1)
                {
                    this.abortOngoingOpen = false;
                    return true;
                }
            }
            return false;
        }
 
        public void OnOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout)
        {
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            this.openSemaphore.Enter(timeoutHelper.RemainingTime());
            try
            {
                bool openInnerListener = ShouldOpenInnerListener(filter, listener);
                if (openInnerListener)
                {
                    try
                    {
                        this.innerListener.Open(timeoutHelper.RemainingTime());
                        this.innerChannel = this.innerListener.AcceptChannel(timeoutHelper.RemainingTime());
                        this.innerChannel.Open(timeoutHelper.RemainingTime());
 
                        lock (ThisLock)
                        {
                            if (this.abortOngoingOpen)
                            {
                                this.AbortState();
                                return;
                            }
                        }
 
                        ActionItem.Schedule(startReceivingStatic, this);
                    }
#pragma warning suppress 56500 // covered by FxCOP
                    catch (Exception e)
                    {
                        this.pendingInnerListenerOpenException = e;
                        throw;
                    }
                }
                else
                {
                    this.ThrowPendingOpenExceptionIfAny();
                }
            }
            finally
            {
                this.openSemaphore.Exit();
            }
        }
 
        public IAsyncResult OnBeginOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout, AsyncCallback callback, object state)
        {
            return new OpenAsyncResult(this, filter, listener, timeout, callback, state);
        }
 
        public void OnEndOuterListenerOpen(IAsyncResult result)
        {
            OpenAsyncResult.End(result);
        }
 
        void OnReceiveComplete(IAsyncResult result)
        {
            if (!this.HandleReceiveResult(result))
            {
                this.StartReceiving();
            }
        }
 
        static void OnReceiveCompleteStatic(IAsyncResult result)
        {
            if (result.CompletedSynchronously)
                return;
            ((DatagramChannelDemuxer<TInnerChannel, TInnerItem>)result.AsyncState).OnReceiveComplete(result);
        }
 
        bool ProcessItem(TInnerItem item)
        {
            try
            {
                Message message = null;
                IChannelListener matchingListener = null;
                try
                {
                    message = this.GetMessage(item);
                    matchingListener = MatchListener(message);
                }
                // The message may be bad because of which running the listener filters may throw
                // In that case, continue receiving
                catch (CommunicationException e)
                {
                    DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    return false;
                }
                catch (MultipleFilterMatchesException e)
                {
                    DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    return false;
                }
                catch (XmlException e)
                {
                    DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    return false;
                }
                catch (Exception e)
                {
                    if (Fx.IsFatal(e)) throw;
                    this.HandleUnknownException(e);
                    return true;
                }
 
                if (matchingListener == null)
                {
                    System.ServiceModel.Dispatcher.ErrorBehavior.ThrowAndCatch(
                        new EndpointNotFoundException(SR.GetString(SR.UnableToDemuxChannel, message.Headers.Action)), message);
                    // EndpointNotFound is responsible for closing the item
                    this.EndpointNotFound(item);
                    item = null;
                    return false;
                }
 
                if (this.onItemDequeued == null)
                {
                    this.onItemDequeued = new Action(this.OnItemDequeued);
                }
                this.EnqueueAndDispatch(matchingListener, item, this.onItemDequeued, false);
                item = null;
                return true;
            }
            finally
            {
                if (item != null)
                {
                    this.AbortItem(item);
                }
            }
        }
 
        void StartReceiving()
        {
            while (true)
            {
                if (this.innerChannel.State != CommunicationState.Opened)
                {
                    return;
                }
 
                IAsyncResult result;
 
                try
                {
                    result = this.BeginReceive(TimeSpan.MaxValue, onReceiveComplete, this);
                }
                catch (CommunicationObjectFaultedException e)
                {
                    DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    return;
                }
                catch (CommunicationObjectAbortedException e)
                {
                    DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    return;
                }
                catch (ObjectDisposedException e)
                {
                    DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    return;
                }
                catch (CommunicationException e)
                {
                    DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    continue;
                }
                catch (TimeoutException e)
                {
                    if (TD.ReceiveTimeoutIsEnabled())
                    {
                        TD.ReceiveTimeout(e.Message);
                    }
 
                    DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    continue;
                }
                catch (Exception e)
                {
                    if (Fx.IsFatal(e)) throw;
                    this.HandleUnknownException(e);
                    return;
                }
 
                if (!result.CompletedSynchronously)
                {
                    return;
                }
 
                if (this.HandleReceiveResult(result))
                {
                    return;
                }
            }
        }
 
        class OpenAsyncResult : AsyncResult
        {
            static FastAsyncCallback waitOverCallback = new FastAsyncCallback(WaitOverCallback);
            static AsyncCallback openListenerCallback = Fx.ThunkCallback(new AsyncCallback(OpenListenerCallback));
            static AsyncCallback acceptChannelCallback = Fx.ThunkCallback(new AsyncCallback(AcceptChannelCallback));
            static AsyncCallback openChannelCallback = Fx.ThunkCallback(new AsyncCallback(OpenChannelCallback));
            DatagramChannelDemuxer<TInnerChannel, TInnerItem> channelDemuxer;
            ChannelDemuxerFilter filter;
            IChannelListener listener;
            TimeoutHelper timeoutHelper;
            bool openInnerListener;
 
            public OpenAsyncResult(DatagramChannelDemuxer<TInnerChannel, TInnerItem> channelDemuxer, ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout, AsyncCallback callback, object state)
                : base(callback, state)
            {
                this.channelDemuxer = channelDemuxer;
                this.filter = filter;
                this.listener = listener;
                this.timeoutHelper = new TimeoutHelper(timeout);
                if (!this.channelDemuxer.openSemaphore.EnterAsync(this.timeoutHelper.RemainingTime(), waitOverCallback, this))
                {
                    return;
                }
 
                bool onWaitOverSucceeded = false;
                bool completeSelf = false;
                try
                {
                    completeSelf = this.OnWaitOver();
                    onWaitOverSucceeded = true;
                }
                finally
                {
                    if (!onWaitOverSucceeded)
                    {
                        Cleanup();
                    }
                }
                if (completeSelf)
                {
                    Cleanup();
                    Complete(true);
                }
            }
 
            static void WaitOverCallback(object state, Exception asyncException)
            {
                OpenAsyncResult self = (OpenAsyncResult)state;
                Exception completionException = asyncException;
                bool completeSelf = false;
 
                if (completionException != null)
                {
                    completeSelf = true;
                }
                else
                {
                    try
                    {
                        completeSelf = self.OnWaitOver();
                    }
#pragma warning suppress 56500 // covered by FxCOP
                    catch (Exception e)
                    {
                        if (Fx.IsFatal(e)) throw;
                        completeSelf = true;
                        completionException = e;
                    }
                }
 
                if (completeSelf)
                {
                    self.Cleanup();
                    self.Complete(false, completionException);
                }
            }
 
            bool OnWaitOver()
            {
                this.openInnerListener = this.channelDemuxer.ShouldOpenInnerListener(filter, listener);
                // the semaphore is obtained. Check if the inner listener needs to be opened. If not,
                // check if there is a pending exception obtained while opening the inner listener and throw
                // that
                if (!this.openInnerListener)
                {
                    this.channelDemuxer.ThrowPendingOpenExceptionIfAny();
                    return true;
                }
                else
                {
                    return this.OnOpenInnerListener();
                }
            }
 
            bool OnInnerListenerEndOpen(IAsyncResult result)
            {
                this.channelDemuxer.innerListener.EndOpen(result);
                result = this.channelDemuxer.innerListener.BeginAcceptChannel(this.timeoutHelper.RemainingTime(), acceptChannelCallback, this);
 
                if (!result.CompletedSynchronously)
                {
                    return false;
                }
 
                return this.OnEndAcceptChannel(result);
            }
 
            bool OnOpenInnerListener()
            {
                try
                {
                    IAsyncResult result = this.channelDemuxer.innerListener.BeginOpen(timeoutHelper.RemainingTime(), openListenerCallback, this);
                    if (!result.CompletedSynchronously)
                    {
                        return false;
                    }
                    this.OnInnerListenerEndOpen(result);
                    return true;
                }
#pragma warning suppress 56500 // covered by FxCOP
                catch (Exception e)
                {
                    this.channelDemuxer.pendingInnerListenerOpenException = e;
                    throw;
                }
            }
 
            static void OpenListenerCallback(IAsyncResult result)
            {
                if (result.CompletedSynchronously)
                {
                    return;
                }
                OpenAsyncResult self = (OpenAsyncResult)result.AsyncState;
                Exception completionException = null;
                try
                {
                    self.OnInnerListenerEndOpen(result);
                }
#pragma warning suppress 56500 // covered by FxCOP
                catch (Exception e)
                {
                    if (Fx.IsFatal(e)) throw;
                    completionException = e;
                }
                if (completionException != null)
                {
                    self.channelDemuxer.pendingInnerListenerOpenException = completionException;
                }
                self.Cleanup();
                self.Complete(false, completionException);
            }
 
            static void AcceptChannelCallback(IAsyncResult result)
            {
                if (result.CompletedSynchronously)
                {
                    return;
                }
                OpenAsyncResult self = (OpenAsyncResult)result.AsyncState;
                Exception completionException = null;
                bool completeSelf = false;
                try
                {
                    completeSelf = self.OnEndAcceptChannel(result);
                }
#pragma warning suppress 56500 // covered by FxCOP
                catch (Exception e)
                {
                    if (Fx.IsFatal(e)) throw;
                    completionException = e;
                    completeSelf = true;
                }
                if (completeSelf)
                {
                    if (completionException != null)
                    {
                        self.channelDemuxer.pendingInnerListenerOpenException = completionException;
                    }
                    self.Cleanup();
                    self.Complete(false, completionException);
                }
            }
 
            bool OnEndAcceptChannel(IAsyncResult result)
            {
                this.channelDemuxer.innerChannel = this.channelDemuxer.innerListener.EndAcceptChannel(result);
                IAsyncResult openResult = this.channelDemuxer.innerChannel.BeginOpen(this.timeoutHelper.RemainingTime(), acceptChannelCallback, this);
 
                if (!openResult.CompletedSynchronously)
                {
                    return false;
                }
 
                this.OnEndOpenChannel(openResult);
                return true;
            }
 
            static void OpenChannelCallback(IAsyncResult result)
            {
                if (result.CompletedSynchronously)
                {
                    return;
                }
                OpenAsyncResult self = (OpenAsyncResult)result.AsyncState;
                Exception completionException = null;
                try
                {
                    self.OnEndOpenChannel(result);
                }
#pragma warning suppress 56500 // covered by FxCOP
                catch (Exception e)
                {
                    if (Fx.IsFatal(e)) throw;
                    completionException = e;
                }
                if (completionException != null)
                {
                    self.channelDemuxer.pendingInnerListenerOpenException = completionException;
                }
                self.Cleanup();
                self.Complete(false, completionException);
            }
 
            void OnEndOpenChannel(IAsyncResult result)
            {
                this.channelDemuxer.innerChannel.EndOpen(result);
 
                lock (this.channelDemuxer.ThisLock)
                {
                    if (this.channelDemuxer.abortOngoingOpen)
                    {
                        this.channelDemuxer.AbortState();
                        return;
                    }
                }
 
                ActionItem.Schedule(startReceivingStatic, this.channelDemuxer);
            }
 
            void Cleanup()
            {
                this.channelDemuxer.openSemaphore.Exit();
            }
 
            public static void End(IAsyncResult result)
            {
                AsyncResult.End<OpenAsyncResult>(result);
            }
        }
 
        class CloseAsyncResult : AsyncResult
        {
            static AsyncCallback sharedCallback = Fx.ThunkCallback(new AsyncCallback(SharedCallback));
            DatagramChannelDemuxer<TInnerChannel, TInnerItem> channelDemuxer;
            TimeoutHelper timeoutHelper;
            bool closedInnerChannel;
 
            public CloseAsyncResult(DatagramChannelDemuxer<TInnerChannel, TInnerItem> channelDemuxer, TimeSpan timeout, AsyncCallback callback, object state)
                : base(callback, state)
            {
                this.channelDemuxer = channelDemuxer;
                this.timeoutHelper = new TimeoutHelper(timeout);
                if (channelDemuxer.innerChannel != null)
                {
                    bool closeSucceeded = false;
                    try
                    {
                        IAsyncResult result = channelDemuxer.innerChannel.BeginClose(timeoutHelper.RemainingTime(), sharedCallback, this);
                        if (!result.CompletedSynchronously)
                        {
                            closeSucceeded = true;
                            return;
                        }
                        channelDemuxer.innerChannel.EndClose(result);
                        closeSucceeded = true;
                    }
                    finally
                    {
                        if (!closeSucceeded)
                        {
                            // we should abort the state since calling Abort on the channel demuxer will be a no-op
                            // due to the reference count being 0
                            this.channelDemuxer.AbortState();
                        }
                    }
                }
                if (OnInnerChannelClosed())
                {
                    Complete(true);
                }
            }
 
            bool OnInnerChannelClosed()
            {
                this.closedInnerChannel = true;
                bool closeSucceeded = false;
                try
                {
                    IAsyncResult result = channelDemuxer.innerListener.BeginClose(timeoutHelper.RemainingTime(), sharedCallback, this);
                    if (!result.CompletedSynchronously)
                    {
                        closeSucceeded = true;
                        return false;
                    }
                    channelDemuxer.innerListener.EndClose(result);
                    closeSucceeded = true;
                }
                finally
                {
                    if (!closeSucceeded)
                    {
                        // we should abort the state since calling Abort on the channel demuxer will be a no-op
                        // due to the reference count being 0
                        channelDemuxer.AbortState();
                    }
                }
                return true;
            }
 
            static void SharedCallback(IAsyncResult result)
            {
                if (result.CompletedSynchronously)
                {
                    return;
                }
                CloseAsyncResult self = (CloseAsyncResult)result.AsyncState;
                bool completeSelf = false;
                Exception completionException = null;
                bool closeSucceeded = false;
                try
                {
                    if (!self.closedInnerChannel)
                    {
                        self.channelDemuxer.innerChannel.EndClose(result);
                        completeSelf = self.OnInnerChannelClosed();
                        closeSucceeded = true;
                    }
                    else
                    {
                        self.channelDemuxer.innerListener.EndClose(result);
                        completeSelf = true;
                        closeSucceeded = true;
                    }
                }
#pragma warning suppress 56500 // covered by FxCOP
                catch (Exception e)
                {
                    if (Fx.IsFatal(e)) throw;
                    completeSelf = true;
                    completionException = e;
                }
                finally
                {
                    if (!closeSucceeded)
                    {
                        // we should abort the state since calling Abort on the channel demuxer will be a no-op
                        // due to the reference count being 0
                        self.channelDemuxer.AbortState();
                    }
                }
                if (completeSelf)
                {
                    self.Complete(false, completionException);
                }
            }
 
            public static void End(IAsyncResult result)
            {
                AsyncResult.End<CloseAsyncResult>(result);
            }
        }
    }
 
    class InputChannelDemuxer : DatagramChannelDemuxer<IInputChannel, Message>
    {
        public InputChannelDemuxer(BindingContext context)
            : base(context)
        {
        }
 
        protected override void AbortItem(Message message)
        {
            AbortMessage(message);
        }
 
        protected override IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return this.InnerChannel.BeginReceive(timeout, callback, state);
        }
 
        protected override LayeredChannelListener<IInputChannel> CreateListener<IInputChannel>(ChannelDemuxerFilter filter)
        {
            SingletonChannelListener<IInputChannel, InputChannel, Message> listener = new SingletonChannelListener<IInputChannel, InputChannel, Message>(filter, this);
            listener.Acceptor = (IChannelAcceptor<IInputChannel>)new InputChannelAcceptor(listener);
            return listener;
        }
 
        protected override void Dispatch(IChannelListener listener)
        {
            SingletonChannelListener<IInputChannel, InputChannel, Message> singletonListener = (SingletonChannelListener<IInputChannel, InputChannel, Message>)listener;
            singletonListener.Dispatch();
        }
 
        protected override void EndpointNotFound(Message message)
        {
            if (this.DemuxFailureHandler != null)
            {
                this.DemuxFailureHandler.HandleDemuxFailure(message);
            }
            this.AbortItem(message);
        }
 
        protected override Message EndReceive(IAsyncResult result)
        {
            return this.InnerChannel.EndReceive(result);
        }
 
        protected override void EnqueueAndDispatch(IChannelListener listener, Message message, Action dequeuedCallback, bool canDispatchOnThisThread)
        {
            SingletonChannelListener<IInputChannel, InputChannel, Message> singletonListener = (SingletonChannelListener<IInputChannel, InputChannel, Message>)listener;
            singletonListener.EnqueueAndDispatch(message, dequeuedCallback, canDispatchOnThisThread);
        }
 
        protected override void EnqueueAndDispatch(IChannelListener listener, Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread)
        {
            SingletonChannelListener<IInputChannel, InputChannel, Message> singletonListener = (SingletonChannelListener<IInputChannel, InputChannel, Message>)listener;
            singletonListener.EnqueueAndDispatch(exception, dequeuedCallback, canDispatchOnThisThread);
        }
 
        protected override Message GetMessage(Message message)
        {
            return message;
        }
    }
 
    class DuplexChannelDemuxer : DatagramChannelDemuxer<IDuplexChannel, Message>
    {
        public DuplexChannelDemuxer(BindingContext context)
            : base(context)
        {
        }
 
        protected override void AbortItem(Message message)
        {
            AbortMessage(message);
        }
 
        protected override IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return this.InnerChannel.BeginReceive(timeout, callback, state);
        }
 
        protected override LayeredChannelListener<IDuplexChannel> CreateListener<IDuplexChannel>(ChannelDemuxerFilter filter)
        {
            SingletonChannelListener<IDuplexChannel, DuplexChannel, Message> listener = new SingletonChannelListener<IDuplexChannel, DuplexChannel, Message>(filter, this);
            listener.Acceptor = (IChannelAcceptor<IDuplexChannel>)new DuplexChannelAcceptor(listener, this);
            return listener;
        }
 
        protected override void Dispatch(IChannelListener listener)
        {
            SingletonChannelListener<IDuplexChannel, DuplexChannel, Message> singletonListener = (SingletonChannelListener<IDuplexChannel, DuplexChannel, Message>)listener;
            singletonListener.Dispatch();
        }
 
        protected override void EndpointNotFound(Message message)
        {
            if (this.DemuxFailureHandler != null)
            {
                this.DemuxFailureHandler.HandleDemuxFailure(message);
            }
            this.AbortItem(message);
        }
 
        protected override Message EndReceive(IAsyncResult result)
        {
            return this.InnerChannel.EndReceive(result);
        }
 
        protected override void EnqueueAndDispatch(IChannelListener listener, Message message, Action dequeuedCallback, bool canDispatchOnThisThread)
        {
            SingletonChannelListener<IDuplexChannel, DuplexChannel, Message> singletonListener = (SingletonChannelListener<IDuplexChannel, DuplexChannel, Message>)listener;
            singletonListener.EnqueueAndDispatch(message, dequeuedCallback, canDispatchOnThisThread);
        }
 
        protected override void EnqueueAndDispatch(IChannelListener listener, Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread)
        {
            SingletonChannelListener<IDuplexChannel, DuplexChannel, Message> singletonListener = (SingletonChannelListener<IDuplexChannel, DuplexChannel, Message>)listener;
            singletonListener.EnqueueAndDispatch(exception, dequeuedCallback, canDispatchOnThisThread);
        }
 
        protected override Message GetMessage(Message message)
        {
            return message;
        }
 
        class DuplexChannelAcceptor : SingletonChannelAcceptor<IDuplexChannel, DuplexChannel, Message>
        {
            DuplexChannelDemuxer demuxer;
 
            public DuplexChannelAcceptor(ChannelManagerBase channelManager, DuplexChannelDemuxer demuxer)
                : base(channelManager)
            {
                this.demuxer = demuxer;
            }
 
            protected override DuplexChannel OnCreateChannel()
            {
                return new DuplexChannelWrapper(this.ChannelManager, demuxer.InnerChannel);
            }
 
            protected override void OnTraceMessageReceived(Message message)
            {
                if (DiagnosticUtility.ShouldTraceInformation)
                {
                    TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.MessageReceived, SR.GetString(SR.TraceCodeMessageReceived),
                        MessageTransmitTraceRecord.CreateReceiveTraceRecord(message), this, null);
                }
            }
        }
 
        class DuplexChannelWrapper : DuplexChannel
        {
            IDuplexChannel innerChannel;
 
            public DuplexChannelWrapper(ChannelManagerBase channelManager, IDuplexChannel innerChannel)
                : base(channelManager, innerChannel.LocalAddress)
            {
                this.innerChannel = innerChannel;
            }
 
            public override EndpointAddress RemoteAddress
            {
                get { return this.innerChannel.RemoteAddress; }
            }
 
            public override Uri Via
            {
                get { return this.innerChannel.Via; }
            }
 
            protected override void OnSend(Message message, TimeSpan timeout)
            {
                this.innerChannel.Send(message, timeout);
            }
 
            protected override IAsyncResult OnBeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)
            {
                return this.innerChannel.BeginSend(message, timeout, callback, state);
            }
 
            protected override void OnEndSend(IAsyncResult result)
            {
                this.innerChannel.EndSend(result);
            }
 
            protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
            {
                return new CompletedAsyncResult(callback, state);
            }
 
            protected override void OnEndOpen(IAsyncResult result)
            {
                CompletedAsyncResult.End(result);
            }
 
            protected override void OnOpen(TimeSpan timeout)
            {
            }
        }
    }
 
    class ReplyChannelDemuxer : DatagramChannelDemuxer<IReplyChannel, RequestContext>
    {
        public ReplyChannelDemuxer(BindingContext context)
            : base(context)
        {
        }
 
        protected override void AbortItem(RequestContext request)
        {
            AbortMessage(request);
            request.Abort();
        }
 
        protected override IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return this.InnerChannel.BeginReceiveRequest(timeout, callback, state);
        }
 
        protected override LayeredChannelListener<TChannel> CreateListener<TChannel>(ChannelDemuxerFilter filter)
        {
            if (typeof(TChannel) == typeof(IInputChannel))
            {
                SingletonChannelListener<IInputChannel, InputChannel, Message> listener = new SingletonChannelListener<IInputChannel, InputChannel, Message>(filter, this);
                listener.Acceptor = (IChannelAcceptor<IInputChannel>)new InputChannelAcceptor(listener);
                return (LayeredChannelListener<TChannel>)(object)listener;
            }
            else if (typeof(TChannel) == typeof(IReplyChannel))
            {
                SingletonChannelListener<IReplyChannel, ReplyChannel, RequestContext> listener = new SingletonChannelListener<IReplyChannel, ReplyChannel, RequestContext>(filter, this);
                listener.Acceptor = (IChannelAcceptor<IReplyChannel>)new ReplyChannelAcceptor(listener);
                return (LayeredChannelListener<TChannel>)(object)listener;
            }
            else
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException());
            }
        }
 
        protected override void Dispatch(IChannelListener listener)
        {
            SingletonChannelListener<IInputChannel, InputChannel, Message> inputListener = listener as SingletonChannelListener<IInputChannel, InputChannel, Message>;
            if (inputListener != null)
            {
                inputListener.Dispatch();
                return;
            }
            SingletonChannelListener<IReplyChannel, ReplyChannel, RequestContext> replyListener = listener as SingletonChannelListener<IReplyChannel, ReplyChannel, RequestContext>;
            if (replyListener != null)
            {
                replyListener.Dispatch();
                return;
            }
 
            throw Fx.AssertAndThrow("ReplyChannelDemuxer.Dispatch (false)");
        }
 
        void EndpointNotFoundCallback(IAsyncResult result)
        {
            if (result.CompletedSynchronously)
            {
                return;
            }
            RequestContext item = (RequestContext)result.AsyncState;
            bool abortItem = true;
            try
            {
                ReplyChannelDemuxFailureAsyncResult.End(result);
                abortItem = false;
            }
            catch (TimeoutException e)
            {
                if (TD.SendTimeoutIsEnabled())
                {
                    TD.SendTimeout(e.Message);
                }
 
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
            }
            catch (CommunicationException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
            }
            catch (ObjectDisposedException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e)) throw;
                this.HandleUnknownException(e);
            }
            finally
            {
                if (abortItem)
                {
                    this.AbortItem(item);
                }
            }
        }
 
        protected override void EndpointNotFound(RequestContext request)
        {
            bool abortItem = true;
            try
            {
                if (this.DemuxFailureHandler != null)
                {
                    try
                    {
                        ReplyChannelDemuxFailureAsyncResult result = new ReplyChannelDemuxFailureAsyncResult(this.DemuxFailureHandler, request, Fx.ThunkCallback(new AsyncCallback(this.EndpointNotFoundCallback)), request);
                        result.Start();
                        if (!result.CompletedSynchronously)
                        {
                            abortItem = false;
                            return;
                        }
                        ReplyChannelDemuxFailureAsyncResult.End(result);
                        abortItem = false;
                    }
                    catch (CommunicationException e)
                    {
                        DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    }
                    catch (TimeoutException e)
                    {
                        if (TD.SendTimeoutIsEnabled())
                        {
                            TD.SendTimeout(e.Message);
                        }
                        DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    }
                    catch (ObjectDisposedException e)
                    {
                        DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    }
                    catch (Exception e)
                    {
                        if (Fx.IsFatal(e)) throw;
                        this.HandleUnknownException(e);
                    }
                }
            }
            finally
            {
                if (abortItem)
                {
                    this.AbortItem(request);
                }
            }
        }
 
        protected override RequestContext EndReceive(IAsyncResult result)
        {
            return this.InnerChannel.EndReceiveRequest(result);
        }
 
        protected override void EnqueueAndDispatch(IChannelListener listener, RequestContext request, Action dequeuedCallback, bool canDispatchOnThisThread)
        {
            SingletonChannelListener<IInputChannel, InputChannel, Message> inputListener = listener as SingletonChannelListener<IInputChannel, InputChannel, Message>;
            if (inputListener != null)
            {
                inputListener.EnqueueAndDispatch(request.RequestMessage, dequeuedCallback, canDispatchOnThisThread);
 
                try
                {
                    request.Close();
                }
                catch (CommunicationException e)
                {
                    DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                }
                catch (TimeoutException e)
                {
                    if (TD.CloseTimeoutIsEnabled())
                    {
                        TD.CloseTimeout(e.Message);
                    }
                    DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                }
            }
            SingletonChannelListener<IReplyChannel, ReplyChannel, RequestContext> replyListener = listener as SingletonChannelListener<IReplyChannel, ReplyChannel, RequestContext>;
            if (replyListener != null)
            {
                replyListener.EnqueueAndDispatch(request, dequeuedCallback, canDispatchOnThisThread);
                return;
            }
 
            throw Fx.AssertAndThrow("ReplyChannelDemuxer.EnqueueAndDispatch (false)");
        }
 
        protected override void EnqueueAndDispatch(IChannelListener listener, Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread)
        {
            SingletonChannelListener<IInputChannel, InputChannel, Message> inputListener = listener as SingletonChannelListener<IInputChannel, InputChannel, Message>;
            if (inputListener != null)
            {
                inputListener.EnqueueAndDispatch(exception, dequeuedCallback, canDispatchOnThisThread);
                return;
            }
 
            SingletonChannelListener<IReplyChannel, ReplyChannel, RequestContext> replyListener = listener as SingletonChannelListener<IReplyChannel, ReplyChannel, RequestContext>;
            if (replyListener != null)
            {
                replyListener.EnqueueAndDispatch(exception, dequeuedCallback, canDispatchOnThisThread);
                return;
            }
 
            throw Fx.AssertAndThrow("ReplyChannelDemuxer.EnqueueAndDispatch (false)");
        }
 
        protected override Message GetMessage(RequestContext request)
        {
            return request.RequestMessage;
        }
    }
 
    interface IChannelDemuxerFilter
    {
        ChannelDemuxerFilter Filter { get; }
    }
 
    class SingletonChannelListener<TChannel, TQueuedChannel, TQueuedItem> : DelegatingChannelListener<TChannel>, IChannelDemuxerFilter
        where TChannel : class, IChannel
        where TQueuedChannel : InputQueueChannel<TQueuedItem>
        where TQueuedItem : class, IDisposable
    {
        ChannelDemuxerFilter filter;
        IChannelDemuxer channelDemuxer;
 
        public SingletonChannelListener(ChannelDemuxerFilter filter, IChannelDemuxer channelDemuxer)
            : base(true)
        {
            this.filter = filter;
            this.channelDemuxer = channelDemuxer;
        }
 
        public ChannelDemuxerFilter Filter
        {
            get { return this.filter; }
        }
 
        SingletonChannelAcceptor<TChannel, TQueuedChannel, TQueuedItem> SingletonAcceptor
        {
            get { return (SingletonChannelAcceptor<TChannel, TQueuedChannel, TQueuedItem>)base.Acceptor; }
            set { this.Acceptor = value; }
        }
 
        protected override void OnOpen(TimeSpan timeout)
        {
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            this.channelDemuxer.OnOuterListenerOpen(this.filter, this, timeoutHelper.RemainingTime());
            base.OnOpen(timeoutHelper.RemainingTime());
        }
 
        protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return new ChainedAsyncResult(timeout, callback, state, this.OnBeginOuterListenerOpen, this.OnEndOuterListenerOpen, base.OnBeginOpen, base.OnEndOpen);
        }
 
        protected override void OnEndOpen(IAsyncResult result)
        {
            ChainedAsyncResult.End(result);
        }
 
        IAsyncResult OnBeginOuterListenerOpen(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return this.channelDemuxer.OnBeginOuterListenerOpen(this.filter, this, timeout, callback, state);
        }
 
        void OnEndOuterListenerOpen(IAsyncResult result)
        {
            this.channelDemuxer.OnEndOuterListenerOpen(result);
        }
 
        protected override void OnAbort()
        {
            this.channelDemuxer.OnOuterListenerAbort(this.filter);
            base.OnAbort();
        }
 
        protected override void OnClose(TimeSpan timeout)
        {
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            this.channelDemuxer.OnOuterListenerClose(this.filter, timeoutHelper.RemainingTime());
            base.OnClose(timeoutHelper.RemainingTime());
        }
 
        protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return new ChainedAsyncResult(timeout, callback, state, this.OnBeginOuterListenerClose, this.OnEndOuterListenerClose, base.OnBeginClose, base.OnEndClose);
        }
 
        protected override void OnEndClose(IAsyncResult result)
        {
            ChainedAsyncResult.End(result);
        }
 
        IAsyncResult OnBeginOuterListenerClose(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return this.channelDemuxer.OnBeginOuterListenerClose(this.filter, timeout, callback, state);
        }
 
        void OnEndOuterListenerClose(IAsyncResult result)
        {
            this.channelDemuxer.OnEndOuterListenerClose(result);
        }
 
        public void Dispatch()
        {
            this.SingletonAcceptor.DispatchItems();
        }
 
        public void EnqueueAndDispatch(TQueuedItem item, Action dequeuedCallback, bool canDispatchOnThisThread)
        {
            this.SingletonAcceptor.EnqueueAndDispatch(item, dequeuedCallback, canDispatchOnThisThread);
        }
 
        public void EnqueueAndDispatch(Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread)
        {
            this.SingletonAcceptor.EnqueueAndDispatch(exception, dequeuedCallback, canDispatchOnThisThread);
        }
    }
 
    //
    // Session demuxers
    //
 
    abstract class SessionChannelDemuxer<TInnerChannel, TInnerItem> : TypedChannelDemuxer, IChannelDemuxer
        where TInnerChannel : class, IChannel
        where TInnerItem : class, IDisposable
    {
        IChannelDemuxFailureHandler demuxFailureHandler;
        MessageFilterTable<InputQueueChannelListener<TInnerChannel>> filterTable;
        IChannelListener<TInnerChannel> innerListener;
        static AsyncCallback onAcceptComplete = Fx.ThunkCallback(new AsyncCallback(OnAcceptCompleteStatic));
        static AsyncCallback onPeekComplete = Fx.ThunkCallback(new AsyncCallback(OnPeekCompleteStatic));
        Action onItemDequeued;
        static WaitCallback scheduleAcceptStatic = new WaitCallback(ScheduleAcceptStatic);
        static Action<object> startAcceptStatic = new Action<object>(StartAcceptStatic);
        Action<object> onStartAccepting;
        int openCount;
        ThreadNeutralSemaphore openSemaphore;
        Exception pendingExceptionOnOpen;
        bool abortOngoingOpen;
        FlowThrottle throttle;
        TimeSpan peekTimeout;
 
        public SessionChannelDemuxer(BindingContext context, TimeSpan peekTimeout, int maxPendingSessions)
        {
            if (context.BindingParameters != null)
            {
                this.demuxFailureHandler = context.BindingParameters.Find<IChannelDemuxFailureHandler>();
            }
            this.innerListener = context.BuildInnerChannelListener<TInnerChannel>();
            this.filterTable = new MessageFilterTable<InputQueueChannelListener<TInnerChannel>>();
            this.openSemaphore = new ThreadNeutralSemaphore(1);
            this.peekTimeout = peekTimeout;
            this.throttle = new FlowThrottle(scheduleAcceptStatic, maxPendingSessions, null, null);
        }
 
        protected object ThisLock
        {
            get { return this; }
        }
 
        protected IChannelDemuxFailureHandler DemuxFailureHandler
        {
            get { return this.demuxFailureHandler; }
        }
 
        Action<object> OnStartAccepting
        {
            get
            {
                if (this.onStartAccepting == null)
                {
                    this.onStartAccepting = new Action<object>(OnStartAcceptingCallback);
                }
 
                return this.onStartAccepting;
            }
        }
 
        protected abstract void AbortItem(TInnerItem item);
        protected abstract IAsyncResult BeginReceive(TInnerChannel channel, AsyncCallback callback, object state);
        protected abstract IAsyncResult BeginReceive(TInnerChannel channel, TimeSpan timeout, AsyncCallback callback, object state);
        protected abstract TInnerChannel CreateChannel(ChannelManagerBase channelManager, TInnerChannel innerChannel, TInnerItem firstItem);
        protected abstract void EndpointNotFound(TInnerChannel channel, TInnerItem item);
        protected abstract TInnerItem EndReceive(TInnerChannel channel, IAsyncResult result);
        protected abstract Message GetMessage(TInnerItem item);
 
        public override IChannelListener<TChannel> BuildChannelListener<TChannel>(ChannelDemuxerFilter filter)
        {
            Fx.Assert(typeof(TChannel) == typeof(TInnerChannel), "SessionChannelDemuxer.BuildChannelListener (typeof(TChannel) == typeof(TInnerChannel))");
 
            InputQueueChannelListener<TChannel> listener = new InputQueueChannelListener<TChannel>(filter, this);
            listener.InnerChannelListener = this.innerListener;
            return listener;
        }
 
        // return true if another BeginAcceptChannel should pend
        bool BeginAcceptChannel(bool requiresThrottle, out IAsyncResult result)
        {
            result = null;
 
            if (requiresThrottle && !this.throttle.Acquire(this))
            {
                return false;
            }
 
            bool releaseThrottle = true;
 
            try
            {
                result = this.innerListener.BeginAcceptChannel(TimeSpan.MaxValue, onAcceptComplete, this);
                releaseThrottle = false;
            }
            catch (CommunicationObjectFaultedException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                return false;
            }
            catch (CommunicationObjectAbortedException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                return false;
            }
            catch (ObjectDisposedException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                return false;
            }
            catch (CommunicationException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                return true;
            }
            catch (TimeoutException e)
            {
                if (TD.OpenTimeoutIsEnabled())
                {
                    TD.OpenTimeout(e.Message);
                }
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                return true;
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e)) throw;
                this.HandleUnknownException(e);
                releaseThrottle = false;
                return false;
            }
            finally
            {
                if (releaseThrottle)
                {
                    this.throttle.Release();
                }
            }
 
            return true;
        }
 
        bool EndAcceptChannel(IAsyncResult result, out TInnerChannel channel)
        {
            channel = null;
            bool releaseThrottle = true;
            try
            {
                channel = this.innerListener.EndAcceptChannel(result);
                releaseThrottle = (channel == null);
            }
            catch (CommunicationObjectFaultedException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                return false;
            }
            catch (CommunicationObjectAbortedException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                return false;
            }
            catch (ObjectDisposedException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                return false;
            }
            catch (CommunicationException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                return true;
            }
            catch (TimeoutException e)
            {
                if (TD.OpenTimeoutIsEnabled())
                {
                    TD.OpenTimeout(e.Message);
                }
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                return true;
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e)) throw;
                this.HandleUnknownException(e);
                releaseThrottle = false;
                return false;
            }
            finally
            {
                if (releaseThrottle)
                {
                    throttle.Release();
                }
            }
 
            return (channel != null);
        }
 
        void PeekChannel(TInnerChannel channel)
        {
            bool releaseThrottle = true;
            try
            {
                IAsyncResult peekResult = new PeekAsyncResult(this, channel, onPeekComplete, this);
                releaseThrottle = false;
                if (!peekResult.CompletedSynchronously)
                {
                    return;
                }
                channel = null;
                this.HandlePeekResult(peekResult);
            }
            catch (CommunicationException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
            }
            catch (TimeoutException e)
            {
                if (TD.OpenTimeoutIsEnabled())
                {
                    TD.OpenTimeout(e.Message);
                }
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
            }
            catch (ObjectDisposedException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e)) throw;
                this.HandleUnknownException(e);
                releaseThrottle = false;
            }
 
            if (channel != null)
            {
                channel.Abort();
            }
 
            if (releaseThrottle)
            {
                this.throttle.Release();
            }
        }
 
        void HandlePeekResult(IAsyncResult result)
        {
            TInnerChannel channel = null;
            TInnerItem item;
            bool abortChannel = false;
            bool releaseThrottle = true;
            try
            {
                PeekAsyncResult.End(result, out channel, out item);
                releaseThrottle = (item == null);
            }
            catch (ObjectDisposedException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                abortChannel = true;
                return;
            }
            catch (CommunicationException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                abortChannel = true;
                return;
            }
            catch (TimeoutException e)
            {
                if (TD.OpenTimeoutIsEnabled())
                {
                    TD.OpenTimeout(e.Message);
                }
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                abortChannel = true;
                return;
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e)) throw;
                this.HandleUnknownException(e);
                releaseThrottle = false;
                return;
            }
            finally
            {
                if (abortChannel && channel != null)
                {
                    channel.Abort();
                }
 
                if (releaseThrottle)
                {
                    this.throttle.Release();
                }
            }
 
            if (item != null)
            {
                releaseThrottle = true;
 
                try
                {
                    this.ProcessItem(channel, item);
                    releaseThrottle = false;
                }
                catch (CommunicationException e)
                {
                    DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                }
                catch (TimeoutException e)
                {
                    if (TD.OpenTimeoutIsEnabled())
                    {
                        TD.OpenTimeout(e.Message);
                    }
                    DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                }
                catch (Exception e)
                {
                    if (Fx.IsFatal(e)) throw;
                    this.HandleUnknownException(e);
                    releaseThrottle = false;
                }
                finally
                {
                    if (releaseThrottle)
                    {
                        this.throttle.Release();
                    }
                }
            }
        }
 
        InputQueueChannelListener<TInnerChannel> MatchListener(Message message)
        {
            InputQueueChannelListener<TInnerChannel> matchingListener = null;
            lock (this.ThisLock)
            {
                if (this.filterTable.GetMatchingValue(message, out matchingListener))
                {
                    return matchingListener;
                }
            }
            return null;
        }
 
        static void OnAcceptCompleteStatic(IAsyncResult result)
        {
            if (result.CompletedSynchronously)
            {
                return;
            }
 
            ((SessionChannelDemuxer<TInnerChannel, TInnerItem>)result.AsyncState).OnStartAcceptingCallback(result);
        }
 
        static void ScheduleAcceptStatic(object state)
        {
            ActionItem.Schedule(startAcceptStatic, state);
        }
 
        static void StartAcceptStatic(object state)
        {
            ((SessionChannelDemuxer<TInnerChannel, TInnerItem>)state).StartAccepting(false);
        }
 
        bool ShouldStartAccepting(ChannelDemuxerFilter filter, IChannelListener listener)
        {
            lock (this.ThisLock)
            {
                // the listener's Abort may be racing with Open
                if (listener.State == CommunicationState.Closed || listener.State == CommunicationState.Closing)
                {
                    return false;
                }
 
                this.filterTable.Add(filter.Filter, (InputQueueChannelListener<TInnerChannel>)(object)listener, filter.Priority);
                if (++this.openCount == 1)
                {
                    this.abortOngoingOpen = false;
                    return true;
                }
            }
            return false;
        }
 
        void StartAccepting(bool requiresThrottle)
        {
            IAsyncResult acceptResult;
            bool acceptValid = this.BeginAcceptChannel(requiresThrottle, out acceptResult);
            if (acceptValid && (acceptResult == null || acceptResult.CompletedSynchronously))
            {
                // need to spawn another thread to process this completion
                ActionItem.Schedule(OnStartAccepting, acceptResult);
            }
        }
 
        void OnItemDequeued()
        {
            this.throttle.Release();
        }
 
        void ThrowPendingOpenExceptionIfAny()
        {
            if (this.pendingExceptionOnOpen != null)
            {
                if (pendingExceptionOnOpen is CommunicationObjectAbortedException)
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperWarning(new CommunicationObjectAbortedException(SR.GetString(SR.PreviousChannelDemuxerOpenFailed, this.pendingExceptionOnOpen.ToString())));
                }
                else if (pendingExceptionOnOpen is CommunicationObjectFaultedException)
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperWarning(new CommunicationObjectFaultedException(SR.GetString(SR.PreviousChannelDemuxerOpenFailed, this.pendingExceptionOnOpen.ToString())));
                }
                else
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperWarning(new CommunicationException(SR.GetString(SR.PreviousChannelDemuxerOpenFailed, this.pendingExceptionOnOpen.ToString())));
                }
            }
        }
 
        public void OnOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout)
        {
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            this.openSemaphore.Enter(timeoutHelper.RemainingTime());
            try
            {
                bool startAccepting = ShouldStartAccepting(filter, listener);
                if (startAccepting)
                {
                    try
                    {
                        this.innerListener.Open(timeoutHelper.RemainingTime());
                        StartAccepting(true);
                        lock (ThisLock)
                        {
                            if (this.abortOngoingOpen)
                            {
                                this.innerListener.Abort();
                            }
                        }
                    }
#pragma warning suppress 56500 // covered by FxCOP
                    catch (Exception e)
                    {
                        this.pendingExceptionOnOpen = e;
                        throw;
                    }
                }
                else
                {
                    this.ThrowPendingOpenExceptionIfAny();
                }
            }
            finally
            {
                this.openSemaphore.Exit();
            }
        }
 
 
        public IAsyncResult OnBeginOuterListenerOpen(ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout, AsyncCallback callback, object state)
        {
            return new OpenAsyncResult(this, filter, listener, timeout, callback, state);
        }
 
        public void OnEndOuterListenerOpen(IAsyncResult result)
        {
            OpenAsyncResult.End(result);
        }
 
        bool ShouldCloseInnerListener(ChannelDemuxerFilter filter, bool aborted)
        {
            lock (this.ThisLock)
            {
                if (this.filterTable.ContainsKey(filter.Filter))
                {
                    this.filterTable.Remove(filter.Filter);
                    if (--this.openCount == 0)
                    {
                        if (aborted)
                        {
                            this.abortOngoingOpen = true;
                        }
                        return true;
                    }
                }
            }
            return false;
        }
 
        public void OnOuterListenerAbort(ChannelDemuxerFilter filter)
        {
            if (ShouldCloseInnerListener(filter, true))
            {
                innerListener.Abort();
            }
        }
 
        public void OnOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout)
        {
            if (ShouldCloseInnerListener(filter, false))
            {
                bool closeSucceeded = false;
                try
                {
                    innerListener.Close(timeout);
                    closeSucceeded = true;
                }
                finally
                {
                    if (!closeSucceeded)
                    {
                        // we should abort the state since calling Abort on the channel demuxer will be a no-op
                        // due to the reference count being 0
                        innerListener.Abort();
                    }
                }
            }
        }
 
        public IAsyncResult OnBeginOuterListenerClose(ChannelDemuxerFilter filter, TimeSpan timeout, AsyncCallback callback, object state)
        {
            if (ShouldCloseInnerListener(filter, false))
            {
                bool closeSucceeded = false;
                try
                {
                    IAsyncResult result = this.innerListener.BeginClose(timeout, callback, state);
                    closeSucceeded = true;
                    return result;
                }
                finally
                {
                    if (!closeSucceeded)
                    {
                        // we should abort the state since calling Abort on the channel demuxer will be a no-op
                        // due to the reference count being 0
                        this.innerListener.Abort();
                    }
                }
            }
            else
            {
                return new CompletedAsyncResult(callback, state);
            }
        }
 
        public void OnEndOuterListenerClose(IAsyncResult result)
        {
            if (result is CompletedAsyncResult)
            {
                CompletedAsyncResult.End(result);
            }
            else
            {
                bool closeSucceeded = false;
                try
                {
                    this.innerListener.EndClose(result);
                    closeSucceeded = true;
                }
                finally
                {
                    if (!closeSucceeded)
                    {
                        // we should abort the state since calling Abort on the channel demuxer will be a no-op
                        // due to the reference count being 0
                        this.innerListener.Abort();
                    }
                }
            }
        }
 
        void OnStartAcceptingCallback(object state)
        {
            IAsyncResult result = (IAsyncResult)state;
            TInnerChannel channel = null;
 
            if (result == null || this.EndAcceptChannel(result, out channel))
            {
                this.StartAccepting(channel);
            }
        }
 
        static void OnPeekCompleteStatic(IAsyncResult result)
        {
            if (result.CompletedSynchronously)
            {
                return;
            }
 
            SessionChannelDemuxer<TInnerChannel, TInnerItem> demuxer
                = (SessionChannelDemuxer<TInnerChannel, TInnerItem>)result.AsyncState;
 
            bool releaseThrottle = true;
 
            try
            {
                demuxer.HandlePeekResult(result);
                releaseThrottle = false;
            }
            catch (CommunicationException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
            }
            catch (ObjectDisposedException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e)) throw;
                demuxer.HandleUnknownException(e);
                releaseThrottle = false;
            }
            finally
            {
                if (releaseThrottle)
                {
                    demuxer.throttle.Release();
                }
            }
        }
 
        void ProcessItem(TInnerChannel channel, TInnerItem item)
        {
            InputQueueChannelListener<TInnerChannel> listener = null;
            TInnerChannel wrappedChannel = null;
            bool releaseThrottle = true;
 
            try
            {
                Message message = this.GetMessage(item);
                try
                {
                    listener = MatchListener(message);
                    releaseThrottle = (listener == null);
                }
                // MatchListener could run the filters against an untrusted message and could throw.
                // If so, abort the session
                catch (CommunicationException e)
                {
                    DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    return;
                }
                catch (MultipleFilterMatchesException e)
                {
                    DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    return;
                }
                catch (XmlException e)
                {
                    DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    return;
                }
                finally
                {
                    if (releaseThrottle)
                    {
                        this.throttle.Release();
                    }
                }
 
                if (listener == null)
                {
                    try
                    {
                        throw TraceUtility.ThrowHelperError(new EndpointNotFoundException(SR.GetString(SR.UnableToDemuxChannel, message.Headers.Action)), message);
                    }
                    catch (EndpointNotFoundException e)
                    {
                        DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                        this.EndpointNotFound(channel, item);
                        // EndpointNotFound is responsible for closing and aborting the channel
                        channel = null;
                        item = null;
                    }
                    return;
                }
 
                wrappedChannel = this.CreateChannel(listener, channel, item);
                channel = null;
                item = null;
            }
            finally
            {
                if (item != null)
                {
                    this.AbortItem(item);
                }
                if (channel != null)
                {
                    channel.Abort();
                }
            }
 
            bool enqueueSucceeded = false;
            try
            {
                if (this.onItemDequeued == null)
                {
                    this.onItemDequeued = new Action(this.OnItemDequeued);
                }
 
                listener.InputQueueAcceptor.EnqueueAndDispatch(wrappedChannel, this.onItemDequeued, false);
                enqueueSucceeded = true;
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e)) throw;
                this.HandleUnknownException(e);
            }
            finally
            {
                if (!enqueueSucceeded)
                {
                    this.throttle.Release();
                    wrappedChannel.Abort();
                }
            }
        }
 
        protected void HandleUnknownException(Exception exception)
        {
            InputQueueChannelListener<TInnerChannel> listener = null;
 
            lock (this.ThisLock)
            {
                if (this.filterTable.Count > 0)
                {
                    KeyValuePair<MessageFilter, InputQueueChannelListener<TInnerChannel>>[] pairs = new KeyValuePair<MessageFilter, InputQueueChannelListener<TInnerChannel>>[this.filterTable.Count];
                    this.filterTable.CopyTo(pairs, 0);
                    listener = pairs[0].Value;
 
                    if (this.onItemDequeued == null)
                    {
                        this.onItemDequeued = new Action(OnItemDequeued);
                    }
 
                    listener.InputQueueAcceptor.EnqueueAndDispatch(exception, this.onItemDequeued, false);
                }
            }
        }
 
        void StartAccepting(TInnerChannel channelToPeek)
        {
            for (;;)
            {
                IAsyncResult result;
                bool acceptValid = this.BeginAcceptChannel(true, out result);
 
                if (channelToPeek != null)
                {
                    if (acceptValid && (result == null || result.CompletedSynchronously))
                    {
                        // need to spawn another thread to process this completion
                        // since we're going to process channelToPeek on this thread
                        ActionItem.Schedule(OnStartAccepting, result);
                    }
 
                    PeekChannel(channelToPeek);
                    return;
                }
                else
                {
                    if (!acceptValid)
                    {
                        return; // we're done, listener is toast
                    }
 
                    if (result == null)
                    {
                        continue;
                    }
 
                    if (!result.CompletedSynchronously)
                    {
                        return;
                    }
 
                    if (!this.EndAcceptChannel(result, out channelToPeek))
                    {
                        return;
                    }
                }
            }
        }
 
        class PeekAsyncResult : AsyncResult
        {
            TInnerChannel channel;
            SessionChannelDemuxer<TInnerChannel, TInnerItem> demuxer;
            TInnerItem item;
            static AsyncCallback onOpenComplete = Fx.ThunkCallback(new AsyncCallback(OnOpenCompleteStatic));
            static AsyncCallback onReceiveComplete = Fx.ThunkCallback(new AsyncCallback(OnReceiveCompleteStatic));
 
            public PeekAsyncResult(SessionChannelDemuxer<TInnerChannel, TInnerItem> demuxer, TInnerChannel channel, AsyncCallback callback, object state)
                : base(callback, state)
            {
                this.demuxer = demuxer;
                this.channel = channel;
                IAsyncResult result = this.channel.BeginOpen(onOpenComplete, this);
                if (!result.CompletedSynchronously)
                {
                    return;
                }
                if (this.HandleOpenComplete(result))
                {
                    this.Complete(true);
                }
            }
 
            public static void End(IAsyncResult result, out TInnerChannel channel, out TInnerItem item)
            {
                PeekAsyncResult peekResult = AsyncResult.End<PeekAsyncResult>(result);
                channel = peekResult.channel;
                item = peekResult.item;
            }
 
            bool HandleOpenComplete(IAsyncResult result)
            {
                this.channel.EndOpen(result);
 
                IAsyncResult receiveResult;
 
                if (this.demuxer.peekTimeout == ChannelDemuxer.UseDefaultReceiveTimeout)
                {
                    //use the default ReceiveTimeout for the channel
                    receiveResult = this.demuxer.BeginReceive(this.channel, onReceiveComplete, this);
                }
                else
                {
                    receiveResult = this.demuxer.BeginReceive(this.channel, this.demuxer.peekTimeout, onReceiveComplete, this);
                }
 
                if (receiveResult.CompletedSynchronously)
                {
                    this.HandleReceiveComplete(receiveResult);
                    return true;
                }
 
                return false;
            }
 
            static void OnOpenCompleteStatic(IAsyncResult result)
            {
                if (result.CompletedSynchronously)
                    return;
 
                PeekAsyncResult peekAsyncResult = (PeekAsyncResult)result.AsyncState;
 
                bool completeSelf = false;
                Exception exception = null;
 
                try
                {
                    completeSelf = peekAsyncResult.HandleOpenComplete(result);
                }
                catch (Exception e)
                {
                    if (Fx.IsFatal(e))
                    {
                        throw;
                    }
                    exception = e;
                    completeSelf = true;
                }
 
                if (completeSelf)
                {
                    peekAsyncResult.Complete(false, exception);
                }
            }
 
            void HandleReceiveComplete(IAsyncResult result)
            {
                this.item = this.demuxer.EndReceive(this.channel, result);
            }
 
            static void OnReceiveCompleteStatic(IAsyncResult result)
            {
                if (result.CompletedSynchronously)
                    return;
 
                PeekAsyncResult peekAsyncResult = (PeekAsyncResult)result.AsyncState;
                Exception exception = null;
 
                try
                {
                    peekAsyncResult.HandleReceiveComplete(result);
                }
                catch (Exception e)
                {
                    if (Fx.IsFatal(e))
                    {
                        throw;
                    }
                    exception = e;
                }
 
                peekAsyncResult.Complete(false, exception);
            }
        }
 
        class OpenAsyncResult : AsyncResult
        {
            static FastAsyncCallback waitOverCallback = new FastAsyncCallback(WaitOverCallback);
            static AsyncCallback openListenerCallback = Fx.ThunkCallback(new AsyncCallback(OpenListenerCallback));
            SessionChannelDemuxer<TInnerChannel, TInnerItem> channelDemuxer;
            ChannelDemuxerFilter filter;
            IChannelListener listener;
            TimeoutHelper timeoutHelper;
            bool startAccepting;
 
            public OpenAsyncResult(SessionChannelDemuxer<TInnerChannel, TInnerItem> channelDemuxer, ChannelDemuxerFilter filter, IChannelListener listener, TimeSpan timeout, AsyncCallback callback, object state)
                : base(callback, state)
            {
                this.channelDemuxer = channelDemuxer;
                this.filter = filter;
                this.listener = listener;
                this.timeoutHelper = new TimeoutHelper(timeout);
                if (!this.channelDemuxer.openSemaphore.EnterAsync(this.timeoutHelper.RemainingTime(), waitOverCallback, this))
                {
                    return;
                }
 
                bool waitOverSucceeded = false;
                bool completeSelf = false;
                try
                {
                    completeSelf = this.OnWaitOver();
                    waitOverSucceeded = true;
                }
                finally
                {
                    if (!waitOverSucceeded)
                    {
                        Cleanup();
                    }
                }
                if (completeSelf)
                {
                    Cleanup();
                    Complete(true);
                }
            }
 
            static void WaitOverCallback(object state, Exception asyncException)
            {
                OpenAsyncResult self = (OpenAsyncResult)state;
                bool completeSelf = false;
                Exception completionException = asyncException;
                if (completionException != null)
                {
                    completeSelf = true;
                }
                else
                {
                    try
                    {
                        completeSelf = self.OnWaitOver();
                    }
#pragma warning suppress 56500 // covered by FxCOP
                    catch (Exception e)
                    {
                        if (Fx.IsFatal(e)) throw;
                        completeSelf = true;
                        completionException = e;
                    }
                }
 
                if (completeSelf)
                {
                    self.Cleanup();
                    self.Complete(false, completionException);
                }
            }
 
            bool OnWaitOver()
            {
                this.startAccepting = this.channelDemuxer.ShouldStartAccepting(this.filter, this.listener);
                if (!this.startAccepting)
                {
                    this.channelDemuxer.ThrowPendingOpenExceptionIfAny();
                    return true;
                }
                else
                {
                    return this.OnStartAccepting();
                }
            }
 
            void OnEndInnerListenerOpen(IAsyncResult result)
            {
                this.channelDemuxer.innerListener.EndOpen(result);
                this.channelDemuxer.StartAccepting(true);
                lock (this.channelDemuxer.ThisLock)
                {
                    if (this.channelDemuxer.abortOngoingOpen)
                    {
                        this.channelDemuxer.innerListener.Abort();
                    }
                }
            }
 
            bool OnStartAccepting()
            {
                try
                {
                    IAsyncResult result = this.channelDemuxer.innerListener.BeginOpen(timeoutHelper.RemainingTime(), openListenerCallback, this);
                    if (!result.CompletedSynchronously)
                    {
                        return false;
                    }
                    this.OnEndInnerListenerOpen(result);
                    return true;
                }
#pragma warning suppress 56500 // covered by FxCOP
                catch (Exception e)
                {
                    this.channelDemuxer.pendingExceptionOnOpen = e;
                    throw;
                }
            }
 
            static void OpenListenerCallback(IAsyncResult result)
            {
                if (result.CompletedSynchronously)
                {
                    return;
                }
                OpenAsyncResult self = (OpenAsyncResult)result.AsyncState;
                Exception completionException = null;
                try
                {
                    self.OnEndInnerListenerOpen(result);
                }
#pragma warning suppress 56500 // covered by FxCOP
                catch (Exception e)
                {
                    if (Fx.IsFatal(e)) throw;
                    completionException = e;
                }
                if (completionException != null)
                {
                    self.channelDemuxer.pendingExceptionOnOpen = completionException;
                }
                self.Cleanup();
                self.Complete(false, completionException);
            }
 
            void Cleanup()
            {
                this.channelDemuxer.openSemaphore.Exit();
            }
 
            public static void End(IAsyncResult result)
            {
                AsyncResult.End<OpenAsyncResult>(result);
            }
        }
    }
 
    class InputSessionChannelDemuxer : SessionChannelDemuxer<IInputSessionChannel, Message>
    {
        public InputSessionChannelDemuxer(BindingContext context, TimeSpan peekTimeout, int maxPendingSessions)
            : base(context, peekTimeout, maxPendingSessions)
        {
        }
 
        protected override void AbortItem(Message message)
        {
            AbortMessage(message);
        }
 
        protected override IAsyncResult BeginReceive(IInputSessionChannel channel, AsyncCallback callback, object state)
        {
            return channel.BeginReceive(callback, state);
        }
 
        protected override IAsyncResult BeginReceive(IInputSessionChannel channel, TimeSpan timeout, AsyncCallback callback, object state)
        {
            return channel.BeginReceive(timeout, callback, state);
        }
 
        protected override IInputSessionChannel CreateChannel(ChannelManagerBase channelManager, IInputSessionChannel innerChannel, Message firstMessage)
        {
            return new InputSessionChannelWrapper(channelManager, innerChannel, firstMessage);
        }
 
        protected override void EndpointNotFound(IInputSessionChannel channel, Message message)
        {
            if (this.DemuxFailureHandler != null)
            {
                this.DemuxFailureHandler.HandleDemuxFailure(message);
            }
            this.AbortItem(message);
            channel.Abort();
        }
 
        protected override Message EndReceive(IInputSessionChannel channel, IAsyncResult result)
        {
            return channel.EndReceive(result);
        }
 
        protected override Message GetMessage(Message message)
        {
            return message;
        }
    }
 
    class InputSessionChannelWrapper : InputChannelWrapper, IInputSessionChannel
    {
        public InputSessionChannelWrapper(ChannelManagerBase channelManager, IInputSessionChannel innerChannel, Message firstMessage)
            : base(channelManager, innerChannel, firstMessage)
        {
        }
 
        new IInputSessionChannel InnerChannel
        {
            get { return (IInputSessionChannel)base.InnerChannel; }
        }
 
        public IInputSession Session
        {
            get { return this.InnerChannel.Session; }
        }
    }
 
    class DuplexSessionChannelDemuxer : SessionChannelDemuxer<IDuplexSessionChannel, Message>
    {
        public DuplexSessionChannelDemuxer(BindingContext context, TimeSpan peekTimeout, int maxPendingSessions)
            : base(context, peekTimeout, maxPendingSessions)
        {
        }
 
        protected override void AbortItem(Message message)
        {
            AbortMessage(message);
        }
 
        protected override IAsyncResult BeginReceive(IDuplexSessionChannel channel, AsyncCallback callback, object state)
        {
            return channel.BeginReceive(callback, state);
        }
 
        protected override IAsyncResult BeginReceive(IDuplexSessionChannel channel, TimeSpan timeout, AsyncCallback callback, object state)
        {
            return channel.BeginReceive(timeout, callback, state);
        }
 
        protected override IDuplexSessionChannel CreateChannel(ChannelManagerBase channelManager, IDuplexSessionChannel innerChannel, Message firstMessage)
        {
            return new DuplexSessionChannelWrapper(channelManager, innerChannel, firstMessage);
        }
 
        void EndpointNotFoundCallback(IAsyncResult result)
        {
            if (result.CompletedSynchronously)
            {
                return;
            }
            ChannelAndMessageAsyncState channelAndMessage = (ChannelAndMessageAsyncState)result.AsyncState;
            bool doAbort = true;
            try
            {
                DuplexSessionDemuxFailureAsyncResult.End(result);
                doAbort = false;
            }
            catch (TimeoutException e)
            {
                if (TD.SendTimeoutIsEnabled())
                {
                    TD.SendTimeout(e.Message);
                }
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
            }
            catch (CommunicationException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
            }
            catch (ObjectDisposedException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e)) throw;
                this.HandleUnknownException(e);
            }
            finally
            {
                if (doAbort)
                {
                    this.AbortItem(channelAndMessage.message);
                    channelAndMessage.channel.Abort();
                }
            }
        }
 
        protected override void EndpointNotFound(IDuplexSessionChannel channel, Message message)
        {
            bool doAbort = true;
            try
            {
                if (this.DemuxFailureHandler != null)
                {
                    try
                    {
                        DuplexSessionDemuxFailureAsyncResult result = new DuplexSessionDemuxFailureAsyncResult(this.DemuxFailureHandler, channel, message, Fx.ThunkCallback(new AsyncCallback(this.EndpointNotFoundCallback)), new ChannelAndMessageAsyncState(channel, message));
                        result.Start();
                        if (!result.CompletedSynchronously)
                        {
                            doAbort = false;
                            return;
                        }
                        DuplexSessionDemuxFailureAsyncResult.End(result);
                        doAbort = false;
                    }
                    catch (CommunicationException e)
                    {
                        DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    }
                    catch (TimeoutException e)
                    {
                        if (TD.SendTimeoutIsEnabled())
                        {
                            TD.SendTimeout(e.Message);
                        }
                        DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    }
                    catch (ObjectDisposedException e)
                    {
                        DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    }
                    catch (Exception e)
                    {
                        if (Fx.IsFatal(e)) throw;
                        this.HandleUnknownException(e);
                    }
                }
            }
            finally
            {
                if (doAbort)
                {
                    this.AbortItem(message);
                    channel.Abort();
                }
            }
        }
 
        protected override Message EndReceive(IDuplexSessionChannel channel, IAsyncResult result)
        {
            return channel.EndReceive(result);
        }
 
        protected override Message GetMessage(Message message)
        {
            return message;
        }
 
        struct ChannelAndMessageAsyncState
        {
            public IChannel channel;
            public Message message;
 
            public ChannelAndMessageAsyncState(IChannel channel, Message message)
            {
                this.channel = channel;
                this.message = message;
            }
        }
    }
 
    class DuplexSessionChannelWrapper : InputChannelWrapper, IDuplexSessionChannel
    {
        public DuplexSessionChannelWrapper(ChannelManagerBase channelManager, IDuplexSessionChannel innerChannel, Message firstMessage)
            : base(channelManager, innerChannel, firstMessage)
        {
        }
 
        new IDuplexSessionChannel InnerChannel
        {
            get { return (IDuplexSessionChannel)base.InnerChannel; }
        }
 
        public IDuplexSession Session
        {
            get { return InnerChannel.Session; }
        }
 
        public EndpointAddress RemoteAddress
        {
            get { return InnerChannel.RemoteAddress; }
        }
 
        public Uri Via
        {
            get { return InnerChannel.Via; }
        }
 
        public void Send(Message message)
        {
            this.InnerChannel.Send(message);
        }
 
        public void Send(Message message, TimeSpan timeout)
        {
            this.InnerChannel.Send(message, timeout);
        }
 
        public IAsyncResult BeginSend(Message message, AsyncCallback callback, object state)
        {
            return this.InnerChannel.BeginSend(message, callback, state);
        }
 
        public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)
        {
            return this.InnerChannel.BeginSend(message, timeout, callback, state);
        }
 
        public void EndSend(IAsyncResult result)
        {
            this.InnerChannel.EndSend(result);
        }
    }
 
    class ReplySessionChannelDemuxer : SessionChannelDemuxer<IReplySessionChannel, RequestContext>
    {
        public ReplySessionChannelDemuxer(BindingContext context, TimeSpan peekTimeout, int maxPendingSessions)
            : base(context, peekTimeout, maxPendingSessions)
        {
        }
 
        protected override void AbortItem(RequestContext request)
        {
            AbortMessage(request);
            request.Abort();
        }
 
        protected override IAsyncResult BeginReceive(IReplySessionChannel channel, AsyncCallback callback, object state)
        {
            return channel.BeginReceiveRequest(callback, state);
        }
 
        protected override IAsyncResult BeginReceive(IReplySessionChannel channel, TimeSpan timeout, AsyncCallback callback, object state)
        {
            return channel.BeginReceiveRequest(timeout, callback, state);
        }
 
        protected override IReplySessionChannel CreateChannel(ChannelManagerBase channelManager, IReplySessionChannel innerChannel, RequestContext firstRequest)
        {
            return new ReplySessionChannelWrapper(channelManager, innerChannel, firstRequest);
        }
 
        void EndpointNotFoundCallback(IAsyncResult result)
        {
            if (result.CompletedSynchronously)
            {
                return;
            }
            ChannelAndRequestAsyncState channelAndRequest = (ChannelAndRequestAsyncState)result.AsyncState;
            bool doAbort = true;
            try
            {
                ReplySessionDemuxFailureAsyncResult.End(result);
                doAbort = false;
            }
            catch (TimeoutException e)
            {
                if (TD.SendTimeoutIsEnabled())
                {
                    TD.SendTimeout(e.Message);
                }
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
            }
            catch (CommunicationException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
            }
            catch (ObjectDisposedException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e)) throw;
                this.HandleUnknownException(e);
            }
            finally
            {
                if (doAbort)
                {
                    this.AbortItem(channelAndRequest.request);
                    channelAndRequest.channel.Abort();
                }
            }
        }
 
        protected override void EndpointNotFound(IReplySessionChannel channel, RequestContext request)
        {
            bool doAbort = true;
            try
            {
                if (this.DemuxFailureHandler != null)
                {
                    try
                    {
                        ReplySessionDemuxFailureAsyncResult result = new ReplySessionDemuxFailureAsyncResult(this.DemuxFailureHandler, request, channel, Fx.ThunkCallback(new AsyncCallback(this.EndpointNotFoundCallback)), new ChannelAndRequestAsyncState(channel, request));
                        result.Start();
                        if (!result.CompletedSynchronously)
                        {
                            doAbort = false;
                            return;
                        }
                        ReplySessionDemuxFailureAsyncResult.End(result);
                        doAbort = false;
                    }
                    catch (CommunicationException e)
                    {
                        DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    }
                    catch (TimeoutException e)
                    {
                        if (TD.SendTimeoutIsEnabled())
                        {
                            TD.SendTimeout(e.Message);
                        }
                        DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    }
                    catch (ObjectDisposedException e)
                    {
                        DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    }
                    catch (Exception e)
                    {
                        if (Fx.IsFatal(e)) throw;
                        this.HandleUnknownException(e);
                    }
                }
            }
            finally
            {
                if (doAbort)
                {
                    this.AbortItem(request);
                    channel.Abort();
                }
            }
        }
 
        protected override RequestContext EndReceive(IReplySessionChannel channel, IAsyncResult result)
        {
            return channel.EndReceiveRequest(result);
        }
 
        protected override Message GetMessage(RequestContext request)
        {
            return request.RequestMessage;
        }
 
        struct ChannelAndRequestAsyncState
        {
            public IChannel channel;
            public RequestContext request;
 
            public ChannelAndRequestAsyncState(IChannel channel, RequestContext request)
            {
                this.channel = channel;
                this.request = request;
            }
        }
    }
 
    class ReplySessionChannelWrapper : ReplyChannelWrapper, IReplySessionChannel
    {
        public ReplySessionChannelWrapper(ChannelManagerBase channelManager, IReplySessionChannel innerChannel, RequestContext firstRequest)
            : base(channelManager, innerChannel, firstRequest)
        {
        }
 
        new IReplySessionChannel InnerChannel
        {
            get { return (IReplySessionChannel)base.InnerChannel; }
        }
 
        public IInputSession Session
        {
            get { return this.InnerChannel.Session; }
        }
    }
 
    abstract class ChannelWrapper<TChannel, TItem> : LayeredChannel<TChannel>
        where TChannel : class, IChannel
        where TItem : class, IDisposable
    {
        TItem firstItem;
 
        public ChannelWrapper(ChannelManagerBase channelManager, TChannel innerChannel, TItem firstItem)
            : base(channelManager, innerChannel)
        {
            this.firstItem = firstItem;
        }
 
        protected abstract void CloseFirstItem(TimeSpan timeout);
 
        protected TItem GetFirstItem()
        {
            return Interlocked.Exchange<TItem>(ref this.firstItem, null);
        }
 
        protected bool HaveFirstItem()
        {
            return (this.firstItem != null);
        }
 
        protected override void OnAbort()
        {
            base.OnAbort();
            this.CloseFirstItem(TimeSpan.Zero);
        }
 
        protected override void OnClose(TimeSpan timeout)
        {
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            this.CloseFirstItem(timeoutHelper.RemainingTime());
            base.OnClose(timeoutHelper.RemainingTime());
        }
 
        protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
        {
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            this.CloseFirstItem(timeoutHelper.RemainingTime());
            return base.OnBeginClose(timeoutHelper.RemainingTime(), callback, state);
        }
 
        protected override void OnEndClose(IAsyncResult result)
        {
            base.OnEndClose(result);
        }
 
        protected class ReceiveAsyncResult : AsyncResult
        {
            TItem item;
 
            public ReceiveAsyncResult(TItem item, AsyncCallback callback, object state)
                : base(callback, state)
            {
                this.item = item;
                this.Complete(true);
            }
 
            public static TItem End(IAsyncResult result)
            {
                ReceiveAsyncResult receiveResult = AsyncResult.End<ReceiveAsyncResult>(result);
                return receiveResult.item;
            }
        }
 
        protected class WaitAsyncResult : AsyncResult
        {
            public WaitAsyncResult(AsyncCallback callback, object state)
                : base(callback, state)
            {
                this.Complete(true);
            }
 
            public static bool End(IAsyncResult result)
            {
                WaitAsyncResult waitResult = AsyncResult.End<WaitAsyncResult>(result);
                return true;
            }
        }
    }
 
    class InputChannelWrapper : ChannelWrapper<IInputChannel, Message>, IInputChannel
    {
        public InputChannelWrapper(ChannelManagerBase channelManager, IInputChannel innerChannel, Message firstMessage)
            : base(channelManager, innerChannel, firstMessage)
        {
        }
 
        public EndpointAddress LocalAddress
        {
            get { return this.InnerChannel.LocalAddress; }
        }
 
        protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return new CompletedAsyncResult(callback, state);
        }
 
        protected override void OnEndOpen(IAsyncResult result)
        {
            CompletedAsyncResult.End(result);
        }
 
        protected override void OnOpen(TimeSpan timeout)
        {
        }
 
        protected override void CloseFirstItem(TimeSpan timeout)
        {
            Message message = this.GetFirstItem();
            if (message != null)
            {
                TypedChannelDemuxer.AbortMessage(message);
            }
        }
 
        public Message Receive()
        {
            Message message = this.GetFirstItem();
            if (message != null)
                return message;
            return this.InnerChannel.Receive();
        }
 
        public Message Receive(TimeSpan timeout)
        {
            Message message = this.GetFirstItem();
            if (message != null)
                return message;
            return this.InnerChannel.Receive(timeout);
        }
 
        public IAsyncResult BeginReceive(AsyncCallback callback, object state)
        {
            Message message = this.GetFirstItem();
            if (message != null)
                return new ReceiveAsyncResult(message, callback, state);
            return this.InnerChannel.BeginReceive(callback, state);
        }
 
        public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
        {
            Message message = this.GetFirstItem();
            if (message != null)
                return new ReceiveAsyncResult(message, callback, state);
            return this.InnerChannel.BeginReceive(timeout, callback, state);
        }
 
        public Message EndReceive(IAsyncResult result)
        {
            if (result is ReceiveAsyncResult)
                return ReceiveAsyncResult.End(result);
            return this.InnerChannel.EndReceive(result);
        }
 
        public bool TryReceive(TimeSpan timeout, out Message message)
        {
            message = this.GetFirstItem();
            if (message != null)
                return true;
            return this.InnerChannel.TryReceive(timeout, out message);
        }
 
        public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
        {
            Message message = this.GetFirstItem();
            if (message != null)
                return new ReceiveAsyncResult(message, callback, state);
            return this.InnerChannel.BeginTryReceive(timeout, callback, state);
        }
 
        public bool EndTryReceive(IAsyncResult result, out Message message)
        {
            if (result is ReceiveAsyncResult)
            {
                message = ReceiveAsyncResult.End(result);
                return true;
            }
            return this.InnerChannel.EndTryReceive(result, out message);
        }
 
        public bool WaitForMessage(TimeSpan timeout)
        {
            if (this.HaveFirstItem())
                return true;
            return this.InnerChannel.WaitForMessage(timeout);
        }
 
        public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
        {
            if (this.HaveFirstItem())
                return new WaitAsyncResult(callback, state);
            return this.InnerChannel.BeginWaitForMessage(timeout, callback, state);
        }
 
        public bool EndWaitForMessage(IAsyncResult result)
        {
            if (result is WaitAsyncResult)
                return WaitAsyncResult.End(result);
            return this.InnerChannel.EndWaitForMessage(result);
        }
    }
 
    class ReplyChannelWrapper : ChannelWrapper<IReplyChannel, RequestContext>, IReplyChannel
    {
        public ReplyChannelWrapper(ChannelManagerBase channelManager, IReplyChannel innerChannel, RequestContext firstRequest)
            : base(channelManager, innerChannel, firstRequest)
        {
        }
 
        public EndpointAddress LocalAddress
        {
            get { return this.InnerChannel.LocalAddress; }
        }
 
        protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return new CompletedAsyncResult(callback, state);
        }
 
        protected override void OnEndOpen(IAsyncResult result)
        {
            CompletedAsyncResult.End(result);
        }
 
        protected override void OnOpen(TimeSpan timeout)
        {
        }
 
        protected override void CloseFirstItem(TimeSpan timeout)
        {
            RequestContext request = this.GetFirstItem();
            if (request != null)
            {
                try
                {
                    request.RequestMessage.Close();
                    request.Close(timeout);
                }
                catch (CommunicationException e)
                {
                    DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                }
                catch (TimeoutException e)
                {
                    if (TD.CloseTimeoutIsEnabled())
                    {
                        TD.CloseTimeout(e.Message);
                    }
                    DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                }
            }
        }
 
        public RequestContext ReceiveRequest()
        {
            RequestContext request = this.GetFirstItem();
            if (request != null)
                return request;
            return this.InnerChannel.ReceiveRequest();
        }
 
        public RequestContext ReceiveRequest(TimeSpan timeout)
        {
            RequestContext request = this.GetFirstItem();
            if (request != null)
                return request;
            return this.InnerChannel.ReceiveRequest(timeout);
        }
 
        public IAsyncResult BeginReceiveRequest(AsyncCallback callback, object state)
        {
            RequestContext request = this.GetFirstItem();
            if (request != null)
                return new ReceiveAsyncResult(request, callback, state);
            return this.InnerChannel.BeginReceiveRequest(callback, state);
        }
 
        public IAsyncResult BeginReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)
        {
            RequestContext request = this.GetFirstItem();
            if (request != null)
                return new ReceiveAsyncResult(request, callback, state);
            return this.InnerChannel.BeginReceiveRequest(timeout, callback, state);
        }
 
        public RequestContext EndReceiveRequest(IAsyncResult result)
        {
            if (result is ReceiveAsyncResult)
                return ReceiveAsyncResult.End(result);
            return this.InnerChannel.EndReceiveRequest(result);
        }
 
        public bool TryReceiveRequest(TimeSpan timeout, out RequestContext request)
        {
            request = this.GetFirstItem();
            if (request != null)
                return true;
            return this.InnerChannel.TryReceiveRequest(timeout, out request);
        }
 
        public IAsyncResult BeginTryReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)
        {
            RequestContext request = this.GetFirstItem();
            if (request != null)
                return new ReceiveAsyncResult(request, callback, state);
            return this.InnerChannel.BeginTryReceiveRequest(timeout, callback, state);
        }
 
        public bool EndTryReceiveRequest(IAsyncResult result, out RequestContext request)
        {
            if (result is ReceiveAsyncResult)
            {
                request = ReceiveAsyncResult.End(result);
                return true;
            }
            return this.InnerChannel.EndTryReceiveRequest(result, out request);
        }
 
        public bool WaitForRequest(TimeSpan timeout)
        {
            if (this.HaveFirstItem())
                return true;
            return this.InnerChannel.WaitForRequest(timeout);
        }
 
        public IAsyncResult BeginWaitForRequest(TimeSpan timeout, AsyncCallback callback, object state)
        {
            if (this.HaveFirstItem())
                return new WaitAsyncResult(callback, state);
            return this.InnerChannel.BeginWaitForRequest(timeout, callback, state);
        }
 
        public bool EndWaitForRequest(IAsyncResult result)
        {
            if (result is WaitAsyncResult)
                return WaitAsyncResult.End(result);
            return this.InnerChannel.EndWaitForRequest(result);
        }
    }
 
    class InputQueueChannelListener<TChannel> : DelegatingChannelListener<TChannel>
        where TChannel : class, IChannel
    {
        ChannelDemuxerFilter filter;
        IChannelDemuxer channelDemuxer;
 
        public InputQueueChannelListener(ChannelDemuxerFilter filter, IChannelDemuxer channelDemuxer)
            : base(true)
        {
            this.filter = filter;
            this.channelDemuxer = channelDemuxer;
            this.Acceptor = new InputQueueChannelAcceptor<TChannel>(this);
        }
 
        public ChannelDemuxerFilter Filter
        {
            get { return this.filter; }
        }
 
        public InputQueueChannelAcceptor<TChannel> InputQueueAcceptor
        {
            get { return (InputQueueChannelAcceptor<TChannel>)base.Acceptor; }
        }
 
        protected override void OnOpen(TimeSpan timeout)
        {
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            this.channelDemuxer.OnOuterListenerOpen(this.filter, this, timeoutHelper.RemainingTime());
            base.OnOpen(timeoutHelper.RemainingTime());
        }
 
        protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return new ChainedAsyncResult(timeout, callback, state, this.OnBeginOuterListenerOpen, this.OnEndOuterListenerOpen, base.OnBeginOpen, base.OnEndOpen);
        }
 
        protected override void OnEndOpen(IAsyncResult result)
        {
            ChainedAsyncResult.End(result);
        }
 
        IAsyncResult OnBeginOuterListenerOpen(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return this.channelDemuxer.OnBeginOuterListenerOpen(this.filter, this, timeout, callback, state);
        }
 
        void OnEndOuterListenerOpen(IAsyncResult result)
        {
            this.channelDemuxer.OnEndOuterListenerOpen(result);
        }
 
        protected override void OnAbort()
        {
            this.channelDemuxer.OnOuterListenerAbort(this.filter);
            base.OnAbort();
        }
 
        protected override void OnClose(TimeSpan timeout)
        {
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            this.channelDemuxer.OnOuterListenerClose(this.filter, timeoutHelper.RemainingTime());
            base.OnClose(timeoutHelper.RemainingTime());
        }
 
        protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return new ChainedAsyncResult(timeout, callback, state, this.OnBeginOuterListenerClose, this.OnEndOuterListenerClose, base.OnBeginClose, base.OnEndClose);
        }
 
        protected override void OnEndClose(IAsyncResult result)
        {
            ChainedAsyncResult.End(result);
        }
 
        IAsyncResult OnBeginOuterListenerClose(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return this.channelDemuxer.OnBeginOuterListenerClose(this.filter, timeout, callback, state);
        }
 
        void OnEndOuterListenerClose(IAsyncResult result)
        {
            this.channelDemuxer.OnEndOuterListenerClose(result);
        }
    }
 
    //
    // Binding element
    //
 
    class ChannelDemuxerBindingElement : BindingElement
    {
        ChannelDemuxer demuxer;
        CachedBindingContextState cachedContextState;
        bool cacheContextState;
 
        public ChannelDemuxerBindingElement(bool cacheContextState)
        {
            this.cacheContextState = cacheContextState;
            if (cacheContextState)
            {
                this.cachedContextState = new CachedBindingContextState();
            }
            this.demuxer = new ChannelDemuxer();
        }
 
        public ChannelDemuxerBindingElement(ChannelDemuxerBindingElement element)
        {
            this.demuxer = element.demuxer;
            this.cacheContextState = element.cacheContextState;
            this.cachedContextState = element.cachedContextState;
        }
 
        public TimeSpan PeekTimeout
        {
            get
            {
                return this.demuxer.PeekTimeout;
            }
            set
            {
                if (value < TimeSpan.Zero && value != ChannelDemuxer.UseDefaultReceiveTimeout)
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException("value"));
                }
 
                this.demuxer.PeekTimeout = value;
            }
        }
 
        public int MaxPendingSessions
        {
            get
            {
                return this.demuxer.MaxPendingSessions;
            }
            set
            {
                if (value < 1)
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException(SR.GetString(SR.ValueMustBeGreaterThanZero)));
                }
 
                this.demuxer.MaxPendingSessions = value;
            }
        }
 
        void SubstituteCachedBindingContextParametersIfNeeded(BindingContext context)
        {
            if (!this.cacheContextState)
            {
                return;
            }
            if (!this.cachedContextState.IsStateCached)
            {
                foreach (object parameter in context.BindingParameters)
                {
                    this.cachedContextState.CachedBindingParameters.Add(parameter);
                }
                this.cachedContextState.IsStateCached = true;
            }
            else
            {
                context.BindingParameters.Clear();
                foreach (object parameter in this.cachedContextState.CachedBindingParameters)
                {
                    context.BindingParameters.Add(parameter);
                }
            }
        }
 
        public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
        {
            if (context == null)
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("context");
 
            SubstituteCachedBindingContextParametersIfNeeded(context);
            return context.BuildInnerChannelFactory<TChannel>();
        }
 
 
        public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)
        {
            if (context == null)
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("context");
            ChannelDemuxerFilter filter = context.BindingParameters.Remove<ChannelDemuxerFilter>();
            SubstituteCachedBindingContextParametersIfNeeded(context);
            if (filter == null)
                return demuxer.BuildChannelListener<TChannel>(context);
            else
                return demuxer.BuildChannelListener<TChannel>(context, filter);
        }
 
        public override bool CanBuildChannelFactory<TChannel>(BindingContext context)
        {
            if (context == null)
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("context");
 
            return context.CanBuildInnerChannelFactory<TChannel>();
        }
 
        public override bool CanBuildChannelListener<TChannel>(BindingContext context)
        {
            if (context == null)
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("context");
 
            return context.CanBuildInnerChannelListener<TChannel>();
        }
 
        public override BindingElement Clone()
        {
            return new ChannelDemuxerBindingElement(this);
        }
 
        public override T GetProperty<T>(BindingContext context)
        {
            // augment the context with cached binding parameters
            if (this.cacheContextState && this.cachedContextState.IsStateCached)
            {
                for (int i = 0; i < this.cachedContextState.CachedBindingParameters.Count; ++i)
                {
                    if (!context.BindingParameters.Contains(this.cachedContextState.CachedBindingParameters[i].GetType()))
                    {
                        context.BindingParameters.Add(this.cachedContextState.CachedBindingParameters[i]);
                    }
                }
            }
            return context.GetInnerProperty<T>();
        }
 
        class CachedBindingContextState
        {
            public bool IsStateCached;
            public BindingParameterCollection CachedBindingParameters;
 
            public CachedBindingContextState()
            {
                CachedBindingParameters = new BindingParameterCollection();
            }
        }
    }
 
    //
    // Demuxer filter
    //
 
    class ChannelDemuxerFilter
    {
        MessageFilter filter;
        int priority;
 
        public ChannelDemuxerFilter(MessageFilter filter, int priority)
        {
            this.filter = filter;
            this.priority = priority;
        }
 
        public MessageFilter Filter
        {
            get { return this.filter; }
        }
 
        public int Priority
        {
            get { return this.priority; }
        }
    }
 
    class ReplyChannelDemuxFailureAsyncResult : AsyncResult
    {
        static AsyncCallback demuxFailureHandlerCallback = Fx.ThunkCallback(new AsyncCallback(DemuxFailureHandlerCallback));
        IChannelDemuxFailureHandler demuxFailureHandler;
        RequestContext requestContext;
 
        public ReplyChannelDemuxFailureAsyncResult(IChannelDemuxFailureHandler demuxFailureHandler, RequestContext requestContext, AsyncCallback callback, object state)
            : base(callback, state)
        {
            if (demuxFailureHandler == null)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("demuxFailureHandler");
            }
            if (requestContext == null)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("requestContext");
            }
            this.demuxFailureHandler = demuxFailureHandler;
            this.requestContext = requestContext;
        }
 
        public void Start()
        {
            IAsyncResult result = this.demuxFailureHandler.BeginHandleDemuxFailure(requestContext.RequestMessage, requestContext, demuxFailureHandlerCallback, this);
            if (!result.CompletedSynchronously)
            {
                return;
            }
            this.demuxFailureHandler.EndHandleDemuxFailure(result);
            if (this.OnDemuxFailureHandled())
            {
                Complete(true);
            }
        }
 
        protected virtual bool OnDemuxFailureHandled()
        {
            requestContext.Close();
            return true;
        }
 
        static void DemuxFailureHandlerCallback(IAsyncResult result)
        {
            if (result.CompletedSynchronously)
            {
                return;
            }
            ReplyChannelDemuxFailureAsyncResult self = (ReplyChannelDemuxFailureAsyncResult)(result.AsyncState);
            bool completeSelf = false;
            Exception completionException = null;
            try
            {
                self.demuxFailureHandler.EndHandleDemuxFailure(result);
                completeSelf = self.OnDemuxFailureHandled();
            }
#pragma warning suppress 56500 // covered by FxCOP
            catch (Exception e)
            {
                if (Fx.IsFatal(e)) throw;
                completeSelf = true;
                completionException = e;
            }
            if (completeSelf)
            {
                self.Complete(false, completionException);
            }
        }
 
        public static void End(IAsyncResult result)
        {
            AsyncResult.End<ReplyChannelDemuxFailureAsyncResult>(result);
        }
    }
 
    class ReplySessionDemuxFailureAsyncResult : ReplyChannelDemuxFailureAsyncResult
    {
        static AsyncCallback closeChannelCallback = Fx.ThunkCallback(new AsyncCallback(ChannelCloseCallback));
        IReplySessionChannel channel;
 
        public ReplySessionDemuxFailureAsyncResult(IChannelDemuxFailureHandler demuxFailureHandler, RequestContext requestContext, IReplySessionChannel channel, AsyncCallback callback, object state)
            : base(demuxFailureHandler, requestContext, callback, state)
        {
            this.channel = channel;
        }
 
        protected override bool OnDemuxFailureHandled()
        {
            base.OnDemuxFailureHandled();
            IAsyncResult result = this.channel.BeginClose(closeChannelCallback, this);
            if (!result.CompletedSynchronously)
            {
                return false;
            }
            this.channel.EndClose(result);
            return true;
        }
 
        static void ChannelCloseCallback(IAsyncResult result)
        {
            if (result.CompletedSynchronously)
            {
                return;
            }
            ReplySessionDemuxFailureAsyncResult self = (ReplySessionDemuxFailureAsyncResult)result.AsyncState;
            Exception completionException = null;
            try
            {
                self.channel.EndClose(result);
            }
#pragma warning suppress 56500 // covered by FxCOP
            catch (Exception e)
            {
                if (Fx.IsFatal(e)) throw;
                completionException = e;
            }
            self.Complete(false, completionException);
        }
 
        public static new void End(IAsyncResult result)
        {
            AsyncResult.End<ReplySessionDemuxFailureAsyncResult>(result);
        }
    }
 
    class DuplexSessionDemuxFailureAsyncResult : AsyncResult
    {
        static AsyncCallback demuxFailureHandlerCallback = Fx.ThunkCallback(new AsyncCallback(DemuxFailureHandlerCallback));
        static AsyncCallback channelCloseCallback = Fx.ThunkCallback(new AsyncCallback(ChannelCloseCallback));
        IChannelDemuxFailureHandler demuxFailureHandler;
        IDuplexSessionChannel channel;
        Message message;
 
        public DuplexSessionDemuxFailureAsyncResult(IChannelDemuxFailureHandler demuxFailureHandler, IDuplexSessionChannel channel, Message message, AsyncCallback callback, object state)
            : base(callback, state)
        {
            if (demuxFailureHandler == null)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("demuxFailureHandler");
            }
            if (channel == null)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("channel");
            }
            this.demuxFailureHandler = demuxFailureHandler;
            this.channel = channel;
            this.message = message;
        }
 
        public void Start()
        {
            IAsyncResult result = this.demuxFailureHandler.BeginHandleDemuxFailure(this.message, this.channel, demuxFailureHandlerCallback, this);
            if (!result.CompletedSynchronously)
            {
                return;
            }
            this.demuxFailureHandler.EndHandleDemuxFailure(result);
            if (this.OnDemuxFailureHandled())
            {
                Complete(true);
            }
        }
 
        bool OnDemuxFailureHandled()
        {
            IAsyncResult result = this.channel.BeginClose(channelCloseCallback, this);
            if (!result.CompletedSynchronously)
            {
                return false;
            }
            this.channel.EndClose(result);
            this.message.Close();
            return true;
        }
 
        static void DemuxFailureHandlerCallback(IAsyncResult result)
        {
            if (result.CompletedSynchronously)
            {
                return;
            }
            DuplexSessionDemuxFailureAsyncResult self = (DuplexSessionDemuxFailureAsyncResult)result.AsyncState;
            bool completeSelf = false;
            Exception completionException = null;
            try
            {
                self.demuxFailureHandler.EndHandleDemuxFailure(result);
                completeSelf = self.OnDemuxFailureHandled();
            }
#pragma warning suppress 56500 // covered by FxCOP
            catch (Exception e)
            {
                if (Fx.IsFatal(e)) throw;
                completeSelf = true;
                completionException = e;
            }
            if (completeSelf)
            {
                self.Complete(false, completionException);
            }
        }
 
        static void ChannelCloseCallback(IAsyncResult result)
        {
            if (result.CompletedSynchronously)
            {
                return;
            }
            DuplexSessionDemuxFailureAsyncResult self = (DuplexSessionDemuxFailureAsyncResult)result.AsyncState;
            Exception completionException = null;
            try
            {
                self.channel.EndClose(result);
                self.message.Close();
            }
#pragma warning suppress 56500 // covered by FxCOP
            catch (Exception e)
            {
                if (Fx.IsFatal(e)) throw;
                completionException = e;
            }
            self.Complete(false, completionException);
        }
 
        public static void End(IAsyncResult result)
        {
            AsyncResult.End<DuplexSessionDemuxFailureAsyncResult>(result);
        }
    }
 
    interface IChannelDemuxFailureHandler
    {
        void HandleDemuxFailure(Message message);
 
        IAsyncResult BeginHandleDemuxFailure(Message message, RequestContext faultContext, AsyncCallback callback, object state);
        IAsyncResult BeginHandleDemuxFailure(Message message, IOutputChannel faultContext, AsyncCallback callback, object state);
        void EndHandleDemuxFailure(IAsyncResult result);
    }
}