|
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// ExchangeUtilities.cs
//
// <OWNER>Microsoft</OWNER>
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Threading;
#if SILVERLIGHT
using System.Core; // for System.Core.SR
#endif
namespace System.Linq.Parallel
{
/// <summary>
/// ExchangeUtilities is a static class that contains helper functions to partition and merge
/// streams.
/// </summary>
internal static class ExchangeUtilities
{
//-----------------------------------------------------------------------------------
// A factory method to construct a partitioned stream over a data source.
//
// Arguments:
// source - the data source to be partitioned
// partitionCount - the number of partitions desired
// useOrdinalOrderPreservation - whether ordinal position must be tracked
// useStriping - whether striped partitioning should be used instead of range partitioning
//
internal static PartitionedStream<T, int> PartitionDataSource<T>(IEnumerable<T> source, int partitionCount, bool useStriping)
{
// The partitioned stream to return.
PartitionedStream<T, int> returnValue;
IParallelPartitionable<T> sourceAsPartitionable = source as IParallelPartitionable<T>;
if (sourceAsPartitionable != null)
{
// The type overrides the partitioning algorithm, so we will use it instead of the default.
// The returned enumerator must be the same size that we requested, otherwise we throw.
QueryOperatorEnumerator<T, int>[] enumerators = sourceAsPartitionable.GetPartitions(partitionCount);
if (enumerators == null)
{
throw new InvalidOperationException(SR.GetString(SR.ParallelPartitionable_NullReturn));
}
else if (enumerators.Length != partitionCount)
{
throw new InvalidOperationException(SR.GetString(SR.ParallelPartitionable_IncorretElementCount));
}
// Now just copy the enumerators into the stream, validating that the result is non-null.
PartitionedStream<T, int> stream =
new PartitionedStream<T, int>(partitionCount, Util.GetDefaultComparer<int>(), OrdinalIndexState.Correct);
for (int i = 0; i < partitionCount; i++)
{
QueryOperatorEnumerator<T, int> currentEnumerator = enumerators[i];
if (currentEnumerator == null)
{
throw new InvalidOperationException(SR.GetString(SR.ParallelPartitionable_NullElement));
}
stream[i] = currentEnumerator;
}
returnValue = stream;
}
else
{
returnValue = new PartitionedDataSource<T>(source, partitionCount, useStriping);
}
Contract.Assert(returnValue.PartitionCount == partitionCount);
return returnValue;
}
//-----------------------------------------------------------------------------------
// Converts an enumerator or a partitioned stream into a hash-partitioned stream. In the resulting
// partitioning, all elements with the same hash code are guaranteed to be in the same partition.
//
// Arguments:
// source - the data to be hash-partitioned. If it is a partitioned stream, it
// must have partitionCount partitions
// partitionCount - the desired number of partitions
// useOrdinalOrderPreservation - whether ordinal order preservation is required
// keySelector - function to obtain the key given an element
// keyComparer - equality comparer for the keys
//
internal static PartitionedStream<Pair<TElement, THashKey>, int> HashRepartition<TElement, THashKey, TIgnoreKey>(
PartitionedStream<TElement, TIgnoreKey> source, Func<TElement, THashKey> keySelector, IEqualityComparer<THashKey> keyComparer,
IEqualityComparer<TElement> elementComparer, CancellationToken cancellationToken)
{
TraceHelpers.TraceInfo("PartitionStream<..>.HashRepartitionStream(..):: creating **RE**partitioned stream for nested operator");
return new UnorderedHashRepartitionStream<TElement, THashKey, TIgnoreKey>(source, keySelector, keyComparer, elementComparer, cancellationToken);
}
internal static PartitionedStream<Pair<TElement, THashKey>, TOrderKey> HashRepartitionOrdered<TElement, THashKey, TOrderKey>(
PartitionedStream<TElement, TOrderKey> source, Func<TElement, THashKey> keySelector, IEqualityComparer<THashKey> keyComparer,
IEqualityComparer<TElement> elementComparer, CancellationToken cancellationToken)
{
TraceHelpers.TraceInfo("PartitionStream<..>.HashRepartitionStream(..):: creating **RE**partitioned stream for nested operator");
return new OrderedHashRepartitionStream<TElement, THashKey, TOrderKey>(source, keySelector, keyComparer, elementComparer, cancellationToken);
}
//---------------------------------------------------------------------------------------
// A helper method that given two OrdinalIndexState values return the "worse" one. For
// example, if state1 is valid and state2 is increasing, we will return
// OrdinalIndexState.Increasing.
//
internal static OrdinalIndexState Worse(this OrdinalIndexState state1, OrdinalIndexState state2)
{
return state1 > state2 ? state1 : state2;
}
internal static bool IsWorseThan(this OrdinalIndexState state1, OrdinalIndexState state2)
{
return state1 > state2;
}
}
/// <summary>
/// Used during hash partitioning, when the keys being memoized are not used for anything.
/// </summary>
internal struct NoKeyMemoizationRequired { }
}
|