File: System\Linq\Parallel\Merging\OrderPreservingMergeHelper.cs
Project: ndp\fx\src\Core\System.Core.csproj (System.Core)
// ==++==
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
// 
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// OrderPreservingMergeHelper.cs
//
// <OWNER>Microsoft</OWNER>
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
 
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Diagnostics.Contracts;
 
namespace System.Linq.Parallel
{
    /// <summary>
    /// The order preserving merge helper guarantees the output stream is in a specific order. This is done
    /// by comparing keys from a set of already-sorted input partitions, and coalescing output data using
    /// incremental key comparisons.
    /// </summary>
    /// <typeparam name="TInputOutput"></typeparam>
    /// <typeparam name="TKey"></typeparam>
    internal class OrderPreservingMergeHelper<TInputOutput, TKey> : IMergeHelper<TInputOutput>
    {
        private QueryTaskGroupState m_taskGroupState; // State shared among tasks.
        private PartitionedStream<TInputOutput, TKey> m_partitions; // Source partitions.
        private Shared<TInputOutput[]> m_results; // The array where results are stored.
        private TaskScheduler m_taskScheduler; // The task manager to execute the query.
 
        //-----------------------------------------------------------------------------------
        // Instantiates a new merge helper.
        //
        // Arguments:
        //     partitions   - the source partitions from which to consume data.
        //     ignoreOutput - whether we're enumerating "for effect" or for output.
        //
 
        internal OrderPreservingMergeHelper(PartitionedStream<TInputOutput, TKey> partitions, TaskScheduler taskScheduler, 
            CancellationState cancellationState, int queryId)
        {
            Contract.Assert(partitions != null);
 
            TraceHelpers.TraceInfo("KeyOrderPreservingMergeHelper::.ctor(..): creating an order preserving merge helper");
            
            m_taskGroupState = new QueryTaskGroupState(cancellationState, queryId);
            m_partitions = partitions;
            m_results = new Shared<TInputOutput[]>(null);
            m_taskScheduler = taskScheduler;
        }
 
        //-----------------------------------------------------------------------------------
        // Schedules execution of the merge itself.
        //
        // Arguments:
        //    ordinalIndexState - the state of the ordinal index of the merged partitions
        //
 
        void IMergeHelper<TInputOutput>.Execute()
        {
            OrderPreservingSpoolingTask<TInputOutput, TKey>.Spool(m_taskGroupState, m_partitions, m_results, m_taskScheduler);
        }
 
        //-----------------------------------------------------------------------------------
        // Gets the enumerator from which to enumerate output results.
        //
 
        IEnumerator<TInputOutput> IMergeHelper<TInputOutput>.GetEnumerator()
        {
            Contract.Assert(m_results.Value != null);
            return ((IEnumerable<TInputOutput>)m_results.Value).GetEnumerator();
        }
 
 
        //-----------------------------------------------------------------------------------
        // Returns the results as an array.
        //
 
        public TInputOutput[] GetResultsAsArray()
        {
            return m_results.Value;
        }
    }
}