File: System\Linq\Parallel\QueryOperators\PartitionerQueryOperator.cs
Project: ndp\fx\src\Core\System.Core.csproj (System.Core)
// ==++==
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
// 
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// PartitionerQueryOperator.cs
//
// <OWNER>Microsoft</OWNER>
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Collections.Concurrent;
using System.Linq.Parallel;
using System.Diagnostics.Contracts;
using System.Threading;
#if SILVERLIGHT
using System.Core; // for System.Core.SR
#endif
 
namespace System.Linq.Parallel
{
    /// <summary>
    /// A QueryOperator that represents the output of the query partitioner.AsParallel().
    /// </summary>
    internal class PartitionerQueryOperator<TElement> : QueryOperator<TElement>
    {
        private Partitioner<TElement> m_partitioner; // The partitioner to use as data source.
 
        internal PartitionerQueryOperator(Partitioner<TElement> partitioner)
            : base(false, QuerySettings.Empty)
        {
            m_partitioner = partitioner;
        }
 
        internal bool Orderable
        {
            get { return m_partitioner is OrderablePartitioner<TElement>; }
        }
 
        internal override QueryResults<TElement> Open(QuerySettings settings, bool preferStriping)
        {
            // Notice that the preferStriping argument is not used. Partitioner<T> does not support
            // striped partitioning.
 
            return new PartitionerQueryOperatorResults(m_partitioner, settings);
        }
 
        //---------------------------------------------------------------------------------------
        // Returns an enumerable that represents the query executing sequentially.
        //
 
        internal override IEnumerable<TElement> AsSequentialQuery(CancellationToken token)
        {
            using (IEnumerator<TElement> enumerator = m_partitioner.GetPartitions(1)[0])
            {
                while (enumerator.MoveNext())
                {
                    yield return enumerator.Current;
                }
            }
        }
 
        //---------------------------------------------------------------------------------------
        // The state of the order index of the results returned by this operator.
        //
 
        internal override OrdinalIndexState OrdinalIndexState
        {
            get { return GetOrdinalIndexState(m_partitioner); }
        }
 
        /// <summary>
        /// Determines the OrdinalIndexState for a partitioner 
        /// </summary>
        internal static OrdinalIndexState GetOrdinalIndexState(Partitioner<TElement> partitioner)
        {
            OrderablePartitioner<TElement> orderablePartitioner = partitioner as OrderablePartitioner<TElement>;
 
            if (orderablePartitioner == null)
            {
                return OrdinalIndexState.Shuffled;
            }
 
            if (orderablePartitioner.KeysOrderedInEachPartition)
            {
                if (orderablePartitioner.KeysNormalized)
                {
                    return OrdinalIndexState.Correct;
                }
                else
                {
                    return OrdinalIndexState.Increasing;
                }
            }
            else
            {
                return OrdinalIndexState.Shuffled;
            }
        }
 
 
        //---------------------------------------------------------------------------------------
        // Whether this operator performs a premature merge that would not be performed in
        // a similar sequential operation (i.e., in LINQ to Objects).
        //
 
        internal override bool LimitsParallelism
        {
            get { return false; }
        }
 
 
        /// <summary>
        /// QueryResults for a PartitionerQueryOperator
        /// </summary>
        private class PartitionerQueryOperatorResults : QueryResults<TElement>
        {
            private Partitioner<TElement> m_partitioner; // The data source for the query
 
            private QuerySettings m_settings; // Settings collected from the query
 
            internal PartitionerQueryOperatorResults(Partitioner<TElement> partitioner, QuerySettings settings)
            {
                m_partitioner = partitioner;
                m_settings = settings;
            }
 
            internal override void GivePartitionedStream(IPartitionedStreamRecipient<TElement> recipient)
            {
                Contract.Assert(m_settings.DegreeOfParallelism.HasValue);
                int partitionCount = m_settings.DegreeOfParallelism.Value;
 
                OrderablePartitioner<TElement> orderablePartitioner = m_partitioner as OrderablePartitioner<TElement>;
 
                // If the partitioner is not orderable, it will yield zeros as order keys. The order index state
                // is irrelevant.
                OrdinalIndexState indexState = (orderablePartitioner != null)
                    ? GetOrdinalIndexState(orderablePartitioner)
                    : OrdinalIndexState.Shuffled;
 
                PartitionedStream<TElement, int> partitions = new PartitionedStream<TElement, int>(
                    partitionCount,
                    Util.GetDefaultComparer<int>(),
                    indexState);
 
                if (orderablePartitioner != null)
                {
                    IList<IEnumerator<KeyValuePair<long, TElement>>> partitionerPartitions =
                        orderablePartitioner.GetOrderablePartitions(partitionCount);
 
                    if (partitionerPartitions == null)
                    {
                        throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_NullPartitionList));
                    }
 
                    if (partitionerPartitions.Count != partitionCount)
                    {
                        throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_WrongNumberOfPartitions));
                    }
 
                    for (int i = 0; i < partitionCount; i++)
                    {
                        IEnumerator<KeyValuePair<long, TElement>> partition = partitionerPartitions[i];
                        if (partition == null)
                        {
                            throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_NullPartition));
                        }
 
                        partitions[i] = new OrderablePartitionerEnumerator(partition);
                    }
                }
                else
                {
                    IList<IEnumerator<TElement>> partitionerPartitions =
                        m_partitioner.GetPartitions(partitionCount);
 
                    if (partitionerPartitions == null)
                    {
                        throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_NullPartitionList));
                    }
 
                    if (partitionerPartitions.Count != partitionCount)
                    {
                        throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_WrongNumberOfPartitions));
                    }
 
                    for (int i = 0; i < partitionCount; i++)
                    {
                        IEnumerator<TElement> partition = partitionerPartitions[i];
                        if (partition == null)
                        {
                            throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_NullPartition));
                        }
 
                        partitions[i] = new PartitionerEnumerator(partition);
                    }
                }
 
                recipient.Receive<int>(partitions);
            }
 
        }
 
        /// <summary>
        /// Enumerator that converts an enumerator over key-value pairs exposed by a partitioner
        /// to a QueryOperatorEnumerator used by PLINQ internally.
        /// </summary>
        private class OrderablePartitionerEnumerator : QueryOperatorEnumerator<TElement, int>
        {
            private IEnumerator<KeyValuePair<long, TElement>> m_sourceEnumerator;
 
            internal OrderablePartitionerEnumerator(IEnumerator<KeyValuePair<long, TElement>> sourceEnumerator)
            {
                m_sourceEnumerator = sourceEnumerator;
            }
 
            internal override bool MoveNext(ref TElement currentElement, ref int currentKey)
            {
                if (!m_sourceEnumerator.MoveNext()) return false;
 
                KeyValuePair<long, TElement> current = m_sourceEnumerator.Current;
                currentElement = current.Value;
 
                checked
                {
                    currentKey = (int)current.Key;
                }
 
                return true;
            }
 
            protected override void Dispose(bool disposing)
            {
                Contract.Assert(m_sourceEnumerator != null);
                m_sourceEnumerator.Dispose();
            }
        }
 
        /// <summary>
        /// Enumerator that converts an enumerator over key-value pairs exposed by a partitioner
        /// to a QueryOperatorEnumerator used by PLINQ internally.
        /// </summary>
        private class PartitionerEnumerator : QueryOperatorEnumerator<TElement, int>
        {
            private IEnumerator<TElement> m_sourceEnumerator;
 
            internal PartitionerEnumerator(IEnumerator<TElement> sourceEnumerator)
            {
                m_sourceEnumerator = sourceEnumerator;
            }
 
            internal override bool MoveNext(ref TElement currentElement, ref int currentKey)
            {
                if (!m_sourceEnumerator.MoveNext()) return false;
 
                currentElement = m_sourceEnumerator.Current;
                currentKey = 0;
 
                return true;
            }
 
            protected override void Dispose(bool disposing)
            {
                Contract.Assert(m_sourceEnumerator != null);
                m_sourceEnumerator.Dispose();
            }
        }
    }
 
}