File: System\ServiceModel\Channels\MsmqBindingMonitor.cs
Project: ndp\cdf\src\WCF\ServiceModel\System.ServiceModel.csproj (System.ServiceModel)
//----------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation.  All rights reserved.
//----------------------------------------------------------------------------
 
namespace System.ServiceModel.Channels
{
    using System.Collections.Generic;
    using System.Messaging;
    using System.Runtime;
    using System.Security;
    using System.Security.Permissions;
    using System.ServiceModel;
    using System.Threading;
 
    class MsmqBindingMonitor
    {
        static readonly TimeSpan DefaultUpdateInterval = TimeSpan.FromMinutes(10);
 
        CommunicationState currentState = CommunicationState.Created;
        List<MsmqBindingFilter> filters = new List<MsmqBindingFilter>();
        string host;
        int iteration;
        Dictionary<string, MatchState> knownPublicQueues = new Dictionary<string, MatchState>();
        Dictionary<string, MatchState> knownPrivateQueues = new Dictionary<string, MatchState>();
        object thisLock = new object();
        IOThreadTimer timer;
        TimeSpan updateInterval;
        ManualResetEvent firstRoundComplete;
        bool retryMatchedFilters;
 
        public MsmqBindingMonitor(string host)
            : this(host, DefaultUpdateInterval, false)
        {
        }
 
        public MsmqBindingMonitor(string host, TimeSpan updateInterval, bool retryMatchedFilters)
        {
            if (string.Compare(host, "localhost", StringComparison.OrdinalIgnoreCase) == 0)
            {
                this.host = ".";
            }
            else
            {
                this.host = host;
            }
 
            this.firstRoundComplete = new ManualResetEvent(false);
 
            this.updateInterval = updateInterval;
            this.retryMatchedFilters = retryMatchedFilters;
            this.iteration = 1;
        }
 
        public void AddFilter(MsmqBindingFilter filter)
        {
            lock (this.thisLock)
            {
                this.filters.Add(filter);
 
                // Now - see if we match any known queues
                MatchFilter(filter, knownPublicQueues.Values);
                MatchFilter(filter, knownPrivateQueues.Values);
            }
        }
 
        public bool ContainsFilter(MsmqBindingFilter filter)
        {
            lock (this.thisLock)
            {
                return this.filters.Contains(filter);
            }
        }
 
        public void Open()
        {
            lock (this.thisLock)
            {
                if (this.currentState != CommunicationState.Created)
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.CommunicationObjectCannotBeModified, this.GetType().ToString())));
                }
 
                this.currentState = CommunicationState.Opened;
                this.ScheduleRetryTimerIfNotSet();
            }
        }
 
        public void Close()
        {
            lock (this.thisLock)
            {
                if (this.currentState != CommunicationState.Opened)
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.CommunicationObjectCannotBeModified, this.GetType().ToString())));
                }
 
                this.currentState = CommunicationState.Closed;
                this.CancelRetryTimer();
            }
        }
 
        public void RemoveFilter(MsmqBindingFilter filter)
        {
            lock (this.thisLock)
            {
                this.filters.Remove(filter);
 
                RematchQueues(filter, knownPublicQueues.Values);
                RematchQueues(filter, knownPrivateQueues.Values);
            }
        }
 
        public void WaitForFirstRoundComplete()
        {
            this.firstRoundComplete.WaitOne();
        }
 
        void ScheduleRetryTimerIfNotSet()
        {
            if (this.timer == null)
            {
                this.timer = new IOThreadTimer(new Action<object>(OnTimer), null, false);
                // Schedule one enumeration to run immediately...
                this.timer.Set(0);
            }
        }
 
        void CancelRetryTimer()
        {
            if (this.timer != null)
            {
                this.timer.Cancel();
                this.timer = null;
            }
        }
 
        void MatchFilter(MsmqBindingFilter filter, IEnumerable<MatchState> queues)
        {
            // Run through all the queues - see if we are better than any existing matches...
            foreach (MatchState state in queues)
            {
                int matchLength = filter.Match(state.QueueName);
                if (matchLength > state.LastMatchLength)
                {
                    if (state.LastMatch != null)
                    {
                        state.LastMatch.MatchLost(this.host, state.QueueName, state.IsPrivate, state.CallbackState);
                    }
 
                    state.LastMatchLength = matchLength;
                    state.LastMatch = filter;
 
                    state.CallbackState = filter.MatchFound(this.host, state.QueueName, state.IsPrivate);
                }
            }
        }
 
        void RetryMatchFilters(IEnumerable<MatchState> queues)
        {
            // Run through all the queues and call match found on them
            foreach (MatchState state in queues)
            {
                if (state.LastMatch != null)
                {
                    state.CallbackState = state.LastMatch.MatchFound(this.host, state.QueueName, state.IsPrivate);
                }
            }
        }
 
        void MatchQueue(MatchState state)
        {
            MsmqBindingFilter bestMatch = state.LastMatch;
            int bestMatchLength = state.LastMatchLength;
 
            // look through all the filters for the largest match:
            foreach (MsmqBindingFilter filter in this.filters)
            {
                int matchLength = filter.Match(state.QueueName);
                if (matchLength > bestMatchLength)
                {
                    bestMatchLength = matchLength;
                    bestMatch = filter;
                }
            }
 
            if (bestMatch != state.LastMatch)
            {
                if (state.LastMatch != null)
                {
                    state.LastMatch.MatchLost(this.host, state.QueueName, state.IsPrivate, state.CallbackState);
                }
 
                state.LastMatchLength = bestMatchLength;
                state.LastMatch = bestMatch;
 
                state.CallbackState = bestMatch.MatchFound(this.host, state.QueueName, state.IsPrivate);
            }
        }
 
        // The demand is not added now (in 4.5), to avoid a breaking change. To be considered in the next version.
        /*
        // We demand full trust because this method calls into MessageQueue, which is defined in a non-APTCA assembly.
        // MSMQ is not enabled in partial trust, so this demand should not break customers.
        [PermissionSet(SecurityAction.Demand, Unrestricted = true)]
        */
        void OnTimer(object state)
        {
            try
            {
                if (this.currentState != CommunicationState.Opened)
                    return;
 
                lock (this.thisLock)
                {
                    if (this.retryMatchedFilters)
                    {
                        RetryMatchFilters(knownPublicQueues.Values);
                        RetryMatchFilters(knownPrivateQueues.Values);
                    }
 
                    bool scanNeeded = ((this.retryMatchedFilters == false) || 
                        (this.retryMatchedFilters && (this.iteration % 2) != 0));
                    if (scanNeeded)
                    {
                        MsmqDiagnostics.ScanStarted();
 
                        // enumerate the public queues first
                        try
                        {
                            MessageQueue[] queues = MessageQueue.GetPublicQueuesByMachine(this.host);
                            ProcessFoundQueues(queues, knownPublicQueues, false);
                        }
                        catch (MessageQueueException ex)
                        {
                            MsmqDiagnostics.CannotReadQueues(this.host, true, ex);
                        }
 
                        // enumerate the private queues next
                        try
                        {
                            MessageQueue[] queues = MessageQueue.GetPrivateQueuesByMachine(this.host);
                            ProcessFoundQueues(queues, knownPrivateQueues, true);
                        }
                        catch (MessageQueueException ex)
                        {
                            MsmqDiagnostics.CannotReadQueues(this.host, false, ex);
                        }
 
                        // Figure out if we lost any queues:
                        ProcessLostQueues(knownPublicQueues);
                        ProcessLostQueues(knownPrivateQueues);
                    }
 
                    this.iteration++;
                    this.timer.Set(this.updateInterval);
                }
            }
            finally
            {
                this.firstRoundComplete.Set();
            }
        }
 
        // The demand is not added now (in 4.5), to avoid a breaking change. To be considered in the next version.
        /*
        // We demand full trust because this method calls into MessageQueue, which is defined in a non-APTCA assembly.
        // MSMQ is not enabled in partial trust, so this demand should not break customers.
        [PermissionSet(SecurityAction.Demand, Unrestricted = true)]
        */
        void ProcessFoundQueues(MessageQueue[] queues, Dictionary<string, MatchState> knownQueues, bool isPrivate)
        {
            foreach (MessageQueue queue in queues)
            {
                MatchState state;
                string name = ExtractQueueName(queue.QueueName, isPrivate);
 
                if (!knownQueues.TryGetValue(name, out state))
                {
                    state = new MatchState(name, this.iteration, isPrivate);
                    knownQueues.Add(name, state);
 
                    MatchQueue(state);
                }
                else
                {
                    state.DiscoveryIteration = this.iteration;
                }
            }
        }
 
        string ExtractQueueName(string name, bool isPrivate)
        {
            // private queues start with "private$\\"
            if (isPrivate)
            {
                return name.Substring("private$\\".Length);
            }
            else
            {
                return name;
            }
        }
 
        void ProcessLostQueues(Dictionary<string, MatchState> knownQueues)
        {
            List<MatchState> lostQueues = new List<MatchState>();
 
            foreach (MatchState state in knownQueues.Values)
            {
                if (state.DiscoveryIteration != this.iteration)
                {
                    // we lost this queue!
                    lostQueues.Add(state);
                }
            }
 
            foreach (MatchState state in lostQueues)
            {
                knownQueues.Remove(state.QueueName);
                if (state.LastMatch != null)
                {
                    state.LastMatch.MatchLost(this.host, state.QueueName, state.IsPrivate, state.CallbackState);
                }
            }
        }
 
        void RematchQueues(MsmqBindingFilter filter, IEnumerable<MatchState> queues)
        {
            // if any queue currently matches "filter", re-match it against the other filters:
            foreach (MatchState state in queues)
            {
                if (state.LastMatch == filter)
                {
                    state.LastMatch.MatchLost(this.host, state.QueueName, state.IsPrivate, state.CallbackState);
                    state.LastMatch = null;
                    state.LastMatchLength = -1;
                    MatchQueue(state);
                }
            }
        }
 
        class MatchState
        {
            string name;
            int iteration;
            MsmqBindingFilter lastMatch;
            int lastMatchLength;
            object callbackState;
            bool isPrivate;
 
            public MatchState(string name, int iteration, bool isPrivate)
            {
                this.name = name;
                this.iteration = iteration;
                this.isPrivate = isPrivate;
                this.lastMatchLength = -1;
            }
 
            public object CallbackState
            {
                get { return this.callbackState; }
                set { this.callbackState = value; }
            }
 
            public int DiscoveryIteration
            {
                get { return this.iteration; }
                set { this.iteration = value; }
            }
 
            public bool IsPrivate
            {
                get { return this.isPrivate; }
            }
 
            public MsmqBindingFilter LastMatch
            {
                get { return this.lastMatch; }
                set { this.lastMatch = value; }
            }
 
            public int LastMatchLength
            {
                get { return this.lastMatchLength; }
                set { this.lastMatchLength = value; }
            }
 
            public string QueueName
            {
                get { return this.name; }
            }
        }
    }
}