File: System\ServiceModel\Channels\PeerNodeImplementation.cs
Project: ndp\cdf\src\WCF\ServiceModel\System.ServiceModel.csproj (System.ServiceModel)
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation.  All rights reserved.
//------------------------------------------------------------
 
namespace System.ServiceModel.Channels
{
    using System.Collections;
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.IO;
    using System.Net;
    using System.Runtime;
    using System.Runtime.Serialization;
    using System.ServiceModel;
    using System.ServiceModel.Diagnostics;
    using System.ServiceModel.Dispatcher;
    using System.ServiceModel.Security;
    using System.Text;
    using System.Threading;
    using System.Xml;
 
    partial class PeerNodeImplementation : IPeerNodeMessageHandling
    {
        const int maxViaSize = 4096;
 
        public delegate void MessageAvailableCallback(Message message);
 
        // configuration
        int connectTimeout;
        IPAddress listenIPAddress;
        Uri listenUri;
        int port;
        long maxReceivedMessageSize;
        int minNeighbors;
        int idealNeighbors;
        int maxNeighbors;
        int maxReferrals;
        string meshId;
        PeerMessagePropagationFilter messagePropagationFilter;
        SynchronizationContext messagePropagationFilterContext;
        int maintainerInterval = PeerTransportConstants.MaintainerInterval;          // milliseconds before a maintainer kicks in
        PeerResolver resolver;
 
        PeerNodeConfig config;
 
        PeerSecurityManager securityManager;
        internal MessageEncodingBindingElement EncodingElement;
 
        // internal state
        ManualResetEvent connectCompletedEvent; // raised when maintainer has connected or given up
        MessageEncoder encoder; // used for encoding internal messages
 
        // Double-checked locking pattern requires volatile for read/write synchronization
        volatile bool isOpen;
        Exception openException; // exception to be thrown from Open
        Dictionary<object, MessageFilterRegistration> messageFilters;
        int refCount; // number of factories/channels that are using this instance
        SimpleStateManager stateManager; // manages open/close operations
        object thisLock = new Object();
        PeerNodeTraceRecord traceRecord;
        PeerNodeTraceRecord completeTraceRecord;    // contains address info as well
 
        // primary infrastructure components
        internal PeerConnector connector;                           // Purely for testing do not take a internal dependency on this
        PeerMaintainer maintainer;
        internal PeerFlooder flooder;                               // Purely for testing do not take an internal dependency on this
        PeerNeighborManager neighborManager;
        PeerIPHelper ipHelper;
        PeerService service;
 
        object resolverRegistrationId;
        bool registered;
 
        public event EventHandler Offline;
        public event EventHandler Online;
        Dictionary<Uri, RefCountedSecurityProtocol> uri2SecurityProtocol;
        Dictionary<Type, object> serviceHandlers;
        BufferManager bufferManager = null;
        internal static byte[] DefaultId = new byte[0];
        XmlDictionaryReaderQuotas readerQuotas;
        long maxBufferPoolSize;
        internal int MaxSendQueue = 128, MaxReceiveQueue = 128;
 
 
 
        public PeerNodeImplementation()
        {
            // intialize default configuration
            connectTimeout = PeerTransportConstants.ConnectTimeout;
            maxReceivedMessageSize = TransportDefaults.MaxReceivedMessageSize;
            minNeighbors = PeerTransportConstants.MinNeighbors;
            idealNeighbors = PeerTransportConstants.IdealNeighbors;
            maxNeighbors = PeerTransportConstants.MaxNeighbors;
            maxReferrals = PeerTransportConstants.MaxReferrals;
            port = PeerTransportDefaults.Port;
 
            // initialize internal state
            connectCompletedEvent = new ManualResetEvent(false);
            encoder = new BinaryMessageEncodingBindingElement().CreateMessageEncoderFactory().Encoder;
            messageFilters = new Dictionary<object, MessageFilterRegistration>();
            stateManager = new SimpleStateManager(this);
            uri2SecurityProtocol = new Dictionary<Uri, RefCountedSecurityProtocol>();
            readerQuotas = new XmlDictionaryReaderQuotas();
            this.maxBufferPoolSize = TransportDefaults.MaxBufferPoolSize;
        }
 
        // To facilitate testing
        public event EventHandler<PeerNeighborCloseEventArgs> NeighborClosed;
        public event EventHandler<PeerNeighborCloseEventArgs> NeighborClosing;
        public event EventHandler NeighborConnected;
        public event EventHandler NeighborOpened;
 
        public event EventHandler Aborted;
 
        public PeerNodeConfig Config
        {
            get
            {
                return this.config;
            }
            private set
            {
                Fx.Assert(value != null, "PeerNodeImplementation.Config can not be set to null");
                this.config = value;
            }
        }
 
        public bool IsOnline
        {
            get
            {
                lock (ThisLock)
                {
                    if (isOpen)
                        return neighborManager.IsOnline;
                    else
                        return false;
                }
            }
        }
 
        internal bool IsOpen
        {
            get { return isOpen; }
        }
 
        public IPAddress ListenIPAddress
        {
            get { return listenIPAddress; }
            set
            {
                // No validation necessary at this point. When the service is opened, it will throw if the IP address is invalid
                lock (ThisLock)
                {
                    ThrowIfOpen();
                    listenIPAddress = value;
                }
            }
        }
 
        public Uri ListenUri
        {
            get { return listenUri; }
            set
            {
                if (value == null)
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("value");
 
                if (value.Scheme != PeerStrings.Scheme)
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("value", SR.GetString(SR.InvalidUriScheme,
                        value.Scheme, PeerStrings.Scheme));
                }
 
                Fx.Assert(value.PathAndQuery == "/", "PeerUriCannotContainPath");
 
                lock (ThisLock)
                {
                    ThrowIfOpen();
                    listenUri = value;
                }
            }
        }
 
        public long MaxBufferPoolSize
        {
            get { return maxBufferPoolSize; }
            set
            {
                lock (ThisLock)
                {
                    ThrowIfOpen();
                    maxBufferPoolSize = value;
                }
            }
        }
 
        public long MaxReceivedMessageSize
        {
            get { return maxReceivedMessageSize; }
            set
            {
                if (!(value >= PeerTransportConstants.MinMessageSize))
                {
                    throw Fx.AssertAndThrow("invalid MaxReceivedMessageSize");
                }
 
                lock (ThisLock)
                {
                    ThrowIfOpen();
                    maxReceivedMessageSize = value;
                }
            }
        }
 
        public string MeshId
        {
            get
            {
                lock (ThisLock)
                {
                    ThrowIfNotOpen();
                    return meshId;
                }
            }
        }
 
        public PeerMessagePropagationFilter MessagePropagationFilter
        {
            get { return messagePropagationFilter; }
            set
            {
                lock (ThisLock)
                {
                    // null is ok and causes optimised flooding codepath
                    messagePropagationFilter = value;
                    messagePropagationFilterContext = ThreadBehavior.GetCurrentSynchronizationContext();
                }
            }
        }
 
        // Made internal to facilitate testing
        public PeerNeighborManager NeighborManager
        {
            get { return neighborManager; }
        }
 
        public ulong NodeId
        {
            get
            {
                ThrowIfNotOpen();
                return config.NodeId;
            }
        }
 
        public int Port
        {
            get { return port; }
            set
            {
                lock (ThisLock)
                {
                    ThrowIfOpen();
                    port = value;
                }
            }
        }
 
        public int ListenerPort
        {
            get
            {
                ThrowIfNotOpen();
                return config.ListenerPort;
            }
        }
 
        public XmlDictionaryReaderQuotas ReaderQuotas
        {
            get
            {
                return this.readerQuotas;
            }
        }
 
        public PeerResolver Resolver
        {
            get { return resolver; }
            set
            {
                Fx.Assert(value != null, "null Resolver");
 
                lock (ThisLock)
                {
                    ThrowIfOpen();
                    resolver = value;
                }
            }
        }
 
        public PeerSecurityManager SecurityManager
        {
            get { return this.securityManager; }
            set { this.securityManager = value; }
        }
 
        internal PeerService Service
        {
            get
            {
                return this.service;
            }
            set
            {
                lock (ThisLock)
                {
                    ThrowIfNotOpen();
                    this.service = value;
                }
            }
        }
 
        object ThisLock
        {
            get { return thisLock; }
        }
 
        public void Abort()
        {
            stateManager.Abort();
        }
 
        public IAsyncResult BeginClose(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return stateManager.BeginClose(timeout, callback, state);
        }
 
        public IAsyncResult BeginOpen(TimeSpan timeout, AsyncCallback callback, object state, bool waitForOnline)
        {
            return stateManager.BeginOpen(timeout, callback, state, waitForOnline);
        }
 
        public Guid ProcessOutgoingMessage(Message message, Uri via)
        {
            Guid result = Guid.NewGuid();
            System.Xml.UniqueId messageId = new System.Xml.UniqueId(result);
            if (-1 != message.Headers.FindHeader(PeerStrings.MessageId, PeerStrings.Namespace))
                PeerExceptionHelper.ThrowInvalidOperation_ConflictingHeader(PeerStrings.MessageId);
            if (-1 != message.Headers.FindHeader(PeerOperationNames.PeerTo, PeerStrings.Namespace))
                PeerExceptionHelper.ThrowInvalidOperation_ConflictingHeader(PeerOperationNames.PeerTo);
            if (-1 != message.Headers.FindHeader(PeerOperationNames.PeerVia, PeerStrings.Namespace))
                PeerExceptionHelper.ThrowInvalidOperation_ConflictingHeader(PeerOperationNames.PeerVia);
            if (-1 != message.Headers.FindHeader(PeerOperationNames.Flood, PeerStrings.Namespace, PeerOperationNames.Demuxer))
                PeerExceptionHelper.ThrowInvalidOperation_ConflictingHeader(PeerOperationNames.Flood);
 
            message.Headers.Add(PeerDictionaryHeader.CreateMessageIdHeader(messageId));
            message.Properties.Via = via;
            message.Headers.Add(MessageHeader.CreateHeader(PeerOperationNames.PeerTo, PeerStrings.Namespace, message.Headers.To));
            message.Headers.Add(PeerDictionaryHeader.CreateViaHeader(via));
            message.Headers.Add(PeerDictionaryHeader.CreateFloodRole());
            return result;
        }
 
        public void SecureOutgoingMessage(ref Message message, Uri via, TimeSpan timeout, SecurityProtocol securityProtocol)
        {
            if (securityProtocol != null)
            {
                securityProtocol.SecureOutgoingMessage(ref message, timeout);
            }
        }
 
        public IAsyncResult BeginSend(object registrant, Message message, Uri via,
            ITransportFactorySettings settings, TimeSpan timeout, AsyncCallback callback, object state, SecurityProtocol securityProtocol)
        {
            PeerFlooder localFlooder;
            int factoryMaxReceivedMessageSize;
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
 
            MessageBuffer messageBuffer = null;
            Message securedMessage = null;
            ulong hopcount = PeerTransportConstants.MaxHopCount;
            PeerMessagePropagation propagateFlags = PeerMessagePropagation.LocalAndRemote;
            int messageSize = (int)-1;
            byte[] id;
            SendAsyncResult result = new SendAsyncResult(callback, state);
            AsyncCallback onFloodComplete = Fx.ThunkCallback(new AsyncCallback(result.OnFloodComplete));
 
            try
            {
                lock (ThisLock)
                {
                    ThrowIfNotOpen();
                    localFlooder = flooder;
                }
 
                // we know this will fit in an int because of our MaxReceivedMessageSize restrictions
                factoryMaxReceivedMessageSize = (int)Math.Min(maxReceivedMessageSize, settings.MaxReceivedMessageSize);
                Guid guid = ProcessOutgoingMessage(message, via);
                SecureOutgoingMessage(ref message, via, timeout, securityProtocol);
                if ((message is SecurityAppliedMessage))
                {
                    ArraySegment<byte> buffer = encoder.WriteMessage(message, int.MaxValue, bufferManager);
                    securedMessage = encoder.ReadMessage(buffer, bufferManager);
                    id = (message as SecurityAppliedMessage).PrimarySignatureValue;
                    messageSize = (int)buffer.Count;
                }
                else
                {
                    securedMessage = message;
                    id = guid.ToByteArray();
                }
 
                messageBuffer = securedMessage.CreateBufferedCopy(factoryMaxReceivedMessageSize);
                string contentType = settings.MessageEncoderFactory.Encoder.ContentType;
                if (this.messagePropagationFilter != null)
                {
                    using (Message filterMessage = messageBuffer.CreateMessage())
                    {
                        propagateFlags = ((IPeerNodeMessageHandling)this).DetermineMessagePropagation(filterMessage, PeerMessageOrigination.Local);
                    }
                }
 
                if ((propagateFlags & PeerMessagePropagation.Remote) != PeerMessagePropagation.None)
                {
                    if (hopcount == 0)
                        propagateFlags &= ~PeerMessagePropagation.Remote;
                }
 
                // flood it out
                IAsyncResult ar = null;
                if ((propagateFlags & PeerMessagePropagation.Remote) != 0)
                {
                    ar = localFlooder.BeginFloodEncodedMessage(id, messageBuffer, timeoutHelper.RemainingTime(), onFloodComplete, null);
                    if (DiagnosticUtility.ShouldTraceVerbose)
                    {
                        TraceUtility.TraceEvent(TraceEventType.Verbose, TraceCode.PeerChannelMessageSent, SR.GetString(SR.TraceCodePeerChannelMessageSent), this, message);
                    }
                }
                else
                {
                    ar = new CompletedAsyncResult(onFloodComplete, null);
                }
                if (ar == null)
                {
                    Fx.Assert("SendAsyncResult must have an Async Result for onFloodComplete");
                }
 
                // queue up the pre-encoded message for local channels
                if ((propagateFlags & PeerMessagePropagation.Local) != 0)
                {
                    using (Message msg = messageBuffer.CreateMessage())
                    {
                        int i = msg.Headers.FindHeader(SecurityJan2004Strings.Security, SecurityJan2004Strings.Namespace);
                        if (i >= 0)
                        {
                            msg.Headers.AddUnderstood(i);
                        }
                        using (MessageBuffer clientBuffer = msg.CreateBufferedCopy(factoryMaxReceivedMessageSize))
                        {
                            DeliverMessageToClientChannels(registrant, clientBuffer, via, message.Headers.To, contentType, messageSize, -1, null);
                        }
                    }
                }
                result.OnLocalDispatchComplete(result);
            }
            finally
            {
                message.Close();
                if (securedMessage != null)
                    securedMessage.Close();
                if (messageBuffer != null)
                    messageBuffer.Close();
            }
 
            return result;
        }
 
        public void Close(TimeSpan timeout)
        {
            stateManager.Close(timeout);
        }
 
        void CloseCore(TimeSpan timeout, bool graceful)
        {
            PeerService lclService;
            PeerMaintainer lclMaintainer;
            PeerNeighborManager lclNeighborManager;
            PeerConnector lclConnector;
            PeerIPHelper lclIPHelper;
            PeerNodeConfig lclConfig;
            PeerFlooder lclFlooder;
            Exception exception = null;
 
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            if (DiagnosticUtility.ShouldTraceInformation)
            {
                TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.PeerNodeClosing, SR.GetString(SR.TraceCodePeerNodeClosing), this.traceRecord, this, null);
            }
 
            lock (ThisLock)
            {
                isOpen = false;
                lclMaintainer = maintainer;
                lclNeighborManager = neighborManager;
                lclConnector = connector;
                lclIPHelper = ipHelper;
                lclService = service;
                lclConfig = config;
                lclFlooder = flooder;
            }
 
            // only unregister if we are doing a g----ful shutdown
            try
            {
                if (graceful)
                {
                    UnregisterAddress(timeout);
                }
                else
                {
                    if (lclConfig != null)
                    {
                        ActionItem.Schedule(new Action<object>(UnregisterAddress), lclConfig.UnregisterTimeout);
                    }
                }
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e)) throw;
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                if (exception == null) exception = e;
            }
 
            try
            {
                if (lclConnector != null)
                    lclConnector.Closing();
 
                if (lclService != null)
                {
                    try
                    {
                        lclService.Abort();
                    }
                    catch (Exception e)
                    {
                        if (Fx.IsFatal(e)) throw;
                        DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                        if (exception == null) exception = e;
                    }
                }
 
                if (lclMaintainer != null)
                {
                    try
                    {
                        lclMaintainer.Close();
                    }
                    catch (Exception e)
                    {
                        if (Fx.IsFatal(e)) throw;
                        DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                        if (exception == null) exception = e;
                    }
                }
 
                if (lclIPHelper != null)
                {
                    try
                    {
                        lclIPHelper.Close();
                        lclIPHelper.AddressChanged -= new EventHandler(stateManager.OnIPAddressesChanged);
                    }
                    catch (Exception e)
                    {
                        if (Fx.IsFatal(e)) throw;
                        DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                        if (exception == null) exception = e;
                    }
                }
                if (lclNeighborManager != null)
                {
                    lclNeighborManager.NeighborConnected -= new EventHandler(OnNeighborConnected);
                    lclNeighborManager.NeighborOpened -= new EventHandler(this.securityManager.OnNeighborOpened);
                    this.securityManager.OnNeighborAuthenticated -= new EventHandler(this.OnNeighborAuthenticated);
                    lclNeighborManager.Online -= new EventHandler(FireOnline);
                    lclNeighborManager.Offline -= new EventHandler(FireOffline);
                    try
                    {
                        lclNeighborManager.Shutdown(graceful, timeoutHelper.RemainingTime());
                    }
                    catch (Exception e)
                    {
                        if (Fx.IsFatal(e)) throw;
                        DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                        if (exception == null) exception = e;
                    }
 
                    // unregister for neighbor close events once shutdown has completed
                    lclNeighborManager.NeighborClosed -= new EventHandler<PeerNeighborCloseEventArgs>(OnNeighborClosed);
                    lclNeighborManager.NeighborClosing -= new EventHandler<PeerNeighborCloseEventArgs>(OnNeighborClosing);
                    lclNeighborManager.Close();
                }
 
                if (lclConnector != null)
                {
                    try
                    {
                        lclConnector.Close();
                    }
                    catch (Exception e)
                    {
                        if (Fx.IsFatal(e)) throw;
                        DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                        if (exception == null) exception = e;
                    }
                }
 
                if (lclFlooder != null)
                {
                    try
                    {
                        lclFlooder.Close();
                    }
                    catch (Exception e)
                    {
                        if (Fx.IsFatal(e)) throw;
                        DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                        if (exception == null) exception = e;
                    }
                }
 
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e)) throw;
                if (exception == null) exception = e;
            }
 
            // reset object for next call to open
            EventHandler abortedHandler = null;
            lock (ThisLock)
            {
                // clear out old components (so they can be garbage collected)
                neighborManager = null;
                connector = null;
                maintainer = null;
                flooder = null;
                ipHelper = null;
                service = null;
 
                // reset generated config
                config = null;
                meshId = null;
                abortedHandler = Aborted;
            }
 
            // Notify anyone who is interested that abort has occured 
            if (!graceful && abortedHandler != null)
            {
                try
                {
                    abortedHandler(this, EventArgs.Empty);
                }
                catch (Exception e)
                {
                    if (Fx.IsFatal(e)) throw;
                    DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    if (exception == null) exception = e;
                }
            }
 
            if (DiagnosticUtility.ShouldTraceInformation)
            {
                TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.PeerNodeClosed, SR.GetString(SR.TraceCodePeerNodeClosed), this.traceRecord, this, null);
            }
            if (exception != null && graceful == true)                          // Swallows all non fatal exceptions during Abort
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(exception);
            }
        }
 
        // Performs case-insensitive comparison of two vias
        bool CompareVia(Uri via1, Uri via2)
        {
            return (Uri.Compare(via1, via2,
                (UriComponents.Scheme | UriComponents.UserInfo | UriComponents.Host | UriComponents.Port | UriComponents.Path),
                UriFormat.SafeUnescaped, StringComparison.OrdinalIgnoreCase) == 0);
        }
 
        public static void EndClose(IAsyncResult result)
        {
            if (result == null)
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("result");
 
            SimpleStateManager.EndClose(result);
        }
 
        public static void EndOpen(IAsyncResult result)
        {
            if (result == null)
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("result");
 
            SimpleStateManager.EndOpen(result);
        }
 
        public static void EndSend(IAsyncResult result)
        {
            if (result == null)
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("result");
 
            SendAsyncResult.End(result);
        }
 
        // Necessary to allow access of the EventHandlers which can only be done from inside the class
        void FireOffline(object sender, EventArgs e)
        {
            if (!isOpen)
            {
                return;
            }
 
            EventHandler handler = Offline;
            if (handler != null)
            {
                handler(this, EventArgs.Empty);
            }
        }
 
        // Necessary to allow access of the EventHandlers which can only be done from inside the class
        void FireOnline(object sender, EventArgs e)
        {
            if (!isOpen)
            {
                return;
            }
 
            EventHandler handler = Online;
            if (handler != null)
            {
                handler(this, EventArgs.Empty);
            }
        }
 
        // static Uri -> PeerNode mapping
        static internal Dictionary<Uri, PeerNodeImplementation> peerNodes = new Dictionary<Uri, PeerNodeImplementation>();
 
        internal static PeerNodeImplementation Get(Uri listenUri)
        {
            PeerNodeImplementation node = null;
            if (!TryGet(listenUri, out node))
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
                    new InvalidOperationException(SR.GetString(SR.NoTransportManagerForUri, listenUri)));
            }
            return node;
        }
 
        internal protected static bool TryGet(Uri listenUri, out PeerNodeImplementation result)
        {
            if (listenUri == null)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("listenUri");
            }
 
            if (listenUri.Scheme != PeerStrings.Scheme)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("listenUri", SR.GetString(SR.InvalidUriScheme,
                    listenUri.Scheme, PeerStrings.Scheme));
            }
            result = null;
            bool success = false;
            // build base uri
            Uri baseUri = new UriBuilder(PeerStrings.Scheme, listenUri.Host).Uri;
 
            lock (peerNodes)
            {
                if (peerNodes.ContainsKey(baseUri))
                {
                    result = peerNodes[baseUri];
                    success = true;
                }
            }
            return success;
        }
 
        public static bool TryGet(string meshId, out PeerNodeImplementation result)
        {
            UriBuilder uriBuilder = new UriBuilder();
            uriBuilder.Host = meshId;
            uriBuilder.Scheme = PeerStrings.Scheme;
            bool success = PeerNodeImplementation.TryGet(uriBuilder.Uri, out result);
            return success;
        }
 
        // internal method to return an existing PeerNode or create a new one with the given settings
        public static PeerNodeImplementation Get(Uri listenUri, Registration registration)
        {
            if (listenUri == null)
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("listenUri");
 
            if (listenUri.Scheme != PeerStrings.Scheme)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("listenUri", SR.GetString(SR.InvalidUriScheme,
                    listenUri.Scheme, PeerStrings.Scheme));
            }
 
            // build base uri
            Uri baseUri = new UriBuilder(PeerStrings.Scheme, listenUri.Host).Uri;
 
            lock (peerNodes)
            {
                PeerNodeImplementation peerNodeImpl = null;
                PeerNodeImplementation peerNode = null;
                if (peerNodes.TryGetValue(baseUri, out peerNode))
                {
                    peerNodeImpl = (PeerNodeImplementation)peerNode;
 
                    // ensure that the PeerNode is compatible
                    registration.CheckIfCompatible(peerNodeImpl, listenUri);
                    peerNodeImpl.refCount++;
                    return peerNodeImpl;
                }
 
                // create a new PeerNode, and add it to the dictionary
                peerNodeImpl = registration.CreatePeerNode();
                peerNodes[baseUri] = peerNodeImpl;
                peerNodeImpl.refCount = 1;
                return peerNodeImpl;
            }
        }
 
        // SimpleStateManager callback - Called on final release of PeerNode.
        void InternalClose(TimeSpan timeout, bool graceful)
        {
            CloseCore(timeout, graceful);
            lock (ThisLock)
            {
                messageFilters.Clear();
            }
        }
 
        protected void OnAbort()
        {
            InternalClose(TimeSpan.FromTicks(0), false);
        }
 
        protected void OnClose(TimeSpan timeout)
        {
            InternalClose(timeout, true);
        }
 
        // called when the maintainer has completed the connection attempt (successful or not)
        void OnConnectionAttemptCompleted(Exception e)
        {
            // store the exception if one occured when trying to connect, so that it can be rethrown from Open
 
            Fx.Assert(openException == null, "OnConnectionAttemptCompleted twice");
            openException = e;
 
            if (openException == null && DiagnosticUtility.ShouldTraceInformation)
            {
                TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.PeerNodeOpened, SR.GetString(SR.TraceCodePeerNodeOpened), this.completeTraceRecord, this, null);
            }
            else if (openException != null && DiagnosticUtility.ShouldTraceError)
            {
                TraceUtility.TraceEvent(TraceEventType.Error, TraceCode.PeerNodeOpenFailed, SR.GetString(SR.TraceCodePeerNodeOpenFailed), this.completeTraceRecord, this, e);
            }
 
            connectCompletedEvent.Set();
        }
 
        bool IPeerNodeMessageHandling.ValidateIncomingMessage(ref Message message, Uri via)
        {
            SecurityProtocol protocol = null;
 
            if (via == null)
            {
                Fx.Assert("FloodMessage doesn't contain Via header!");
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.PeerMessageMustHaveVia, message.Headers.Action)));
            }
            if (TryGetSecurityProtocol(via, out protocol))
            {
                protocol.VerifyIncomingMessage(ref message, ServiceDefaults.SendTimeout, null);
                return true;
            }
            return false;
        }
 
        internal bool TryGetSecurityProtocol(Uri via, out SecurityProtocol protocol)
        {
            lock (ThisLock)
            {
                RefCountedSecurityProtocol wrapper = null;
                bool result = false;
                protocol = null;
                if (uri2SecurityProtocol.TryGetValue(via, out wrapper))
                {
                    protocol = wrapper.Protocol;
                    result = true;
                }
                return result;
            }
        }
 
        void IPeerNodeMessageHandling.HandleIncomingMessage(MessageBuffer messageBuffer, PeerMessagePropagation propagateFlags,
            int index, MessageHeader hopHeader, Uri via, Uri to)
        {
            if (DiagnosticUtility.ShouldTraceVerbose)
            {
                TraceUtility.TraceEvent(TraceEventType.Verbose, TraceCode.PeerFloodedMessageReceived, SR.GetString(SR.TraceCodePeerFloodedMessageReceived), this.traceRecord, this, null);
            }
 
            if (via == null)
            {
                Fx.Assert("No VIA in the forwarded message!");
                using (Message message = messageBuffer.CreateMessage())
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.PeerMessageMustHaveVia, message.Headers.Action)));
                }
            }
            if ((propagateFlags & PeerMessagePropagation.Local) != 0)
            {
                DeliverMessageToClientChannels(null, messageBuffer, via, to, messageBuffer.MessageContentType, (int)maxReceivedMessageSize, index, hopHeader);
                messageBuffer = null;
            }
            else
            {
                if (DiagnosticUtility.ShouldTraceVerbose)
                {
                    using (Message traceMessage = messageBuffer.CreateMessage())
                    {
                        TraceUtility.TraceEvent(TraceEventType.Verbose, TraceCode.PeerFloodedMessageNotPropagated, SR.GetString(SR.TraceCodePeerFloodedMessageNotPropagated), this.traceRecord, this, null, traceMessage);
                    }
                }
            }
        }
 
        PeerMessagePropagation IPeerNodeMessageHandling.DetermineMessagePropagation(Message message, PeerMessageOrigination origination)
        {
            PeerMessagePropagation propagateFlags = PeerMessagePropagation.LocalAndRemote;
            PeerMessagePropagationFilter filter = MessagePropagationFilter;
            if (filter != null)
            {
                try
                {
                    SynchronizationContext context = messagePropagationFilterContext;
                    if (context != null)
                    {
                        context.Send(delegate(object state) { propagateFlags = filter.ShouldMessagePropagate(message, origination); }, null);
                    }
                    else
                    {
                        propagateFlags = filter.ShouldMessagePropagate(message, origination);
                    }
                }
                catch (Exception e)
                {
                    if (Fx.IsFatal(e)) throw;
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperCallback(SR.GetString(SR.MessagePropagationException), e);
                }
            }
 
            // Don't flood if the Node is closed
            if (!isOpen)
            {
                propagateFlags = PeerMessagePropagation.None;
            }
 
            return propagateFlags;
        }
 
 
        // Queued callback to actually process the address change
        // The design is such that any address change notifications are queued just like Open/Close operations.
        // So, we need not worry about address changes racing with other address changes or Open/Close operations.
        // Abort can happen at any time. However, Abort skips unregistering addresses, so this method doesn't have 
        // to worry about undoing its work if Abort happens.
        void OnIPAddressChange()
        {
            string lclMeshId = null;
            PeerNodeAddress nodeAddress = null;
            object lclResolverRegistrationId = null;
            bool lclRegistered = false;
            PeerIPHelper lclIPHelper = ipHelper;
            PeerNodeConfig lclconfig = config;
            bool processChange = false;
            TimeoutHelper timeoutHelper = new TimeoutHelper(ServiceDefaults.SendTimeout);
 
            // Determine if IP addresses have really changed before notifying the resolver
            // since it is possible that another change notification ahead of this one in the queue 
            // may have already completed notifying the resolver of the most current change.
            if (lclIPHelper != null && config != null)
            {
                nodeAddress = lclconfig.GetListenAddress(false);
                processChange = lclIPHelper.AddressesChanged(nodeAddress.IPAddresses);
                if (processChange)
                {
                    // Build the nodeAddress with the updated IP addresses
                    nodeAddress = new PeerNodeAddress(
                        nodeAddress.EndpointAddress, lclIPHelper.GetLocalAddresses());
                }
            }
 
            lock (ThisLock)
            {
                // Skip processing if the node isn't open anymore or if addresses haven't changed
                if (processChange && isOpen)
                {
                    lclMeshId = meshId;
                    lclResolverRegistrationId = resolverRegistrationId;
                    lclRegistered = registered;
                    config.SetListenAddress(nodeAddress);
                    completeTraceRecord = new PeerNodeTraceRecord(config.NodeId, meshId, nodeAddress);
                }
                else
                {
                    return;
                }
            }
            //#57954 - log and ---- non-critical exceptions during network change event notifications
            try
            {
                // Do we have any addresses? If so, update or re-register. Otherwise, unregister.
                if (nodeAddress.IPAddresses.Count > 0)
                {
                    if (lclRegistered)
                    {
                        resolver.Update(lclResolverRegistrationId, nodeAddress, timeoutHelper.RemainingTime());
                    }
                    else
                    {
                        RegisterAddress(lclMeshId, nodeAddress, timeoutHelper.RemainingTime());
                    }
                }
                else
                {
                    UnregisterAddress(timeoutHelper.RemainingTime());
                }
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e)) throw;
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Warning);
            }
            PingConnections();
 
            if (DiagnosticUtility.ShouldTraceInformation)
            {
                TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.PeerNodeAddressChanged, SR.GetString(SR.TraceCodePeerNodeAddressChanged), this.completeTraceRecord, this, null);
            }
        }
 
        // Register with the resolver
        void RegisterAddress(string lclMeshId, PeerNodeAddress nodeAddress, TimeSpan timeout)
        {
            // Register only if we have any addresses
            if (nodeAddress.IPAddresses.Count > 0)
            {
                object lclResolverRegistrationId = null;
                try
                {
                    lclResolverRegistrationId = resolver.Register(lclMeshId, nodeAddress, timeout);
                }
                catch (Exception e)
                {
                    if (Fx.IsFatal(e)) throw;
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new CommunicationException(SR.GetString(SR.ResolverException), e));
                }
                lock (ThisLock)
                {
                    if (!(!registered))
                    {
                        throw Fx.AssertAndThrow("registered expected to be false");
                    }
                    registered = true;
                    resolverRegistrationId = lclResolverRegistrationId;
                }
            }
        }
 
        // Unregister that should only be called from non-user threads.
        //since this is invoked on background threads, we log and ---- all non-critical exceptions
        //#57972
        void UnregisterAddress(object timeout)
        {
            try
            {
                UnregisterAddress((TimeSpan)timeout);
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e)) throw;
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Warning);
            }
        }
 
        void UnregisterAddress(TimeSpan timeout)
        {
            bool needToUnregister = false;
            object lclResolverRegistrationId = null;
            lock (ThisLock)
            {
                if (registered)
                {
                    needToUnregister = true;
                    lclResolverRegistrationId = resolverRegistrationId;
                    registered = false;                 // this ensures that the current thread will do unregistration
                }
                resolverRegistrationId = null;
            }
            if (needToUnregister)
            {
                try
                {
                    resolver.Unregister(lclResolverRegistrationId, timeout);
                }
                catch (Exception e)
                {
                    if (Fx.IsFatal(e)) throw;
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new CommunicationException(SR.GetString(SR.ResolverException), e));
                }
            }
        }
 
        void OnNeighborClosed(object sender, PeerNeighborCloseEventArgs e)
        {
            IPeerNeighbor neighbor = (IPeerNeighbor)sender;
            PeerConnector localConnector;
            PeerMaintainer localMaintainer;
            PeerFlooder localFlooder;
 
            localConnector = connector;
            localMaintainer = maintainer;
            localFlooder = flooder;
 
            UtilityExtension.OnNeighborClosed(neighbor);
            PeerChannelAuthenticatorExtension.OnNeighborClosed(neighbor);
 
            if (localConnector != null)
                localConnector.OnNeighborClosed(neighbor);
            if (localMaintainer != null)
                localMaintainer.OnNeighborClosed(neighbor);
            if (localFlooder != null)
                localFlooder.OnNeighborClosed(neighbor);
 
            // Finally notify any Peernode client
            EventHandler<PeerNeighborCloseEventArgs> handler = NeighborClosed;
            if (handler != null)
            {
                handler(this, e);
            }
        }
 
        void OnNeighborClosing(object sender, PeerNeighborCloseEventArgs e)
        {
            IPeerNeighbor neighbor = (IPeerNeighbor)sender;
            PeerConnector localConnector;
 
            localConnector = connector;
 
            if (localConnector != null)
                localConnector.OnNeighborClosing(neighbor, e.Reason);
 
            // Finally notify any Peernode client
            EventHandler<PeerNeighborCloseEventArgs> handler = NeighborClosing;
            if (handler != null)
            {
                handler(this, e);
            }
        }
 
        void OnNeighborConnected(object sender, EventArgs e)
        {
            IPeerNeighbor neighbor = (IPeerNeighbor)sender;
            PeerMaintainer localMaintainer = maintainer;
            PeerFlooder localFlooder = flooder;
 
            if (localFlooder != null)
                localFlooder.OnNeighborConnected(neighbor);
 
            if (localMaintainer != null)
                localMaintainer.OnNeighborConnected(neighbor);
 
            UtilityExtension.OnNeighborConnected(neighbor);
 
            // Finally notify any Peernode client
            EventHandler handler = NeighborConnected;
            if (handler != null)
            {
                handler(this, EventArgs.Empty);
            }
        }
 
        // raised by the neighbor manager when any connection has reached the opened state
        void OnNeighborAuthenticated(object sender, EventArgs e)
        {
            IPeerNeighbor n = (IPeerNeighbor)sender;
 
            //hand the authenticated neighbor over to connector.
            //If neighbor is aborted before 
            PeerConnector localConnector = connector;
            if (localConnector != null)
                connector.OnNeighborAuthenticated(n);
 
            // Finally notify any Peernode client
            EventHandler handler = NeighborOpened;
            if (handler != null)
            {
                handler(this, EventArgs.Empty);
            }
        }
 
        // Open blocks the thread until either Online happens or Open times out.
        void OnOpen(TimeSpan timeout, bool waitForOnline)
        {
            bool aborted = false;
            EventHandler connectedHandler = delegate(object source, EventArgs args) { connectCompletedEvent.Set(); };
            EventHandler abortHandler = delegate(object source, EventArgs args) { aborted = true; connectCompletedEvent.Set(); };
            openException = null;                                               // clear out the open exception from the last Open attempt
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
 
            try
            {
                NeighborConnected += connectedHandler;
                Aborted += abortHandler;
                OpenCore(timeout);
 
                if (waitForOnline)
                {
                    if (!TimeoutHelper.WaitOne(connectCompletedEvent, timeoutHelper.RemainingTime()))
                    {
                        throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new TimeoutException());
                    }
                }
 
                if (aborted)
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new CommunicationObjectAbortedException(SR.GetString(SR.PeerNodeAborted)));
                }
 
                // retrieve listen addresses and register with the resolver
                if (isOpen)
                {
                    if (openException != null)
                    {
                        throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(openException);
                    }
                    else
                    {
                        string lclMeshId = null;
                        PeerNodeConfig lclConfig = null;
                        lock (ThisLock)
                        {
                            lclMeshId = meshId;
                            lclConfig = config;
                        }
 
                        // The design is such that any address change notifications are queued behind Open operation
                        // So, we need not worry about address changes racing with the initial registration.
                        RegisterAddress(lclMeshId, lclConfig.GetListenAddress(false), timeoutHelper.RemainingTime());
                    }
                }
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e)) throw;
                CloseCore(TimeSpan.FromTicks(0), false);
                throw;
            }
            finally
            {
                NeighborConnected -= connectedHandler;
                Aborted -= abortHandler;
            }
        }
 
        internal void Open(TimeSpan timeout, bool waitForOnline)
        {
            stateManager.Open(timeout, waitForOnline);
        }
 
        // the core functionality of open (all but waiting for a connection)
        void OpenCore(TimeSpan timeout)
        {
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            PeerMaintainer lclMaintainer;
            PeerNodeConfig lclConfig;
            string lclMeshId;
 
            lock (ThisLock)
            {
                if (ListenUri == null)
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.ListenUriNotSet, this.GetType())));
                }
 
                // extract mesh id from listen uri
                meshId = ListenUri.Host;
 
                // generate the node id
                byte[] bytes = new byte[sizeof(ulong)];
                ulong nodeId = 0;
                do
                {
                    System.ServiceModel.Security.CryptoHelper.FillRandomBytes(bytes);
                    for (int i = 0; i < sizeof(ulong); i++)
                        nodeId |= ((ulong)bytes[i]) << i * 8;
                }
                while (nodeId == PeerTransportConstants.InvalidNodeId);
 
                // now that the node id has been generated, create the trace record that describes this
                traceRecord = new PeerNodeTraceRecord(nodeId, meshId);
                if (DiagnosticUtility.ShouldTraceInformation)
                {
                    TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.PeerNodeOpening, SR.GetString(SR.TraceCodePeerNodeOpening), this.traceRecord, this, null);
                }
 
                // create the node configuration
                config = new PeerNodeConfig(meshId,
                                                nodeId,
                                                resolver,
                                                messagePropagationFilter,
                                                encoder,
                                                ListenUri, listenIPAddress, port,
                                                maxReceivedMessageSize, minNeighbors, idealNeighbors, maxNeighbors, maxReferrals,
                                                connectTimeout, maintainerInterval,
                                                securityManager,
                                                this.readerQuotas,
                                                this.maxBufferPoolSize,
                                                this.MaxSendQueue,
                                                this.MaxReceiveQueue);
 
                // create components
                if (listenIPAddress != null)
                    ipHelper = new PeerIPHelper(listenIPAddress);
                else
                    ipHelper = new PeerIPHelper();
                bufferManager = BufferManager.CreateBufferManager(64 * config.MaxReceivedMessageSize, (int)config.MaxReceivedMessageSize);
                neighborManager = new PeerNeighborManager(ipHelper,
                                                            config,
                                                            this);
                flooder = PeerFlooder.CreateFlooder(config, neighborManager, this);
                maintainer = new PeerMaintainer(config, neighborManager, flooder);
                connector = new PeerConnector(config, neighborManager, maintainer);
 
                Dictionary<Type, object> services = serviceHandlers;
                if (services == null)
                {
                    services = new Dictionary<Type, object>();
                    services.Add(typeof(IPeerConnectorContract), connector);
                    services.Add(typeof(IPeerFlooderContract<Message, UtilityInfo>), flooder);
                }
                service = new PeerService(this.config,
                                        neighborManager.ProcessIncomingChannel,
                                        neighborManager.GetNeighborFromProxy,
                                        services,
                                        this);
                this.securityManager.MeshId = this.meshId;
                service.Open(timeoutHelper.RemainingTime());
 
                // register for events
                neighborManager.NeighborClosed += new EventHandler<PeerNeighborCloseEventArgs>(OnNeighborClosed);
                neighborManager.NeighborClosing += new EventHandler<PeerNeighborCloseEventArgs>(OnNeighborClosing);
                neighborManager.NeighborConnected += new EventHandler(OnNeighborConnected);
                neighborManager.NeighborOpened += new EventHandler(this.SecurityManager.OnNeighborOpened);
                this.securityManager.OnNeighborAuthenticated += new EventHandler(this.OnNeighborAuthenticated);
                neighborManager.Online += new EventHandler(FireOnline);
                neighborManager.Offline += new EventHandler(FireOffline);
                ipHelper.AddressChanged += new EventHandler(stateManager.OnIPAddressesChanged);
 
                // open components
                ipHelper.Open();
 
                // Set the listen address before opening any more components
                PeerNodeAddress nodeAddress = new PeerNodeAddress(service.GetListenAddress(), ipHelper.GetLocalAddresses());
                config.SetListenAddress(nodeAddress);
 
                neighborManager.Open(service.Binding, service);
                connector.Open();
                maintainer.Open();
                flooder.Open();
 
                isOpen = true;
                completeTraceRecord = new PeerNodeTraceRecord(nodeId, meshId, nodeAddress);
 
                // Set these locals inside the lock (Abort may occur whilst Opening)
                lclMaintainer = maintainer;
 
                lclMeshId = meshId;
                lclConfig = config;
                openException = null;
 
            }
 
            // retrieve listen addresses and register with the resolver
            if (isOpen)
            {
                // attempt to connect to the mesh
                lclMaintainer.ScheduleConnect(new PeerMaintainer.ConnectCallback(OnConnectionAttemptCompleted));
            }
        }
 
        void DeliverMessageToClientChannels(
                                object registrant,
                                MessageBuffer messageBuffer,
                                Uri via,
                                Uri peerTo,
                                string contentType,
                                int messageSize,
                                int index,
                                MessageHeader hopHeader)
        {
            Message message = null;
            try
            {
                // create a list of callbacks so they can each be called outside the lock
                ArrayList callbacks = new ArrayList();
                Uri to = peerTo;
                Fx.Assert(peerTo != null, "Invalid To header value!");
                if (isOpen)
                {
                    lock (ThisLock)
                    {
                        if (isOpen)
                        {
                            foreach (MessageFilterRegistration mfr in messageFilters.Values)
                            {
                                // first, the via's must match
                                bool match = CompareVia(via, mfr.via);
                                if (messageSize < 0)
                                {
                                    //messageSize <0 indicates that this message is coming from BeginSend
                                    //and the size is not computed yet.
                                    if (message == null)
                                    {
                                        message = messageBuffer.CreateMessage();
                                        Fx.Assert(message.Headers.To == to, "To Header is inconsistent in Send() case!");
                                        Fx.Assert(message.Properties.Via == via, "Via property is inconsistent in Send() case!");
                                    }
                                    //incoming message need not be verified MaxReceivedSize
                                    //only do this for local channels
                                    if (registrant != null)
                                    {
                                        ArraySegment<byte> buffer = encoder.WriteMessage(message, int.MaxValue, bufferManager);
                                        messageSize = (int)buffer.Count;
                                    }
                                }
                                // only queue the message for registrants expecting this size
                                match = match && (messageSize <= mfr.settings.MaxReceivedMessageSize);
 
                                // if a filter is specified, it must match as well
                                if (match && mfr.filters != null)
                                {
                                    for (int i = 0; match && i < mfr.filters.Length; i++)
                                    {
                                        match = mfr.filters[i].Match(via, to);
                                    }
                                }
 
                                if (match)
                                {
                                    callbacks.Add(mfr.callback);
                                }
                            }
                        }
                    }
                }
                foreach (MessageAvailableCallback callback in callbacks)
                {
                    Message localCopy;
                    try
                    {
                        //this copy is free'd by SFx.
                        localCopy = messageBuffer.CreateMessage();
                        localCopy.Properties.Via = via;
                        localCopy.Headers.To = to;
                        //mark security header as understood.
                        try
                        {
                            int i = localCopy.Headers.FindHeader(SecurityJan2004Strings.Security, SecurityJan2004Strings.Namespace);
                            if (i >= 0)
                            {
                                localCopy.Headers.AddUnderstood(i);
                            }
                        }
                        catch (MessageHeaderException e)
                        {
                            DiagnosticUtility.TraceHandledException(e, TraceEventType.Warning);
                        }
                        catch (SerializationException e)
                        {
                            DiagnosticUtility.TraceHandledException(e, TraceEventType.Warning);
                        }
                        catch (XmlException e)
                        {
                            DiagnosticUtility.TraceHandledException(e, TraceEventType.Warning);
                        }
 
                        if (index != -1)
                        {
                            localCopy.Headers.ReplaceAt(index, hopHeader);
                        }
 
                        callback(localCopy);
                    }
                    catch (ObjectDisposedException e)
                    {
                        DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    }
                    catch (CommunicationObjectAbortedException e)
                    {
                        DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    }
                    catch (CommunicationObjectFaultedException e)
                    {
                        DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    }
                }
            }
            finally
            {
                if (message != null)
                    message.Close();
            }
        }
 
        public void RefreshConnection()
        {
            PeerMaintainer lclMaintainer = null;
            lock (ThisLock)
            {
                ThrowIfNotOpen();
                lclMaintainer = maintainer;
            }
            if (lclMaintainer != null)
            {
                lclMaintainer.RefreshConnection();
            }
        }
 
        public void PingConnections()
        {
            PeerMaintainer lclMaintainer = null;
            lock (ThisLock)
            {
                lclMaintainer = maintainer;
            }
            if (lclMaintainer != null)
            {
                lclMaintainer.PingConnections();
            }
        }
 
        //always call methods from inside a lock (of the container)
        class RefCountedSecurityProtocol
        {
            int refCount;
            public SecurityProtocol Protocol;
            public RefCountedSecurityProtocol(SecurityProtocol securityProtocol)
            {
                this.Protocol = securityProtocol;
                this.refCount = 1;
            }
            public int AddRef()
            {
                return ++refCount;
            }
            public int Release()
            {
                return --refCount;
            }
        }
 
        // internal message filtering
        internal void RegisterMessageFilter(object registrant, Uri via, PeerMessageFilter[] filters,
            ITransportFactorySettings settings, MessageAvailableCallback callback, SecurityProtocol securityProtocol)
        {
            MessageFilterRegistration registration = new MessageFilterRegistration();
            registration.registrant = registrant;
            registration.via = via;
            registration.filters = filters;
            registration.settings = settings;
            registration.callback = callback;
            registration.securityProtocol = securityProtocol;
            lock (ThisLock)
            {
                messageFilters.Add(registrant, registration);
                RefCountedSecurityProtocol protocolWrapper = null;
                if (!this.uri2SecurityProtocol.TryGetValue(via, out protocolWrapper))
                {
                    protocolWrapper = new RefCountedSecurityProtocol(securityProtocol);
                    this.uri2SecurityProtocol.Add(via, protocolWrapper);
                }
                else
                    protocolWrapper.AddRef();
            }
        }
 
        // internal method to release the reference on an existing PeerNode
        internal void Release()
        {
            lock (peerNodes)
            {
                if (peerNodes.ContainsValue(this))
                {
                    if (--refCount == 0)
                    {
                        // no factories/channels are using this instance (although the application may still be
                        // referring to it directly). either way, we remove this from the registry
                        peerNodes.Remove(listenUri);
                    }
                }
            }
        }
 
        // Call with null to reset to our implementation
        public void SetServiceHandlers(Dictionary<Type, object> services)
        {
            lock (ThisLock)
            {
                serviceHandlers = services;
            }
        }
 
        void ThrowIfNotOpen()
        {
            if (!isOpen)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.TransportManagerNotOpen)));
            }
        }
 
        void ThrowIfOpen()
        {
            if (isOpen)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(
                    SR.TransportManagerOpen)));
            }
        }
 
        public override string ToString()
        {
            lock (ThisLock)
            {
                // if open return the mesh id, otherwise return the type
                if (isOpen)
                    return string.Format(System.Globalization.CultureInfo.InvariantCulture,
                        "{0} ({1})", MeshId, NodeId);
                else
                    return this.GetType().ToString();
            }
        }
 
        internal void UnregisterMessageFilter(object registrant, Uri via)
        {
            lock (ThisLock)
            {
                messageFilters.Remove(registrant);
                RefCountedSecurityProtocol protocolWrapper = null;
                if (uri2SecurityProtocol.TryGetValue(via, out protocolWrapper))
                {
                    if (protocolWrapper.Release() == 0)
                        uri2SecurityProtocol.Remove(via);
                }
                else
                    Fx.Assert(false, "Corresponding SecurityProtocol is not Found!");
            }
        }
 
        internal static void ValidateVia(Uri uri)
        {
            int viaSize = Encoding.UTF8.GetByteCount(uri.OriginalString);
            if (viaSize > maxViaSize)
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidDataException(SR.GetString(
                    SR.PeerChannelViaTooLong, uri, viaSize, maxViaSize)));
        }
 
        internal class ChannelRegistration
        {
            public object registrant;
            public Uri via;
            public ITransportFactorySettings settings;
            public SecurityProtocol securityProtocol;
            public Type channelType;
 
        }
 
        // holds the registration information passed in by channels and listeners. This informtaion is used
        // to determine which channels and listeners will receive an incoming message
        class MessageFilterRegistration : ChannelRegistration
        {
            public PeerMessageFilter[] filters;
            public MessageAvailableCallback callback;
        }
 
        // represents the settings of a PeerListenerFactory or PeerChannelFactory, used to create a new
        // PeerNode or compare settings to an existing PeerNode
        internal class Registration
        {
            IPAddress listenIPAddress;
            Uri listenUri;
            long maxReceivedMessageSize;
            int port;
            PeerResolver resolver;
            PeerSecurityManager securityManager;
            XmlDictionaryReaderQuotas readerQuotas;
            long maxBufferPoolSize;
 
            public Registration(Uri listenUri, IPeerFactory factory)
            {
                if (factory.Resolver == null)
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
                        new InvalidOperationException(SR.GetString(SR.PeerResolverRequired)));
                }
                if (factory.ListenIPAddress != null)
                {
                    listenIPAddress = factory.ListenIPAddress;
                }
                this.listenUri = new UriBuilder(PeerStrings.Scheme, listenUri.Host).Uri;
                this.port = factory.Port;
                this.maxReceivedMessageSize = factory.MaxReceivedMessageSize;
                this.resolver = factory.Resolver;
                this.securityManager = factory.SecurityManager;
                this.readerQuotas = new XmlDictionaryReaderQuotas();
                factory.ReaderQuotas.CopyTo(this.readerQuotas);
                this.maxBufferPoolSize = factory.MaxBufferPoolSize;
            }
 
            bool HasMismatchedReaderQuotas(XmlDictionaryReaderQuotas existingOne, XmlDictionaryReaderQuotas newOne, out string result)
            {
                //check for properties that affect the message
                result = null;
                if (existingOne.MaxArrayLength != newOne.MaxArrayLength)
                    result = PeerBindingPropertyNames.ReaderQuotasDotArrayLength;
                else if (existingOne.MaxStringContentLength != newOne.MaxStringContentLength)
                    result = PeerBindingPropertyNames.ReaderQuotasDotStringLength;
                else if (existingOne.MaxDepth != newOne.MaxDepth)
                    result = PeerBindingPropertyNames.ReaderQuotasDotMaxDepth;
                else if (existingOne.MaxNameTableCharCount != newOne.MaxNameTableCharCount)
                    result = PeerBindingPropertyNames.ReaderQuotasDotMaxCharCount;
                else if (existingOne.MaxBytesPerRead != newOne.MaxBytesPerRead)
                    result = PeerBindingPropertyNames.ReaderQuotasDotMaxBytesPerRead;
                return result != null;
            }
 
            public void CheckIfCompatible(PeerNodeImplementation peerNode, Uri via)
            {
                string mismatch = null;
                // test the settings that must be identical
 
                if (listenUri != peerNode.ListenUri)
                    mismatch = PeerBindingPropertyNames.ListenUri;
                else if (port != peerNode.Port)
                    mismatch = PeerBindingPropertyNames.Port;
                else if (maxReceivedMessageSize != peerNode.MaxReceivedMessageSize)
                    mismatch = PeerBindingPropertyNames.MaxReceivedMessageSize;
                else if (maxBufferPoolSize != peerNode.MaxBufferPoolSize)
                    mismatch = PeerBindingPropertyNames.MaxBufferPoolSize;
                else if (HasMismatchedReaderQuotas(peerNode.ReaderQuotas, readerQuotas, out mismatch))
                { }
                else if (resolver.GetType() != peerNode.Resolver.GetType())
                    mismatch = PeerBindingPropertyNames.Resolver;
                else if (!resolver.Equals(peerNode.Resolver))
                    mismatch = PeerBindingPropertyNames.ResolverSettings;
                else if (listenIPAddress != peerNode.ListenIPAddress)
                {
                    if ((listenIPAddress == null || peerNode.ListenIPAddress == null)
                        ||
                        (!listenIPAddress.Equals(peerNode.ListenIPAddress)))
                        mismatch = PeerBindingPropertyNames.ListenIPAddress;
                }
                else if ((securityManager == null) && (peerNode.SecurityManager != null))
                    mismatch = PeerBindingPropertyNames.Security;
                if (mismatch != null)
                    PeerExceptionHelper.ThrowInvalidOperation_PeerConflictingPeerNodeSettings(mismatch);
                securityManager.CheckIfCompatibleNodeSettings(peerNode.SecurityManager);
            }
 
            public PeerNodeImplementation CreatePeerNode()
            {
                PeerNodeImplementation peerNode = new PeerNodeImplementation();
                peerNode.ListenIPAddress = listenIPAddress;
                peerNode.ListenUri = listenUri;
                peerNode.MaxReceivedMessageSize = maxReceivedMessageSize;
                peerNode.Port = port;
                peerNode.Resolver = resolver;
                peerNode.SecurityManager = securityManager;
                this.readerQuotas.CopyTo(peerNode.readerQuotas);
                peerNode.MaxBufferPoolSize = maxBufferPoolSize;
                return peerNode;
            }
        }
 
        class SendAsyncResult : AsyncResult
        {
            bool floodComplete = false;
            bool localDispatchComplete = false;
 
            object thisLock = new object();
            object ThisLock { get { return thisLock; } }
            Exception floodException = null;
 
            public SendAsyncResult(AsyncCallback callback, object state) : base(callback, state) { }
 
            public void OnFloodComplete(IAsyncResult result)
            {
                if (this.floodComplete || this.IsCompleted)
                    return;
 
                bool complete = false;
                lock (this.ThisLock)
                {
                    if (this.localDispatchComplete)
                        complete = true;
                    this.floodComplete = true;
                }
                try
                {
                    PeerFlooder.EndFloodEncodedMessage(result);
                }
                catch (Exception e)
                {
                    if (Fx.IsFatal(e)) throw;
                    DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    floodException = e;
                }
                if (complete)
                {
                    this.Complete(result.CompletedSynchronously, floodException);
                }
            }
 
            public void OnLocalDispatchComplete(IAsyncResult result)
            {
                SendAsyncResult sr = (SendAsyncResult)result;
                if (this.localDispatchComplete || this.IsCompleted)
                    return;
 
                bool complete = false;
                lock (this.ThisLock)
                {
                    if (this.floodComplete)
                        complete = true;
                    this.localDispatchComplete = true;
                }
 
                if (complete)
                {
                    this.Complete(true, floodException);
                }
            }
 
            public static void End(IAsyncResult result)
            {
                AsyncResult.End<SendAsyncResult>(result);
            }
        }
 
        bool IPeerNodeMessageHandling.HasMessagePropagation
        {
            get
            {
                return this.messagePropagationFilter != null;
            }
        }
 
        bool IPeerNodeMessageHandling.IsKnownVia(Uri via)
        {
            bool result = false;
            lock (ThisLock)
            {
                result = uri2SecurityProtocol.ContainsKey(via);
            }
            return result;
        }
 
        bool IPeerNodeMessageHandling.IsNotSeenBefore(Message message, out byte[] id, out int cacheMiss)
        {
            PeerFlooder lclFlooder = flooder;
            id = DefaultId;
            cacheMiss = -1;
            return (lclFlooder != null && lclFlooder.IsNotSeenBefore(message, out id, out cacheMiss));
        }
 
        public MessageEncodingBindingElement EncodingBindingElement
        {
            get
            {
                return this.EncodingElement;
            }
        }
 
    }
 
    interface IPeerNodeMessageHandling
    {
        void HandleIncomingMessage(MessageBuffer messageBuffer, PeerMessagePropagation propagateFlags, int index, MessageHeader header, Uri via, Uri to);
        PeerMessagePropagation DetermineMessagePropagation(Message message, PeerMessageOrigination origination);
        bool HasMessagePropagation { get; }
        bool ValidateIncomingMessage(ref Message data, Uri via);
        bool IsKnownVia(Uri via);
        bool IsNotSeenBefore(Message message, out byte[] id, out int cacheMiss);
        MessageEncodingBindingElement EncodingBindingElement { get; }
    }
 
}