File: System\Linq\Parallel\Channels\SynchronousChannel.cs
Project: ndp\fx\src\Core\System.Core.csproj (System.Core)
// ==++==
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
// 
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// SynchronousChannel.cs
//
// <OWNER>Microsoft</OWNER>
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
 
using System.Collections.Generic;
using System.Diagnostics.Contracts;
 
namespace System.Linq.Parallel
{
    /// <summary>
    /// The simplest channel is one that has no synchronization.  This is used for stop-
    /// and-go productions where we are guaranteed the consumer is not running
    /// concurrently. It just wraps a FIFO queue internally.
    ///
    /// Assumptions:
    ///     Producers and consumers never try to enqueue/dequeue concurrently.
    /// </summary>
    /// <typeparam name="T"></typeparam>
    internal sealed class SynchronousChannel<T>
    {
        // We currently use the BCL FIFO queue internally, although any would do.
        private Queue<T> m_queue;
 
#if DEBUG
    // In debug builds, we keep track of when the producer is done (for asserts).
        private bool m_done;
#endif
 
        //-----------------------------------------------------------------------------------
        // Instantiates a new queue.
        //
 
        internal SynchronousChannel()
        {
        }
 
        //-----------------------------------------------------------------------------------
        // Initializes the queue for this channel.
        //
 
        internal void Init()
        {
            m_queue = new Queue<T>();
        }
 
        //-----------------------------------------------------------------------------------
        // Enqueue a new item.
        //
        // Arguments:
        //     item                - the item to place into the queue
        //     timeoutMilliseconds - synchronous channels never wait, so this is unused
        //
        // Assumptions:
        //     The producer has not signaled that it's done yet.
        //
        // Return Value:
        //     Synchronous channels always return true for this function.  It can't timeout.
        //
 
        internal void Enqueue(T item)
        {
            Contract.Assert(m_queue != null);
#if DEBUG
            Contract.Assert(!m_done, "trying to enqueue into the queue after production is done");
#endif
 
            m_queue.Enqueue(item);
        }
 
        //-----------------------------------------------------------------------------------
        // Dequeue the next item in the queue.
        //
        // Return Value:
        //     The item removed from the queue.
        //
        // Assumptions:
        //     The producer must be done producing. This queue is meant for synchronous
        //     production/consumption, therefore it's unsafe for the consumer to try and
        //     dequeue an item while a producer might be enqueueing one.
        //
 
        internal T Dequeue()
        {
            Contract.Assert(m_queue != null);
#if DEBUG
            Contract.Assert(m_done, "trying to dequeue before production is done -- this is not safe");
#endif
            return m_queue.Dequeue();
        }
 
        //-----------------------------------------------------------------------------------
        // Signals that a producer will no longer be enqueueing items.
        //
 
        internal void SetDone()
        {
#if DEBUG
    // We only track this in DEBUG builds to aid in debugging. This ensures we
    // can assert dequeue-before-done and enqueue-after-done invariants above.
            m_done = true;
#endif
        }
 
        //-----------------------------------------------------------------------------------
        // Copies the internal contents of this channel to an array.
        //
 
        internal void CopyTo(T[] array, int arrayIndex)
        {
            Contract.Assert(array != null);
#if DEBUG
            Contract.Assert(m_done, "Can only copy from the channel after it's done being added to");
#endif
            m_queue.CopyTo(array, arrayIndex);
        }
 
        //-----------------------------------------------------------------------------------
        // Retrieves the current count of items in the queue.
        //
 
        internal int Count
        {
            get
            {
                Contract.Assert(m_queue != null);
                return m_queue.Count;
            }
        }
    }
}