File: System\Linq\Parallel\QueryOperators\Unary\ForAllOperator.cs
Project: ndp\fx\src\Core\System.Core.csproj (System.Core)
// ==++==
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
// 
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// ForAllQueryOperator.cs
//
// <OWNER>Microsoft</OWNER>
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
 
using System.Collections.Generic;
using System.Threading;
using System.Diagnostics.Contracts;
 
namespace System.Linq.Parallel
{
    /// <summary>
    /// A forall operator just enables an action to be placed at the "top" of a query tree
    /// instead of yielding an enumerator that some consumer can walk. We execute the
    /// query for effect instead of yielding a data result. 
    /// </summary>
    /// <typeparam name="TInput"></typeparam>
    internal sealed class ForAllOperator<TInput> : UnaryQueryOperator<TInput, TInput>
    {
 
        // The per-element action to be invoked.
        private readonly Action<TInput> m_elementAction;
 
        //---------------------------------------------------------------------------------------
        // Constructs a new forall operator.
        //
 
        internal ForAllOperator(IEnumerable<TInput> child, Action<TInput> elementAction)
            :base(child)
        {
            Contract.Assert(child != null, "child data source cannot be null");
            Contract.Assert(elementAction != null, "need a function");
 
            m_elementAction = elementAction;
        }
 
        //---------------------------------------------------------------------------------------
        // This invokes the entire query tree, invoking the per-element action for each result.
        //
 
        internal void RunSynchronously()
        {
            Contract.Assert(m_elementAction != null);
            
            // Get the enumerator w/out using pipelining. By the time this returns, the query
            // has been executed and we are done. We expect the return to be null.
            Shared<bool> dummyTopLevelDisposeFlag = new Shared<bool>(false);
 
            CancellationTokenSource dummyInternalCancellationTokenSource = new CancellationTokenSource();
 
            // stuff in appropriate defaults for unspecified options.
            QuerySettings settingsWithDefaults = SpecifiedQuerySettings
                .WithPerExecutionSettings(dummyInternalCancellationTokenSource, dummyTopLevelDisposeFlag)
                .WithDefaults();
 
            QueryLifecycle.LogicalQueryExecutionBegin(settingsWithDefaults.QueryId);
 
            IEnumerator<TInput> enumerator = GetOpenedEnumerator(ParallelMergeOptions.FullyBuffered, true, true,
                settingsWithDefaults);
            settingsWithDefaults.CleanStateAtQueryEnd();
            Contract.Assert(enumerator == null);
 
            QueryLifecycle.LogicalQueryExecutionEnd(settingsWithDefaults.QueryId);
        }
 
        //---------------------------------------------------------------------------------------
        // Just opens the current operator, including opening the child and wrapping it with
        // partitions as needed.
        //
 
        internal override QueryResults<TInput> Open(
            QuerySettings settings, bool preferStriping)
        {
            // We just open the child operator.
            QueryResults<TInput> childQueryResults = Child.Open(settings, preferStriping);
            return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
        }
 
        internal override void  WrapPartitionedStream<TKey>(
            PartitionedStream<TInput,TKey> inputStream, IPartitionedStreamRecipient<TInput> recipient, bool preferStriping, QuerySettings settings)
        {
            int partitionCount = inputStream.PartitionCount;
            PartitionedStream<TInput, int> outputStream = new PartitionedStream<TInput, int>(
                partitionCount, Util.GetDefaultComparer<int>(), OrdinalIndexState.Correct);
            for (int i = 0; i < partitionCount; i++)
            {
                outputStream[i] = new ForAllEnumerator<TKey>(
                    inputStream[i], m_elementAction, settings.CancellationState.MergedCancellationToken);
            }
 
            recipient.Receive(outputStream);
        }
 
        //---------------------------------------------------------------------------------------
        // Returns an enumerable that represents the query executing sequentially.
        //
 
        internal override IEnumerable<TInput> AsSequentialQuery(CancellationToken token)
        {
            Contract.Assert(false, "AsSequentialQuery is not supported on ForAllOperator");
            throw new InvalidOperationException();
        }
 
        //---------------------------------------------------------------------------------------
        // Whether this operator performs a premature merge that would not be performed in
        // a similar sequential operation (i.e., in LINQ to Objects).
        //
 
        internal override bool LimitsParallelism
        {
            get { return false; }
        }
 
        //---------------------------------------------------------------------------------------
        // The executable form of a forall operator. When it is enumerated, the entire underlying
        // partition is walked, invoking the per-element action for each item.
        //
 
        private class ForAllEnumerator<TKey> : QueryOperatorEnumerator<TInput, int>
        {
            private readonly QueryOperatorEnumerator<TInput, TKey> m_source; // The data source.
            private readonly Action<TInput> m_elementAction; // Forall operator being executed.
            private CancellationToken m_cancellationToken; // Token used to cancel this operator.
 
            //---------------------------------------------------------------------------------------
            // Constructs a new forall enumerator object.
            //
 
            internal ForAllEnumerator(QueryOperatorEnumerator<TInput, TKey> source, Action<TInput> elementAction, CancellationToken cancellationToken)
            {
                Contract.Assert(source != null);
                Contract.Assert(elementAction != null);
 
                m_source = source;
                m_elementAction = elementAction;
                m_cancellationToken = cancellationToken;
            }
 
            //---------------------------------------------------------------------------------------
            // Just walks the entire data source upon its first invocation, performing the per-
            // element action for each element.
            //
 
            internal override bool MoveNext(ref TInput currentElement, ref int currentKey)
            {
                Contract.Assert(m_elementAction != null, "expected a compiled operator");
 
                // We just scroll through the enumerator and execute the action. Because we execute
                // "in place", we actually never even produce a single value.
                
                // Cancellation testing must be performed here as full enumeration occurs within this method.
                // We only need to throw a simple exception here.. marshalling logic handled via QueryTaskGroupState.QueryEnd (called by ForAllSpoolingTask)
                TInput element = default(TInput);
                TKey keyUnused = default(TKey);
                int i = 0;
                while (m_source.MoveNext(ref element, ref keyUnused))
                {
                    if ((i++ & CancellationState.POLL_INTERVAL) == 0)
                        CancellationState.ThrowIfCanceled(m_cancellationToken);
                    m_elementAction(element);
                }
 
                
                return false;
            }
 
            protected override void Dispose(bool disposing)
            {
                Contract.Assert(m_source != null);
                m_source.Dispose();
            }
        }
    }
}