File: System\Activities\Runtime\Scheduler.cs
Project: ndp\cdf\src\NetFx40\System.Activities\System.Activities.csproj (System.Activities)
//-----------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------------------------
 
namespace System.Activities.Runtime
{
    using System;
    using System.Diagnostics.CodeAnalysis;
    using System.Runtime;
    using System.Runtime.Serialization;
    using System.Security;
    using System.Threading;
    using System.Runtime.Diagnostics;
 
    [DataContract(Name = XD.Runtime.Scheduler, Namespace = XD.Runtime.Namespace)]
    class Scheduler
    {
        static ContinueAction continueAction = new ContinueAction();
        static YieldSilentlyAction yieldSilentlyAction = new YieldSilentlyAction();
        static AbortAction abortAction = new AbortAction();
 
        WorkItem firstWorkItem;
 
        static SendOrPostCallback onScheduledWorkCallback = Fx.ThunkCallback(new SendOrPostCallback(OnScheduledWork));
 
        SynchronizationContext synchronizationContext;
 
        bool isPausing;
        bool isRunning;
 
        bool resumeTraceRequired;
 
        Callbacks callbacks;
 
        Quack<WorkItem> workItemQueue;
 
        public Scheduler(Callbacks callbacks)
        {
            this.Initialize(callbacks);
        }
 
        public static RequestedAction Continue
        {
            get
            {
                return continueAction;
            }
        }
 
        public static RequestedAction YieldSilently
        {
            get
            {
                return yieldSilentlyAction;
            }
        }
 
        public static RequestedAction Abort
        {
            get
            {
                return abortAction;
            }
        }
 
        public bool IsRunning
        {
            get
            {
                return this.isRunning;
            }
        }
 
        public bool IsIdle
        {
            get
            {
                return this.firstWorkItem == null;
            }
        }
 
        [DataMember(EmitDefaultValue = false, Name = "firstWorkItem")]
        internal WorkItem SerializedFirstWorkItem
        {
            get { return this.firstWorkItem; }
            set { this.firstWorkItem = value; }
        }
 
        [DataMember(EmitDefaultValue = false)]
        [SuppressMessage(FxCop.Category.Performance, FxCop.Rule.AvoidUncalledPrivateCode)]
        internal WorkItem[] SerializedWorkItemQueue
        {
            get
            {
                if (this.workItemQueue != null && this.workItemQueue.Count > 0)
                {
                    return this.workItemQueue.ToArray();
                }
                else
                {
                    return null;
                }
            }
            set
            {
                Fx.Assert(value != null, "EmitDefaultValue is false so we should never get null.");
 
                // this.firstWorkItem is serialized out separately, so don't use ScheduleWork() here
                this.workItemQueue = new Quack<WorkItem>(value);
            }
        }
 
        public void FillInstanceMap(ActivityInstanceMap instanceMap)
        {
            if (this.firstWorkItem != null)
            {
                ActivityInstanceMap.IActivityReference activityReference = this.firstWorkItem as ActivityInstanceMap.IActivityReference;
                if (activityReference != null)
                {
                    instanceMap.AddEntry(activityReference, true);
                }
 
                if (this.workItemQueue != null && this.workItemQueue.Count > 0)
                {
                    for (int i = 0; i < this.workItemQueue.Count; i++)
                    {
                        activityReference = this.workItemQueue[i] as ActivityInstanceMap.IActivityReference;
                        if (activityReference != null)
                        {
                            instanceMap.AddEntry(activityReference, true);
                        }
                    }
                }
            }
        }
 
        public static RequestedAction CreateNotifyUnhandledExceptionAction(Exception exception, ActivityInstance sourceInstance)
        {
            return new NotifyUnhandledExceptionAction(exception, sourceInstance);
        }
 
        public void ClearAllWorkItems(ActivityExecutor executor)
        {
            if (this.firstWorkItem != null)
            {
                this.firstWorkItem.Release(executor);
                this.firstWorkItem = null;
 
                if (this.workItemQueue != null)
                {
                    while (this.workItemQueue.Count > 0)
                    {
                        WorkItem item = this.workItemQueue.Dequeue();
                        item.Release(executor);
                    }
                }
            }
 
            Fx.Assert(this.workItemQueue == null || this.workItemQueue.Count == 0, "We either didn't have a first work item and therefore don't have anything in the queue, or we drained the queue.");
 
            // For consistency we set this to null even if it is empty
            this.workItemQueue = null;
        }
 
        public void OnDeserialized(Callbacks callbacks)
        {
            Initialize(callbacks);
            Fx.Assert(this.firstWorkItem != null || this.workItemQueue == null, "cannot have items in the queue unless we also have a firstWorkItem set");
        }
 
        // This method should only be called when we relinquished the thread but did not
        // complete the operation (silent yield is the current example)
        public void InternalResume(RequestedAction action)
        {
            Fx.Assert(this.isRunning, "We should still be processing work - we just don't have a thread");
 
            bool isTracingEnabled = FxTrace.ShouldTraceInformation;
            bool notifiedCompletion = false;
            bool isInstanceComplete = false;
 
            if (this.callbacks.IsAbortPending)
            {
                this.isPausing = false;
                this.isRunning = false;
 
                this.NotifyWorkCompletion();
                notifiedCompletion = true;
 
                if (isTracingEnabled)
                {
                    isInstanceComplete = this.callbacks.IsCompleted;
                }
 
                // After calling SchedulerIdle we no longer have the lock.  That means
                // that any subsequent processing in this method won't have the single
                // threaded guarantee.
                this.callbacks.SchedulerIdle();
            }
            else if (object.ReferenceEquals(action, continueAction))
            {
                ScheduleWork(false);
            }
            else
            {
                Fx.Assert(action is NotifyUnhandledExceptionAction, "This is the only other choice because we should never have YieldSilently here");
 
                NotifyUnhandledExceptionAction notifyAction = (NotifyUnhandledExceptionAction)action;
 
                // We only set isRunning back to false so that the host doesn't
                // have to treat this like a pause notification.  As an example,
                // a host could turn around and call run again in response to
                // UnhandledException without having to go through its operation
                // dispatch loop first (or request pause again).  If we reset
                // isPausing here then any outstanding operations wouldn't get
                // signaled with that type of host.
                this.isRunning = false;
 
                this.NotifyWorkCompletion();
                notifiedCompletion = true;
 
                if (isTracingEnabled)
                {
                    isInstanceComplete = this.callbacks.IsCompleted;
                }
 
                this.callbacks.NotifyUnhandledException(notifyAction.Exception, notifyAction.Source);
            }
 
            if (isTracingEnabled)
            {
                if (notifiedCompletion)
                {
                    Guid oldActivityId = Guid.Empty;
                    bool resetId = false;
 
                    if (isInstanceComplete)
                    {
                        if (TD.WorkflowActivityStopIsEnabled())
                        {
                            oldActivityId = DiagnosticTraceBase.ActivityId;
                            DiagnosticTraceBase.ActivityId = this.callbacks.WorkflowInstanceId;
                            resetId = true;
 
                            TD.WorkflowActivityStop(this.callbacks.WorkflowInstanceId);
                        }
                    }
                    else
                    {
                        if (TD.WorkflowActivitySuspendIsEnabled())
                        {
                            oldActivityId = DiagnosticTraceBase.ActivityId;
                            DiagnosticTraceBase.ActivityId = this.callbacks.WorkflowInstanceId;
                            resetId = true;
 
                            TD.WorkflowActivitySuspend(this.callbacks.WorkflowInstanceId);
                        }
                    }
 
                    if (resetId)
                    {
                        DiagnosticTraceBase.ActivityId = oldActivityId;
                    }
                }
            }
        }
 
        // called from ctor and OnDeserialized intialization paths
        void Initialize(Callbacks callbacks)
        {
            this.callbacks = callbacks;
        }
 
        public void Open(SynchronizationContext synchronizationContext)
        {
            Fx.Assert(this.synchronizationContext == null, "can only open when in the created state");
            if (synchronizationContext != null)
            {
                this.synchronizationContext = synchronizationContext;
            }
            else
            {
                this.synchronizationContext = SynchronizationContextHelper.GetDefaultSynchronizationContext();
            }
        }
 
        internal void Open(Scheduler oldScheduler)
        {
            Fx.Assert(this.synchronizationContext == null, "can only open when in the created state");
            this.synchronizationContext = SynchronizationContextHelper.CloneSynchronizationContext(oldScheduler.synchronizationContext);
        }
 
        void ScheduleWork(bool notifyStart)
        {
            if (notifyStart)
            {
                this.synchronizationContext.OperationStarted();
                this.resumeTraceRequired = true;
            }
            else
            {
                this.resumeTraceRequired = false;
            }
            this.synchronizationContext.Post(Scheduler.onScheduledWorkCallback, this);
        }
 
        void NotifyWorkCompletion()
        {
            this.synchronizationContext.OperationCompleted();
        }
 
        // signal the scheduler to stop processing work. If we are processing work
        // then we will catch this signal at our next iteration. Pause process completes
        // when idle is signalled. Can be called while we're processing work since
        // the worst thing that could happen in a ---- is that we pause one extra work item later
        public void Pause()
        {
            this.isPausing = true;
        }
 
        public void MarkRunning()
        {
            this.isRunning = true;
        }
 
        public void Resume()
        {
            Fx.Assert(this.isRunning, "This should only be called after we've been set to process work.");
 
            if (this.IsIdle || this.isPausing || this.callbacks.IsAbortPending)
            {
                this.isPausing = false;
                this.isRunning = false;
                this.callbacks.SchedulerIdle();
            }
            else
            {
                ScheduleWork(true);
            }
        }
 
        public void PushWork(WorkItem workItem)
        {
            if (this.firstWorkItem == null)
            {
                this.firstWorkItem = workItem;
            }
            else
            {
                if (this.workItemQueue == null)
                {
                    this.workItemQueue = new Quack<WorkItem>();
                }
 
                this.workItemQueue.PushFront(this.firstWorkItem);
                this.firstWorkItem = workItem;
            }
 
            // To avoid the virt call on EVERY work item we check
            // the Verbose flag.  All of our Schedule traces are
            // verbose.
            if (FxTrace.ShouldTraceVerboseToTraceSource)
            {
                workItem.TraceScheduled();
            }
        }
 
        public void EnqueueWork(WorkItem workItem)
        {
            if (this.firstWorkItem == null)
            {
                this.firstWorkItem = workItem;
            }
            else
            {
                if (this.workItemQueue == null)
                {
                    this.workItemQueue = new Quack<WorkItem>();
                }
 
                this.workItemQueue.Enqueue(workItem);
            }
 
            if (FxTrace.ShouldTraceVerboseToTraceSource)
            {
                workItem.TraceScheduled();
            }
        }
 
        static void OnScheduledWork(object state)
        {
            Scheduler thisPtr = (Scheduler)state;
 
            // We snapshot these values here so that we can
            // use them after calling OnSchedulerIdle.
            bool isTracingEnabled = FxTrace.Trace.ShouldTraceToTraceSource(TraceEventLevel.Informational);
            Guid oldActivityId = Guid.Empty;
            Guid workflowInstanceId = Guid.Empty;
 
            if (isTracingEnabled)
            {
                oldActivityId = DiagnosticTraceBase.ActivityId;
                workflowInstanceId = thisPtr.callbacks.WorkflowInstanceId;
                FxTrace.Trace.SetAndTraceTransfer(workflowInstanceId, true);
 
                if (thisPtr.resumeTraceRequired)
                {
                    if (TD.WorkflowActivityResumeIsEnabled())
                    {
                        TD.WorkflowActivityResume(workflowInstanceId);
                    }
                }
            }
 
            thisPtr.callbacks.ThreadAcquired();
 
            RequestedAction nextAction = continueAction;
            bool idleOrPaused = false;
 
            while (object.ReferenceEquals(nextAction, continueAction))
            {
                if (thisPtr.IsIdle || thisPtr.isPausing)
                {
                    idleOrPaused = true;
                    break;
                }
 
                // cycle through (queue->thisPtr.firstWorkItem->currentWorkItem)
                WorkItem currentWorkItem = thisPtr.firstWorkItem;
 
                // promote an item out of our work queue if necessary
                if (thisPtr.workItemQueue != null && thisPtr.workItemQueue.Count > 0)
                {
                    thisPtr.firstWorkItem = thisPtr.workItemQueue.Dequeue();
                }
                else
                {
                    thisPtr.firstWorkItem = null;
                }
 
                if (TD.ExecuteWorkItemStartIsEnabled())
                {
                    TD.ExecuteWorkItemStart();
                }
 
                nextAction = thisPtr.callbacks.ExecuteWorkItem(currentWorkItem);
 
                if (TD.ExecuteWorkItemStopIsEnabled())
                {
                    TD.ExecuteWorkItemStop();
                }
            }
 
            bool notifiedCompletion = false;
            bool isInstanceComplete = false;
 
            if (idleOrPaused || object.ReferenceEquals(nextAction, abortAction))
            {
                thisPtr.isPausing = false;
                thisPtr.isRunning = false;
 
                thisPtr.NotifyWorkCompletion();
                notifiedCompletion = true;
 
                if (isTracingEnabled)
                {
                    isInstanceComplete = thisPtr.callbacks.IsCompleted;
                }
 
                // After calling SchedulerIdle we no longer have the lock.  That means
                // that any subsequent processing in this method won't have the single
                // threaded guarantee.
                thisPtr.callbacks.SchedulerIdle();
            }
            else if (!object.ReferenceEquals(nextAction, yieldSilentlyAction))
            {
                Fx.Assert(nextAction is NotifyUnhandledExceptionAction, "This is the only other option");
 
                NotifyUnhandledExceptionAction notifyAction = (NotifyUnhandledExceptionAction)nextAction;
 
                // We only set isRunning back to false so that the host doesn't
                // have to treat this like a pause notification.  As an example,
                // a host could turn around and call run again in response to
                // UnhandledException without having to go through its operation
                // dispatch loop first (or request pause again).  If we reset
                // isPausing here then any outstanding operations wouldn't get
                // signaled with that type of host.
                thisPtr.isRunning = false;
 
                thisPtr.NotifyWorkCompletion();
                notifiedCompletion = true;
 
                if (isTracingEnabled)
                {
                    isInstanceComplete = thisPtr.callbacks.IsCompleted;
                }
 
                thisPtr.callbacks.NotifyUnhandledException(notifyAction.Exception, notifyAction.Source);
            }
 
            if (isTracingEnabled)
            {
                if (notifiedCompletion)
                {
                    if (isInstanceComplete)
                    {
                        if (TD.WorkflowActivityStopIsEnabled())
                        {
                            TD.WorkflowActivityStop(workflowInstanceId);
                        }
                    }
                    else
                    {
                        if (TD.WorkflowActivitySuspendIsEnabled())
                        {
                            TD.WorkflowActivitySuspend(workflowInstanceId);
                        }
                    }
                }
 
                DiagnosticTraceBase.ActivityId = oldActivityId;
            }
        }
 
        public struct Callbacks
        {
            readonly ActivityExecutor activityExecutor;
 
            public Callbacks(ActivityExecutor activityExecutor)
            {
                this.activityExecutor = activityExecutor;
            }
 
            public Guid WorkflowInstanceId
            {
                get
                {
                    return this.activityExecutor.WorkflowInstanceId;
                }
            }
 
            public bool IsAbortPending
            {
                get
                {
                    return this.activityExecutor.IsAbortPending || this.activityExecutor.IsTerminatePending;
                }
            }
 
            public bool IsCompleted
            {
                get
                {
                    return ActivityUtilities.IsCompletedState(this.activityExecutor.State);
                }
            }
 
            public RequestedAction ExecuteWorkItem(WorkItem workItem)
            {
                Fx.Assert(this.activityExecutor != null, "ActivityExecutor null in ExecuteWorkItem.");
 
                // We check the Verbose flag to avoid the 
                // virt call if possible
                if (FxTrace.ShouldTraceVerboseToTraceSource)
                {
                    workItem.TraceStarting();
                }
 
                RequestedAction action = this.activityExecutor.OnExecuteWorkItem(workItem);
 
                if (!object.ReferenceEquals(action, Scheduler.YieldSilently))
                {
                    if (this.activityExecutor.IsAbortPending || this.activityExecutor.IsTerminatePending)
                    {
                        action = Scheduler.Abort;
                    }
 
                    // if the caller yields, then the work item is still active and the callback
                    // is responsible for releasing it back to the pool                    
                    workItem.Dispose(this.activityExecutor);                    
                }
 
                return action;
            }
 
            public void SchedulerIdle()
            {
                Fx.Assert(this.activityExecutor != null, "ActivityExecutor null in SchedulerIdle.");
                this.activityExecutor.OnSchedulerIdle();
            }
 
            public void ThreadAcquired()
            {
                Fx.Assert(this.activityExecutor != null, "ActivityExecutor null in ThreadAcquired.");
                this.activityExecutor.OnSchedulerThreadAcquired();
            }
 
            public void NotifyUnhandledException(Exception exception, ActivityInstance source)
            {
                Fx.Assert(this.activityExecutor != null, "ActivityExecutor null in NotifyUnhandledException.");
                this.activityExecutor.NotifyUnhandledException(exception, source);
            }
        }
 
        internal abstract class RequestedAction
        {
            protected RequestedAction()
            {
            }
        }
 
        class ContinueAction : RequestedAction
        {
            public ContinueAction()
            {
            }
        }
 
        class YieldSilentlyAction : RequestedAction
        {
            public YieldSilentlyAction()
            {
            }
        }
 
        class AbortAction : RequestedAction
        {
            public AbortAction()
            {
            }
        }
 
        class NotifyUnhandledExceptionAction : RequestedAction
        {
            public NotifyUnhandledExceptionAction(Exception exception, ActivityInstance source)
            {
                this.Exception = exception;
                this.Source = source;
            }
 
            public Exception Exception
            {
                get;
                private set;
            }
 
            public ActivityInstance Source
            {
                get;
                private set;
            }
        }
    }
}