|
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// SelectManyQueryOperator.cs
//
// <OWNER>Microsoft</OWNER>
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Threading;
namespace System.Linq.Parallel
{
/// <summary>
/// SelectMany is effectively a nested loops join. It is given two data sources, an
/// outer and an inner -- actually, the inner is sometimes calculated by invoking a
/// function for each outer element -- and we walk the outer, walking the entire
/// inner enumerator for each outer element. There is an optional result selector
/// function which can transform the output before yielding it as a result element.
///
/// Notes:
/// Although select many takes two enumerable objects as input, it appears to the
/// query analysis infrastructure as a unary operator. That's because it works a
/// little differently than the other binary operators: it has to re-open the right
/// child every time an outer element is walked. The right child is NOT partitioned.
/// </summary>
/// <typeparam name="TLeftInput"></typeparam>
/// <typeparam name="TRightInput"></typeparam>
/// <typeparam name="TOutput"></typeparam>
internal sealed class SelectManyQueryOperator<TLeftInput, TRightInput, TOutput> : UnaryQueryOperator<TLeftInput, TOutput>
{
private readonly Func<TLeftInput, IEnumerable<TRightInput>> m_rightChildSelector; // To select a new child each iteration.
private readonly Func<TLeftInput, int, IEnumerable<TRightInput>> m_indexedRightChildSelector; // To select a new child each iteration.
private readonly Func<TLeftInput, TRightInput, TOutput> m_resultSelector; // An optional result selection function.
private bool m_prematureMerge = false; // Whether to prematurely merge the input of this operator.
private bool m_limitsParallelism = false; // Whether to prematurely merge the input of this operator.
//---------------------------------------------------------------------------------------
// Initializes a new select-many operator.
//
// Arguments:
// leftChild - the left data source from which to pull data.
// rightChild - the right data source from which to pull data.
// rightChildSelector - if no right data source was supplied, the selector function
// will generate a new right child for every unique left element.
// resultSelector - a selection function for creating output elements.
//
internal SelectManyQueryOperator(IEnumerable<TLeftInput> leftChild,
Func<TLeftInput, IEnumerable<TRightInput>> rightChildSelector,
Func<TLeftInput, int, IEnumerable<TRightInput>> indexedRightChildSelector,
Func<TLeftInput, TRightInput, TOutput> resultSelector)
:base(leftChild)
{
Contract.Assert(leftChild != null, "left child data source cannot be null");
Contract.Assert(rightChildSelector != null || indexedRightChildSelector != null,
"either right child data or selector must be supplied");
Contract.Assert(rightChildSelector == null || indexedRightChildSelector == null,
"either indexed- or non-indexed child selector must be supplied (not both)");
Contract.Assert(typeof(TRightInput) == typeof(TOutput) || resultSelector != null,
"right input and output must be the same types, otherwise the result selector may not be null");
m_rightChildSelector = rightChildSelector;
m_indexedRightChildSelector = indexedRightChildSelector;
m_resultSelector = resultSelector;
// If the SelectMany is indexed, elements must be returned in the order in which
// indices were assigned.
m_outputOrdered = Child.OutputOrdered || indexedRightChildSelector != null;
InitOrderIndex();
}
private void InitOrderIndex()
{
OrdinalIndexState childIndexState = Child.OrdinalIndexState;
if (m_indexedRightChildSelector != null)
{
// If this is an indexed SelectMany, we need the order keys to be Correct, so that we can pass them
// into the user delegate.
m_prematureMerge = ExchangeUtilities.IsWorseThan(childIndexState, OrdinalIndexState.Correct);
m_limitsParallelism = m_prematureMerge && childIndexState != OrdinalIndexState.Shuffled;
}
else
{
if (OutputOrdered)
{
// If the output of this SelectMany is ordered, the input keys must be at least increasing. The
// SelectMany algorithm assumes that there will be no duplicate order keys, so if the order keys
// are Shuffled, we need to merge prematurely.
m_prematureMerge = ExchangeUtilities.IsWorseThan(childIndexState, OrdinalIndexState.Increasing);
}
}
SetOrdinalIndexState(OrdinalIndexState.Increasing);
}
internal override void WrapPartitionedStream<TLeftKey>(
PartitionedStream<TLeftInput, TLeftKey> inputStream, IPartitionedStreamRecipient<TOutput> recipient, bool preferStriping, QuerySettings settings)
{
int partitionCount = inputStream.PartitionCount;
if (m_indexedRightChildSelector != null)
{
PartitionedStream<TLeftInput, int> inputStreamInt;
// If the index is not correct, we need to reindex.
if (m_prematureMerge)
{
ListQueryResults<TLeftInput> listResults =
QueryOperator<TLeftInput>.ExecuteAndCollectResults(inputStream, partitionCount, OutputOrdered, preferStriping, settings);
inputStreamInt = listResults.GetPartitionedStream();
}
else
{
inputStreamInt = (PartitionedStream<TLeftInput, int>)(object)inputStream;
}
WrapPartitionedStreamIndexed(inputStreamInt, recipient, settings);
return;
}
//
//
if (m_prematureMerge)
{
PartitionedStream<TLeftInput, int> inputStreamInt =
QueryOperator<TLeftInput>.ExecuteAndCollectResults(inputStream, partitionCount, OutputOrdered, preferStriping, settings)
.GetPartitionedStream();
WrapPartitionedStreamNotIndexed(inputStreamInt, recipient, settings);
}
else
{
WrapPartitionedStreamNotIndexed(inputStream, recipient, settings);
}
}
/// <summary>
/// A helper method for WrapPartitionedStream. We use the helper to reuse a block of code twice, but with
/// a different order key type. (If premature merge occured, the order key type will be "int". Otherwise,
/// it will be the same type as "TLeftKey" in WrapPartitionedStream.)
/// </summary>
private void WrapPartitionedStreamNotIndexed<TLeftKey>(
PartitionedStream<TLeftInput, TLeftKey> inputStream, IPartitionedStreamRecipient<TOutput> recipient, QuerySettings settings)
{
int partitionCount = inputStream.PartitionCount;
var keyComparer = new PairComparer<TLeftKey, int>(inputStream.KeyComparer, Util.GetDefaultComparer<int>());
var outputStream = new PartitionedStream<TOutput, Pair<TLeftKey, int>>(partitionCount, keyComparer, OrdinalIndexState);
for (int i = 0; i < partitionCount; i++)
{
outputStream[i] = new SelectManyQueryOperatorEnumerator<TLeftKey>(inputStream[i], this, settings.CancellationState.MergedCancellationToken);
}
recipient.Receive(outputStream);
}
/// <summary>
/// Similar helper method to WrapPartitionedStreamNotIndexed, except that this one is for the indexed variant
/// of SelectMany (i.e., the SelectMany that passes indices into the user sequence-generating delegate)
/// </summary>
private void WrapPartitionedStreamIndexed(
PartitionedStream<TLeftInput, int> inputStream, IPartitionedStreamRecipient<TOutput> recipient, QuerySettings settings)
{
var keyComparer = new PairComparer<int, int>(inputStream.KeyComparer, Util.GetDefaultComparer<int>());
var outputStream = new PartitionedStream<TOutput, Pair<int, int>>(inputStream.PartitionCount, keyComparer, OrdinalIndexState);
for (int i = 0; i < inputStream.PartitionCount; i++)
{
outputStream[i] = new IndexedSelectManyQueryOperatorEnumerator(inputStream[i], this, settings.CancellationState.MergedCancellationToken);
}
recipient.Receive(outputStream);
}
//---------------------------------------------------------------------------------------
// Just opens the current operator, including opening the left child and wrapping with a
// partition if needed. The right child is not opened yet -- this is always done on demand
// as the outer elements are enumerated.
//
internal override QueryResults<TOutput> Open(QuerySettings settings, bool preferStriping)
{
QueryResults<TLeftInput> childQueryResults = Child.Open(settings, preferStriping);
return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
}
//---------------------------------------------------------------------------------------
// Returns an enumerable that represents the query executing sequentially.
//
internal override IEnumerable<TOutput> AsSequentialQuery(CancellationToken token)
{
if (m_rightChildSelector != null)
{
if (m_resultSelector != null)
{
return CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_rightChildSelector, m_resultSelector);
}
return (IEnumerable<TOutput>)(object)(CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_rightChildSelector));
}
else
{
Contract.Assert(m_indexedRightChildSelector != null);
if (m_resultSelector != null)
{
return CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_indexedRightChildSelector, m_resultSelector);
}
return (IEnumerable<TOutput>)(object)(CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_indexedRightChildSelector));
}
}
//---------------------------------------------------------------------------------------
// Whether this operator performs a premature merge that would not be performed in
// a similar sequential operation (i.e., in LINQ to Objects).
//
internal override bool LimitsParallelism
{
get { return m_limitsParallelism; }
}
//---------------------------------------------------------------------------------------
// The enumerator type responsible for executing the SelectMany logic.
//
class IndexedSelectManyQueryOperatorEnumerator : QueryOperatorEnumerator<TOutput, Pair<int, int>>
{
private readonly QueryOperatorEnumerator<TLeftInput, int> m_leftSource; // The left data source to enumerate.
private readonly SelectManyQueryOperator<TLeftInput, TRightInput, TOutput> m_selectManyOperator; // The select many operator to use.
private IEnumerator<TRightInput> m_currentRightSource; // The current enumerator we're using.
private IEnumerator<TOutput> m_currentRightSourceAsOutput; // If we need to access the enumerator for output directly (no result selector).
private Mutables m_mutables; // bag of frequently mutated value types [allocate in moveNext to avoid false-sharing]
private readonly CancellationToken m_cancellationToken;
private class Mutables
{
internal int m_currentRightSourceIndex = -1; // The index for the right data source.
internal TLeftInput m_currentLeftElement; // The current element in the left data source.
internal int m_currentLeftSourceIndex; // The current key in the left data source.
internal int m_lhsCount; //counts the number of lhs elements enumerated. used for cancellation testing.
}
//---------------------------------------------------------------------------------------
// Instantiates a new select-many enumerator. Notice that the right data source is an
// enumera*BLE* not an enumera*TOR*. It is re-opened for every single element in the left
// data source.
//
internal IndexedSelectManyQueryOperatorEnumerator(QueryOperatorEnumerator<TLeftInput, int> leftSource,
SelectManyQueryOperator<TLeftInput, TRightInput, TOutput> selectManyOperator,
CancellationToken cancellationToken)
{
Contract.Assert(leftSource != null);
Contract.Assert(selectManyOperator != null);
m_leftSource = leftSource;
m_selectManyOperator = selectManyOperator;
m_cancellationToken = cancellationToken;
}
//---------------------------------------------------------------------------------------
// Straightforward IEnumerator<T> methods.
//
internal override bool MoveNext(ref TOutput currentElement, ref Pair<int, int> currentKey)
{
while (true)
{
if (m_currentRightSource == null)
{
m_mutables = new Mutables();
// Check cancellation every few lhs-enumerations in case none of them are producing
// any outputs. Otherwise, we rely on the consumer of this operator to be performing the checks.
if ((m_mutables.m_lhsCount++ & CancellationState.POLL_INTERVAL) == 0)
CancellationState.ThrowIfCanceled(m_cancellationToken);
// We don't have a "current" right enumerator to use. We have to fetch the next
// one. If the left has run out of elements, however, we're done and just return
// false right away.
if (!m_leftSource.MoveNext(ref m_mutables.m_currentLeftElement, ref m_mutables.m_currentLeftSourceIndex))
{
return false;
}
// Use the source selection routine to create a right child.
IEnumerable<TRightInput> rightChild =
m_selectManyOperator.m_indexedRightChildSelector(m_mutables.m_currentLeftElement, m_mutables.m_currentLeftSourceIndex);
Contract.Assert(rightChild != null);
m_currentRightSource = rightChild.GetEnumerator();
Contract.Assert(m_currentRightSource != null);
// If we have no result selector, we will need to access the Current element of the right
// data source as though it is a TOutput. Unfortunately, we know that TRightInput must
// equal TOutput (we check it during operator construction), but the type system doesn't.
// Thus we would have to cast the result of invoking Current from type TRightInput to
// TOutput. This is no good, since the results could be value types. Instead, we save the
// enumerator object as an IEnumerator<TOutput> and access that later on.
if (m_selectManyOperator.m_resultSelector == null)
{
m_currentRightSourceAsOutput = (IEnumerator<TOutput>)(object)m_currentRightSource;
Contract.Assert(m_currentRightSourceAsOutput == m_currentRightSource,
"these must be equal, otherwise the surrounding logic will be broken");
}
}
if (m_currentRightSource.MoveNext())
{
m_mutables.m_currentRightSourceIndex++;
// If the inner data source has an element, we can yield it.
if (m_selectManyOperator.m_resultSelector != null)
{
// In the case of a selection function, use that to yield the next element.
currentElement = m_selectManyOperator.m_resultSelector(m_mutables.m_currentLeftElement, m_currentRightSource.Current);
}
else
{
// Otherwise, the right input and output types must be the same. We use the
// casted copy of the current right source and just return its current element.
Contract.Assert(m_currentRightSourceAsOutput != null);
currentElement = m_currentRightSourceAsOutput.Current;
}
currentKey = new Pair<int, int>(m_mutables.m_currentLeftSourceIndex, m_mutables.m_currentRightSourceIndex);
return true;
}
else
{
// Otherwise, we have exhausted the right data source. Loop back around and try
// to get the next left element, then its right, and so on.
m_currentRightSource.Dispose();
m_currentRightSource = null;
m_currentRightSourceAsOutput = null;
}
}
}
protected override void Dispose(bool disposing)
{
m_leftSource.Dispose();
if (m_currentRightSource != null)
{
m_currentRightSource.Dispose();
}
}
}
//---------------------------------------------------------------------------------------
// The enumerator type responsible for executing the SelectMany logic.
//
class SelectManyQueryOperatorEnumerator<TLeftKey> : QueryOperatorEnumerator<TOutput, Pair<TLeftKey, int>>
{
private readonly QueryOperatorEnumerator<TLeftInput, TLeftKey> m_leftSource; // The left data source to enumerate.
private readonly SelectManyQueryOperator<TLeftInput, TRightInput, TOutput> m_selectManyOperator; // The select many operator to use.
private IEnumerator<TRightInput> m_currentRightSource; // The current enumerator we're using.
private IEnumerator<TOutput> m_currentRightSourceAsOutput; // If we need to access the enumerator for output directly (no result selector).
private Mutables m_mutables; // bag of frequently mutated value types [allocate in moveNext to avoid false-sharing]
private readonly CancellationToken m_cancellationToken;
private class Mutables
{
internal int m_currentRightSourceIndex = -1; // The index for the right data source.
internal TLeftInput m_currentLeftElement; // The current element in the left data source.
internal TLeftKey m_currentLeftKey; // The current key in the left data source.
internal int m_lhsCount; // Counts the number of lhs elements enumerated. used for cancellation testing.
}
//---------------------------------------------------------------------------------------
// Instantiates a new select-many enumerator. Notice that the right data source is an
// enumera*BLE* not an enumera*TOR*. It is re-opened for every single element in the left
// data source.
//
internal SelectManyQueryOperatorEnumerator(QueryOperatorEnumerator<TLeftInput, TLeftKey> leftSource,
SelectManyQueryOperator<TLeftInput, TRightInput, TOutput> selectManyOperator,
CancellationToken cancellationToken)
{
Contract.Assert(leftSource != null);
Contract.Assert(selectManyOperator != null);
m_leftSource = leftSource;
m_selectManyOperator = selectManyOperator;
m_cancellationToken = cancellationToken;
}
//---------------------------------------------------------------------------------------
// Straightforward IEnumerator<T> methods.
//
internal override bool MoveNext(ref TOutput currentElement, ref Pair<TLeftKey, int> currentKey)
{
while (true)
{
if (m_currentRightSource == null)
{
m_mutables = new Mutables();
// Check cancellation every few lhs-enumerations in case none of them are producing
// any outputs. Otherwise, we rely on the consumer of this operator to be performing the checks.
if ((m_mutables.m_lhsCount++ & CancellationState.POLL_INTERVAL) == 0)
CancellationState.ThrowIfCanceled(m_cancellationToken);
// We don't have a "current" right enumerator to use. We have to fetch the next
// one. If the left has run out of elements, however, we're done and just return
// false right away.
if (!m_leftSource.MoveNext(ref m_mutables.m_currentLeftElement, ref m_mutables.m_currentLeftKey))
{
return false;
}
// Use the source selection routine to create a right child.
IEnumerable<TRightInput> rightChild = m_selectManyOperator.m_rightChildSelector(m_mutables.m_currentLeftElement);
Contract.Assert(rightChild != null);
m_currentRightSource = rightChild.GetEnumerator();
Contract.Assert(m_currentRightSource != null);
// If we have no result selector, we will need to access the Current element of the right
// data source as though it is a TOutput. Unfortunately, we know that TRightInput must
// equal TOutput (we check it during operator construction), but the type system doesn't.
// Thus we would have to cast the result of invoking Current from type TRightInput to
// TOutput. This is no good, since the results could be value types. Instead, we save the
// enumerator object as an IEnumerator<TOutput> and access that later on.
if (m_selectManyOperator.m_resultSelector == null)
{
m_currentRightSourceAsOutput = (IEnumerator<TOutput>)(object)m_currentRightSource;
Contract.Assert(m_currentRightSourceAsOutput == m_currentRightSource,
"these must be equal, otherwise the surrounding logic will be broken");
}
}
if (m_currentRightSource.MoveNext())
{
m_mutables.m_currentRightSourceIndex++;
// If the inner data source has an element, we can yield it.
if (m_selectManyOperator.m_resultSelector != null)
{
// In the case of a selection function, use that to yield the next element.
currentElement = m_selectManyOperator.m_resultSelector(m_mutables.m_currentLeftElement, m_currentRightSource.Current);
}
else
{
// Otherwise, the right input and output types must be the same. We use the
// casted copy of the current right source and just return its current element.
Contract.Assert(m_currentRightSourceAsOutput != null);
currentElement = m_currentRightSourceAsOutput.Current;
}
currentKey = new Pair<TLeftKey, int>(m_mutables.m_currentLeftKey, m_mutables.m_currentRightSourceIndex);
return true;
}
else
{
// Otherwise, we have exhausted the right data source. Loop back around and try
// to get the next left element, then its right, and so on.
m_currentRightSource.Dispose();
m_currentRightSource = null;
m_currentRightSourceAsOutput = null;
}
}
}
protected override void Dispose(bool disposing)
{
m_leftSource.Dispose();
if (m_currentRightSource != null)
{
m_currentRightSource.Dispose();
}
}
}
}
}
|