File: System\ServiceModel\Channels\SingletonConnectionReader.cs
Project: ndp\cdf\src\WCF\ServiceModel\System.ServiceModel.csproj (System.ServiceModel)
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation.  All rights reserved.
//------------------------------------------------------------
 
namespace System.ServiceModel.Channels
{
    using System;
    using System.Diagnostics;
    using System.IO;
    using System.Net;
    using System.Runtime;
    using System.Runtime.CompilerServices;
    using System.Security.Authentication.ExtendedProtection;
    using System.ServiceModel;
    using System.ServiceModel.Activation;
    using System.ServiceModel.Description;
    using System.ServiceModel.Diagnostics;
    using System.ServiceModel.Dispatcher;
    using System.ServiceModel.Security;
    using System.Threading;
    using System.Xml;
    using System.ServiceModel.Diagnostics.Application;
 
    delegate void ServerSingletonPreambleCallback(ServerSingletonPreambleConnectionReader serverSingletonPreambleReader);
    delegate ISingletonChannelListener SingletonPreambleDemuxCallback(ServerSingletonPreambleConnectionReader serverSingletonPreambleReader);
    interface ISingletonChannelListener
    {
        TimeSpan ReceiveTimeout { get; }
        void ReceiveRequest(RequestContext requestContext, Action callback, bool canDispatchOnThisThread);
    }
 
    class ServerSingletonPreambleConnectionReader : InitialServerConnectionReader
    {
        ServerSingletonDecoder decoder;
        ServerSingletonPreambleCallback callback;
        WaitCallback onAsyncReadComplete;
        IConnectionOrientedTransportFactorySettings transportSettings;
        TransportSettingsCallback transportSettingsCallback;
        SecurityMessageProperty security;
        Uri via;
        IConnection rawConnection;
        byte[] connectionBuffer;
        bool isReadPending;
        int offset;
        int size;
        TimeoutHelper receiveTimeoutHelper;
        Action<Uri> viaDelegate;
        ChannelBinding channelBindingToken;
        static AsyncCallback onValidate;
 
        public ServerSingletonPreambleConnectionReader(IConnection connection, Action connectionDequeuedCallback,
            long streamPosition, int offset, int size, TransportSettingsCallback transportSettingsCallback,
            ConnectionClosedCallback closedCallback, ServerSingletonPreambleCallback callback)
            : base(connection, closedCallback)
        {
            this.decoder = new ServerSingletonDecoder(streamPosition, MaxViaSize, MaxContentTypeSize);
            this.offset = offset;
            this.size = size;
            this.callback = callback;
            this.transportSettingsCallback = transportSettingsCallback;
            this.rawConnection = connection;
            this.ConnectionDequeuedCallback = connectionDequeuedCallback;
 
        }
 
        public ChannelBinding ChannelBinding
        {
            get
            {
                return this.channelBindingToken;
            }
        }
 
        public int BufferOffset
        {
            get { return this.offset; }
        }
 
        public int BufferSize
        {
            get { return this.size; }
        }
 
        public ServerSingletonDecoder Decoder
        {
            get { return this.decoder; }
        }
 
        public IConnection RawConnection
        {
            get { return this.rawConnection; }
        }
 
        public Uri Via
        {
            get { return this.via; }
        }
 
        public IConnectionOrientedTransportFactorySettings TransportSettings
        {
            get { return this.transportSettings; }
        }
 
        public SecurityMessageProperty Security
        {
            get { return this.security; }
        }
 
        TimeSpan GetRemainingTimeout()
        {
            return this.receiveTimeoutHelper.RemainingTime();
        }
 
        void ReadAndDispatch()
        {
            bool success = false;
            try
            {
                while ((size > 0 || !isReadPending) && !IsClosed)
                {
                    if (size == 0)
                    {
                        isReadPending = true;
                        if (onAsyncReadComplete == null)
                        {
                            onAsyncReadComplete = new WaitCallback(OnAsyncReadComplete);
                        }
 
                        if (Connection.BeginRead(0, connectionBuffer.Length, GetRemainingTimeout(),
                            onAsyncReadComplete, null) == AsyncCompletionResult.Queued)
                        {
                            break;
                        }
                        HandleReadComplete();
                    }
 
                    int bytesRead = decoder.Decode(connectionBuffer, offset, size);
                    if (bytesRead > 0)
                    {
                        offset += bytesRead;
                        size -= bytesRead;
                    }
 
                    if (decoder.CurrentState == ServerSingletonDecoder.State.PreUpgradeStart)
                    {
                        if (onValidate == null)
                        {
                            onValidate = Fx.ThunkCallback(new AsyncCallback(OnValidate));
                        }
 
                        this.via = decoder.Via;
                        IAsyncResult result = this.Connection.BeginValidate(this.via, onValidate, this);
 
                        if (result.CompletedSynchronously)
                        {
                            if (!VerifyValidationResult(result))
                            {
                                // This goes through the failure path (Abort) even though it doesn't throw.
                                return;
                            }
                        }
                        break; //exit loop, set success=true;
                    }
                }
                success = true;
            }
            catch (CommunicationException exception)
            {
                DiagnosticUtility.TraceHandledException(exception, TraceEventType.Information);
            }
            catch (TimeoutException exception)
            {
                if (TD.ReceiveTimeoutIsEnabled())
                {
                    TD.ReceiveTimeout(exception.Message);
                }
                DiagnosticUtility.TraceHandledException(exception, TraceEventType.Information);
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e))
                {
                    throw;
                }
                if (!ExceptionHandler.HandleTransportExceptionHelper(e))
                {
                    throw;
                }
 
                // containment -- we abort ourselves for any error, no extra containment needed
            }
            finally
            {
                if (!success)
                {
                    Abort();
                }
            }
        }
 
        //returns true if validation was successful
        bool VerifyValidationResult(IAsyncResult result)
        {
            return this.Connection.EndValidate(result) && this.ContinuePostValidationProcessing();
        }
 
        static void OnValidate(IAsyncResult result)
        {
            bool success = false;
            ServerSingletonPreambleConnectionReader thisPtr = (ServerSingletonPreambleConnectionReader)result.AsyncState;
            try
            {
                if (!result.CompletedSynchronously)
                {
                    if (!thisPtr.VerifyValidationResult(result))
                    {
                        // This goes through the failure path (Abort) even though it doesn't throw.
                        return;
                    }
                }
                success = true;
            }
            catch (CommunicationException exception)
            {
                DiagnosticUtility.TraceHandledException(exception, TraceEventType.Information);
            }
            catch (TimeoutException exception)
            {
                if (TD.ReceiveTimeoutIsEnabled())
                {
                    TD.ReceiveTimeout(exception.Message);
                }          
                DiagnosticUtility.TraceHandledException(exception, TraceEventType.Information);
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e))
                {
                    throw;
                }
 
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
            }
            finally
            {
                if (!success)
                {
                    thisPtr.Abort();
                }
            }
        }
 
        //returns false if the connection should be aborted
        bool ContinuePostValidationProcessing()
        {
            if (viaDelegate != null)
            {
                try
                {
                    viaDelegate(via);
                }
                catch (ServiceActivationException e)
                {
                    DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    // return fault and close connection
                    SendFault(FramingEncodingString.ServiceActivationFailedFault);
                    return true;
                }
            }
 
 
            this.transportSettings = transportSettingsCallback(via);
 
            if (transportSettings == null)
            {
                EndpointNotFoundException e = new EndpointNotFoundException(SR.GetString(SR.EndpointNotFound, decoder.Via));
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                // return fault and close connection
                SendFault(FramingEncodingString.EndpointNotFoundFault);
                return false;
            }
 
            // we have enough information to hand off to a channel. Our job is done
            callback(this);
            return true;
        }
 
        public void SendFault(string faultString)
        {
            SendFault(faultString, ref this.receiveTimeoutHelper);
        }
 
        void SendFault(string faultString, ref TimeoutHelper timeoutHelper)
        {
            InitialServerConnectionReader.SendFault(Connection, faultString,
                connectionBuffer, timeoutHelper.RemainingTime(), TransportDefaults.MaxDrainSize);
        }
 
        public IAsyncResult BeginCompletePreamble(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return new CompletePreambleAsyncResult(timeout, this, callback, state);
        }
 
        public IConnection EndCompletePreamble(IAsyncResult result)
        {
            return CompletePreambleAsyncResult.End(result);
        }
 
        class CompletePreambleAsyncResult : TypedAsyncResult<IConnection>
        {
            static WaitCallback onReadCompleted = new WaitCallback(OnReadCompleted);
            static WaitCallback onWriteCompleted = new WaitCallback(OnWriteCompleted);
            static AsyncCallback onUpgradeComplete = Fx.ThunkCallback(OnUpgradeComplete);
            TimeoutHelper timeoutHelper;
            ServerSingletonPreambleConnectionReader parent;
            StreamUpgradeAcceptor upgradeAcceptor;
            StreamUpgradeProvider upgrade;
            IStreamUpgradeChannelBindingProvider channelBindingProvider;
            IConnection currentConnection;
            UpgradeState upgradeState = UpgradeState.None;
            
            public CompletePreambleAsyncResult(TimeSpan timeout, ServerSingletonPreambleConnectionReader parent, AsyncCallback callback, object state)
                : base(callback, state)
            {
                this.timeoutHelper = new TimeoutHelper(timeout);
                this.parent = parent;
 
                Initialize();
 
                if (ContinueWork(null))
                {
                    Complete(this.currentConnection, true);
                }
            }
 
            byte[] ConnectionBuffer
            {
                get
                {
                    return this.parent.connectionBuffer;
                }
                set
                {
                    this.parent.connectionBuffer = value;
                }
            }
 
            int Offset
            {
                get
                {
                    return this.parent.offset;
                }
                set
                {
                    this.parent.offset = value;
                }
            }
 
            int Size
            {
                get
                {
                    return this.parent.size;
                }
                set
                {
                    this.parent.size = value;
                }
            }
 
            bool CanReadAndDecode
            {
                get
                {
                    //ok to read/decode before we start the upgrade
                    //and between UpgradeComplete/WritingPreambleAck
                    return this.upgradeState == UpgradeState.None
                        || this.upgradeState == UpgradeState.UpgradeComplete;
                }
            }
 
            ServerSingletonDecoder Decoder
            {
                get
                {
                    return this.parent.decoder;
                }
            }
 
            void Initialize()
            {
                if (!this.parent.transportSettings.MessageEncoderFactory.Encoder.IsContentTypeSupported(Decoder.ContentType))
                {
                    SendFault(FramingEncodingString.ContentTypeInvalidFault, ref timeoutHelper);
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ProtocolException(SR.GetString(
                        SR.ContentTypeMismatch, Decoder.ContentType, parent.transportSettings.MessageEncoderFactory.Encoder.ContentType)));
                }
 
                upgrade = this.parent.transportSettings.Upgrade;                
                if (upgrade != null)
                {
                    channelBindingProvider = upgrade.GetProperty<IStreamUpgradeChannelBindingProvider>();
                    upgradeAcceptor = upgrade.CreateUpgradeAcceptor();
                }
 
                this.currentConnection = this.parent.Connection;
            }
 
            void SendFault(string faultString, ref TimeoutHelper timeoutHelper)
            {
                this.parent.SendFault(faultString, ref timeoutHelper);
            }
 
            bool BeginRead()
            {
                this.Offset = 0;
                return this.currentConnection.BeginRead(0, this.ConnectionBuffer.Length, timeoutHelper.RemainingTime(), onReadCompleted, this) == AsyncCompletionResult.Completed;
            }
 
            void EndRead()
            {
                this.Size = currentConnection.EndRead();
                if (this.Size == 0)
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(this.Decoder.CreatePrematureEOFException());
                }
            }
 
            bool ContinueWork(IAsyncResult upgradeAsyncResult)
            {
                if (upgradeAsyncResult != null)
                {
                    Fx.AssertAndThrow(this.upgradeState == UpgradeState.EndUpgrade, "upgradeAsyncResult should only be passed in from OnUpgradeComplete callback");
                }
 
                for (;;)
                {
                    if (Size == 0 && this.CanReadAndDecode)
                    {
                        if (BeginRead())
                        {
                            EndRead();
                        }
                        else
                        {
                            //when read completes, we will re-enter this loop.
                            break;
                        }                        
                    }
 
                    for (;;)
                    {
                        if (this.CanReadAndDecode)
                        {
                            int bytesRead = Decoder.Decode(ConnectionBuffer, Offset, Size);
                            if (bytesRead > 0)
                            {
                                Offset += bytesRead;
                                Size -= bytesRead;
                            }
                        }
 
                        switch (Decoder.CurrentState)
                        {
                            case ServerSingletonDecoder.State.UpgradeRequest:
                                switch (this.upgradeState)
                                {
                                    case UpgradeState.None:
                                        //change the state so that we don't read/decode until it is safe
                                        ChangeUpgradeState(UpgradeState.VerifyingUpgradeRequest);
                                        break;
                                    case UpgradeState.VerifyingUpgradeRequest:
                                        if (this.upgradeAcceptor == null)
                                        {
                                            SendFault(FramingEncodingString.UpgradeInvalidFault, ref timeoutHelper);
                                            throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
                                                new ProtocolException(SR.GetString(SR.UpgradeRequestToNonupgradableService, Decoder.Upgrade)));
                                        }
 
                                        if (!this.upgradeAcceptor.CanUpgrade(Decoder.Upgrade))
                                        {
                                            SendFault(FramingEncodingString.UpgradeInvalidFault, ref timeoutHelper);
                                            throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ProtocolException(SR.GetString(SR.UpgradeProtocolNotSupported, Decoder.Upgrade)));
                                        }
 
                                        ChangeUpgradeState(UpgradeState.WritingUpgradeAck);
                                        // accept upgrade
                                        if (this.currentConnection.BeginWrite(ServerSingletonEncoder.UpgradeResponseBytes, 0, ServerSingletonEncoder.UpgradeResponseBytes.Length,
                                            true, timeoutHelper.RemainingTime(), onWriteCompleted, this) == AsyncCompletionResult.Queued)
                                        {
                                            //OnWriteCompleted will:
                                            //  1) set upgradeState to UpgradeAckSent 
                                            //  2) call EndWrite
                                            return false;
                                        }
                                        else
                                        {
                                            this.currentConnection.EndWrite();
                                        }
 
                                        ChangeUpgradeState(UpgradeState.UpgradeAckSent);
                                        break;
                                    case UpgradeState.UpgradeAckSent:
                                        IConnection connectionToUpgrade = this.currentConnection;
                                        if (Size > 0)
                                        {
                                            connectionToUpgrade = new PreReadConnection(connectionToUpgrade, ConnectionBuffer, Offset, Size);
                                        }
                                        ChangeUpgradeState(UpgradeState.BeginUpgrade);
                                        break;
                                    case UpgradeState.BeginUpgrade:
                                        try
                                        {
                                            if (!BeginUpgrade(out upgradeAsyncResult))
                                            {
                                                //OnUpgradeComplete will set upgradeState to EndUpgrade
                                                return false;
                                            }
 
                                            ChangeUpgradeState(UpgradeState.EndUpgrade);
                                        }
                                        catch (Exception exception)
                                        {
                                            if (Fx.IsFatal(exception))
                                                throw;
                                            
                                            this.parent.WriteAuditFailure(upgradeAcceptor as StreamSecurityUpgradeAcceptor, exception);
                                            throw;
                                        }
                                        break;
                                    case UpgradeState.EndUpgrade://Must be a different state here than UpgradeComplete so that we don't try to read from the connection
                                        try
                                        {
                                            EndUpgrade(upgradeAsyncResult);
                                            ChangeUpgradeState(UpgradeState.UpgradeComplete);
                                        }
                                        catch (Exception exception)
                                        {
                                            if (Fx.IsFatal(exception))
                                                throw;
 
                                            this.parent.WriteAuditFailure(upgradeAcceptor as StreamSecurityUpgradeAcceptor, exception);
                                            throw;
                                        }
                                        break;
                                    case UpgradeState.UpgradeComplete:
                                        //Client is doing more than one upgrade, reset the state
                                        ChangeUpgradeState(UpgradeState.VerifyingUpgradeRequest);
                                        break;
                                }
                                break;
                            case ServerSingletonDecoder.State.Start:
                                this.parent.SetupSecurityIfNecessary(upgradeAcceptor);
 
                                if (this.upgradeState == UpgradeState.UpgradeComplete //We have done at least one upgrade, but we are now done.
                                    || this.upgradeState == UpgradeState.None)//no upgrade, just send the preample end bytes
                                {
                                    ChangeUpgradeState(UpgradeState.WritingPreambleEnd);
                                    // we've finished the preamble. Ack and return.
                                    if (this.currentConnection.BeginWrite(ServerSessionEncoder.AckResponseBytes, 0, ServerSessionEncoder.AckResponseBytes.Length,
                                                true, timeoutHelper.RemainingTime(), onWriteCompleted, this) == AsyncCompletionResult.Queued)
                                    {
                                        //OnWriteCompleted will:
                                        //  1) set upgradeState to PreambleEndSent 
                                        //  2) call EndWrite
                                        return false;
                                    }
                                    else
                                    {
                                        this.currentConnection.EndWrite();
                                    }
                                    
                                    //terminal state
                                    ChangeUpgradeState(UpgradeState.PreambleEndSent);
                                }
                                                                
                                //we are done, this.currentConnection is the upgraded connection                                
                                return true;
                        }
 
                        if (Size == 0)
                        {
                            break;
                        }
                    }
                }
 
                return false;
            }
 
            bool BeginUpgrade(out IAsyncResult upgradeAsyncResult)
            {
                upgradeAsyncResult = InitialServerConnectionReader.BeginUpgradeConnection(this.currentConnection, upgradeAcceptor, this.parent.transportSettings, onUpgradeComplete, this);
 
                if (!upgradeAsyncResult.CompletedSynchronously)
                {
                    upgradeAsyncResult = null; //caller shouldn't use this out param unless completed sync.
                    return false;
                }
 
                return true;
            }
 
            void EndUpgrade(IAsyncResult upgradeAsyncResult)
            {
                this.currentConnection = InitialServerConnectionReader.EndUpgradeConnection(upgradeAsyncResult);
 
                this.ConnectionBuffer = this.currentConnection.AsyncReadBuffer;
 
                if (this.channelBindingProvider != null 
                    && this.channelBindingProvider.IsChannelBindingSupportEnabled 
                    && this.parent.channelBindingToken == null)//first one wins in the case of multiple upgrades.
                {
                    this.parent.channelBindingToken = channelBindingProvider.GetChannelBinding(this.upgradeAcceptor, ChannelBindingKind.Endpoint);
                }
            }
 
            void ChangeUpgradeState(UpgradeState newState)
            {
                switch (newState)
                {
                    case UpgradeState.None:
                        throw Fx.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState + ", newState=" + newState);
                    case UpgradeState.VerifyingUpgradeRequest:
                        if (this.upgradeState != UpgradeState.None //starting first upgrade
                            && this.upgradeState != UpgradeState.UpgradeComplete)//completing one upgrade and starting another
                        {
                            throw Fx.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState + ", newState=" + newState);
                        }
                        break;
                    case UpgradeState.WritingUpgradeAck:
                        if (this.upgradeState != UpgradeState.VerifyingUpgradeRequest)
                        {
                            throw Fx.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState + ", newState=" + newState);
                        }
                        break;
                    case UpgradeState.UpgradeAckSent:
                        if (this.upgradeState != UpgradeState.WritingUpgradeAck)
                        {
                            throw Fx.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState + ", newState=" + newState);
                        }
                        break;
                    case UpgradeState.BeginUpgrade:
                        if (this.upgradeState != UpgradeState.UpgradeAckSent)
                        {
                            throw Fx.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState + ", newState=" + newState);
                        }
                        break;
                    case UpgradeState.EndUpgrade:
                        if (this.upgradeState != UpgradeState.BeginUpgrade)
                        {
                            throw Fx.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState + ", newState=" + newState);
                        }
                        break;
                    case UpgradeState.UpgradeComplete:
                        if (this.upgradeState != UpgradeState.EndUpgrade)
                        {
                            throw Fx.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState + ", newState=" + newState);
                        }
                        break;
                    case UpgradeState.WritingPreambleEnd:
                        if (this.upgradeState != UpgradeState.None //no upgrade being used
                            && this.upgradeState != UpgradeState.UpgradeComplete)//upgrades are now complete, end the preamble handshake.
                        {
                            throw Fx.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState + ", newState=" + newState);
                        }
                        break;
                    case UpgradeState.PreambleEndSent:
                        if (this.upgradeState != UpgradeState.WritingPreambleEnd)
                        {
                            throw Fx.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState + ", newState=" + newState);
                        }
                        break;
                    default:
                        throw Fx.AssertAndThrow("Unexpected Upgrade State: " + newState);
                }
                this.upgradeState = newState;
            }
 
            static void OnReadCompleted(object state)
            {
                CompletePreambleAsyncResult thisPtr = (CompletePreambleAsyncResult)state;
                Exception completionException = null;
                bool completeSelf = false;
 
                try
                {
                    thisPtr.EndRead();
                    completeSelf = thisPtr.ContinueWork(null);
                }
                catch (Exception ex)
                {
                    if (Fx.IsFatal(ex))
                    {
                        throw;
                    }
                    completionException = ex;
                    completeSelf = true;
                }
 
                if (completeSelf)
                {
                    if (completionException != null)
                    {
                        thisPtr.Complete(false, completionException);
                    }
                    else
                    {
                        thisPtr.Complete(thisPtr.currentConnection, false);
                    }
                }
            }
 
            static void OnWriteCompleted(object state)
            {
                CompletePreambleAsyncResult thisPtr = (CompletePreambleAsyncResult)state;
                Exception completionException = null;
                bool completeSelf = false;
 
                try
                {
                    thisPtr.currentConnection.EndWrite();
 
                    switch (thisPtr.upgradeState)
                    {
                        case UpgradeState.WritingUpgradeAck:
                            thisPtr.ChangeUpgradeState(UpgradeState.UpgradeAckSent);
                            break;
                        case UpgradeState.WritingPreambleEnd:
                            thisPtr.ChangeUpgradeState(UpgradeState.PreambleEndSent);
                            break;
                    }
                    completeSelf = thisPtr.ContinueWork(null);
                }
                catch (Exception ex)
                {
                    if (Fx.IsFatal(ex))
                    {
                        throw;
                    }
                    completionException = ex;
                    completeSelf = true;
                }
 
                if (completeSelf)
                {
                    if (completionException != null)
                    {
                        thisPtr.Complete(false, completionException);
                    }
                    else
                    {
                        thisPtr.Complete(thisPtr.currentConnection, false);
                    }
                }
            }
            
            static void OnUpgradeComplete(IAsyncResult result)
            {
                if (result.CompletedSynchronously)
                {
                    return;
                }
 
                CompletePreambleAsyncResult thisPtr = (CompletePreambleAsyncResult)result.AsyncState;                
                Exception completionException = null;
                bool completeSelf = false;
                
                try
                {
                    thisPtr.ChangeUpgradeState(UpgradeState.EndUpgrade);
                    completeSelf = thisPtr.ContinueWork(result);
                }
                catch (Exception ex)
                {
                    if (Fx.IsFatal(ex))
                    {
                        throw;
                    }
                    completionException = ex;
                    completeSelf = true;
                }
 
                if (completeSelf)
                {
                    if (completionException != null)
                    {
                        thisPtr.Complete(false, completionException);
                    }
                    else
                    {
                        thisPtr.Complete(thisPtr.currentConnection, false);
                    }
                }
            }
 
            enum UpgradeState
            {
                None, 
                VerifyingUpgradeRequest, 
                WritingUpgradeAck,
                UpgradeAckSent,
                BeginUpgrade,
                EndUpgrade,
                UpgradeComplete,
                WritingPreambleEnd,
                PreambleEndSent,
            }
        }        
 
        void SetupSecurityIfNecessary(StreamUpgradeAcceptor upgradeAcceptor)
        {
            StreamSecurityUpgradeAcceptor securityUpgradeAcceptor = upgradeAcceptor as StreamSecurityUpgradeAcceptor;
            if (securityUpgradeAcceptor != null)
            {
                this.security = securityUpgradeAcceptor.GetRemoteSecurity();
                if (this.security == null)
                {
                    Exception securityFailedException = new ProtocolException(
                    SR.GetString(SR.RemoteSecurityNotNegotiatedOnStreamUpgrade, this.Via));
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(securityFailedException);
                }
                // Audit Authentication Success
                WriteAuditEvent(securityUpgradeAcceptor, AuditLevel.Success, null);
            }
        }
 
        #region Transport Security Auditing
        void WriteAuditFailure(StreamSecurityUpgradeAcceptor securityUpgradeAcceptor, Exception exception)
        {
            try
            {
                WriteAuditEvent(securityUpgradeAcceptor, AuditLevel.Failure, exception);
            }
#pragma warning suppress 56500 // covered by FxCop
            catch (Exception auditException)
            {
                if (Fx.IsFatal(auditException))
                {
                    throw;
                }
 
                DiagnosticUtility.TraceHandledException(auditException, TraceEventType.Error);
            }
        }
 
        void WriteAuditEvent(StreamSecurityUpgradeAcceptor securityUpgradeAcceptor, AuditLevel auditLevel, Exception exception)
        {
            if ((this.transportSettings.AuditBehavior.MessageAuthenticationAuditLevel & auditLevel) != auditLevel)
            {
                return;
            }
 
            if (securityUpgradeAcceptor == null)
            {
                return;
            }
            String primaryIdentity = String.Empty;
            SecurityMessageProperty clientSecurity = securityUpgradeAcceptor.GetRemoteSecurity();
            if (clientSecurity != null)
            {
                primaryIdentity = GetIdentityNameFromContext(clientSecurity);
            }
 
            ServiceSecurityAuditBehavior auditBehavior = this.transportSettings.AuditBehavior;
 
            if (auditLevel == AuditLevel.Success)
            {
                SecurityAuditHelper.WriteTransportAuthenticationSuccessEvent(auditBehavior.AuditLogLocation,
                    auditBehavior.SuppressAuditFailure, null, this.Via, primaryIdentity);
            }
            else
            {
                SecurityAuditHelper.WriteTransportAuthenticationFailureEvent(auditBehavior.AuditLogLocation,
                    auditBehavior.SuppressAuditFailure, null, this.Via, primaryIdentity, exception);
            }
        }
 
        [MethodImpl(MethodImplOptions.NoInlining)]
        static string GetIdentityNameFromContext(SecurityMessageProperty clientSecurity)
        {
            return SecurityUtils.GetIdentityNamesFromContext(
                clientSecurity.ServiceSecurityContext.AuthorizationContext);
        }
        #endregion
 
        void HandleReadComplete()
        {
            offset = 0;
            size = Connection.EndRead();
            isReadPending = false;
            if (size == 0)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(decoder.CreatePrematureEOFException());
            }
        }
 
        void OnAsyncReadComplete(object state)
        {
            bool success = false;
            try
            {
                HandleReadComplete();
                ReadAndDispatch();
                success = true;
            }
            catch (CommunicationException exception)
            {
                DiagnosticUtility.TraceHandledException(exception, TraceEventType.Information);
            }
            catch (TimeoutException exception)
            {
                if (TD.ReceiveTimeoutIsEnabled())
                {
                    TD.ReceiveTimeout(exception.Message);
                }
                DiagnosticUtility.TraceHandledException(exception, TraceEventType.Information);
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e))
                {
                    throw;
                }
                if (!ExceptionHandler.HandleTransportExceptionHelper(e))
                {
                    throw;
                }
 
                // containment -- we abort ourselves for any error, no extra containment needed
            }
            finally
            {
                if (!success)
                {
                    Abort();
                }
            }
        }
 
        public void StartReading(Action<Uri> viaDelegate, TimeSpan timeout)
        {
            this.viaDelegate = viaDelegate;
            this.receiveTimeoutHelper = new TimeoutHelper(timeout);
            this.connectionBuffer = Connection.AsyncReadBuffer;
            ReadAndDispatch();
        }
    }
 
    class ServerSingletonConnectionReader : SingletonConnectionReader
    {
        ConnectionDemuxer connectionDemuxer;
        ServerSingletonDecoder decoder;
        IConnection rawConnection;
        string contentType;
        ChannelBinding channelBindingToken;
 
        public ServerSingletonConnectionReader(ServerSingletonPreambleConnectionReader preambleReader,
            IConnection upgradedConnection, ConnectionDemuxer connectionDemuxer)
            : base(upgradedConnection, preambleReader.BufferOffset, preambleReader.BufferSize,
            preambleReader.Security, preambleReader.TransportSettings, preambleReader.Via)
        {
            this.decoder = preambleReader.Decoder;
            this.contentType = this.decoder.ContentType;
            this.connectionDemuxer = connectionDemuxer;
            this.rawConnection = preambleReader.RawConnection;
            this.channelBindingToken = preambleReader.ChannelBinding;
        }
 
        protected override string ContentType
        {
            get { return this.contentType; }
        }
 
        protected override long StreamPosition
        {
            get { return this.decoder.StreamPosition; }
        }
 
        protected override bool DecodeBytes(byte[] buffer, ref int offset, ref int size, ref bool isAtEof)
        {
            while (size > 0)
            {
                int bytesRead = decoder.Decode(buffer, offset, size);
                if (bytesRead > 0)
                {
                    offset += bytesRead;
                    size -= bytesRead;
                }
 
                switch (decoder.CurrentState)
                {
                    case ServerSingletonDecoder.State.EnvelopeStart:
                        // we're at the envelope
                        return true;
 
                    case ServerSingletonDecoder.State.End:
                        isAtEof = true;
                        return false;
                }
            }
 
            return false;
        }
 
        protected override void OnClose(TimeSpan timeout)
        {
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            // send back EOF and then recycle the connection
            this.Connection.Write(SingletonEncoder.EndBytes, 0, SingletonEncoder.EndBytes.Length, true, timeoutHelper.RemainingTime());
            this.connectionDemuxer.ReuseConnection(this.rawConnection, timeoutHelper.RemainingTime());
 
            ChannelBindingUtility.Dispose(ref this.channelBindingToken);
        }
 
        protected override void PrepareMessage(Message message)
        {
            base.PrepareMessage(message);
            IPEndPoint remoteEndPoint = this.rawConnection.RemoteIPEndPoint;
 
            // pipes will return null
            if (remoteEndPoint != null)
            {
                RemoteEndpointMessageProperty remoteEndpointProperty = new RemoteEndpointMessageProperty(remoteEndPoint);
                message.Properties.Add(RemoteEndpointMessageProperty.Name, remoteEndpointProperty);
            }
 
            if (this.channelBindingToken != null)
            {
                ChannelBindingMessageProperty property = new ChannelBindingMessageProperty(this.channelBindingToken, false);
                property.AddTo(message);
                property.Dispose(); //message.Properties.Add() creates a copy...
            }
        }
    }
 
    abstract class SingletonConnectionReader
    {
        IConnection connection;
        bool doneReceiving;
        bool doneSending;
        bool isAtEof;
        bool isClosed;
        SecurityMessageProperty security;
        object thisLock = new object();
        int offset;
        int size;
        IConnectionOrientedTransportFactorySettings transportSettings;
        Uri via;
        Stream inputStream;
 
        protected SingletonConnectionReader(IConnection connection, int offset, int size, SecurityMessageProperty security,
            IConnectionOrientedTransportFactorySettings transportSettings, Uri via)
        {
            this.connection = connection;
            this.offset = offset;
            this.size = size;
            this.security = security;
            this.transportSettings = transportSettings;
            this.via = via;
        }
 
        protected IConnection Connection
        {
            get
            {
                return this.connection;
            }
        }
 
        protected object ThisLock
        {
            get
            {
                return this.thisLock;
            }
        }
 
        protected virtual string ContentType
        {
            get { return null; }
        }
 
        protected abstract long StreamPosition { get; }
 
        public void Abort()
        {
            this.connection.Abort();
        }
 
        public void DoneReceiving(bool atEof)
        {
            DoneReceiving(atEof, this.transportSettings.CloseTimeout);
        }
 
        void DoneReceiving(bool atEof, TimeSpan timeout)
        {
            if (!this.doneReceiving)
            {
                this.isAtEof = atEof;
                this.doneReceiving = true;
 
                if (this.doneSending)
                {
                    this.Close(timeout);
                }
            }
        }
 
        public void Close(TimeSpan timeout)
        {
            lock (ThisLock)
            {
                if (this.isClosed)
                {
                    return;
                }
 
                this.isClosed = true;
            }
 
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            bool success = false;
            try
            {
                // first drain our stream if necessary
                if (this.inputStream != null)
                {
                    byte[] dummy = DiagnosticUtility.Utility.AllocateByteArray(transportSettings.ConnectionBufferSize);
                    while (!this.isAtEof)
                    {
                        this.inputStream.ReadTimeout = TimeoutHelper.ToMilliseconds(timeoutHelper.RemainingTime());
                        int bytesRead = this.inputStream.Read(dummy, 0, dummy.Length);
                        if (bytesRead == 0)
                        {
                            this.isAtEof = true;
                        }
                    }
                }
                OnClose(timeoutHelper.RemainingTime());
                success = true;
            }
            finally
            {
                if (!success)
                {
                    this.Abort();
                }
            }
        }
 
        protected abstract void OnClose(TimeSpan timeout);
 
        public void DoneSending(TimeSpan timeout)
        {
            this.doneSending = true;
            if (this.doneReceiving)
            {
                this.Close(timeout);
            }
        }
 
        protected abstract bool DecodeBytes(byte[] buffer, ref int offset, ref int size, ref bool isAtEof);
 
        protected virtual void PrepareMessage(Message message)
        {
            message.Properties.Via = this.via;
            message.Properties.Security = (this.security != null) ? (SecurityMessageProperty)this.security.CreateCopy() : null;
        }
 
        public RequestContext ReceiveRequest(TimeSpan timeout)
        {
            Message requestMessage = Receive(timeout);
            return new StreamedFramingRequestContext(this, requestMessage);
        }
 
        public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return new ReceiveAsyncResult(this, timeout, callback, state);
        }
 
        public virtual Message EndReceive(IAsyncResult result)
        {
            return ReceiveAsyncResult.End(result);
        }
 
        public Message Receive(TimeSpan timeout)
        {
            byte[] buffer = DiagnosticUtility.Utility.AllocateByteArray(connection.AsyncReadBufferSize);
 
            if (size > 0)
            {
                Buffer.BlockCopy(connection.AsyncReadBuffer, offset, buffer, offset, size);
            }
 
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            for (;;)
            {
                if (DecodeBytes(buffer, ref offset, ref size, ref isAtEof))
                {
                    break;
                }
 
                if (this.isAtEof)
                {
                    DoneReceiving(true, timeoutHelper.RemainingTime());
                    return null;
                }
 
                if (size == 0)
                {
                    offset = 0;
                    size = connection.Read(buffer, 0, buffer.Length, timeoutHelper.RemainingTime());
                    if (size == 0)
                    {
                        DoneReceiving(true, timeoutHelper.RemainingTime());
                        return null;
                    }
                }
            }
 
            // we're ready to read a message
            IConnection singletonConnection = this.connection;
            if (size > 0)
            {
                byte[] initialData = DiagnosticUtility.Utility.AllocateByteArray(size);
                Buffer.BlockCopy(buffer, offset, initialData, 0, size);
                singletonConnection = new PreReadConnection(singletonConnection, initialData);
            }
 
            Stream connectionStream = new SingletonInputConnectionStream(this, singletonConnection, this.transportSettings);
            this.inputStream = new MaxMessageSizeStream(connectionStream, transportSettings.MaxReceivedMessageSize);
            using (ServiceModelActivity activity = DiagnosticUtility.ShouldUseActivity ? ServiceModelActivity.CreateBoundedActivity(true) : null)
            {
                if (DiagnosticUtility.ShouldUseActivity)
                {
                    ServiceModelActivity.Start(activity, SR.GetString(SR.ActivityProcessingMessage, TraceUtility.RetrieveMessageNumber()), ActivityType.ProcessMessage);
                }
 
                Message message = null;
                try
                {
                    message = transportSettings.MessageEncoderFactory.Encoder.ReadMessage(
                        this.inputStream, transportSettings.MaxBufferSize, this.ContentType);
                }
                catch (XmlException xmlException)
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
                        new ProtocolException(SR.GetString(SR.MessageXmlProtocolError), xmlException));
                }
 
                if (DiagnosticUtility.ShouldUseActivity)
                {
                    TraceUtility.TransferFromTransport(message);
                }
 
                PrepareMessage(message);
 
                return message;
            }
        }
 
        class ReceiveAsyncResult : AsyncResult
        {
            static Action<object> onReceiveScheduled = new Action<object>(OnReceiveScheduled);
 
            Message message;
            SingletonConnectionReader parent;
            TimeSpan timeout;
 
            public ReceiveAsyncResult(SingletonConnectionReader parent, TimeSpan timeout, AsyncCallback callback,
                object state)
                : base(callback, state)
            {
                this.parent = parent;
                this.timeout = timeout;
 
                // 
                ActionItem.Schedule(onReceiveScheduled, this);
            }
 
            public static Message End(IAsyncResult result)
            {
                ReceiveAsyncResult receiveAsyncResult = AsyncResult.End<ReceiveAsyncResult>(result);
                return receiveAsyncResult.message;
            }
 
            static void OnReceiveScheduled(object state)
            {
                ReceiveAsyncResult thisPtr = (ReceiveAsyncResult)state;
 
                Exception completionException = null;
                try
                {
                    thisPtr.message = thisPtr.parent.Receive(thisPtr.timeout);
                }
#pragma warning suppress 56500 // Microsoft, transferring exception to another thread
                catch (Exception exception)
                {
                    if (Fx.IsFatal(exception))
                    {
                        throw;
                    }
                    completionException = exception;
                }
                thisPtr.Complete(false, completionException);
            }
        }
 
        class StreamedFramingRequestContext : RequestContextBase
        {
            IConnection connection;
            SingletonConnectionReader parent;
            IConnectionOrientedTransportFactorySettings settings;
            TimeoutHelper timeoutHelper;
 
            public StreamedFramingRequestContext(SingletonConnectionReader parent, Message requestMessage)
                : base(requestMessage, parent.transportSettings.CloseTimeout, parent.transportSettings.SendTimeout)
            {
                this.parent = parent;
                this.connection = parent.connection;
                this.settings = parent.transportSettings;
            }
 
            protected override void OnAbort()
            {
                this.parent.Abort();
            }
 
            protected override void OnClose(TimeSpan timeout)
            {
                this.parent.Close(timeout);
            }
 
            protected override void OnReply(Message message, TimeSpan timeout)
            {
                ICompressedMessageEncoder compressedMessageEncoder = this.settings.MessageEncoderFactory.Encoder as ICompressedMessageEncoder;
                if (compressedMessageEncoder != null && compressedMessageEncoder.CompressionEnabled)
                {
                    compressedMessageEncoder.AddCompressedMessageProperties(message, this.parent.ContentType);
                }
 
                timeoutHelper = new TimeoutHelper(timeout);
                StreamingConnectionHelper.WriteMessage(message, this.connection, false, this.settings, ref timeoutHelper);
                parent.DoneSending(timeoutHelper.RemainingTime());
            }
 
            protected override IAsyncResult OnBeginReply(Message message, TimeSpan timeout, AsyncCallback callback, object state)
            {
                ICompressedMessageEncoder compressedMessageEncoder = this.settings.MessageEncoderFactory.Encoder as ICompressedMessageEncoder;
                if (compressedMessageEncoder != null && compressedMessageEncoder.CompressionEnabled)
                {
                    compressedMessageEncoder.AddCompressedMessageProperties(message, this.parent.ContentType);
                }
 
                timeoutHelper = new TimeoutHelper(timeout);
                return StreamingConnectionHelper.BeginWriteMessage(message, this.connection, false, this.settings,
                    ref timeoutHelper, callback, state);
            }
 
            protected override void OnEndReply(IAsyncResult result)
            {
                StreamingConnectionHelper.EndWriteMessage(result);
                parent.DoneSending(timeoutHelper.RemainingTime());
            }
        }
 
        // ensures that the reader is notified at end-of-stream, and takes care of the framing chunk headers
        class SingletonInputConnectionStream : ConnectionStream
        {
            SingletonMessageDecoder decoder;
            SingletonConnectionReader reader;
            bool atEof;
            byte[] chunkBuffer; // used for when we have overflow
            int chunkBufferOffset;
            int chunkBufferSize;
            int chunkBytesRemaining;
 
            public SingletonInputConnectionStream(SingletonConnectionReader reader, IConnection connection,
                IDefaultCommunicationTimeouts defaultTimeouts)
                : base(connection, defaultTimeouts)
            {
                this.reader = reader;
                this.decoder = new SingletonMessageDecoder(reader.StreamPosition);
                this.chunkBytesRemaining = 0;
                this.chunkBuffer = new byte[IntEncoder.MaxEncodedSize];
            }
 
            void AbortReader()
            {
                this.reader.Abort();
            }
 
            public override void Close()
            {
                this.reader.DoneReceiving(this.atEof);
            }
 
            // run chunk data through the decoder
            void DecodeData(byte[] buffer, int offset, int size)
            {
                while (size > 0)
                {
                    int bytesRead = decoder.Decode(buffer, offset, size);
                    offset += bytesRead;
                    size -= bytesRead;
                    Fx.Assert(decoder.CurrentState == SingletonMessageDecoder.State.ReadingEnvelopeBytes || decoder.CurrentState == SingletonMessageDecoder.State.ChunkEnd, "");
                }
            }
 
            // run the current data through the decoder to get valid message bytes
            void DecodeSize(byte[] buffer, ref int offset, ref int size)
            {
                while (size > 0)
                {
                    int bytesRead = decoder.Decode(buffer, offset, size);
 
                    if (bytesRead > 0)
                    {
                        offset += bytesRead;
                        size -= bytesRead;
                    }
 
                    switch (decoder.CurrentState)
                    {
                        case SingletonMessageDecoder.State.ChunkStart:
                            this.chunkBytesRemaining = decoder.ChunkSize;
 
                            // if we have overflow and we're not decoding out of our buffer, copy over
                            if (size > 0 && !object.ReferenceEquals(buffer, this.chunkBuffer))
                            {
                                Fx.Assert(size <= this.chunkBuffer.Length, "");
                                Buffer.BlockCopy(buffer, offset, this.chunkBuffer, 0, size);
                                this.chunkBufferOffset = 0;
                                this.chunkBufferSize = size;
                            }
                            return;
 
                        case SingletonMessageDecoder.State.End:
                            ProcessEof();
                            return;
                    }
                }
            }
 
            int ReadCore(byte[] buffer, int offset, int count)
            {
                int bytesRead = -1;
                try
                {
                    bytesRead = base.Read(buffer, offset, count);
                    if (bytesRead == 0)
                    {
                        ProcessEof();
                    }
                }
                finally
                {
                    if (bytesRead == -1)  // there was an exception
                    {
                        AbortReader();
                    }
                }
 
                return bytesRead;
            }
 
            public override int Read(byte[] buffer, int offset, int count)
            {
                int result = 0;
                while (true)
                {
                    if (count == 0)
                    {
                        return result;
                    }
 
                    if (this.atEof)
                    {
                        return result;
                    }
 
                    // first deal with any residual carryover
                    if (this.chunkBufferSize > 0)
                    {
                        int bytesToCopy = Math.Min(chunkBytesRemaining,
                            Math.Min(this.chunkBufferSize, count));
 
                        Buffer.BlockCopy(this.chunkBuffer, this.chunkBufferOffset, buffer, offset, bytesToCopy);
                        // keep decoder up to date
                        DecodeData(this.chunkBuffer, this.chunkBufferOffset, bytesToCopy);
 
                        this.chunkBufferOffset += bytesToCopy;
                        this.chunkBufferSize -= bytesToCopy;
                        this.chunkBytesRemaining -= bytesToCopy;
                        if (this.chunkBytesRemaining == 0 && this.chunkBufferSize > 0)
                        {
                            DecodeSize(this.chunkBuffer, ref this.chunkBufferOffset, ref this.chunkBufferSize);
                        }
 
                        result += bytesToCopy;
                        offset += bytesToCopy;
                        count -= bytesToCopy;
                    }
                    else if (chunkBytesRemaining > 0)
                    {
                        // We're in the middle of a chunk. Try and include the next chunk size as well
 
                        int bytesToRead = count;
                        if (int.MaxValue - chunkBytesRemaining >= IntEncoder.MaxEncodedSize)
                        {
                            bytesToRead = Math.Min(count, chunkBytesRemaining + IntEncoder.MaxEncodedSize);
                        }
 
                        int bytesRead = ReadCore(buffer, offset, bytesToRead);
 
                        // keep decoder up to date
                        DecodeData(buffer, offset, Math.Min(bytesRead, this.chunkBytesRemaining));
 
                        if (bytesRead > chunkBytesRemaining)
                        {
                            result += this.chunkBytesRemaining;
                            int overflowCount = bytesRead - chunkBytesRemaining;
                            int overflowOffset = offset + chunkBytesRemaining;
                            this.chunkBytesRemaining = 0;
                            // read at least part of the next chunk, and put any overflow in this.chunkBuffer
                            DecodeSize(buffer, ref overflowOffset, ref overflowCount);
                        }
                        else
                        {
                            result += bytesRead;
                            this.chunkBytesRemaining -= bytesRead;
                        }
 
                        return result;
                    }
                    else
                    {
                        // Final case: we have a new chunk. Read the size, and loop around again
                        if (count < IntEncoder.MaxEncodedSize)
                        {
                            // we don't have space for MaxEncodedSize, so it's worth the copy cost to read into a temp buffer
                            this.chunkBufferOffset = 0;
                            this.chunkBufferSize = ReadCore(this.chunkBuffer, 0, this.chunkBuffer.Length);
                            DecodeSize(this.chunkBuffer, ref this.chunkBufferOffset, ref this.chunkBufferSize);
                        }
                        else
                        {
                            int bytesRead = ReadCore(buffer, offset, IntEncoder.MaxEncodedSize);
                            int sizeOffset = offset;
                            DecodeSize(buffer, ref sizeOffset, ref bytesRead);
                        }
                    }
                }
            }
 
            void ProcessEof()
            {
                if (!this.atEof)
                {
                    this.atEof = true;
                    if (this.chunkBufferSize > 0 || this.chunkBytesRemaining > 0
                        || decoder.CurrentState != SingletonMessageDecoder.State.End)
                    {
                        throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(decoder.CreatePrematureEOFException());
                    }
 
                    this.reader.DoneReceiving(true);
                }
            }
 
            public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
            {
                return new ReadAsyncResult(this, buffer, offset, count, callback, state);
            }
 
            public override int EndRead(IAsyncResult result)
            {
                return ReadAsyncResult.End(result);
            }
 
            public class ReadAsyncResult : AsyncResult
            {
                SingletonInputConnectionStream parent;
                int result;
 
                public ReadAsyncResult(SingletonInputConnectionStream parent,
                    byte[] buffer, int offset, int count, AsyncCallback callback, object state)
                    : base(callback, state)
                {
                    this.parent = parent;
 
                    // 
                    this.result = this.parent.Read(buffer, offset, count);
                    base.Complete(true);
                }
 
                public static int End(IAsyncResult result)
                {
                    ReadAsyncResult thisPtr = AsyncResult.End<ReadAsyncResult>(result);
                    return thisPtr.result;
                }
            }
        }
    }
 
    static class StreamingConnectionHelper
    {
        public static void WriteMessage(Message message, IConnection connection, bool isRequest,
            IConnectionOrientedTransportFactorySettings settings, ref TimeoutHelper timeoutHelper)
        {
            byte[] endBytes = null;
            if (message != null)
            {
                MessageEncoder messageEncoder = settings.MessageEncoderFactory.Encoder;
                byte[] envelopeStartBytes = SingletonEncoder.EnvelopeStartBytes;
 
                bool writeStreamed;
                if (isRequest)
                {
                    endBytes = SingletonEncoder.EnvelopeEndFramingEndBytes;
                    writeStreamed = TransferModeHelper.IsRequestStreamed(settings.TransferMode);
                }
                else
                {
                    endBytes = SingletonEncoder.EnvelopeEndBytes;
                    writeStreamed = TransferModeHelper.IsResponseStreamed(settings.TransferMode);
                }
 
                if (writeStreamed)
                {
                    connection.Write(envelopeStartBytes, 0, envelopeStartBytes.Length, false, timeoutHelper.RemainingTime());
                    Stream connectionStream = new StreamingOutputConnectionStream(connection, settings);
                    Stream writeTimeoutStream = new TimeoutStream(connectionStream, ref timeoutHelper);
                    messageEncoder.WriteMessage(message, writeTimeoutStream);
                }
                else
                {
                    ArraySegment<byte> messageData = messageEncoder.WriteMessage(message,
                        int.MaxValue, settings.BufferManager, envelopeStartBytes.Length + IntEncoder.MaxEncodedSize);
                    messageData = SingletonEncoder.EncodeMessageFrame(messageData);
                    Buffer.BlockCopy(envelopeStartBytes, 0, messageData.Array, messageData.Offset - envelopeStartBytes.Length,
                        envelopeStartBytes.Length);
                    connection.Write(messageData.Array, messageData.Offset - envelopeStartBytes.Length,
                        messageData.Count + envelopeStartBytes.Length, true, timeoutHelper.RemainingTime(), settings.BufferManager);
                }
            }
            else if (isRequest) // context handles response end bytes
            {
                endBytes = SingletonEncoder.EndBytes;
            }
 
            if (endBytes != null)
            {
                connection.Write(endBytes, 0, endBytes.Length,
                    true, timeoutHelper.RemainingTime());
            }
        }
 
        public static IAsyncResult BeginWriteMessage(Message message, IConnection connection, bool isRequest,
            IConnectionOrientedTransportFactorySettings settings, ref TimeoutHelper timeoutHelper,
            AsyncCallback callback, object state)
        {
            return new WriteMessageAsyncResult(message, connection, isRequest, settings, ref timeoutHelper, callback, state);
        }
 
        public static void EndWriteMessage(IAsyncResult result)
        {
            WriteMessageAsyncResult.End(result);
        }
 
        // overrides ConnectionStream to add a Framing int at the beginning of each record
        class StreamingOutputConnectionStream : ConnectionStream
        {
            byte[] encodedSize;
 
            public StreamingOutputConnectionStream(IConnection connection, IDefaultCommunicationTimeouts timeouts)
                : base(connection, timeouts)
            {
                this.encodedSize = new byte[IntEncoder.MaxEncodedSize];
            }
            void WriteChunkSize(int size)
            {
                if (size > 0)
                {
                    int bytesEncoded = IntEncoder.Encode(size, encodedSize, 0);
                    base.Connection.Write(encodedSize, 0, bytesEncoded, false, TimeSpan.FromMilliseconds(this.WriteTimeout));
                }
            }
 
            public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
            {
                WriteChunkSize(count);
                return base.BeginWrite(buffer, offset, count, callback, state);
            }
 
            public override void WriteByte(byte value)
            {
                WriteChunkSize(1);
                base.WriteByte(value);
            }
 
            public override void Write(byte[] buffer, int offset, int count)
            {
                WriteChunkSize(count);
                base.Write(buffer, offset, count);
            }
        }
 
        class WriteMessageAsyncResult : AsyncResult
        {
            IConnection connection;
            MessageEncoder encoder;
            BufferManager bufferManager;
            Message message;
            static WaitCallback onWriteBufferedMessage;
            static WaitCallback onWriteStartBytes;
            static Action<object> onWriteStartBytesScheduled;
            static WaitCallback onWriteEndBytes =
                Fx.ThunkCallback(new WaitCallback(OnWriteEndBytes));
            byte[] bufferToFree;
            IConnectionOrientedTransportFactorySettings settings;
            TimeoutHelper timeoutHelper;
            byte[] endBytes;
 
            public WriteMessageAsyncResult(Message message, IConnection connection, bool isRequest,
                IConnectionOrientedTransportFactorySettings settings, ref TimeoutHelper timeoutHelper,
                AsyncCallback callback, object state)
                : base(callback, state)
            {
                this.connection = connection;
                this.encoder = settings.MessageEncoderFactory.Encoder;
                this.bufferManager = settings.BufferManager;
                this.timeoutHelper = timeoutHelper;
                this.message = message;
                this.settings = settings;
 
                bool throwing = true;
                bool completeSelf = false;
                if (message == null)
                {
                    if (isRequest) // context takes care of the end bytes on Close/reader.EOF
                    {
                        this.endBytes = SingletonEncoder.EndBytes;
                    }
                    completeSelf = WriteEndBytes();
                }
                else
                {
                    try
                    {
                        byte[] envelopeStartBytes = SingletonEncoder.EnvelopeStartBytes;
                        bool writeStreamed;
                        if (isRequest)
                        {
                            this.endBytes = SingletonEncoder.EnvelopeEndFramingEndBytes;
                            writeStreamed = TransferModeHelper.IsRequestStreamed(settings.TransferMode);
                        }
                        else
                        {
                            this.endBytes = SingletonEncoder.EnvelopeEndBytes;
                            writeStreamed = TransferModeHelper.IsResponseStreamed(settings.TransferMode);
                        }
 
                        if (writeStreamed)
                        {
                            if (onWriteStartBytes == null)
                            {
                                onWriteStartBytes = Fx.ThunkCallback(new WaitCallback(OnWriteStartBytes));
                            }
 
                            AsyncCompletionResult writeStartBytesResult = connection.BeginWrite(envelopeStartBytes, 0, envelopeStartBytes.Length, true,
                                timeoutHelper.RemainingTime(), onWriteStartBytes, this);
 
                            if (writeStartBytesResult == AsyncCompletionResult.Completed)
                            {
                                if (onWriteStartBytesScheduled == null)
                                {
                                    onWriteStartBytesScheduled = new Action<object>(OnWriteStartBytes);
                                }
                                ActionItem.Schedule(onWriteStartBytesScheduled, this);
                            }
                        }
                        else
                        {
                            ArraySegment<byte> messageData = settings.MessageEncoderFactory.Encoder.WriteMessage(message,
                                int.MaxValue, this.bufferManager, envelopeStartBytes.Length + IntEncoder.MaxEncodedSize);
                            messageData = SingletonEncoder.EncodeMessageFrame(messageData);
                            this.bufferToFree = messageData.Array;
                            Buffer.BlockCopy(envelopeStartBytes, 0, messageData.Array, messageData.Offset - envelopeStartBytes.Length,
                                envelopeStartBytes.Length);
 
                            if (onWriteBufferedMessage == null)
                            {
                                onWriteBufferedMessage = Fx.ThunkCallback(new WaitCallback(OnWriteBufferedMessage));
                            }
                            AsyncCompletionResult writeBufferedResult =
                                connection.BeginWrite(messageData.Array, messageData.Offset - envelopeStartBytes.Length,
                                messageData.Count + envelopeStartBytes.Length, true, timeoutHelper.RemainingTime(),
                                onWriteBufferedMessage, this);
 
                            if (writeBufferedResult == AsyncCompletionResult.Completed)
                            {
                                completeSelf = HandleWriteBufferedMessage();
                            }
                        }
                        throwing = false;
                    }
                    finally
                    {
                        if (throwing)
                        {
                            Cleanup();
                        }
                    }
                }
 
                if (completeSelf)
                {
                    base.Complete(true);
                }
            }
 
            public static void End(IAsyncResult result)
            {
                AsyncResult.End<WriteMessageAsyncResult>(result);
            }
 
            void Cleanup()
            {
                if (bufferToFree != null)
                {
                    this.bufferManager.ReturnBuffer(bufferToFree);
                }
            }
 
            bool HandleWriteStartBytes()
            {
                connection.EndWrite();
                Stream connectionStream = new StreamingOutputConnectionStream(connection, settings);
                Stream writeTimeoutStream = new TimeoutStream(connectionStream, ref timeoutHelper);
                this.encoder.WriteMessage(message, writeTimeoutStream);
                return WriteEndBytes();
            }
 
            bool HandleWriteBufferedMessage()
            {
                this.connection.EndWrite();
                return WriteEndBytes();
            }
 
            bool WriteEndBytes()
            {
                if (this.endBytes == null)
                {
                    Cleanup();
                    return true;
                }
 
                AsyncCompletionResult result = connection.BeginWrite(endBytes, 0,
                    endBytes.Length, true, timeoutHelper.RemainingTime(), onWriteEndBytes, this);
 
                if (result == AsyncCompletionResult.Queued)
                {
                    return false;
                }
 
                return HandleWriteEndBytes();
            }
 
            bool HandleWriteEndBytes()
            {
                this.connection.EndWrite();
                Cleanup();
                return true;
            }
 
            static void OnWriteStartBytes(object asyncState)
            {
                OnWriteStartBytesCallbackHelper(asyncState);
            }
 
            static void OnWriteStartBytesCallbackHelper(object asyncState)
            {
                WriteMessageAsyncResult thisPtr = (WriteMessageAsyncResult)asyncState;
                Exception completionException = null;
                bool completeSelf = false;
                bool throwing = true;
                try
                {
                    completeSelf = thisPtr.HandleWriteStartBytes();
                    throwing = false;
                }
#pragma warning suppress 56500 // Microsoft, transferring exception to another thread
                catch (Exception e)
                {
                    if (Fx.IsFatal(e))
                    {
                        throw;
                    }
                    completeSelf = true;
                    completionException = e;
                }
                finally
                {
                    if (throwing)
                    {
                        thisPtr.Cleanup();
                    }
                }
 
                if (completeSelf)
                {
                    thisPtr.Complete(false, completionException);
                }
            }
 
            static void OnWriteBufferedMessage(object asyncState)
            {
                WriteMessageAsyncResult thisPtr = (WriteMessageAsyncResult)asyncState;
 
                Exception completionException = null;
                bool completeSelf = false;
                bool throwing = true;
                try
                {
                    completeSelf = thisPtr.HandleWriteBufferedMessage();
                    throwing = false;
                }
#pragma warning suppress 56500 // Microsoft, transferring exception to another thread
                catch (Exception e)
                {
                    if (Fx.IsFatal(e))
                    {
                        throw;
                    }
                    completeSelf = true;
                    completionException = e;
                }
                finally
                {
                    if (throwing)
                    {
                        thisPtr.Cleanup();
                    }
                }
                if (completeSelf)
                {
                    thisPtr.Complete(false, completionException);
                }
            }
 
            static void OnWriteEndBytes(object asyncState)
            {
                WriteMessageAsyncResult thisPtr = (WriteMessageAsyncResult)asyncState;
 
                Exception completionException = null;
                bool completeSelf = false;
                bool success = false;
                try
                {
                    completeSelf = thisPtr.HandleWriteEndBytes();
                    success = true;
                }
#pragma warning suppress 56500 // Microsoft, transferring exception to another thread
                catch (Exception e)
                {
                    if (Fx.IsFatal(e))
                    {
                        throw;
                    }
                    completeSelf = true;
                    completionException = e;
                }
                finally
                {
                    if (!success)
                    {
                        thisPtr.Cleanup();
                    }
                }
 
                if (completeSelf)
                {
                    thisPtr.Complete(false, completionException);
                }
            }
        }
    }
}