File: System\ServiceModel\Channels\InternalDuplexChannelListener.cs
Project: ndp\cdf\src\WCF\ServiceModel\System.ServiceModel.csproj (System.ServiceModel)
//----------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation.  All rights reserved.
//----------------------------------------------------------------------------
namespace System.ServiceModel.Channels
{
    using System.Runtime;
    using System.ServiceModel.Diagnostics;
 
    sealed class InternalDuplexChannelListener : DelegatingChannelListener<IDuplexChannel>
    {
        IChannelFactory<IOutputChannel> innerChannelFactory;
        bool providesCorrelation;
 
        internal InternalDuplexChannelListener(InternalDuplexBindingElement bindingElement, BindingContext context)
            : base(context.Binding, context.Clone().BuildInnerChannelListener<IInputChannel>())
        {
            this.innerChannelFactory = context.BuildInnerChannelFactory<IOutputChannel>();
            this.providesCorrelation = bindingElement.ProvidesCorrelation;
        }
 
        IOutputChannel GetOutputChannel(Uri to, TimeoutHelper timeoutHelper)
        {
            IOutputChannel channel = this.innerChannelFactory.CreateChannel(new EndpointAddress(to));
            channel.Open(timeoutHelper.RemainingTime());
            return channel;
        }
 
        protected override void OnAbort()
        {
            try
            {
                this.innerChannelFactory.Abort();
            }
            finally
            {
                base.OnAbort();
            }
        }
 
        protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return new ChainedCloseAsyncResult(timeout, callback, state, base.OnBeginClose, base.OnEndClose, this.innerChannelFactory);
        }
 
        protected override void OnEndClose(IAsyncResult result)
        {
            ChainedCloseAsyncResult.End(result);
        }
 
        protected override void OnClose(TimeSpan timeout)
        {
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            base.OnClose(timeoutHelper.RemainingTime());
            this.innerChannelFactory.Close(timeoutHelper.RemainingTime());
        }
 
        protected override void OnOpening()
        {
            base.OnOpening();
            this.Acceptor = (IChannelAcceptor<IDuplexChannel>)(object)new CompositeDuplexChannelAcceptor(this, (IChannelListener<IInputChannel>)this.InnerChannelListener);
        }
 
        protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return new ChainedOpenAsyncResult(timeout, callback, state, base.OnBeginOpen, base.OnEndOpen, this.innerChannelFactory);
        }
 
        protected override void OnEndOpen(IAsyncResult result)
        {
            ChainedOpenAsyncResult.End(result);
        }
 
        protected override void OnOpen(TimeSpan timeout)
        {
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            base.OnOpen(timeoutHelper.RemainingTime());
            this.innerChannelFactory.Open(timeoutHelper.RemainingTime());
        }
 
        public override T GetProperty<T>()
        {
            if (typeof(T) == typeof(IChannelFactory))
            {
                return (T)(object)innerChannelFactory;
            }
 
            if (typeof(T) == typeof(ISecurityCapabilities) && !this.providesCorrelation)
            {
                return InternalDuplexBindingElement.GetSecurityCapabilities<T>(base.GetProperty<ISecurityCapabilities>());
            }
 
            T baseProperty = base.GetProperty<T>();
            if (baseProperty != null)
            {
                return baseProperty;
            }
 
            return this.innerChannelFactory.GetProperty<T>();
        }
 
        sealed class CompositeDuplexChannelAcceptor : LayeredChannelAcceptor<IDuplexChannel, IInputChannel>
        {
            public CompositeDuplexChannelAcceptor(InternalDuplexChannelListener listener, IChannelListener<IInputChannel> innerListener)
                : base(listener, innerListener)
            {
            }
 
            protected override IDuplexChannel OnAcceptChannel(IInputChannel innerChannel)
            {
                return new ServerCompositeDuplexChannel((InternalDuplexChannelListener)ChannelManager, innerChannel);
            }
        }
 
        sealed class ServerCompositeDuplexChannel : ChannelBase, IDuplexChannel
        {
            IInputChannel innerInputChannel;
            TimeSpan sendTimeout;
 
            public ServerCompositeDuplexChannel(InternalDuplexChannelListener listener, IInputChannel innerInputChannel)
                : base(listener)
            {
                this.innerInputChannel = innerInputChannel;
                this.sendTimeout = listener.DefaultSendTimeout;
            }
 
            InternalDuplexChannelListener Listener
            {
                get { return (InternalDuplexChannelListener)base.Manager; }
            }
 
            public EndpointAddress LocalAddress
            {
                get { return this.innerInputChannel.LocalAddress; }
            }
 
            public EndpointAddress RemoteAddress
            {
                get { return null; }
            }
 
            public Uri Via
            {
                get { return null; }
            }
 
            public Message Receive()
            {
                return this.Receive(this.DefaultReceiveTimeout);
            }
 
            public Message Receive(TimeSpan timeout)
            {
                return InputChannel.HelpReceive(this, timeout);
            }
 
            public IAsyncResult BeginReceive(AsyncCallback callback, object state)
            {
                return this.BeginReceive(this.DefaultReceiveTimeout, callback, state);
            }
 
            public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
            {
                return InputChannel.HelpBeginReceive(this, timeout, callback, state);
            }
 
            public Message EndReceive(IAsyncResult result)
            {
                return InputChannel.HelpEndReceive(result);
            }
 
            public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
            {
                return this.innerInputChannel.BeginTryReceive(timeout, callback, state);
            }
 
            public IAsyncResult BeginSend(Message message, AsyncCallback callback, object state)
            {
                return this.BeginSend(message, this.DefaultSendTimeout, callback, state);
            }
 
            public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)
            {
                return new SendAsyncResult(this, message, timeout, callback, state);
            }
 
            public bool EndTryReceive(IAsyncResult result, out Message message)
            {
                return this.innerInputChannel.EndTryReceive(result, out message);
            }
 
            public void EndSend(IAsyncResult result)
            {
                SendAsyncResult.End(result);
            }
 
            protected override void OnAbort()
            {
                this.innerInputChannel.Abort();
            }
 
            protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
            {
                return this.innerInputChannel.BeginClose(timeout, callback, state);
            }
 
            protected override void OnEndClose(IAsyncResult result)
            {
                this.innerInputChannel.EndClose(result);
            }
 
            protected override void OnClose(TimeSpan timeout)
            {
                if (this.innerInputChannel.State == CommunicationState.Opened)
                    this.innerInputChannel.Close(timeout);
            }
 
            protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
            {
                return this.innerInputChannel.BeginOpen(callback, state);
            }
 
            protected override void OnEndOpen(IAsyncResult result)
            {
                this.innerInputChannel.EndOpen(result);
            }
 
            protected override void OnOpen(TimeSpan timeout)
            {
                this.innerInputChannel.Open(timeout);
            }
 
            public bool TryReceive(TimeSpan timeout, out Message message)
            {
                return this.innerInputChannel.TryReceive(timeout, out message);
            }
 
            public void Send(Message message)
            {
                this.Send(message, this.DefaultSendTimeout);
            }
 
            public void Send(Message message, TimeSpan timeout)
            {
                TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
                IOutputChannel outputChannel = ValidateStateAndGetOutputChannel(message, timeoutHelper);
                try
                {
                    outputChannel.Send(message, timeoutHelper.RemainingTime());
                    outputChannel.Close(timeoutHelper.RemainingTime());
                }
                finally
                {
                    outputChannel.Abort();
                }
            }
 
            IOutputChannel ValidateStateAndGetOutputChannel(Message message, TimeoutHelper timeoutHelper)
            {
                ThrowIfDisposedOrNotOpen();
                if (message == null)
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("message");
                }
                Uri to = message.Properties.Via;
                if (to == null)
                {
                    to = message.Headers.To;
                    if (to == null)
                    {
                        throw TraceUtility.ThrowHelperError(new CommunicationException(SR.GetString(SR.MessageMustHaveViaOrToSetForSendingOnServerSideCompositeDuplexChannels)), message);
                    }
                    //Check for EndpointAddress.AnonymousUri is just redundant
                    else if (to.Equals(EndpointAddress.AnonymousUri) || to.Equals(message.Version.Addressing.AnonymousUri))
                    {
                        throw TraceUtility.ThrowHelperError(new CommunicationException(SR.GetString(SR.MessageToCannotBeAddressedToAnonymousOnServerSideCompositeDuplexChannels, to)), message);
                    }
                }
                //Check for EndpointAddress.AnonymousUri is just redundant
                else if (to.Equals(EndpointAddress.AnonymousUri) || to.Equals(message.Version.Addressing.AnonymousUri))
                {
                    throw TraceUtility.ThrowHelperError(new CommunicationException(SR.GetString(SR.MessageViaCannotBeAddressedToAnonymousOnServerSideCompositeDuplexChannels, to)), message);
                }
                return this.Listener.GetOutputChannel(to, timeoutHelper);
            }
 
            class SendAsyncResult : AsyncResult
            {
                IOutputChannel outputChannel;
                static AsyncCallback sendCompleteCallback = Fx.ThunkCallback(new AsyncCallback(SendCompleteCallback));
                TimeoutHelper timeoutHelper;
 
                public SendAsyncResult(ServerCompositeDuplexChannel outer, Message message, TimeSpan timeout, AsyncCallback callback, object state)
                    : base(callback, state)
                {
                    this.timeoutHelper = new TimeoutHelper(timeout);
                    this.outputChannel = outer.ValidateStateAndGetOutputChannel(message, timeoutHelper);
 
                    bool success = false;
                    try
                    {
                        IAsyncResult result = outputChannel.BeginSend(message, timeoutHelper.RemainingTime(), sendCompleteCallback, this);
                        if (result.CompletedSynchronously)
                        {
                            CompleteSend(result);
                            this.Complete(true);
                        }
                        success = true;
                    }
                    finally
                    {
                        if (!success)
                            this.outputChannel.Abort();
                    }
                }
 
                void CompleteSend(IAsyncResult result)
                {
                    try
                    {
                        outputChannel.EndSend(result);
                        outputChannel.Close();
                    }
                    finally
                    {
                        outputChannel.Abort();
                    }
                }
 
                internal static void End(IAsyncResult result)
                {
                    AsyncResult.End<SendAsyncResult>(result);
                }
 
                static void SendCompleteCallback(IAsyncResult result)
                {
                    if (result.CompletedSynchronously)
                    {
                        return;
                    }
 
                    SendAsyncResult thisPtr = (SendAsyncResult)result.AsyncState;
 
                    Exception completionException = null;
                    try
                    {
                        thisPtr.CompleteSend(result);
                    }
#pragma warning suppress 56500 // Microsoft, transferring exception to another thread
                    catch (Exception e)
                    {
                        if (Fx.IsFatal(e))
                        {
                            throw;
                        }
                        completionException = e;
                    }
                    thisPtr.Complete(false, completionException);
                }
            }
 
            public bool WaitForMessage(TimeSpan timeout)
            {
                return innerInputChannel.WaitForMessage(timeout);
            }
 
            public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
            {
                return innerInputChannel.BeginWaitForMessage(timeout, callback, state);
            }
 
            public bool EndWaitForMessage(IAsyncResult result)
            {
                return innerInputChannel.EndWaitForMessage(result);
            }
        }
    }
}