File: System\ServiceModel\Dispatcher\ListenerHandler.cs
Project: ndp\cdf\src\WCF\ServiceModel\System.ServiceModel.csproj (System.ServiceModel)
//-----------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------------------------
 
namespace System.ServiceModel.Dispatcher
{
    using System;
    using System.Diagnostics;
    using System.Runtime;
    using System.Runtime.CompilerServices;
    using System.ServiceModel;
    using System.ServiceModel.Channels;
    using System.Threading;
    using System.Transactions;
    using SessionIdleManager = System.ServiceModel.Channels.ServiceChannel.SessionIdleManager;
 
    class ListenerHandler : CommunicationObject, ISessionThrottleNotification
    {
        static AsyncCallback acceptCallback = Fx.ThunkCallback(new AsyncCallback(ListenerHandler.AcceptCallback));
        static Action<object> initiateChannelPump = new Action<object>(ListenerHandler.InitiateChannelPump);
        static AsyncCallback waitCallback = Fx.ThunkCallback(new AsyncCallback(ListenerHandler.WaitCallback));
 
        readonly ErrorHandlingAcceptor acceptor;
        readonly ChannelDispatcher channelDispatcher;
        ListenerChannel channel;
        SessionIdleManager idleManager;
        bool acceptedNull;
        bool doneAccepting;
        EndpointDispatcherTable endpoints;
        readonly ServiceHostBase host;
        readonly IListenerBinder listenerBinder;
        readonly ServiceThrottle throttle;
        IDefaultCommunicationTimeouts timeouts;
        WrappedTransaction wrappedTransaction;
 
        internal ListenerHandler(IListenerBinder listenerBinder, ChannelDispatcher channelDispatcher, ServiceHostBase host, ServiceThrottle throttle, IDefaultCommunicationTimeouts timeouts)
        {
            this.listenerBinder = listenerBinder;
            if (!((this.listenerBinder != null)))
            {
                Fx.Assert("ListenerHandler.ctor: (this.listenerBinder != null)");
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("listenerBinder");
            }
 
            this.channelDispatcher = channelDispatcher;
            if (!((this.channelDispatcher != null)))
            {
                Fx.Assert("ListenerHandler.ctor: (this.channelDispatcher != null)");
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("channelDispatcher");
            }
 
            this.host = host;
            if (!((this.host != null)))
            {
                Fx.Assert("ListenerHandler.ctor: (this.host != null)");
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("host");
            }
 
            this.throttle = throttle;
            if (!((this.throttle != null)))
            {
                Fx.Assert("ListenerHandler.ctor: (this.throttle != null)");
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("throttle");
            }
 
            this.timeouts = timeouts;
 
            this.endpoints = channelDispatcher.EndpointDispatcherTable;
            this.acceptor = new ErrorHandlingAcceptor(listenerBinder, channelDispatcher);
        }
 
        internal ChannelDispatcher ChannelDispatcher
        {
            get { return this.channelDispatcher; }
        }
 
        internal ListenerChannel Channel
        {
            get { return this.channel; }
        }
 
        protected override TimeSpan DefaultCloseTimeout
        {
            get { return this.host.CloseTimeout; }
        }
 
        protected override TimeSpan DefaultOpenTimeout
        {
            get { return this.host.OpenTimeout; }
        }
 
        internal EndpointDispatcherTable Endpoints
        {
            get { return this.endpoints; }
            set { this.endpoints = value; }
        }
 
        internal ServiceHostBase Host
        {
            get { return this.host; }
        }
 
        new internal object ThisLock
        {
            get { return base.ThisLock; }
        }
 
        protected override void OnOpen(TimeSpan timeout)
        {
        }
 
        protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return new CompletedAsyncResult(callback, state);
        }
 
        protected override void OnEndOpen(IAsyncResult result)
        {
            CompletedAsyncResult.End(result);
        }
 
        protected override void OnOpened()
        {
            base.OnOpened();
            this.channelDispatcher.Channels.IncrementActivityCount();
            if (this.channelDispatcher.IsTransactedReceive && this.channelDispatcher.ReceiveContextEnabled && this.channelDispatcher.MaxTransactedBatchSize > 0)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.IncompatibleBehaviors)));
            }
            NewChannelPump();
        }
 
        internal void NewChannelPump()
        {
            ActionItem.Schedule(ListenerHandler.initiateChannelPump, this);
        }
 
        static void InitiateChannelPump(object state)
        {
            ListenerHandler listenerHandler = state as ListenerHandler;
 
            if (listenerHandler.ChannelDispatcher.IsTransactedAccept)
            {
                if (listenerHandler.ChannelDispatcher.AsynchronousTransactedAcceptEnabled)
                {
                    listenerHandler.AsyncTransactedChannelPump();
                }
                else
                {
                    listenerHandler.SyncTransactedChannelPump();
                }
            }
            else
            {
                listenerHandler.ChannelPump();
            }
        }
 
        void ChannelPump()
        {
            IChannelListener listener = this.listenerBinder.Listener;
 
            for (;;)
            {
                if (this.acceptedNull || (listener.State == CommunicationState.Faulted))
                {
                    this.DoneAccepting();
                    break;
                }
 
                if (!this.AcceptAndAcquireThrottle())
                {
                    break;
                }
 
                this.Dispatch();
            }
        }
 
        [MethodImpl(MethodImplOptions.NoInlining)]
        void SyncTransactedChannelPump()
        {
            IChannelListener listener = this.listenerBinder.Listener;
 
            for (;;)
            {
                if (this.acceptedNull || (listener.State == CommunicationState.Faulted))
                {
                    this.DoneAccepting();
                    break;
                }
 
                this.acceptor.WaitForChannel();
 
                Transaction tx;
                if (this.TransactedAccept(out tx))
                {
                    if (null != tx)
                    {
                        this.wrappedTransaction = new WrappedTransaction(tx);
 
                        if (!this.AcquireThrottle())
                            break;
 
                        this.Dispatch();
                    }
                }
            }
        }
 
        void AsyncTransactedChannelPump()
        {
            IChannelListener listener = this.listenerBinder.Listener;
 
            for (;;)
            {
                if (this.acceptedNull || (listener.State == CommunicationState.Faulted))
                {
                    this.DoneAccepting();
                    break;
                }
 
                IAsyncResult result = this.acceptor.BeginWaitForChannel(ListenerHandler.waitCallback, this);
 
                if (!result.CompletedSynchronously)
                {
                    break;
                }
 
                this.acceptor.EndWaitForChannel(result);
                if (!this.AcceptChannel(listener))
                {
                    break;
                }
            }
        }
 
        static void WaitCallback(IAsyncResult result)
        {
            if (result.CompletedSynchronously)
            {
                return;
            };
 
            ListenerHandler listenerHandler = (ListenerHandler)result.AsyncState;
            IChannelListener listener = listenerHandler.listenerBinder.Listener;
 
            listenerHandler.acceptor.EndWaitForChannel(result);
            if (listenerHandler.AcceptChannel(listener))
            {
                listenerHandler.AsyncTransactedChannelPump();
            }
        }
 
        bool AcceptChannel(IChannelListener listener)
        {
            Transaction tx;
 
            if (this.TransactedAccept(out tx))
            {
                if (null != tx)
                {
                    this.wrappedTransaction = new WrappedTransaction(tx);
 
                    if (!this.AcquireThrottle())
                    {
                        return false;
                    }
 
                    this.Dispatch();
                }
            }
 
            return true;
        }
 
        void AbortChannels()
        {
            IChannel[] channels = this.channelDispatcher.Channels.ToArray();
            for (int index = 0; index < channels.Length; index++)
            {
                channels[index].Abort();
            }
        }
 
        bool AcceptAndAcquireThrottle()
        {
            IAsyncResult result = this.acceptor.BeginTryAccept(TimeSpan.MaxValue, ListenerHandler.acceptCallback, this);
            if (result.CompletedSynchronously)
            {
                return HandleEndAccept(result);
            }
            return false;
        }
 
        bool TransactedAccept(out Transaction tx)
        {
            tx = null;
 
            try
            {
                tx = TransactionBehavior.CreateTransaction(this.ChannelDispatcher.TransactionIsolationLevel, this.ChannelDispatcher.TransactionTimeout);
 
                IChannelBinder binder = null;
                using (TransactionScope scope = new TransactionScope(tx))
                {
                    TimeSpan acceptTimeout = TimeoutHelper.Min(this.ChannelDispatcher.TransactionTimeout, this.ChannelDispatcher.DefaultCommunicationTimeouts.ReceiveTimeout);
                    if (!this.acceptor.TryAccept(TransactionBehavior.NormalizeTimeout(acceptTimeout), out binder))
                    {
                        return false;
                    }
                    scope.Complete();
                }
                if (null != binder)
                {
                    this.channel = new ListenerChannel(binder);
                    this.idleManager = SessionIdleManager.CreateIfNeeded(this.channel.Binder, this.channelDispatcher.DefaultCommunicationTimeouts.ReceiveTimeout);
                    return true;
                }
                else
                {
                    this.AcceptedNull();
                    tx = null;
                    return false;
                }
            }
            catch (CommunicationException e)
            {
                if (null != tx)
                {
                    try
                    {
                        tx.Rollback();
                    }
                    catch (TransactionException ex)
                    {
                        DiagnosticUtility.TraceHandledException(ex, TraceEventType.Information);
                    }
                }
                tx = null;
 
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
 
                return false;
            }
            catch (TransactionException e)
            {
                tx = null;
 
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
 
                return false;
            }
        }
 
        ListenerChannel CompleteAccept(IAsyncResult result)
        {
            IChannelBinder binder;
            bool valid = this.acceptor.EndTryAccept(result, out binder);
 
            if (valid)
            {
                if (binder != null)
                {
                    return new ListenerChannel(binder);
                }
                else
                {
                    this.AcceptedNull();
                    return null;
                }
            }
            else
            {
                return null;
            }
        }
 
        bool HandleEndAccept(IAsyncResult result)
        {
            this.channel = this.CompleteAccept(result);
 
            if (this.channel != null)
            {
                Fx.Assert(this.idleManager == null, "There cannot be an existing idle manager");
                this.idleManager = SessionIdleManager.CreateIfNeeded(this.channel.Binder, this.channelDispatcher.DefaultCommunicationTimeouts.ReceiveTimeout);
            }
            else
            {
                this.DoneAccepting();
                return true;
            }
 
            return this.AcquireThrottle();
        }
 
        static void AcceptCallback(IAsyncResult result)
        {
            if (result.CompletedSynchronously)
            {
                return;
            }
 
            ListenerHandler thisPtr = (ListenerHandler)result.AsyncState;
 
            if (thisPtr.HandleEndAccept(result))
            {
                thisPtr.Dispatch();
                thisPtr.ChannelPump();
            }
        }
 
        bool AcquireThrottle()
        {
            if ((this.channel != null) && (this.throttle != null) && (this.channelDispatcher.Session))
            {
                return this.throttle.AcquireSession(this);
            }
 
            return true;
        }
 
        // This callback always occurs async and always on a dirty thread
        public void ThrottleAcquired()
        {
            this.Dispatch();
            this.NewChannelPump();
        }
 
        void CloseChannel(IChannel channel, TimeSpan timeout)
        {
            try
            {
                if (channel.State != CommunicationState.Closing && channel.State != CommunicationState.Closed)
                {
                    CloseChannelState state = new CloseChannelState(this, channel);
                    if (channel is ISessionChannel<IDuplexSession>)
                    {
                        IDuplexSession duplexSession = ((ISessionChannel<IDuplexSession>)channel).Session;
                        IAsyncResult result = duplexSession.BeginCloseOutputSession(timeout, Fx.ThunkCallback(new AsyncCallback(CloseOutputSessionCallback)), state);
                        if (result.CompletedSynchronously)
                            duplexSession.EndCloseOutputSession(result);
                    }
                    else
                    {
                        IAsyncResult result = channel.BeginClose(timeout, Fx.ThunkCallback(new AsyncCallback(CloseChannelCallback)), state);
                        if (result.CompletedSynchronously)
                            channel.EndClose(result);
                    }
                }
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e))
                {
                    throw;
                }
                this.HandleError(e);
 
                if (channel is ISessionChannel<IDuplexSession>)
                {
                    channel.Abort();
                }
            }
        }
 
        static void CloseChannelCallback(IAsyncResult result)
        {
            if (result.CompletedSynchronously)
            {
                return;
            }
 
            CloseChannelState state = (CloseChannelState)result.AsyncState;
            try
            {
                state.Channel.EndClose(result);
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e))
                {
                    throw;
                }
                state.ListenerHandler.HandleError(e);
            }
        }
 
        public void CloseInput(TimeSpan timeout)
        {
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            // Close all datagram channels
            IChannel[] channels = this.channelDispatcher.Channels.ToArray();
            for (int index = 0; index < channels.Length; index++)
            {
                IChannel channel = channels[index];
                if (!this.IsSessionChannel(channel))
                {
                    try
                    {
                        channel.Close(timeoutHelper.RemainingTime());
                    }
                    catch (Exception e)
                    {
                        if (Fx.IsFatal(e))
                        {
                            throw;
                        }
                        this.HandleError(e);
                    }
                }
            }
        }
 
        static void CloseOutputSessionCallback(IAsyncResult result)
        {
            if (result.CompletedSynchronously)
            {
                return;
            }
 
            CloseChannelState state = (CloseChannelState)result.AsyncState;
            try
            {
                ((ISessionChannel<IDuplexSession>)state.Channel).Session.EndCloseOutputSession(result);
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e))
                {
                    throw;
                }
                state.ListenerHandler.HandleError(e);
                state.Channel.Abort();
            }
        }
 
        void CloseChannels(TimeSpan timeout)
        {
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            IChannel[] channels = this.channelDispatcher.Channels.ToArray();
            for (int index = 0; index < channels.Length; index++)
                CloseChannel(channels[index], timeoutHelper.RemainingTime());
        }
 
        void Dispatch()
        {
            ListenerChannel channel = this.channel;
            SessionIdleManager idleManager = this.idleManager;
            this.channel = null;
            this.idleManager = null;
 
            try
            {
                if (channel != null)
                {
                    ChannelHandler handler = new ChannelHandler(listenerBinder.MessageVersion, channel.Binder, this.throttle, this, (channel.Throttle != null), this.wrappedTransaction, idleManager);
 
                    if (!channel.Binder.HasSession)
                    {
                        this.channelDispatcher.Channels.Add(channel.Binder.Channel);
                    }
 
                    if (channel.Binder is DuplexChannelBinder)
                    {
                        DuplexChannelBinder duplexChannelBinder = channel.Binder as DuplexChannelBinder;
                        duplexChannelBinder.ChannelHandler = handler;
                        duplexChannelBinder.DefaultCloseTimeout = this.DefaultCloseTimeout;
 
                        if (this.timeouts == null)
                            duplexChannelBinder.DefaultSendTimeout = ServiceDefaults.SendTimeout;
                        else
                            duplexChannelBinder.DefaultSendTimeout = timeouts.SendTimeout;
                    }
 
                    ChannelHandler.Register(handler);
                    channel = null;
                    idleManager = null;
                }
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e))
                {
                    throw;
                }
                this.HandleError(e);
            }
            finally
            {
                if (channel != null)
                {
                    channel.Binder.Channel.Abort();
                    if (this.throttle != null && this.channelDispatcher.Session)
                    {
                        this.throttle.DeactivateChannel();
                    }
                    if (idleManager != null)
                    {
                        idleManager.CancelTimer();
                    }
                }
            }
        }
 
        void AcceptedNull()
        {
            this.acceptedNull = true;
        }
 
        void DoneAccepting()
        {
            lock (this.ThisLock)
            {
                if (!this.doneAccepting)
                {
                    this.doneAccepting = true;
                    this.channelDispatcher.Channels.DecrementActivityCount();
                }
            }
        }
 
        bool IsSessionChannel(IChannel channel)
        {
            return (channel is ISessionChannel<IDuplexSession> ||
                    channel is ISessionChannel<IInputSession> ||
                    channel is ISessionChannel<IOutputSession>);
        }
 
        void CancelPendingIdleManager()
        {
            SessionIdleManager idleManager = this.idleManager;
            if (idleManager != null)
            {
                idleManager.CancelTimer();
            }
        }
 
        protected override void OnAbort()
        {
            // if there's an idle manager that has not been transferred to the channel handler, cancel it
            CancelPendingIdleManager();
 
            // Start aborting incoming channels
            this.channelDispatcher.Channels.CloseInput();
 
            // Abort existing channels
            this.AbortChannels();
 
            // Wait for channels to finish aborting
            this.channelDispatcher.Channels.Abort();
 
        }
 
        protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
        {
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
 
            // if there's an idle manager that has not been cancelled, cancel it
            CancelPendingIdleManager();
 
            // Start aborting incoming channels
            this.channelDispatcher.Channels.CloseInput();
 
            // Start closing existing channels
            this.CloseChannels(timeoutHelper.RemainingTime());
 
            // Wait for channels to finish closing
            return this.channelDispatcher.Channels.BeginClose(timeoutHelper.RemainingTime(), callback, state);
        }
 
        protected override void OnClose(TimeSpan timeout)
        {
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
 
            // if there's an idle manager that has not been cancelled, cancel it
            CancelPendingIdleManager();
 
            // Start aborting incoming channels
            this.channelDispatcher.Channels.CloseInput();
 
            // Start closing existing channels
            this.CloseChannels(timeoutHelper.RemainingTime());
 
            // Wait for channels to finish closing
            this.channelDispatcher.Channels.Close(timeoutHelper.RemainingTime());
        }
 
        protected override void OnEndClose(IAsyncResult result)
        {
            this.channelDispatcher.Channels.EndClose(result);
        }
 
        bool HandleError(Exception e)
        {
            return this.channelDispatcher.HandleError(e);
        }
 
        class CloseChannelState
        {
            ListenerHandler listenerHandler;
            IChannel channel;
 
            internal CloseChannelState(ListenerHandler listenerHandler, IChannel channel)
            {
                this.listenerHandler = listenerHandler;
                this.channel = channel;
            }
 
            internal ListenerHandler ListenerHandler
            {
                get { return this.listenerHandler; }
            }
 
            internal IChannel Channel
            {
                get { return this.channel; }
            }
        }
    }
 
    class ListenerChannel
    {
        IChannelBinder binder;
        ServiceThrottle throttle;
 
        public ListenerChannel(IChannelBinder binder)
        {
            this.binder = binder;
        }
 
        public IChannelBinder Binder
        {
            get { return this.binder; }
        }
 
        public ServiceThrottle Throttle
        {
            get { return this.throttle; }
            set { this.throttle = value; }
        }
    }
 
    class WrappedTransaction
    {
        Transaction transaction;
 
        internal WrappedTransaction(Transaction transaction)
        {
            this.transaction = transaction;
        }
 
        internal Transaction Transaction
        {
            get { return this.transaction; }
        }
    }
}