File: System\ServiceModel\Channels\PeerConnector.cs
Project: ndp\cdf\src\WCF\ServiceModel\System.ServiceModel.csproj (System.ServiceModel)
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation.  All rights reserved.
//------------------------------------------------------------
namespace System.ServiceModel.Channels
{
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.Globalization;
    using System.Runtime;
    using System.ServiceModel;
    using System.ServiceModel.Description;
    using System.Threading;
 
    // Connector is responsible for transitioning neighbors to connected state. 
    class PeerConnector : IPeerConnectorContract
    {
        enum State
        {
            Created,
            Opened,
            Closed,
            Closing
        }
 
        PeerNodeConfig config;
        PeerMaintainer maintainer;
        PeerNeighborManager neighborManager;
        State state;
        object thisLock;
 
        // TypedMessageConverters:
        TypedMessageConverter connectInfoMessageConverter;
        TypedMessageConverter disconnectInfoMessageConverter;
        TypedMessageConverter refuseInfoMessageConverter;
        TypedMessageConverter welcomeInfoMessageConverter;
 
        // To keep track of timers to transition neighbors to connected state
        Dictionary<IPeerNeighbor, IOThreadTimer> timerTable;
 
        public PeerConnector(PeerNodeConfig config, PeerNeighborManager neighborManager,
            PeerMaintainer maintainer)
        {
            Fx.Assert(config != null, "Config is expected to non-null");
            Fx.Assert(neighborManager != null, "NeighborManager is expected to be non-null");
            Fx.Assert(maintainer != null, "Maintainer is expected to be non-null");
            Fx.Assert(config.NodeId != PeerTransportConstants.InvalidNodeId, "Invalid NodeId");
            Fx.Assert(config.MaxNeighbors > 0, "MaxNeighbors is expected to be non-zero positive value");
            Fx.Assert(config.ConnectTimeout > 0, "ConnectTimeout is expected to be non-zero positive value");
 
            this.thisLock = new object();
            this.config = config;
            this.neighborManager = neighborManager;
            this.maintainer = maintainer;
            this.timerTable = new Dictionary<IPeerNeighbor, IOThreadTimer>();
            this.state = State.Created;
        }
 
        object ThisLock
        {
            get
            {
                return this.thisLock;
            }
        }
 
 
        internal TypedMessageConverter ConnectInfoMessageConverter
        {
            get
            {
                if (connectInfoMessageConverter == null)
                {
                    connectInfoMessageConverter = TypedMessageConverter.Create(typeof(ConnectInfo), PeerStrings.ConnectAction);
                }
                return connectInfoMessageConverter;
            }
        }
 
        internal TypedMessageConverter DisconnectInfoMessageConverter
        {
            get
            {
                if (disconnectInfoMessageConverter == null)
                {
                    disconnectInfoMessageConverter = TypedMessageConverter.Create(typeof(DisconnectInfo), PeerStrings.DisconnectAction);
                }
                return disconnectInfoMessageConverter;
            }
        }
 
        internal TypedMessageConverter RefuseInfoMessageConverter
        {
            get
            {
                if (refuseInfoMessageConverter == null)
                {
                    refuseInfoMessageConverter = TypedMessageConverter.Create(typeof(RefuseInfo), PeerStrings.RefuseAction);
                }
                return refuseInfoMessageConverter;
            }
        }
 
        internal TypedMessageConverter WelcomeInfoMessageConverter
        {
            get
            {
                if (welcomeInfoMessageConverter == null)
                {
                    welcomeInfoMessageConverter = TypedMessageConverter.Create(typeof(WelcomeInfo), PeerStrings.WelcomeAction);
                }
                return welcomeInfoMessageConverter;
            }
        }
 
        // Add a timer for the specified neighbor to the timer table. The timer is only added
        // if Connector is open and the neighbor is in Connecting state.
        bool AddTimer(IPeerNeighbor neighbor)
        {
            bool added = false;
 
            lock (ThisLock)
            {
                if (state == State.Opened && neighbor.State == PeerNeighborState.Connecting)
                {
                    IOThreadTimer timer = new IOThreadTimer(new Action<object>(OnConnectTimeout), neighbor, true);
                    timer.Set(this.config.ConnectTimeout);
                    this.timerTable.Add(neighbor, timer);
                    added = true;
                }
            }
 
            return added;
        }
 
        //this method takes care of closing the message.
        void SendMessageToNeighbor(IPeerNeighbor neighbor, Message message, PeerMessageHelpers.CleanupCallback cleanupCallback)
        {
            bool fatal = false;
            try
            {
                neighbor.Send(message);
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e))
                {
                    fatal = true;
                    throw;
                }
                if (e is CommunicationException ||
                    e is QuotaExceededException ||
                    e is ObjectDisposedException ||
                    e is TimeoutException)
                {
                    DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    // Message failed to transmit due to quota exceeding or channel failure
                    if (cleanupCallback != null)
                    {
                        cleanupCallback(neighbor, PeerCloseReason.InternalFailure, e);
                    }
                }
                else
                {
                    throw;
                }
            }
            finally
            {
                if (!fatal)
                    message.Close();
            }
        }
 
        // If neighbor cannot transition to connected state, this method cleans up the timer and 
        // closes the neighbor
        void CleanupOnConnectFailure(IPeerNeighbor neighbor, PeerCloseReason reason,
            Exception exception)
        {
            // timer will not be found if neighbor is already closed or connected.
            if (RemoveTimer(neighbor))
            {
                this.neighborManager.CloseNeighbor(neighbor, reason,
                    PeerCloseInitiator.LocalNode, exception);
            }
        }
 
        public void Close()
        {
            Dictionary<IPeerNeighbor, IOThreadTimer> table;
 
            lock (ThisLock)
            {
                table = this.timerTable;
                this.timerTable = null;
                this.state = State.Closed;
 
            }
 
            // Cancel each timer
            if (table != null)
            {
                foreach (IOThreadTimer timer in table.Values)
                    timer.Cancel();
            }
        }
 
        public void Closing()
        {
            lock (ThisLock)
            {
                this.state = State.Closing;
            }
        }
 
        // Complete processing of Disconnect or Refuse message from the neighbor
        void CompleteTerminateMessageProcessing(IPeerNeighbor neighbor,
            PeerCloseReason closeReason, IList<Referral> referrals)
        {
            // Close the neighbor after setting the neighbor state to Disconnected.
            // The set can fail if the neighbor is already being closed and that is ok.
            if (neighbor.TrySetState(PeerNeighborState.Disconnected))
                this.neighborManager.CloseNeighbor(neighbor, closeReason, PeerCloseInitiator.RemoteNode);
            else
                if (!(neighbor.State >= PeerNeighborState.Disconnected))
                {
                    throw Fx.AssertAndThrow("Unexpected neighbor state");
                }
 
            // Hand over the referrals to maintainer
            this.maintainer.AddReferrals(referrals, neighbor);
        }
 
        void OnConnectFailure(IPeerNeighbor neighbor, PeerCloseReason reason,
            Exception exception)
        {
            CleanupOnConnectFailure(neighbor, reason, exception);
        }
 
        void OnConnectTimeout(object asyncState)
        {
            CleanupOnConnectFailure((IPeerNeighbor)asyncState, PeerCloseReason.ConnectTimedOut, null);
        }
 
        // Process neighbor closed notification.
        public void OnNeighborClosed(IPeerNeighbor neighbor)
        {
            // If the neighbor is closed abruptly by the remote node, OnNeighborClosing will 
            // not be invoked. Remove neighbor's timer from the table.
            RemoveTimer(neighbor);
        }
 
        // Process neighbor closing notification.
        public void OnNeighborClosing(IPeerNeighbor neighbor, PeerCloseReason closeReason)
        {
            // Send Disconnect message to a Connected neighbor
            if (neighbor.IsConnected)
                SendTerminatingMessage(neighbor, PeerStrings.DisconnectAction, closeReason);
        }
 
        // Process neighbor authenticated notification
        public void OnNeighborAuthenticated(IPeerNeighbor neighbor)
        {
            if (!(this.state != State.Created))
            {
                throw Fx.AssertAndThrow("Connector not expected to be in Created state");
            }
 
            if (!(PeerNeighborStateHelper.IsAuthenticatedOrClosed(neighbor.State)))
            {
                throw Fx.AssertAndThrow(string.Format(CultureInfo.InvariantCulture, "Neighbor state expected to be Authenticated or Closed, actual state: {0}", neighbor.State));
            }
 
            // setting the state fails if neighbor is already closed or closing
            // If so, we have nothing to do.
            if (!neighbor.TrySetState(PeerNeighborState.Connecting))
            {
                if (!(neighbor.State >= PeerNeighborState.Faulted))
                {
                    throw Fx.AssertAndThrow(string.Format(CultureInfo.InvariantCulture, "Neighbor state expected to be Faulted or Closed, actual state: {0}", neighbor.State));
                }
                return;
            }
 
            // Add a timer to timer table to transition the neighbor to connected state
            // within finite duration. The neighbor is closed if the timer fires and the
            // neighbor has not reached connected state.
            // The timer is not added if neighbor or connector are closed
            if (AddTimer(neighbor))
            {
                // Need to send connect message if the neighbor is the initiator
                if (neighbor.IsInitiator)
                {
                    if (this.neighborManager.ConnectedNeighborCount < this.config.MaxNeighbors)
                        SendConnect(neighbor);
                    else
                    {
                        // We have max connected neighbors already. So close this one.
                        this.neighborManager.CloseNeighbor(neighbor, PeerCloseReason.NodeBusy,
                            PeerCloseInitiator.LocalNode);
                    }
                }
            }
        }
 
        public void Open()
        {
            lock (ThisLock)
            {
                if (!(this.state == State.Created))
                {
                    throw Fx.AssertAndThrow("Connector expected to be in Created state");
                }
                this.state = State.Opened;
            }
        }
 
        //<Implementation of PeerConnector.IPeerConnectorContract>
        // Process Connect from the neighbor
        public void Connect(IPeerNeighbor neighbor, ConnectInfo connectInfo)
        {
            // Don't bother processing the message if Connector has closed
            if (this.state != State.Opened)
                return;
 
            PeerCloseReason closeReason = PeerCloseReason.None;
 
            // A connect message should only be received by a responder neighbor that is
            // in Connecting state. If not, we close the neighbor without bothering 
            // to send a Refuse message
            // A malicious neighbor can format a message with a null connectInfo as an argument
            if (neighbor.IsInitiator || !connectInfo.HasBody() || (neighbor.State != PeerNeighborState.Connecting &&
                neighbor.State != PeerNeighborState.Closed))
            {
                closeReason = PeerCloseReason.InvalidNeighbor;
            }
 
            // Remove the timer from the timer table for this neighbor. If the timer is not
            // present, the neighbor is already being closed and the Connect message should 
            // be ignored.
            else if (RemoveTimer(neighbor))
            {
                // Determine if Welcome or Refuse should be sent
 
                // Refuse if node has maximum allowed connected neighbors?
                if (this.neighborManager.ConnectedNeighborCount >= this.config.MaxNeighbors)
                    closeReason = PeerCloseReason.NodeBusy;
                else
                {
                    // Deserialization failed or connect info is invalid?
                    if (!PeerValidateHelper.ValidNodeAddress(connectInfo.Address))
                    {
                        closeReason = PeerCloseReason.InvalidNeighbor;
                    }
                    else
                    {
                        // Determine if neighbor should be accepted.
                        PeerCloseReason closeReason2;
                        IPeerNeighbor neighborToClose;
                        string action = PeerStrings.RefuseAction;
                        ValidateNeighbor(neighbor, connectInfo.NodeId, out neighborToClose, out closeReason2, out action);
 
                        if (neighbor != neighborToClose)    // new neighbor should be accepted
                        {
                            SendWelcome(neighbor);
                            try
                            {
                                neighbor.ListenAddress = connectInfo.Address;
                            }
                            catch (ObjectDisposedException e)
                            {
                                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                            }
 
                            if (!neighbor.TrySetState(PeerNeighborState.Connected))
                                if (!(neighbor.State >= PeerNeighborState.Disconnecting))
                                {
                                    throw Fx.AssertAndThrow("Neighbor state expected to be >= Disconnecting; it is " + neighbor.State.ToString());
                                }
 
                            if (neighborToClose != null)
                            {
                                // The other neighbor should be closed
                                SendTerminatingMessage(neighborToClose, action, closeReason2);
                                this.neighborManager.CloseNeighbor(neighborToClose, closeReason2, PeerCloseInitiator.LocalNode);
                            }
                        }
                        else
                            closeReason = closeReason2;
                    }
                }
            }
 
            if (closeReason != PeerCloseReason.None)
            {
                SendTerminatingMessage(neighbor, PeerStrings.RefuseAction, closeReason);
                this.neighborManager.CloseNeighbor(neighbor, closeReason, PeerCloseInitiator.LocalNode);
            }
        }
 
        // Process Disconnect message from the neighbor
        public void Disconnect(IPeerNeighbor neighbor, DisconnectInfo disconnectInfo)
        {
            // Don't bother processing the message if Connector has closed
            if (this.state != State.Opened)
                return;
 
            PeerCloseReason closeReason = PeerCloseReason.InvalidNeighbor;
            IList<Referral> referrals = null;
 
            if (disconnectInfo.HasBody())
            {
                // We should only receive Disconnect message after the neighbor has transitioned
                // to connected state.
                if (neighbor.State >= PeerNeighborState.Connected)
                {
                    if (PeerConnectorHelper.IsDefined(disconnectInfo.Reason))
                    {
                        closeReason = (PeerCloseReason)disconnectInfo.Reason;
                        referrals = disconnectInfo.Referrals;
                    }
                }
            }
 
            // Complete processing of disconnect message
            CompleteTerminateMessageProcessing(neighbor, closeReason, referrals);
        }
 
 
        // Process Refuse message from the neighbor
        public void Refuse(IPeerNeighbor neighbor, RefuseInfo refuseInfo)
        {
            // Don't bother processing the message if Connector has closed
            if (this.state != State.Opened)
                return;
 
            PeerCloseReason closeReason = PeerCloseReason.InvalidNeighbor;
            IList<Referral> referrals = null;
 
            if (refuseInfo.HasBody())
            {
                // Refuse message should only be received when neighbor is the initiator
                // and is in connecting state --we accept in closed state to account for
                // timeouts.
                if (neighbor.IsInitiator && (neighbor.State == PeerNeighborState.Connecting ||
                    neighbor.State == PeerNeighborState.Closed))
                {
                    // Remove the entry from timer table for this neighbor
                    RemoveTimer(neighbor);
 
                    if (PeerConnectorHelper.IsDefined(refuseInfo.Reason))
                    {
                        closeReason = (PeerCloseReason)refuseInfo.Reason;
                        referrals = refuseInfo.Referrals;
                    }
                }
            }
            // Complete processing of refuse message
            CompleteTerminateMessageProcessing(neighbor, closeReason, referrals);
        }
 
        // Process Welcome message from the neighbor
        public void Welcome(IPeerNeighbor neighbor, WelcomeInfo welcomeInfo)
        {
            // Don't bother processing the message if Connector has closed
            if (this.state != State.Opened)
                return;
 
            PeerCloseReason closeReason = PeerCloseReason.None;
 
            // Welcome message should only be received when neighbor is the initiator
            // and is in connecting state --we accept in closed state to account for
            // timeouts.
            if (!neighbor.IsInitiator || !welcomeInfo.HasBody() || (neighbor.State != PeerNeighborState.Connecting &&
                neighbor.State != PeerNeighborState.Closed))
            {
                closeReason = PeerCloseReason.InvalidNeighbor;
            }
            // Remove the entry from timer table for this neighbor. If entry is still present,
            // RemoveTimer returns true. Otherwise, neighbor is already being closed and 
            // welcome message will be ignored.
            else if (RemoveTimer(neighbor))
            {
                // It is allowed for a node to have more than MaxNeighbours when processing a welcome message
                // Determine if neighbor should be accepted.
                PeerCloseReason closeReason2;
                IPeerNeighbor neighborToClose;
                string action = PeerStrings.RefuseAction;
                ValidateNeighbor(neighbor, welcomeInfo.NodeId, out neighborToClose, out closeReason2, out action);
 
                if (neighbor != neighborToClose)
                {
                    // Neighbor should be accepted AddReferrals validates the referrals, 
                    // if they are valid then the neighbor is accepted.
                    if (this.maintainer.AddReferrals(welcomeInfo.Referrals, neighbor))
                    {
                        if (!neighbor.TrySetState(PeerNeighborState.Connected))
                        {
                            if (!(neighbor.State >= PeerNeighborState.Faulted))
                            {
                                throw Fx.AssertAndThrow("Neighbor state expected to be >= Faulted; it is " + neighbor.State.ToString());
                            }
                        }
 
                        if (neighborToClose != null)
                        {
                            // The other neighbor should be closed
                            SendTerminatingMessage(neighborToClose, action, closeReason2);
                            this.neighborManager.CloseNeighbor(neighborToClose, closeReason2, PeerCloseInitiator.LocalNode);
                        }
                    }
                    else
                    {
                        // Referrals were invalid this node is suspicous
                        closeReason = PeerCloseReason.InvalidNeighbor;
                    }
                }
                else
                {
                    closeReason = closeReason2;
                }
            }
 
            if (closeReason != PeerCloseReason.None)
            {
                SendTerminatingMessage(neighbor, PeerStrings.DisconnectAction, closeReason);
                this.neighborManager.CloseNeighbor(neighbor, closeReason, PeerCloseInitiator.LocalNode);
            }
        }
 
        bool RemoveTimer(IPeerNeighbor neighbor)
        {
            IOThreadTimer timer = null;
            bool removed = false;
 
            // Remove the timer from the table and cancel it. Do this if Connector is
            // still open. Otherwise, Close method will have already cancelled the timers.
            lock (ThisLock)
            {
                if (this.state == State.Opened &&
                    this.timerTable.TryGetValue(neighbor, out timer))
                {
                    removed = this.timerTable.Remove(neighbor);
                }
            }
            if (timer != null)
            {
                timer.Cancel();
                if (!removed)
                {
                    throw Fx.AssertAndThrow("Neighbor key should have beeen removed from the table");
                }
            }
 
            return removed;
        }
 
        void SendConnect(IPeerNeighbor neighbor)
        {
            // We do not attempt to send the message if PeerConnector is not open
            if (neighbor.State == PeerNeighborState.Connecting && this.state == State.Opened)
            {
                // Retrieve the local address. The retrieved address may be null if the node 
                // is shutdown. In that case, don't bother to send connect message since the 
                // node is closing...
                PeerNodeAddress listenAddress = this.config.GetListenAddress(true);
                if (listenAddress != null)
                {
                    ConnectInfo connectInfo = new ConnectInfo(this.config.NodeId, listenAddress);
                    Message message = ConnectInfoMessageConverter.ToMessage(connectInfo, MessageVersion.Soap12WSAddressing10);
                    SendMessageToNeighbor(neighbor, message, OnConnectFailure);
                }
            }
        }
 
        // Send Disconnect or Refuse message
        void SendTerminatingMessage(IPeerNeighbor neighbor, string action, PeerCloseReason closeReason)
        {
            // We do not attempt to send the message if Connector is not open
            // or if the close reason is InvalidNeighbor.
            if (this.state != State.Opened || closeReason == PeerCloseReason.InvalidNeighbor)
                return;
 
            // Set the neighbor state to disconnecting. TrySetState can fail if the 
            // neighbor is already being closed. Disconnect/Refuse msg not sent in that case.
            if (neighbor.TrySetState(PeerNeighborState.Disconnecting))
            {
                // Get referrals from the maintainer
                Referral[] referrals = maintainer.GetReferrals();
 
                // Build and send the message
                Message message;
                if (action == PeerStrings.DisconnectAction)
                {
                    DisconnectInfo disconnectInfo = new DisconnectInfo((DisconnectReason)closeReason, referrals);
                    message = DisconnectInfoMessageConverter.ToMessage(disconnectInfo, MessageVersion.Soap12WSAddressing10);
                }
                else
                {
                    RefuseInfo refuseInfo = new RefuseInfo((RefuseReason)closeReason, referrals);
                    message = RefuseInfoMessageConverter.ToMessage(refuseInfo, MessageVersion.Soap12WSAddressing10);
                }
                SendMessageToNeighbor(neighbor, message, null);
            }
            else
                if (!(neighbor.State >= PeerNeighborState.Disconnecting))
                {
                    throw Fx.AssertAndThrow("Neighbor state expected to be >= Disconnecting; it is " + neighbor.State.ToString());
                }
        }
 
        void SendWelcome(IPeerNeighbor neighbor)
        {
            // We do not attempt to send the message if PeerConnector is not open
            if (state == State.Opened)
            {
                // Get referrals from the maintainer
                Referral[] referrals = maintainer.GetReferrals();
 
                WelcomeInfo welcomeInfo = new WelcomeInfo(this.config.NodeId, referrals);
                Message message = WelcomeInfoMessageConverter.ToMessage(welcomeInfo, MessageVersion.Soap12WSAddressing10);
                SendMessageToNeighbor(neighbor, message, OnConnectFailure);
            }
        }
 
        // Validates the new neighbor based on its node ID. If it detects duplicate neighbor condition,
        // it will return reference to the neighbor that should be closed.
        void ValidateNeighbor(IPeerNeighbor neighbor, ulong neighborNodeId,
            out IPeerNeighbor neighborToClose, out PeerCloseReason closeReason, out string action)
        {
            neighborToClose = null;
            closeReason = PeerCloseReason.None;
            action = null;
 
            // Invalid neighbor node Id?
            if (neighborNodeId == PeerTransportConstants.InvalidNodeId)
            {
                neighborToClose = neighbor;
                closeReason = PeerCloseReason.InvalidNeighbor;
            }
            // Neighbor's node ID matches local node Id?
            else if (neighborNodeId == this.config.NodeId)
            {
                neighborToClose = neighbor;
                closeReason = PeerCloseReason.DuplicateNodeId;
            }
            else
            {
                // Check for duplicate neighbors (i.e., if another neighbor has the
                // same node Id as the new neighbor).
                // Set neighbor's node Id prior to calling FindDuplicateNeighbor.
                try
                {
                    neighbor.NodeId = neighborNodeId;
                }
                catch (ObjectDisposedException e)
                {
                    DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
                    return;
                }
 
                IPeerNeighbor duplicateNeighbor =
                    this.neighborManager.FindDuplicateNeighbor(neighborNodeId, neighbor);
                if (duplicateNeighbor != null && this.neighborManager.PingNeighbor(duplicateNeighbor))
                {
                    // We have a duplicate neighbor. Determine which one should be closed
                    closeReason = PeerCloseReason.DuplicateNeighbor;
 
                    // In the corner case where both neighbors are initiated by the same node, 
                    // close the new neighbor -- Maintainer is expected to check if there is 
                    // already a connection to a node prior to initiating a new connection.
                    if (neighbor.IsInitiator == duplicateNeighbor.IsInitiator)
                        neighborToClose = neighbor;
 
                    // Otherwise, close the neighbor that was initiated by the node with the 
                    // larger node ID -- this ensures that both nodes tear down the same link.
                    else if (this.config.NodeId > neighborNodeId)
                        neighborToClose = (neighbor.IsInitiator ? neighbor : duplicateNeighbor);
                    else
                        neighborToClose = (neighbor.IsInitiator ? duplicateNeighbor : neighbor);
                }
            }
 
            if (neighborToClose != null)
            {
                // If we decided to close the other neighbor, go ahead and do it.
                if (neighborToClose != neighbor)
                {
                    // Send Disconnect or Refuse message depending on its state
                    if (neighborToClose.State == PeerNeighborState.Connected)
                    {
                        action = PeerStrings.DisconnectAction;
                    }
                    else if (!neighborToClose.IsInitiator &&
                        neighborToClose.State == PeerNeighborState.Connecting)
                    {
                        action = PeerStrings.RefuseAction;
                    }
                }
            }
        }
    }
}