File: System\Linq\Parallel\QueryOperators\QuerySettings.cs
Project: ndp\fx\src\Core\System.Core.csproj (System.Core)
// ==++==
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
// 
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// QuerySettings.cs
//
// <OWNER>Microsoft</OWNER>
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
 
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics.Contracts;
#if SILVERLIGHT
using System.Core; // for System.Core.SR
#endif
 
namespace System.Linq.Parallel
{
    /// <summary>
    /// This type contains query execution options specified by the user.
    /// QuerySettings are used as follows:
    /// - in the query construction phase, some settings may be uninitialized.
    /// - at the start of the query opening phase, the WithDefaults method
    ///   is used to initialize all uninitialized settings.
    /// - in the rest of the query opening phase, we assume that all settings
    ///   have been initialized.
    /// </summary>
    internal struct QuerySettings
    {
        private TaskScheduler m_taskScheduler;
        private int? m_degreeOfParallelism;
        private CancellationState m_cancellationState;
        private ParallelExecutionMode? m_executionMode;
        private ParallelMergeOptions? m_mergeOptions;
        private int m_queryId;
 
        internal CancellationState CancellationState
        {
            get { return m_cancellationState; }
            set
            {
                m_cancellationState = value;
                Contract.Assert(m_cancellationState != null);
            }
        }
 
        // The task manager on which to execute the query.
        internal TaskScheduler TaskScheduler
        {
            get { return m_taskScheduler; }
            set { m_taskScheduler = value; }
        }
 
        // The number of parallel tasks to utilize.
        internal int? DegreeOfParallelism
        {
            get { return m_degreeOfParallelism; }
            set { m_degreeOfParallelism = value; }
        }
 
        // The mode in which to execute this query.
        internal ParallelExecutionMode? ExecutionMode
        {
            get { return m_executionMode; }
            set { m_executionMode = value; }
        }
 
        internal ParallelMergeOptions? MergeOptions
        {
            get { return m_mergeOptions; }
            set { m_mergeOptions = value; }
        }
 
        internal int QueryId
        {
            get
            {
                return m_queryId;
            }
        }
 
        //-----------------------------------------------------------------------------------
        // Constructs a new settings structure.
        //
        internal QuerySettings(TaskScheduler taskScheduler, int? degreeOfParallelism, 
            CancellationToken externalCancellationToken, ParallelExecutionMode? executionMode,
            ParallelMergeOptions? mergeOptions)
        {
            m_taskScheduler = taskScheduler;
            m_degreeOfParallelism = degreeOfParallelism;
            m_cancellationState = new CancellationState(externalCancellationToken);
            m_executionMode = executionMode;
            m_mergeOptions = mergeOptions;
            m_queryId = -1;
 
            Contract.Assert(m_cancellationState != null);
        }
 
        //-----------------------------------------------------------------------------------
        // Combines two sets of options.
        //
        internal QuerySettings Merge(QuerySettings settings2)
        {
            if (this.TaskScheduler != null && settings2.TaskScheduler != null)
            {
                throw new InvalidOperationException(SR.GetString(SR.ParallelQuery_DuplicateTaskScheduler));
            }
 
            if (this.DegreeOfParallelism != null && settings2.DegreeOfParallelism != null)
            {
                throw new InvalidOperationException(SR.GetString(SR.ParallelQuery_DuplicateDOP));
            }
 
            if (this.CancellationState.ExternalCancellationToken.CanBeCanceled && settings2.CancellationState.ExternalCancellationToken.CanBeCanceled)
            {
                throw new InvalidOperationException(SR.GetString(SR.ParallelQuery_DuplicateWithCancellation));
            }
            
            if (this.ExecutionMode != null && settings2.ExecutionMode != null)
            {
                throw new InvalidOperationException(SR.GetString(SR.ParallelQuery_DuplicateExecutionMode));
            }
            
            if (this.MergeOptions != null && settings2.MergeOptions != null)
            {
                throw new InvalidOperationException(SR.GetString(SR.ParallelQuery_DuplicateMergeOptions));
            }
            
            TaskScheduler tm = (this.TaskScheduler == null) ? settings2.TaskScheduler : this.TaskScheduler;
            int? dop = this.DegreeOfParallelism.HasValue ? this.DegreeOfParallelism : settings2.DegreeOfParallelism;
            CancellationToken externalCancellationToken = (this.CancellationState.ExternalCancellationToken.CanBeCanceled) ? this.CancellationState.ExternalCancellationToken : settings2.CancellationState.ExternalCancellationToken;
            ParallelExecutionMode? executionMode = this.ExecutionMode.HasValue ? this.ExecutionMode : settings2.ExecutionMode;
            ParallelMergeOptions? mergeOptions = this.MergeOptions.HasValue ? this.MergeOptions : settings2.MergeOptions;
            
            return new QuerySettings(tm, dop, externalCancellationToken, executionMode, mergeOptions);
        }
 
        internal QuerySettings WithPerExecutionSettings()
        {
            return WithPerExecutionSettings(new CancellationTokenSource(), new Shared<bool>(false));
        }
 
        internal QuerySettings WithPerExecutionSettings(CancellationTokenSource topLevelCancellationTokenSource, Shared<bool> topLevelDisposedFlag)
        {
            //Initialize a new QuerySettings structure and copy in the current settings.
            //Note: this has the very important effect of newing a fresh CancellationSettings, 
            //      and _not_ copying in the current internalCancellationSource or topLevelDisposedFlag which should not be 
            //      propogated to internal query executions. (This affects SelectMany execution and specifically fixes bug:535510)
            //      The fresh toplevel parameters are used instead.
            QuerySettings settings = new QuerySettings(TaskScheduler, DegreeOfParallelism, CancellationState.ExternalCancellationToken, ExecutionMode, MergeOptions);
 
            Contract.Assert(topLevelCancellationTokenSource != null, "There should always be a top-level cancellation signal specified.");
            settings.CancellationState.InternalCancellationTokenSource = topLevelCancellationTokenSource;
 
            //Merge internal and external tokens to form the combined token
             settings.CancellationState.MergedCancellationTokenSource =
                    CancellationTokenSource.CreateLinkedTokenSource(settings.CancellationState.InternalCancellationTokenSource.Token, settings.CancellationState.ExternalCancellationToken);
 
            // and copy in the topLevelDisposedFlag 
            settings.CancellationState.TopLevelDisposedFlag = topLevelDisposedFlag;
 
            Contract.Assert(settings.CancellationState.InternalCancellationTokenSource != null);
            Contract.Assert(settings.CancellationState.MergedCancellationToken.CanBeCanceled);
            Contract.Assert(settings.CancellationState.TopLevelDisposedFlag != null);
 
            // Finally, assign a query Id to the settings
#if !PFX_LEGACY_3_5 && !SILVERLIGHT
            settings.m_queryId = PlinqEtwProvider.NextQueryId();
#endif
 
            return settings;
        }
 
        //-----------------------------------------------------------------------------------
        // Copies the settings, replacing unspecified settings with defaults.
        //
        internal QuerySettings WithDefaults()
        {
            QuerySettings settings = this;
            if (settings.TaskScheduler == null)
            {
                settings.TaskScheduler = TaskScheduler.Default;
            }
 
            if (settings.DegreeOfParallelism == null)
            {
                settings.DegreeOfParallelism = Scheduling.GetDefaultDegreeOfParallelism();
            }
 
            if (settings.ExecutionMode == null)
            {
                settings.ExecutionMode = ParallelExecutionMode.Default;
            }
 
            if (settings.MergeOptions == null)
            {
                settings.MergeOptions = ParallelMergeOptions.Default;
            }
 
            if (settings.MergeOptions == ParallelMergeOptions.Default)
            {
                settings.MergeOptions = ParallelMergeOptions.AutoBuffered;
            }
 
            Contract.Assert(settings.TaskScheduler != null);
            Contract.Assert(settings.DegreeOfParallelism.HasValue);
            Contract.Assert(settings.DegreeOfParallelism.Value >= 1 && settings.DegreeOfParallelism <= Scheduling.MAX_SUPPORTED_DOP);
            Contract.Assert(settings.ExecutionMode != null);
            Contract.Assert(settings.MergeOptions != null);
 
            Contract.Assert(settings.MergeOptions != ParallelMergeOptions.Default);
 
            return settings;
        }
 
        // Returns the default settings
        internal static QuerySettings Empty {
            get { return new QuerySettings(null, null, new CancellationToken(), null, null); }
        }
 
        // Cleanup internal state once the entire query is complete.
        // (this should not be performed after a 'premature-query' completes as the state should live
        // uninterrupted for the duration of the full query.)
        public void CleanStateAtQueryEnd()
        {
            m_cancellationState.MergedCancellationTokenSource.Dispose();
        }
    }
}