File: System\ServiceModel\Dispatcher\QuotaThrottle.cs
Project: ndp\cdf\src\WCF\ServiceModel\System.ServiceModel.csproj (System.ServiceModel)
//-----------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------------------------
 
namespace System.ServiceModel.Dispatcher
{
    using System;
    using System.Diagnostics;
    using System.ServiceModel.Diagnostics;
    using System.ServiceModel.Channels;
    using System.Collections.Generic;
    using System.Runtime;
    using System.Threading;
 
    sealed class QuotaThrottle
    {
        int limit;
        object mutex;
        WaitCallback release;
        Queue<object> waiters;
        bool didTraceThrottleLimit;
        string propertyName = "ManualFlowControlLimit";
        string owner;
 
        internal QuotaThrottle(WaitCallback release, object mutex)
        {
            this.limit = Int32.MaxValue;
            this.mutex = mutex;
            this.release = release;
            this.waiters = new Queue<object>();
        }
 
        bool IsEnabled
        {
            get { return this.limit != Int32.MaxValue; }
        }
 
        internal String Owner
        {
            set { this.owner = value; }
        }
 
        internal int Limit
        {
            get { return this.limit; }
        }
 
        internal bool Acquire(object o)
        {
            lock (this.mutex)
            {
                if (this.IsEnabled)
                {
                    if (this.limit > 0)
                    {
                        this.limit--;
 
                        if (this.limit == 0)
                        {
                            if (DiagnosticUtility.ShouldTraceWarning && !this.didTraceThrottleLimit)
                            {
                                this.didTraceThrottleLimit = true;
 
                                TraceUtility.TraceEvent(
                                    TraceEventType.Warning,
                                    TraceCode.ManualFlowThrottleLimitReached,
                                    SR.GetString(SR.TraceCodeManualFlowThrottleLimitReached,
                                                 this.propertyName, this.owner));
                            }
                        }
 
                        return true;
                    }
                    else
                    {
                        this.waiters.Enqueue(o);
                        return false;
                    }
                }
                else
                {
                    return true;
                }
            }
        }
 
        internal int IncrementLimit(int incrementBy)
        {
            if (incrementBy < 0)
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException("incrementBy", incrementBy,
                                                     SR.GetString(SR.ValueMustBeNonNegative)));
            int newLimit;
            object[] released = null;
 
            lock (this.mutex)
            {
                if (this.IsEnabled)
                {
                    checked { this.limit += incrementBy; }
                    released = this.LimitChanged();
                }
                newLimit = this.limit;
            }
 
            if (released != null)
                this.Release(released);
 
            return newLimit;
        }
 
        object[] LimitChanged()
        {
            object[] released = null;
 
            if (this.IsEnabled)
            {
                if ((this.waiters.Count > 0) && (this.limit > 0))
                {
                    if (this.limit < this.waiters.Count)
                    {
                        released = new object[this.limit];
                        for (int i = 0; i < this.limit; i++)
                            released[i] = this.waiters.Dequeue();
 
                        this.limit = 0;
                    }
                    else
                    {
                        released = this.waiters.ToArray();
                        this.waiters.Clear();
                        this.waiters.TrimExcess();
 
                        this.limit -= released.Length;
                    }
                }
                this.didTraceThrottleLimit = false;
            }
            else
            {
                released = this.waiters.ToArray();
                this.waiters.Clear();
                this.waiters.TrimExcess();
            }
 
            return released;
        }
 
        internal void SetLimit(int messageLimit)
        {
            if (messageLimit < 0)
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException("messageLimit", messageLimit,
                                                    SR.GetString(SR.ValueMustBeNonNegative)));
 
            object[] released = null;
 
            lock (this.mutex)
            {
                this.limit = messageLimit;
                released = this.LimitChanged();
            }
 
            if (released != null)
                this.Release(released);
        }
 
        void ReleaseAsync(object state)
        {
            this.release(state);
        }
 
        internal void Release(object[] released)
        {
            for (int i = 0; i < released.Length; i++)
                ActionItem.Schedule(this.ReleaseAsync, released[i]);
        }
    }
}