|
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// QueryOperator.cs
//
// <OWNER>Microsoft</OWNER>
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics.Contracts;
namespace System.Linq.Parallel
{
/// <summary>
/// This is the abstract base class for all query operators in the system. It
/// implements the ParallelQuery{T} type so that it can be bound as the source
/// of parallel queries and so that it can be returned as the result of parallel query
/// operations. Not much is in here, although it does serve as the "entry point" for
/// opening all query operators: it will lazily analyze and cache a plan the first
/// time the tree is opened, and will open the tree upon calls to GetEnumerator.
///
/// Notes:
/// This class implements ParallelQuery so that any parallel query operator
/// can bind to the parallel query provider overloads. This allows us to string
/// together operators w/out the user always specifying AsParallel, e.g.
/// Select(Where(..., ...), ...), and so forth.
/// </summary>
/// <typeparam name="TOutput"></typeparam>
internal abstract class QueryOperator<TOutput> : ParallelQuery<TOutput>
{
protected bool m_outputOrdered;
internal QueryOperator(QuerySettings settings)
:this(false, settings)
{
}
internal QueryOperator(bool isOrdered, QuerySettings settings)
:base(settings)
{
m_outputOrdered = isOrdered;
}
//---------------------------------------------------------------------------------------
// Opening the query operator will do whatever is necessary to begin enumerating its
// results. This includes in some cases actually introducing parallelism, enumerating
// other query operators, and so on. This is abstract and left to the specific concrete
// operator classes to implement.
//
// Arguments:
// settings - various flags and settings to control query execution
// preferStriping - flag representing whether the caller prefers striped partitioning
// over range partitioning
//
// Return Values:
// Either a single enumerator, or a partition (for partition parallelism).
//
internal abstract QueryResults<TOutput> Open(QuerySettings settings, bool preferStriping);
//---------------------------------------------------------------------------------------
// The GetEnumerator method is the standard IEnumerable mechanism for walking the
// contents of a query. Note that GetEnumerator is only ever called on the root node:
// we then proceed by calling Open on all of the subsequent query nodes.
//
// Arguments:
// usePipelining - whether the returned enumerator will pipeline (i.e. return
// control to the caller when the query is spawned) or not
// (i.e. use the calling thread to execute the query). Note
// that there are some conditions during which this hint will
// be ignored -- currently, that happens only if a sort is
// found anywhere in the query graph.
// suppressOrderPreservation - whether to shut order preservation off, regardless
// of the contents of the query
//
// Return Value:
// An enumerator that retrieves elements from the query output.
//
// Notes:
// The default mode of execution is to pipeline the query execution with respect
// to the GetEnumerator caller (aka the consumer). An overload is available
// that can be used to override the default with an explicit choice.
//
public override IEnumerator<TOutput> GetEnumerator()
{
// Buffering is unspecified and order preservation is not suppressed.
return GetEnumerator(null, false);
}
public IEnumerator<TOutput> GetEnumerator(ParallelMergeOptions? mergeOptions)
{
// Pass through the value supplied for pipelining, and do not suppress
// order preservation by default.
return GetEnumerator(mergeOptions, false);
}
//---------------------------------------------------------------------------------------
// Is the output of this operator ordered?
//
internal bool OutputOrdered
{
get { return m_outputOrdered; }
}
internal virtual IEnumerator<TOutput> GetEnumerator(ParallelMergeOptions? mergeOptions, bool suppressOrderPreservation)
{
// Return a dummy enumerator that will call back GetOpenedEnumerator() on 'this' QueryOperator
// the first time the user calls MoveNext(). We do this to prevent executing the query if user
// never calls MoveNext().
return new QueryOpeningEnumerator<TOutput>(this, mergeOptions, suppressOrderPreservation);
}
//---------------------------------------------------------------------------------------
// The GetOpenedEnumerator method return an enumerator that walks the contents of a query.
// The enumerator will be "opened", which means that PLINQ will start executing the query
// immediately, even before the user calls MoveNext() for the first time.
//
internal IEnumerator<TOutput> GetOpenedEnumerator(ParallelMergeOptions? mergeOptions, bool suppressOrder, bool forEffect,
QuerySettings querySettings)
{
// If the top-level enumerator forces a premature merge, run the query sequentially.
if (querySettings.ExecutionMode.Value == ParallelExecutionMode.Default && LimitsParallelism)
{
IEnumerable<TOutput> opSequential = AsSequentialQuery(querySettings.CancellationState.ExternalCancellationToken);
return ExceptionAggregator.WrapEnumerable(opSequential, querySettings.CancellationState).GetEnumerator();
}
QueryResults<TOutput> queryResults = GetQueryResults(querySettings);
if (mergeOptions == null)
{
mergeOptions = querySettings.MergeOptions;
}
Contract.Assert(mergeOptions != null);
// Top-level pre-emptive cancellation test.
// This handles situations where cancellation has occured before execution commences
// The handling for in-execution occurs in QueryTaskGroupState.QueryEnd()
if(querySettings.CancellationState.MergedCancellationToken.IsCancellationRequested)
{
if (querySettings.CancellationState.ExternalCancellationToken.IsCancellationRequested)
throw new OperationCanceledException(querySettings.CancellationState.ExternalCancellationToken);
else
throw new OperationCanceledException();
}
bool orderedMerge = OutputOrdered && !suppressOrder;
PartitionedStreamMerger<TOutput> merger = new PartitionedStreamMerger<TOutput>(forEffect, mergeOptions.GetValueOrDefault(),
querySettings.TaskScheduler,
orderedMerge,
querySettings.CancellationState,
querySettings.QueryId);
queryResults.GivePartitionedStream(merger); // hook up the data flow between the operator-executors, starting from the merger.
if (forEffect)
{
return null;
}
return merger.MergeExecutor.GetEnumerator();
}
// This method is called only once on the 'head operator' which is the last specified operator in the query
// This method then recursively uses Open() to prepare itself and the other enumerators.
private QueryResults<TOutput> GetQueryResults(QuerySettings querySettings)
{
TraceHelpers.TraceInfo("[timing]: {0}: starting execution - QueryOperator<>::GetQueryResults", DateTime.Now.Ticks);
// All mandatory query settings must be specified
Contract.Assert(querySettings.TaskScheduler != null);
Contract.Assert(querySettings.DegreeOfParallelism.HasValue);
Contract.Assert(querySettings.ExecutionMode.HasValue);
// Now just open the query tree's root operator, supplying a specific DOP
return Open(querySettings, false);
}
//---------------------------------------------------------------------------------------
// Executes the query and returns the results in an array.
//
internal TOutput[] ExecuteAndGetResultsAsArray()
{
QuerySettings querySettings =
SpecifiedQuerySettings
.WithPerExecutionSettings()
.WithDefaults();
QueryLifecycle.LogicalQueryExecutionBegin(querySettings.QueryId);
try
{
if (querySettings.ExecutionMode.Value == ParallelExecutionMode.Default && LimitsParallelism)
{
IEnumerable<TOutput> opSequential = AsSequentialQuery(querySettings.CancellationState.ExternalCancellationToken);
IEnumerable<TOutput> opSequentialWithCancelChecks = CancellableEnumerable.Wrap(opSequential, querySettings.CancellationState.ExternalCancellationToken);
return ExceptionAggregator.WrapEnumerable(opSequentialWithCancelChecks, querySettings.CancellationState).ToArray();
}
QueryResults<TOutput> results = GetQueryResults(querySettings);
if (results.IsIndexible && OutputOrdered)
{
// The special array-based merge performs better if the output is ordered, because
// it does not have to pay for ordering. In the unordered case, we it appears that
// the stop-and-go merge performs a little better.
ArrayMergeHelper<TOutput> merger = new ArrayMergeHelper<TOutput>(SpecifiedQuerySettings, results);
merger.Execute();
TOutput[] output = merger.GetResultsAsArray();
querySettings.CleanStateAtQueryEnd();
return output;
}
else
{
PartitionedStreamMerger<TOutput> merger =
new PartitionedStreamMerger<TOutput>(false, ParallelMergeOptions.FullyBuffered, querySettings.TaskScheduler,
OutputOrdered, querySettings.CancellationState, querySettings.QueryId);
results.GivePartitionedStream(merger);
TOutput[] output = merger.MergeExecutor.GetResultsAsArray();
querySettings.CleanStateAtQueryEnd();
return output;
}
}
finally
{
QueryLifecycle.LogicalQueryExecutionEnd(querySettings.QueryId);
}
}
//---------------------------------------------------------------------------------------
// Returns an enumerable that represents the query executing sequentially.
//
// Note that iterating the returned enumerable will not wrap exceptions AggregateException.
// Before this enumerable is returned to the user, we must wrap it with an
// ExceptionAggregator.
//
internal abstract IEnumerable<TOutput> AsSequentialQuery(CancellationToken token);
//---------------------------------------------------------------------------------------
// Whether this operator performs a premature merge.
//
internal abstract bool LimitsParallelism { get; }
//---------------------------------------------------------------------------------------
// The state of the order index of the results returned by this operator.
//
internal abstract OrdinalIndexState OrdinalIndexState { get; }
//---------------------------------------------------------------------------------------
// A helper method that executes the query rooted at the openedChild operator, and returns
// the results as ListQueryResults<TSource>.
//
internal static ListQueryResults<TOutput> ExecuteAndCollectResults<TKey>(
PartitionedStream<TOutput, TKey> openedChild,
int partitionCount,
bool outputOrdered,
bool useStriping,
QuerySettings settings)
{
TaskScheduler taskScheduler = settings.TaskScheduler;
MergeExecutor<TOutput> executor = MergeExecutor<TOutput>.Execute<TKey>(
openedChild, false, ParallelMergeOptions.FullyBuffered, taskScheduler, outputOrdered,
settings.CancellationState, settings.QueryId);
return new ListQueryResults<TOutput>(executor.GetResultsAsArray(), partitionCount, useStriping);
}
//---------------------------------------------------------------------------------------
// Returns a QueryOperator<T> for any IEnumerable<T> data source. This will just do a
// cast and return a reference to the same data source if the source is another query
// operator, but will lazily allocate a scan operation and return that otherwise.
//
// Arguments:
// source - any enumerable data source to be wrapped
//
// Return Value:
// A query operator.
//
internal static QueryOperator<TOutput> AsQueryOperator(IEnumerable<TOutput> source)
{
Contract.Assert(source != null);
// Just try casting the data source to a query operator, in the case that
// our child is just another query operator.
QueryOperator<TOutput> sourceAsOperator = source as QueryOperator<TOutput>;
if (sourceAsOperator == null)
{
OrderedParallelQuery<TOutput> orderedQuery = source as OrderedParallelQuery<TOutput>;
if (orderedQuery != null)
{
// We have to handle OrderedParallelQuery<T> specially. In all other cases,
// ParallelQuery *is* the QueryOperator<T>. But, OrderedParallelQuery<T>
// is not QueryOperator<T>, it only has a reference to one. Ideally, we
// would want SortQueryOperator<T> to inherit from OrderedParallelQuery<T>,
// but that conflicts with other constraints on our class hierarchy.
sourceAsOperator = (QueryOperator<TOutput>)orderedQuery.SortOperator;
}
else
{
// If the cast failed, then the data source is a real piece of data. We
// just construct a new scan operator on top of it.
sourceAsOperator = new ScanQueryOperator<TOutput>(source);
}
}
Contract.Assert(sourceAsOperator != null);
return sourceAsOperator;
}
}
}
|