File: System\ServiceModel\Channels\DeliveryStrategy.cs
Project: ndp\cdf\src\WCF\ServiceModel\System.ServiceModel.csproj (System.ServiceModel)
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation.  All rights reserved.
//------------------------------------------------------------
namespace System.ServiceModel.Channels
{
    using System;
    using System.Collections.Generic;
    using System.Runtime;
    using System.Threading;
 
    abstract class DeliveryStrategy<ItemType> : IDisposable
        where ItemType : class, IDisposable
    {
        InputQueueChannel<ItemType> channel;
        Action dequeueCallback;
        int quota;
 
        public DeliveryStrategy(InputQueueChannel<ItemType> channel, int quota)
        {
            if (quota <= 0)
            {
                throw Fx.AssertAndThrow("Argument quota must be positive.");
            }
 
            this.channel = channel;
            this.quota = quota;
        }
 
        protected InputQueueChannel<ItemType> Channel
        {
            get
            {
                return this.channel;
            }
        }
 
        public Action DequeueCallback
        {
            get
            {
                return this.dequeueCallback;
            }
            set
            {
                this.dequeueCallback = value;
            }
        }
 
        public abstract int EnqueuedCount
        {
            get;
        }
 
        protected int Quota
        {
            get
            {
                return this.quota;
            }
        }
 
        public abstract bool CanEnqueue(Int64 sequenceNumber);
 
        public virtual void Dispose()
        {
        }
 
        public abstract bool Enqueue(ItemType item, Int64 sequenceNumber);
    }
 
    class OrderedDeliveryStrategy<ItemType> : DeliveryStrategy<ItemType>
        where ItemType : class, IDisposable
    {
        bool isEnqueueInOrder;
        Dictionary<Int64, ItemType> items;
        Action<object> onDispatchCallback;
        Int64 windowStart;
 
        public OrderedDeliveryStrategy(
            InputQueueChannel<ItemType> channel,
            int quota,
            bool isEnqueueInOrder)
            : base(channel, quota)
        {
            this.isEnqueueInOrder = isEnqueueInOrder;
            this.items = new Dictionary<Int64, ItemType>();
            this.windowStart = 1;
        }
 
        public override int EnqueuedCount
        {
            get
            {
                return this.Channel.InternalPendingItems + this.items.Count;
            }
        }
 
        Action<object> OnDispatchCallback
        {
            get
            {
                if (this.onDispatchCallback == null)
                {
                    this.onDispatchCallback = this.OnDispatch;
                }
 
                return this.onDispatchCallback;
            }
        }
 
        public override bool CanEnqueue(long sequenceNumber)
        {
            if (this.EnqueuedCount >= this.Quota)
            {
                return false;
            }
 
            if (this.isEnqueueInOrder && (sequenceNumber > this.windowStart))
            {
                return false;
            }
 
            return (this.Channel.InternalPendingItems + sequenceNumber - this.windowStart < this.Quota);
        }
 
        public override bool Enqueue(ItemType item, long sequenceNumber)
        {
            if (sequenceNumber > this.windowStart)
            {
                this.items.Add(sequenceNumber, item);
                return false;
            }
 
            this.windowStart++;
 
            while (this.items.ContainsKey(this.windowStart))
            {
                if (this.Channel.EnqueueWithoutDispatch(item, this.DequeueCallback))
                {
                    ActionItem.Schedule(this.OnDispatchCallback, null);
                }
 
                item = this.items[this.windowStart];
                this.items.Remove(this.windowStart);
                this.windowStart++;
            }
 
            return this.Channel.EnqueueWithoutDispatch(item, this.DequeueCallback);
        }
 
        static void DisposeItems(Dictionary<Int64, ItemType>.Enumerator items)
        {
            if (items.MoveNext())
            {
                using (ItemType item = items.Current.Value)
                {
                    DisposeItems(items);
                }
            }
        }
 
        public override void Dispose()
        {
            DisposeItems(this.items.GetEnumerator());
            this.items.Clear();
 
            base.Dispose();
        }
 
        void OnDispatch(object state)
        {
            this.Channel.Dispatch();
        }
    }
 
    class UnorderedDeliveryStrategy<ItemType> : DeliveryStrategy<ItemType>
        where ItemType : class, IDisposable
    {
        public UnorderedDeliveryStrategy(InputQueueChannel<ItemType> channel, int quota)
            : base(channel, quota)
        {
        }
 
        public override int EnqueuedCount
        {
            get
            {
                return this.Channel.InternalPendingItems;
            }
        }
 
        public override bool CanEnqueue(Int64 sequenceNumber)
        {
            return (this.EnqueuedCount < this.Quota);
        }
 
        public override bool Enqueue(ItemType item, long sequenceNumber)
        {
            return this.Channel.EnqueueWithoutDispatch(item, this.DequeueCallback);
        }
    }
}