File: System\Runtime\InputQueue.cs
Project: ndp\cdf\src\System.ServiceModel.Internals\System.ServiceModel.Internals.csproj (System.ServiceModel.Internals)
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation.  All rights reserved.
//------------------------------------------------------------
 
namespace System.Runtime
{
    using System;
    using System.Collections.Generic;
    using System.Threading;
 
    [Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.PrivatePrimitive, SupportsAsync = true, ReleaseMethod = "Dispatch")]
    sealed class InputQueue<T> : IDisposable where T : class
    {
        static Action<object> completeOutstandingReadersCallback;
        static Action<object> completeWaitersFalseCallback;
        static Action<object> completeWaitersTrueCallback;
        static Action<object> onDispatchCallback;
        static Action<object> onInvokeDequeuedCallback;
 
        QueueState queueState;
 
        [Fx.Tag.SynchronizationObject(Blocking = false, Kind = Fx.Tag.SynchronizationKind.LockStatement)]
        ItemQueue itemQueue;
 
        [Fx.Tag.SynchronizationObject]
        Queue<IQueueReader> readerQueue;
 
        [Fx.Tag.SynchronizationObject]
        List<IQueueWaiter> waiterList;
 
        public InputQueue()
        {
            this.itemQueue = new ItemQueue();
            this.readerQueue = new Queue<IQueueReader>();
            this.waiterList = new List<IQueueWaiter>();
            this.queueState = QueueState.Open;
        }
 
        public InputQueue(Func<Action<AsyncCallback, IAsyncResult>> asyncCallbackGenerator)
            : this()
        {
            Fx.Assert(asyncCallbackGenerator != null, "use default ctor if you don't have a generator");
            AsyncCallbackGenerator = asyncCallbackGenerator;
        }
 
        public int PendingCount
        {
            get
            {
                lock (ThisLock)
                {
                    return this.itemQueue.ItemCount;
                }
            }
        }
 
        // Users like ServiceModel can hook this abort ICommunicationObject or handle other non-IDisposable objects
        public Action<T> DisposeItemCallback
        {
            get;
            set;
        }
 
        // Users like ServiceModel can hook this to wrap the AsyncQueueReader callback functionality for tracing, etc
        Func<Action<AsyncCallback, IAsyncResult>> AsyncCallbackGenerator
        {
            get;
            set;
        }
 
        object ThisLock
        {
            get { return this.itemQueue; }
        }
 
        public IAsyncResult BeginDequeue(TimeSpan timeout, AsyncCallback callback, object state)
        {
            Item item = default(Item);
 
            lock (ThisLock)
            {
                if (queueState == QueueState.Open)
                {
                    if (itemQueue.HasAvailableItem)
                    {
                        item = itemQueue.DequeueAvailableItem();
                    }
                    else
                    {
                        AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state);
                        readerQueue.Enqueue(reader);
                        return reader;
                    }
                }
                else if (queueState == QueueState.Shutdown)
                {
                    if (itemQueue.HasAvailableItem)
                    {
                        item = itemQueue.DequeueAvailableItem();
                    }
                    else if (itemQueue.HasAnyItem)
                    {
                        AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state);
                        readerQueue.Enqueue(reader);
                        return reader;
                    }
                }
            }
 
            InvokeDequeuedCallback(item.DequeuedCallback);
            return new CompletedAsyncResult<T>(item.GetValue(), callback, state);
        }
 
        public IAsyncResult BeginWaitForItem(TimeSpan timeout, AsyncCallback callback, object state)
        {
            lock (ThisLock)
            {
                if (queueState == QueueState.Open)
                {
                    if (!itemQueue.HasAvailableItem)
                    {
                        AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state);
                        waiterList.Add(waiter);
                        return waiter;
                    }
                }
                else if (queueState == QueueState.Shutdown)
                {
                    if (!itemQueue.HasAvailableItem && itemQueue.HasAnyItem)
                    {
                        AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state);
                        waiterList.Add(waiter);
                        return waiter;
                    }
                }
            }
 
            return new CompletedAsyncResult<bool>(true, callback, state);
        }
 
        public void Close()
        {
            Dispose();
        }
 
        [Fx.Tag.Blocking(CancelMethod = "Close")]
        public T Dequeue(TimeSpan timeout)
        {
            T value;
 
            if (!this.Dequeue(timeout, out value))
            {
                throw Fx.Exception.AsError(new TimeoutException(InternalSR.TimeoutInputQueueDequeue(timeout)));
            }
 
            return value;
        }
 
        [Fx.Tag.Blocking(CancelMethod = "Close")]
        public bool Dequeue(TimeSpan timeout, out T value)
        {
            WaitQueueReader reader = null;
            Item item = new Item();
 
            lock (ThisLock)
            {
                if (queueState == QueueState.Open)
                {
                    if (itemQueue.HasAvailableItem)
                    {
                        item = itemQueue.DequeueAvailableItem();
                    }
                    else
                    {
                        reader = new WaitQueueReader(this);
                        readerQueue.Enqueue(reader);
                    }
                }
                else if (queueState == QueueState.Shutdown)
                {
                    if (itemQueue.HasAvailableItem)
                    {
                        item = itemQueue.DequeueAvailableItem();
                    }
                    else if (itemQueue.HasAnyItem)
                    {
                        reader = new WaitQueueReader(this);
                        readerQueue.Enqueue(reader);
                    }
                    else
                    {
                        value = default(T);
                        return true;
                    }
                }
                else // queueState == QueueState.Closed
                {
                    value = default(T);
                    return true;
                }
            }
 
            if (reader != null)
            {
                return reader.Wait(timeout, out value);
            }
            else
            {
                InvokeDequeuedCallback(item.DequeuedCallback);
                value = item.GetValue();
                return true;
            }
        }
 
        public void Dispatch()
        {
            IQueueReader reader = null;
            Item item = new Item();
            IQueueReader[] outstandingReaders = null;
            IQueueWaiter[] waiters = null;
            bool itemAvailable = true;
 
            lock (ThisLock)
            {
                itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown));
                this.GetWaiters(out waiters);
 
                if (queueState != QueueState.Closed)
                {
                    itemQueue.MakePendingItemAvailable();
 
                    if (readerQueue.Count > 0)
                    {
                        item = itemQueue.DequeueAvailableItem();
                        reader = readerQueue.Dequeue();
 
                        if (queueState == QueueState.Shutdown && readerQueue.Count > 0 && itemQueue.ItemCount == 0)
                        {
                            outstandingReaders = new IQueueReader[readerQueue.Count];
                            readerQueue.CopyTo(outstandingReaders, 0);
                            readerQueue.Clear();
 
                            itemAvailable = false;
                        }
                    }
                }
            }
 
            if (outstandingReaders != null)
            {
                if (completeOutstandingReadersCallback == null)
                {
                    completeOutstandingReadersCallback = new Action<object>(CompleteOutstandingReadersCallback);
                }
 
                ActionItem.Schedule(completeOutstandingReadersCallback, outstandingReaders);
            }
 
            if (waiters != null)
            {
                CompleteWaitersLater(itemAvailable, waiters);
            }
 
            if (reader != null)
            {
                InvokeDequeuedCallback(item.DequeuedCallback);
                reader.Set(item);
            }
        }
 
        [Fx.Tag.Blocking(CancelMethod = "Close", Conditional = "!result.IsCompleted")]
        public bool EndDequeue(IAsyncResult result, out T value)
        {
            CompletedAsyncResult<T> typedResult = result as CompletedAsyncResult<T>;
 
            if (typedResult != null)
            {
                value = CompletedAsyncResult<T>.End(result);
                return true;
            }
 
            return AsyncQueueReader.End(result, out value);
        }
 
        [Fx.Tag.Blocking(CancelMethod = "Close", Conditional = "!result.IsCompleted")]
        public T EndDequeue(IAsyncResult result)
        {
            T value;
 
            if (!this.EndDequeue(result, out value))
            {
                throw Fx.Exception.AsError(new TimeoutException());
            }
 
            return value;
        }
 
        [Fx.Tag.Blocking(CancelMethod = "Dispatch", Conditional = "!result.IsCompleted")]
        public bool EndWaitForItem(IAsyncResult result)
        {
            CompletedAsyncResult<bool> typedResult = result as CompletedAsyncResult<bool>;
            if (typedResult != null)
            {
                return CompletedAsyncResult<bool>.End(result);
            }
 
            return AsyncQueueWaiter.End(result);
        }
 
        public void EnqueueAndDispatch(T item)
        {
            EnqueueAndDispatch(item, null);
        }
 
        // dequeuedCallback is called as an item is dequeued from the InputQueue.  The 
        // InputQueue lock is not held during the callback.  However, the user code will
        // not be notified of the item being available until the callback returns.  If you
        // are not sure if the callback will block for a long time, then first call 
        // IOThreadScheduler.ScheduleCallback to get to a "safe" thread.
        public void EnqueueAndDispatch(T item, Action dequeuedCallback)
        {
            EnqueueAndDispatch(item, dequeuedCallback, true);
        }
 
        public void EnqueueAndDispatch(Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread)
        {
            Fx.Assert(exception != null, "EnqueueAndDispatch: exception parameter should not be null");
            EnqueueAndDispatch(new Item(exception, dequeuedCallback), canDispatchOnThisThread);
        }
 
        public void EnqueueAndDispatch(T item, Action dequeuedCallback, bool canDispatchOnThisThread)
        {
            Fx.Assert(item != null, "EnqueueAndDispatch: item parameter should not be null");
            EnqueueAndDispatch(new Item(item, dequeuedCallback), canDispatchOnThisThread);
        }
 
        public bool EnqueueWithoutDispatch(T item, Action dequeuedCallback)
        {
            Fx.Assert(item != null, "EnqueueWithoutDispatch: item parameter should not be null");
            return EnqueueWithoutDispatch(new Item(item, dequeuedCallback));
        }
 
        public bool EnqueueWithoutDispatch(Exception exception, Action dequeuedCallback)
        {
            Fx.Assert(exception != null, "EnqueueWithoutDispatch: exception parameter should not be null");
            return EnqueueWithoutDispatch(new Item(exception, dequeuedCallback));
        }
 
 
        public void Shutdown()
        {
            this.Shutdown(null);
        }
 
        // Don't let any more items in. Differs from Close in that we keep around
        // existing items in our itemQueue for possible future calls to Dequeue
        public void Shutdown(Func<Exception> pendingExceptionGenerator)
        {
            IQueueReader[] outstandingReaders = null;
            lock (ThisLock)
            {
                if (queueState == QueueState.Shutdown)
                {
                    return;
                }
 
                if (queueState == QueueState.Closed)
                {
                    return;
                }
 
                this.queueState = QueueState.Shutdown;
 
                if (readerQueue.Count > 0 && this.itemQueue.ItemCount == 0)
                {
                    outstandingReaders = new IQueueReader[readerQueue.Count];
                    readerQueue.CopyTo(outstandingReaders, 0);
                    readerQueue.Clear();
                }
            }
 
            if (outstandingReaders != null)
            {
                for (int i = 0; i < outstandingReaders.Length; i++)
                {
                    Exception exception = (pendingExceptionGenerator != null) ? pendingExceptionGenerator() : null;
                    outstandingReaders[i].Set(new Item(exception, null));
                }
            }
        }
 
        [Fx.Tag.Blocking(CancelMethod = "Dispatch")]
        public bool WaitForItem(TimeSpan timeout)
        {
            WaitQueueWaiter waiter = null;
            bool itemAvailable = false;
 
            lock (ThisLock)
            {
                if (queueState == QueueState.Open)
                {
                    if (itemQueue.HasAvailableItem)
                    {
                        itemAvailable = true;
                    }
                    else
                    {
                        waiter = new WaitQueueWaiter();
                        waiterList.Add(waiter);
                    }
                }
                else if (queueState == QueueState.Shutdown)
                {
                    if (itemQueue.HasAvailableItem)
                    {
                        itemAvailable = true;
                    }
                    else if (itemQueue.HasAnyItem)
                    {
                        waiter = new WaitQueueWaiter();
                        waiterList.Add(waiter);
                    }
                    else
                    {
                        return true;
                    }
                }
                else // queueState == QueueState.Closed
                {
                    return true;
                }
            }
 
            if (waiter != null)
            {
                return waiter.Wait(timeout);
            }
            else
            {
                return itemAvailable;
            }
        }
 
        public void Dispose()
        {
            bool dispose = false;
 
            lock (ThisLock)
            {
                if (queueState != QueueState.Closed)
                {
                    queueState = QueueState.Closed;
                    dispose = true;
                }
            }
 
            if (dispose)
            {
                while (readerQueue.Count > 0)
                {
                    IQueueReader reader = readerQueue.Dequeue();
                    reader.Set(default(Item));
                }
 
                while (itemQueue.HasAnyItem)
                {
                    Item item = itemQueue.DequeueAnyItem();
                    DisposeItem(item);
                    InvokeDequeuedCallback(item.DequeuedCallback);
                }
            }
        }        
 
        void DisposeItem(Item item)
        {
            T value = item.Value;
            if (value != null)
            {
                if (value is IDisposable)
                {
                    ((IDisposable)value).Dispose();
                }
                else
                {
                    Action<T> disposeItemCallback = this.DisposeItemCallback;
                    if (disposeItemCallback != null)
                    {
                        disposeItemCallback(value);
                    }
                }
            }
        }
 
        static void CompleteOutstandingReadersCallback(object state)
        {
            IQueueReader[] outstandingReaders = (IQueueReader[])state;
 
            for (int i = 0; i < outstandingReaders.Length; i++)
            {
                outstandingReaders[i].Set(default(Item));
            }
        }
 
        static void CompleteWaiters(bool itemAvailable, IQueueWaiter[] waiters)
        {
            for (int i = 0; i < waiters.Length; i++)
            {
                waiters[i].Set(itemAvailable);
            }
        }
 
        static void CompleteWaitersFalseCallback(object state)
        {
            CompleteWaiters(false, (IQueueWaiter[])state);
        }
 
        static void CompleteWaitersLater(bool itemAvailable, IQueueWaiter[] waiters)
        {
            if (itemAvailable)
            {
                if (completeWaitersTrueCallback == null)
                {
                    completeWaitersTrueCallback = new Action<object>(CompleteWaitersTrueCallback);
                }
 
                ActionItem.Schedule(completeWaitersTrueCallback, waiters);
            }
            else
            {
                if (completeWaitersFalseCallback == null)
                {
                    completeWaitersFalseCallback = new Action<object>(CompleteWaitersFalseCallback);
                }
 
                ActionItem.Schedule(completeWaitersFalseCallback, waiters);
            }
        }
 
        static void CompleteWaitersTrueCallback(object state)
        {
            CompleteWaiters(true, (IQueueWaiter[])state);
        }
 
        static void InvokeDequeuedCallback(Action dequeuedCallback)
        {
            if (dequeuedCallback != null)
            {
                dequeuedCallback();
            }
        }
 
        static void InvokeDequeuedCallbackLater(Action dequeuedCallback)
        {
            if (dequeuedCallback != null)
            {
                if (onInvokeDequeuedCallback == null)
                {
                    onInvokeDequeuedCallback = new Action<object>(OnInvokeDequeuedCallback);
                }
 
                ActionItem.Schedule(onInvokeDequeuedCallback, dequeuedCallback);
            }
        }
 
        static void OnDispatchCallback(object state)
        {
            ((InputQueue<T>)state).Dispatch();
        }
 
        static void OnInvokeDequeuedCallback(object state)
        {
            Fx.Assert(state != null, "InputQueue.OnInvokeDequeuedCallback: (state != null)");
 
            Action dequeuedCallback = (Action)state;
            dequeuedCallback();
        }
 
        void EnqueueAndDispatch(Item item, bool canDispatchOnThisThread)
        {
            bool disposeItem = false;
            IQueueReader reader = null;
            bool dispatchLater = false;
            IQueueWaiter[] waiters = null;
            bool itemAvailable = true;
 
            lock (ThisLock)
            {
                itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown));
                this.GetWaiters(out waiters);
 
                if (queueState == QueueState.Open)
                {
                    if (canDispatchOnThisThread)
                    {
                        if (readerQueue.Count == 0)
                        {
                            itemQueue.EnqueueAvailableItem(item);
                        }
                        else
                        {
                            reader = readerQueue.Dequeue();
                        }
                    }
                    else
                    {
                        if (readerQueue.Count == 0)
                        {
                            itemQueue.EnqueueAvailableItem(item);
                        }
                        else
                        {
                            itemQueue.EnqueuePendingItem(item);
                            dispatchLater = true;
                        }
                    }
                }
                else // queueState == QueueState.Closed || queueState == QueueState.Shutdown
                {
                    disposeItem = true;
                }
            }
 
            if (waiters != null)
            {
                if (canDispatchOnThisThread)
                {
                    CompleteWaiters(itemAvailable, waiters);
                }
                else
                {
                    CompleteWaitersLater(itemAvailable, waiters);
                }
            }
 
            if (reader != null)
            {
                InvokeDequeuedCallback(item.DequeuedCallback);
                reader.Set(item);
            }
 
            if (dispatchLater)
            {
                if (onDispatchCallback == null)
                {
                    onDispatchCallback = new Action<object>(OnDispatchCallback);
                }
 
                ActionItem.Schedule(onDispatchCallback, this);
            }
            else if (disposeItem)
            {
                InvokeDequeuedCallback(item.DequeuedCallback);
                DisposeItem(item);
            }
        }
 
        // This will not block, however, Dispatch() must be called later if this function
        // returns true.
        bool EnqueueWithoutDispatch(Item item)
        {
            lock (ThisLock)
            {
                // Open
                if (queueState != QueueState.Closed && queueState != QueueState.Shutdown)
                {
                    if (readerQueue.Count == 0 && waiterList.Count == 0)
                    {
                        itemQueue.EnqueueAvailableItem(item);
                        return false;
                    }
                    else
                    {
                        itemQueue.EnqueuePendingItem(item);
                        return true;
                    }
                }
            }
 
            DisposeItem(item);
            InvokeDequeuedCallbackLater(item.DequeuedCallback);
            return false;
        }
 
        void GetWaiters(out IQueueWaiter[] waiters)
        {
            if (waiterList.Count > 0)
            {
                waiters = waiterList.ToArray();
                waiterList.Clear();
            }
            else
            {
                waiters = null;
            }
        }
 
        // Used for timeouts. The InputQueue must remove readers from its reader queue to prevent
        // dispatching items to timed out readers.
        bool RemoveReader(IQueueReader reader)
        {
            Fx.Assert(reader != null, "InputQueue.RemoveReader: (reader != null)");
 
            lock (ThisLock)
            {
                if (queueState == QueueState.Open || queueState == QueueState.Shutdown)
                {
                    bool removed = false;
 
                    for (int i = readerQueue.Count; i > 0; i--)
                    {
                        IQueueReader temp = readerQueue.Dequeue();
                        if (object.ReferenceEquals(temp, reader))
                        {
                            removed = true;
                        }
                        else
                        {
                            readerQueue.Enqueue(temp);
                        }
                    }
 
                    return removed;
                }
            }
 
            return false;
        }
 
        enum QueueState
        {
            Open,
            Shutdown,
            Closed
        }
 
        interface IQueueReader
        {
            void Set(Item item);
        }
 
        interface IQueueWaiter
        {
            void Set(bool itemAvailable);
        }
 
        struct Item
        {
            Action dequeuedCallback;
            Exception exception;
            T value;
 
            public Item(T value, Action dequeuedCallback)
                : this(value, null, dequeuedCallback)
            {
            }
 
            public Item(Exception exception, Action dequeuedCallback)
                : this(null, exception, dequeuedCallback)
            {
            }
 
            Item(T value, Exception exception, Action dequeuedCallback)
            {
                this.value = value;
                this.exception = exception;
                this.dequeuedCallback = dequeuedCallback;
            }
 
            public Action DequeuedCallback
            {
                get { return this.dequeuedCallback; }
            }
 
            public Exception Exception
            {
                get { return this.exception; }
            }
 
            public T Value
            {
                get { return this.value; }
            }
 
            public T GetValue()
            {
                if (this.exception != null)
                {
                    throw Fx.Exception.AsError(this.exception);
                }
 
                return this.value;
            }
        }
 
        [Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.AsyncResult, SupportsAsync = true, ReleaseMethod = "Set")]
        class AsyncQueueReader : AsyncResult, IQueueReader
        {
            static Action<object> timerCallback = new Action<object>(AsyncQueueReader.TimerCallback);
 
            bool expired;
            InputQueue<T> inputQueue;
            T item;
            IOThreadTimer timer;
 
            public AsyncQueueReader(InputQueue<T> inputQueue, TimeSpan timeout, AsyncCallback callback, object state)
                : base(callback, state)
            {
                if (inputQueue.AsyncCallbackGenerator != null)
                {
                    base.VirtualCallback = inputQueue.AsyncCallbackGenerator();
                }
                this.inputQueue = inputQueue;
                if (timeout != TimeSpan.MaxValue)
                {
                    this.timer = new IOThreadTimer(timerCallback, this, false);
                    this.timer.Set(timeout);
                }
            }
 
            [Fx.Tag.Blocking(Conditional = "!result.IsCompleted", CancelMethod = "Set")]
            public static bool End(IAsyncResult result, out T value)
            {
                AsyncQueueReader readerResult = AsyncResult.End<AsyncQueueReader>(result);
 
                if (readerResult.expired)
                {
                    value = default(T);
                    return false;
                }
                else
                {
                    value = readerResult.item;
                    return true;
                }
            }
 
            public void Set(Item item)
            {
                this.item = item.Value;
                if (this.timer != null)
                {
                    this.timer.Cancel();
                }
                Complete(false, item.Exception);
            }
 
            static void TimerCallback(object state)
            {
                AsyncQueueReader thisPtr = (AsyncQueueReader)state;
                if (thisPtr.inputQueue.RemoveReader(thisPtr))
                {
                    thisPtr.expired = true;
                    thisPtr.Complete(false);
                }
            }
        }
 
        [Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.AsyncResult, SupportsAsync = true, ReleaseMethod = "Set")]
        class AsyncQueueWaiter : AsyncResult, IQueueWaiter
        {
            static Action<object> timerCallback = new Action<object>(AsyncQueueWaiter.TimerCallback);
            bool itemAvailable;
 
            [Fx.Tag.SynchronizationObject(Blocking = false)]
            object thisLock = new object();
 
            IOThreadTimer timer;
 
            public AsyncQueueWaiter(TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state)
            {
                if (timeout != TimeSpan.MaxValue)
                {
                    this.timer = new IOThreadTimer(timerCallback, this, false);
                    this.timer.Set(timeout);
                }
            }
 
            object ThisLock
            {
                get
                {
                    return this.thisLock;
                }
            }
 
            [Fx.Tag.Blocking(Conditional = "!result.IsCompleted", CancelMethod = "Set")]
            public static bool End(IAsyncResult result)
            {
                AsyncQueueWaiter waiterResult = AsyncResult.End<AsyncQueueWaiter>(result);
                return waiterResult.itemAvailable;
            }
 
            public void Set(bool itemAvailable)
            {
                bool timely;
 
                lock (ThisLock)
                {
                    timely = (this.timer == null) || this.timer.Cancel();
                    this.itemAvailable = itemAvailable;
                }
 
                if (timely)
                {
                    Complete(false);
                }
            }
 
            static void TimerCallback(object state)
            {
                AsyncQueueWaiter thisPtr = (AsyncQueueWaiter)state;
                thisPtr.Complete(false);
            }
        }
 
        class ItemQueue
        {
            int head;
            Item[] items;
            int pendingCount;
            int totalCount;
 
            public ItemQueue()
            {
                this.items = new Item[1];
            }
 
            public bool HasAnyItem
            {
                get { return this.totalCount > 0; }
            }
 
            public bool HasAvailableItem
            {
                get { return this.totalCount > this.pendingCount; }
            }
 
            public int ItemCount
            {
                get { return this.totalCount; }
            }
 
            public Item DequeueAnyItem()
            {
                if (this.pendingCount == this.totalCount)
                {
                    this.pendingCount--;
                }
                return DequeueItemCore();
            }
 
            public Item DequeueAvailableItem()
            {
                Fx.AssertAndThrow(this.totalCount != this.pendingCount, "ItemQueue does not contain any available items");
                return DequeueItemCore();
            }
 
            public void EnqueueAvailableItem(Item item)
            {
                EnqueueItemCore(item);
            }
 
            public void EnqueuePendingItem(Item item)
            {
                EnqueueItemCore(item);
                this.pendingCount++;
            }
 
            public void MakePendingItemAvailable()
            {
                Fx.AssertAndThrow(this.pendingCount != 0, "ItemQueue does not contain any pending items");
                this.pendingCount--;
            }
 
            Item DequeueItemCore()
            {
                Fx.AssertAndThrow(totalCount != 0, "ItemQueue does not contain any items");
                Item item = this.items[this.head];
                this.items[this.head] = new Item();
                this.totalCount--;
                this.head = (this.head + 1) % this.items.Length;
                return item;
            }
 
            void EnqueueItemCore(Item item)
            {
                if (this.totalCount == this.items.Length)
                {
                    Item[] newItems = new Item[this.items.Length * 2];
                    for (int i = 0; i < this.totalCount; i++)
                    {
                        newItems[i] = this.items[(head + i) % this.items.Length];
                    }
                    this.head = 0;
                    this.items = newItems;
                }
                int tail = (this.head + this.totalCount) % this.items.Length;
                this.items[tail] = item;
                this.totalCount++;
            }
        }
 
        [Fx.Tag.SynchronizationObject(Blocking = false)]
        [Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.ManualResetEvent, ReleaseMethod = "Set")]
        class WaitQueueReader : IQueueReader
        {
            Exception exception;
            InputQueue<T> inputQueue;
            T item;
 
            [Fx.Tag.SynchronizationObject]
            ManualResetEvent waitEvent;
 
            public WaitQueueReader(InputQueue<T> inputQueue)
            {
                this.inputQueue = inputQueue;
                waitEvent = new ManualResetEvent(false);
            }
 
            public void Set(Item item)
            {
                lock (this)
                {
                    Fx.Assert(this.item == null, "InputQueue.WaitQueueReader.Set: (this.item == null)");
                    Fx.Assert(this.exception == null, "InputQueue.WaitQueueReader.Set: (this.exception == null)");
 
                    this.exception = item.Exception;
                    this.item = item.Value;
                    waitEvent.Set();
                }
            }
 
            [Fx.Tag.Blocking(CancelMethod = "Set")]
            public bool Wait(TimeSpan timeout, out T value)
            {
                bool isSafeToClose = false;
                try
                {
                    if (!TimeoutHelper.WaitOne(waitEvent, timeout))
                    {
                        if (this.inputQueue.RemoveReader(this))
                        {
                            value = default(T);
                            isSafeToClose = true;
                            return false;
                        }
                        else
                        {
                            waitEvent.WaitOne();
                        }
                    }
 
                    isSafeToClose = true;
                }
                finally
                {
                    if (isSafeToClose)
                    {
                        waitEvent.Close();
                    }
                }
 
                if (this.exception != null)
                {
                    throw Fx.Exception.AsError(this.exception);
                }
 
                value = item;
                return true;
            }
        }
 
        [Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.ManualResetEvent, ReleaseMethod = "Set")]
        class WaitQueueWaiter : IQueueWaiter
        {
            bool itemAvailable;
 
            [Fx.Tag.SynchronizationObject]
            ManualResetEvent waitEvent;
 
            public WaitQueueWaiter()
            {
                waitEvent = new ManualResetEvent(false);
            }
 
            public void Set(bool itemAvailable)
            {
                lock (this)
                {
                    this.itemAvailable = itemAvailable;
                    waitEvent.Set();
                }
            }
 
            [Fx.Tag.Blocking(CancelMethod = "Set")]
            public bool Wait(TimeSpan timeout)
            {
                if (!TimeoutHelper.WaitOne(waitEvent, timeout))
                {
                    return false;
                }
 
                return this.itemAvailable;
            }
        }
    }
}