File: System\ServiceModel\Activation\ListenerConnectionDemuxer.cs
Project: ndp\cdf\src\WCF\SMSvcHost\SMSvcHost.csproj (SMSvcHost)
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation.  All rights reserved.
//------------------------------------------------------------
 
namespace System.ServiceModel.Activation
{
    using System;
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.IO;
    using System.Runtime;
    using System.ServiceModel;
    using System.ServiceModel.Activation.Diagnostics;
    using System.ServiceModel.Channels;
 
    class ListenerConnectionDemuxer
    {
        ConnectionAcceptor acceptor;
        List<InitialServerConnectionReader> connectionReaders;
        bool isDisposed;
 
        ListenerConnectionModeCallback onConnectionModeKnown;
        ConnectionClosedCallback onConnectionClosed;
        // this is the one provided by the caller
        ConnectionHandleDuplicated connectionHandleDuplicated;
        // this is the onw we use internally
        ViaDecodedCallback onViaDecoded;
        TransportType transportType;
        TimeSpan channelInitializationTimeout;
 
        public ListenerConnectionDemuxer(IConnectionListener listener, 
            TransportType transportType,
            int maxAccepts, int initialMaxPendingConnections,
            TimeSpan channelInitializationTimeout,
            ConnectionHandleDuplicated connectionHandleDuplicated)
        {
            this.transportType = transportType;
            this.connectionReaders = new List<InitialServerConnectionReader>();
            this.connectionHandleDuplicated = connectionHandleDuplicated;
            this.acceptor = new ConnectionAcceptor(listener, maxAccepts, initialMaxPendingConnections, OnConnectionAvailable);
            this.channelInitializationTimeout = channelInitializationTimeout;
            this.onConnectionClosed = new ConnectionClosedCallback(OnConnectionClosed);
            this.onViaDecoded = new ViaDecodedCallback(OnViaDecoded);
        }
 
        object ThisLock
        {
            get { return this; }
        }
 
        public void Dispose()
        {
            lock (ThisLock)
            {
                if (isDisposed)
                    return;
 
                isDisposed = true;
            }
 
            for (int i = 0; i < connectionReaders.Count; i++)
            {
                connectionReaders[i].Dispose();
            }
 
            connectionReaders.Clear();
            acceptor.Dispose();
        }
 
        ListenerConnectionModeReader SetupModeReader(IConnection connection)
        {
            if (onConnectionModeKnown == null)
            {
                onConnectionModeKnown = new ListenerConnectionModeCallback(OnConnectionModeKnown);
            }
 
            ListenerConnectionModeReader modeReader = new ListenerConnectionModeReader(connection, onConnectionModeKnown, onConnectionClosed);
            lock (ThisLock)
            {
                if (isDisposed)
                {
                    modeReader.Dispose();
                    return null;
                }
                else
                {
                    connectionReaders.Add(modeReader);
                    return modeReader;
                }
            }
        }
 
        void OnConnectionAvailable(IConnection connection, Action connectionDequeuedCallback)
        {
            if (transportType == TransportType.Tcp)
            {
                ListenerPerfCounters.IncrementConnectionsAcceptedTcp();
            }
            else
            {
                ListenerPerfCounters.IncrementConnectionsAcceptedNamedPipe();
            }
            
            ListenerConnectionModeReader modeReader = SetupModeReader(connection);
 
            if (modeReader != null)
            {
                // StartReading() will never throw non-fatal exceptions; 
                // it propagates all exceptions into the onConnectionModeKnown callback, 
                // which is where we need our robust handling
                modeReader.StartReading(this.channelInitializationTimeout, connectionDequeuedCallback);
            }
            else
            {
                connectionDequeuedCallback();
            }
        }
 
        void OnConnectionModeKnown(ListenerConnectionModeReader modeReader)
        {
            lock (ThisLock)
            {
                if (isDisposed)
                {
                    return;
                }
 
                connectionReaders.Remove(modeReader);
            }
 
            try
            {
                FramingMode framingMode = modeReader.GetConnectionMode();
                switch (framingMode)
                {
                    case FramingMode.Duplex:
                        OnDuplexConnection(modeReader);
                        break;
                    case FramingMode.Singleton:
                        OnSingletonConnection(modeReader);
                        break;
                    default:
                        {
                            Exception inner = new InvalidDataException(SR.GetString(
                                SR.FramingModeNotSupported, framingMode));
                            Exception exception = new ProtocolException(inner.Message, inner);
                            FramingEncodingString.AddFaultString(exception, FramingEncodingString.UnsupportedModeFault);
                            throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(exception);
                        }
                }
            }
            catch (ProtocolException exception)
            {
                DiagnosticUtility.TraceHandledException(exception, TraceEventType.Information);
 
                modeReader.Dispose();
            }
            catch (Exception exception)
            {
                if (Fx.IsFatal(exception))
                {
                    throw;
                }
 
                DiagnosticUtility.TraceHandledException(exception, TraceEventType.Error);
 
                // containment -- abort the errant reader
                modeReader.Dispose();
            }
        }
 
        void OnViaDecoded(InitialServerConnectionReader connectionReader, ListenerSessionConnection session)
        {
            try
            {
                connectionHandleDuplicated(session);
            }
            finally
            {
                session.TriggerDequeuedCallback();
            }
            lock (ThisLock)
            {
                if (isDisposed)
                {
                    return;
                }
 
                connectionReaders.Remove(connectionReader);
            }
        }
 
        void OnConnectionClosed(InitialServerConnectionReader connectionReader)
        {
            lock (ThisLock)
            {
                if (isDisposed)
                {
                    return;
                }
 
                connectionReaders.Remove(connectionReader);
            }
        }
 
        void OnSingletonConnection(ListenerConnectionModeReader modeReader)
        {
            ListenerSingletonConnectionReader singletonReader = new ListenerSingletonConnectionReader(
                modeReader.Connection, modeReader.GetConnectionDequeuedCallback(),
                transportType, modeReader.StreamPosition,
                modeReader.BufferOffset, modeReader.BufferSize, 
                onConnectionClosed, onViaDecoded);
 
            lock (ThisLock)
            {
                if (isDisposed)
                {
                    singletonReader.Dispose();
                    return;
                }
 
                connectionReaders.Add(singletonReader);
            }
            singletonReader.StartReading(modeReader.AccruedData, modeReader.GetRemainingTimeout());
        }
 
        void OnDuplexConnection(ListenerConnectionModeReader modeReader)
        {
            ListenerSessionConnectionReader sessionReader = new ListenerSessionConnectionReader(
                modeReader.Connection, modeReader.GetConnectionDequeuedCallback(), 
                transportType, modeReader.StreamPosition,
                modeReader.BufferOffset, modeReader.BufferSize,
                onConnectionClosed, onViaDecoded);
 
            lock (ThisLock)
            {
                if (isDisposed)
                {
                    sessionReader.Dispose();
                    return;
                }
 
                connectionReaders.Add(sessionReader);
            }
            sessionReader.StartReading(modeReader.AccruedData, modeReader.GetRemainingTimeout());
        }
 
        public void StartDemuxing()
        {
            acceptor.StartAccepting();
        }
    }
}