|
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// MergeExecutor.cs
//
// <OWNER>Microsoft</OWNER>
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics.Contracts;
namespace System.Linq.Parallel
{
/// <summary>
/// Drives execution of an actual merge operation, including creating channel data
/// structures and scheduling parallel work as appropriate. The algorithms used
/// internally are parameterized based on the type of data in the partitions; e.g.
/// if an order preserved stream is found, the merge will automatically use an
/// order preserving merge, and so forth.
/// </summary>
/// <typeparam name="TInputOutput"></typeparam>
internal class MergeExecutor<TInputOutput> : IEnumerable<TInputOutput>
{
// Many internal algorithms are parameterized based on the data. The IMergeHelper
// is the pluggable interface whose implementations perform those algorithms.
private IMergeHelper<TInputOutput> m_mergeHelper;
// Private constructor. MergeExecutor should only be constructed via the
// MergeExecutor.Execute static method.
private MergeExecutor()
{
}
//-----------------------------------------------------------------------------------
// Creates and executes a new merge executor object.
//
// Arguments:
// partitions - the partitions whose data will be merged into one stream
// ignoreOutput - if true, we are enumerating "for effect", and we won't actually
// generate data in the output stream
// pipeline - whether to use a pipelined merge or not.
// isOrdered - whether to perform an ordering merge.
//
internal static MergeExecutor<TInputOutput> Execute<TKey>(
PartitionedStream<TInputOutput, TKey> partitions, bool ignoreOutput, ParallelMergeOptions options, TaskScheduler taskScheduler, bool isOrdered,
CancellationState cancellationState, int queryId)
{
Contract.Assert(partitions != null);
Contract.Assert(partitions.PartitionCount > 0);
Contract.Assert(!ignoreOutput || options == ParallelMergeOptions.FullyBuffered, "@BUGBUG: pipelining w/ no output not supported -- need it?");
MergeExecutor<TInputOutput> mergeExecutor = new MergeExecutor<TInputOutput>();
if (isOrdered && !ignoreOutput)
{
if (options != ParallelMergeOptions.FullyBuffered && !partitions.OrdinalIndexState.IsWorseThan(OrdinalIndexState.Increasing))
{
Contract.Assert(options == ParallelMergeOptions.NotBuffered || options == ParallelMergeOptions.AutoBuffered);
bool autoBuffered = (options == ParallelMergeOptions.AutoBuffered);
if (partitions.PartitionCount > 1)
{
// We use a pipelining ordered merge
mergeExecutor.m_mergeHelper = new OrderPreservingPipeliningMergeHelper<TInputOutput, TKey>(
partitions, taskScheduler, cancellationState, autoBuffered, queryId, partitions.KeyComparer);
}
else
{
// When DOP=1, the default merge simply returns the single producer enumerator to the consumer. This way, ordering
// does not add any extra overhead, and no producer task needs to be scheduled.
mergeExecutor.m_mergeHelper = new DefaultMergeHelper<TInputOutput, TKey>(
partitions, false, options, taskScheduler, cancellationState, queryId);
}
}
else
{
// We use a stop-and-go ordered merge helper
mergeExecutor.m_mergeHelper = new OrderPreservingMergeHelper<TInputOutput, TKey>(partitions, taskScheduler, cancellationState, queryId);
}
}
else
{
// We use a default - unordered - merge helper.
mergeExecutor.m_mergeHelper = new DefaultMergeHelper<TInputOutput, TKey>(partitions, ignoreOutput, options, taskScheduler, cancellationState, queryId);
}
mergeExecutor.Execute();
return mergeExecutor;
}
//-----------------------------------------------------------------------------------
// Initiates execution of the merge.
//
private void Execute()
{
Contract.Assert(m_mergeHelper != null);
m_mergeHelper.Execute();
}
//-----------------------------------------------------------------------------------
// Returns an enumerator that will yield elements from the resulting merged data
// stream.
//
IEnumerator IEnumerable.GetEnumerator()
{
return ((IEnumerable<TInputOutput>)this).GetEnumerator();
}
public IEnumerator<TInputOutput> GetEnumerator()
{
Contract.Assert(m_mergeHelper != null);
return m_mergeHelper.GetEnumerator();
}
//-----------------------------------------------------------------------------------
// Returns the merged results as an array.
//
internal TInputOutput[] GetResultsAsArray()
{
return m_mergeHelper.GetResultsAsArray();
}
//-----------------------------------------------------------------------------------
// This internal helper method is used to generate a set of asynchronous channels.
// The algorithm used by each channel contains the necessary synchronizationis to
// ensure it is suitable for pipelined consumption.
//
// Arguments:
// partitionsCount - the number of partitions for which to create new channels.
//
// Return Value:
// An array of asynchronous channels, one for each partition.
//
internal static AsynchronousChannel<TInputOutput>[] MakeAsynchronousChannels(int partitionCount, ParallelMergeOptions options, IntValueEvent consumerEvent, CancellationToken cancellationToken)
{
AsynchronousChannel<TInputOutput>[] channels = new AsynchronousChannel<TInputOutput>[partitionCount];
Contract.Assert(options == ParallelMergeOptions.NotBuffered || options == ParallelMergeOptions.AutoBuffered);
TraceHelpers.TraceInfo("MergeExecutor::MakeChannels: setting up {0} async channels in prep for pipeline", partitionCount);
// If we are pipelining, we need a channel that contains the necessary synchronization
// in it. We choose a bounded/blocking channel data structure: bounded so that we can
// limit the amount of memory overhead used by the query by putting a cap on the
// buffer size into which producers place data, and blocking so that the consumer can
// wait for additional data to arrive in the case that it's found to be empty.
int chunkSize = 0; // 0 means automatic chunk size
if (options == ParallelMergeOptions.NotBuffered)
{
chunkSize = 1;
}
for (int i = 0; i < channels.Length; i++)
{
channels[i] = new AsynchronousChannel<TInputOutput>(i, chunkSize, cancellationToken, consumerEvent);
}
return channels;
}
//-----------------------------------------------------------------------------------
// This internal helper method is used to generate a set of synchronous channels.
// The channel data structure used has been optimized for sequential execution and
// does not support pipelining.
//
// Arguments:
// partitionsCount - the number of partitions for which to create new channels.
//
// Return Value:
// An array of synchronous channels, one for each partition.
//
internal static SynchronousChannel<TInputOutput>[] MakeSynchronousChannels(int partitionCount)
{
SynchronousChannel<TInputOutput>[] channels = new SynchronousChannel<TInputOutput>[partitionCount];
TraceHelpers.TraceInfo("MergeExecutor::MakeChannels: setting up {0} channels in prep for stop-and-go", partitionCount);
// We just build up the results in memory using simple, dynamically growable FIFO queues.
for (int i = 0; i < channels.Length; i++)
{
channels[i] = new SynchronousChannel<TInputOutput>();
}
return channels;
}
}
}
|