File: Scheduler.cs
Project: ndp\cdf\src\WF\RunTime\System.Workflow.Runtime.csproj (System.Workflow.Runtime)
using System;
using System.Globalization;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Transactions;
using System.Workflow.ComponentModel;
 
namespace System.Workflow.Runtime
{
    #region Scheduler
 
    // Only one instance of this type is used for a workflow instance.
    //
    class Scheduler
    {
        #region data
 
        // state to be persisted for the scheduler
        internal static DependencyProperty HighPriorityEntriesQueueProperty = DependencyProperty.RegisterAttached("HighPriorityEntriesQueue", typeof(Queue<SchedulableItem>), typeof(Scheduler));
        internal static DependencyProperty NormalPriorityEntriesQueueProperty = DependencyProperty.RegisterAttached("NormalPriorityEntriesQueue", typeof(Queue<SchedulableItem>), typeof(Scheduler));
        Queue<SchedulableItem> highPriorityEntriesQueue;
        Queue<SchedulableItem> normalPriorityEntriesQueue;
 
        // non-persisted state for the scheduler
        WorkflowExecutor rootWorkflowExecutor;
        bool empty;
        bool canRun;
        bool threadRequested;
        bool abortOrTerminateRequested;
        Queue<SchedulableItem> transactedEntries;
        object syncObject = new object();
 
        #endregion data
 
        #region ctors
 
        // loading with some state
        public Scheduler(WorkflowExecutor rootExec, bool canRun)
        {
            this.rootWorkflowExecutor = rootExec;
            this.threadRequested = false;
 
            // canRun is true if normal creation
            // false if loading from a persisted state. Will be set to true later at ResumeOnIdle
            this.canRun = canRun;
 
            this.highPriorityEntriesQueue = (Queue<SchedulableItem>)rootExec.RootActivity.GetValue(Scheduler.HighPriorityEntriesQueueProperty);
            this.normalPriorityEntriesQueue = (Queue<SchedulableItem>)rootExec.RootActivity.GetValue(Scheduler.NormalPriorityEntriesQueueProperty);
            if (this.highPriorityEntriesQueue == null)
            {
                this.highPriorityEntriesQueue = new Queue<SchedulableItem>();
                rootExec.RootActivity.SetValue(Scheduler.HighPriorityEntriesQueueProperty, this.highPriorityEntriesQueue);
            }
            if (this.normalPriorityEntriesQueue == null)
            {
                this.normalPriorityEntriesQueue = new Queue<SchedulableItem>();
                rootExec.RootActivity.SetValue(Scheduler.NormalPriorityEntriesQueueProperty, this.normalPriorityEntriesQueue);
            }
 
            this.empty = ((this.normalPriorityEntriesQueue.Count == 0) && (this.highPriorityEntriesQueue.Count == 0));
        }
 
        #endregion ctors
 
        #region Misc properties
 
        public override string ToString()
        {
            return "Scheduler('" + ((Activity)this.RootWorkflowExecutor.WorkflowDefinition).QualifiedName + "')";
        }
 
        protected WorkflowExecutor RootWorkflowExecutor
        {
            get { return this.rootWorkflowExecutor; }
        }
 
        public bool IsStalledNow
        {
            get
            {
                return empty;
            }
        }
 
        public bool CanRun
        {
            get
            {
                return canRun;
            }
 
            set
            {
                canRun = value;
            }
        }
 
        internal bool AbortOrTerminateRequested
        {
            get
            {
                return abortOrTerminateRequested;
            }
            set
            {
                abortOrTerminateRequested = value;
            }
        }
 
        #endregion Misc properties
 
        #region Run work
 
        public void Run()
        {
            do
            {
                this.RootWorkflowExecutor.ProcessQueuedEvents();
                // Get item to run
                SchedulableItem item = GetItemToRun();
                bool runningItem = false;
 
                // no ready work to run... go away
                if (item == null)
                    break;
 
                Activity itemActivity = null;
                Exception exp = null;
 
                TransactionalProperties transactionalProperties = null;
                int contextId = item.ContextId;
 
                // This function gets the root or enclosing while-loop activity
                Activity contextActivity = this.RootWorkflowExecutor.GetContextActivityForId(contextId);
                if (contextActivity == null)
                    throw new InvalidOperationException(ExecutionStringManager.InvalidExecutionContext);
 
                // This is the activity corresponding to the item's ActivityId
                itemActivity = contextActivity.GetActivityByName(item.ActivityId);
                using (new ServiceEnvironment(itemActivity))
                {
                    exp = null;
                    bool ignoreFinallyBlock = false;
 
                    try
                    {
                        // item preamble 
                        // set up the item transactional context if necessary
                        //
                        Debug.Assert(itemActivity != null, "null itemActivity");
                        if (itemActivity == null)
                            throw new InvalidOperationException(String.Format(CultureInfo.CurrentCulture, ExecutionStringManager.InvalidActivityName, item.ActivityId));
 
                        Activity atomicActivity = null;
                        if (this.RootWorkflowExecutor.IsActivityInAtomicContext(itemActivity, out atomicActivity))
                        {
                            transactionalProperties = (TransactionalProperties)atomicActivity.GetValue(WorkflowExecutor.TransactionalPropertiesProperty);
                            // If we've aborted for any reason stop now!
                            // If we attempt to enter a new TransactionScope the com+ context will get corrupted
                            // See windows se bug 137267
                            if (!WorkflowExecutor.CheckAndProcessTransactionAborted(transactionalProperties))
                            {
                                if (transactionalProperties.TransactionScope == null)
                                {
                                    // Use TimeSpan.Zero so scope will not create timeout independent of the transaction
                                    // Use EnterpriseServicesInteropOption.Full to flow transaction to COM+
                                    transactionalProperties.TransactionScope =
                                        new TransactionScope(transactionalProperties.Transaction, TimeSpan.Zero, EnterpriseServicesInteropOption.Full);
 
                                    WorkflowTrace.Runtime.TraceEvent(TraceEventType.Information, 0,
                                        "Workflow Runtime: Scheduler: instanceId: " + this.RootWorkflowExecutor.InstanceIdString +
                                        "Entered into TransactionScope, Current atomic acitivity " + atomicActivity.Name);
                                }
                            }
                        }
 
                        // Run the item
                        //
                        runningItem = true;
                        WorkflowTrace.Runtime.TraceEvent(TraceEventType.Information, 1, "Workflow Runtime: Scheduler: InstanceId: {0} : Running scheduled entry: {1}", this.RootWorkflowExecutor.InstanceIdString, item.ToString());
 
                        // running any entry implicitly changes some state of the workflow instance                    
                        this.RootWorkflowExecutor.stateChangedSincePersistence = true;
 
                        item.Run(this.RootWorkflowExecutor);
                    }
                    catch (Exception e)
                    {
                        if (WorkflowExecutor.IsIrrecoverableException(e))
                        {
                            ignoreFinallyBlock = true;
                            throw;
                        }
                        else
                        {
                            if (transactionalProperties != null)
                                transactionalProperties.TransactionState = TransactionProcessState.AbortProcessed;
                            exp = e;
                        }
                    }
                    finally
                    {
                        if (!ignoreFinallyBlock)
                        {
                            if (runningItem)
                                WorkflowTrace.Runtime.TraceEvent(TraceEventType.Information, 1, "Workflow Runtime: Scheduler: InstanceId: {0} : Done with running scheduled entry: {1}", this.RootWorkflowExecutor.InstanceIdString, item.ToString());
 
                            // Process exception
                            //
                            if (exp != null)
                            {
                                // 
                                this.RootWorkflowExecutor.ExceptionOccured(exp, itemActivity == null ? contextActivity : itemActivity, null);
                                exp = null;
                            }
                        }
                    }
                }
            } while (true);
        }
 
        private SchedulableItem GetItemToRun()
        {
            SchedulableItem ret = null;
 
            lock (this.syncObject)
            {
                bool workToDo = false;
                if ((this.highPriorityEntriesQueue.Count > 0) || (this.normalPriorityEntriesQueue.Count > 0))
                {
                    workToDo = true;
 
                    // If an abort or termination of the workflow has been requested,
                    // then the workflow should try to terminate ASAP. Even transaction scopes
                    // in progress shouldn't be executed to completion. (Ref: 16534)
                    if (this.AbortOrTerminateRequested)
                    {
                        ret = null;
                    }
                    // got work to do in the scheduler
                    else if ((this.highPriorityEntriesQueue.Count > 0))
                    {
                        ret = this.highPriorityEntriesQueue.Dequeue();
                    }
                    else if (this.CanRun)
                    {
                        // the scheduler can run right now
                        //
 
                        // pick an entry to run
                        //
                        if (((IWorkflowCoreRuntime)this.RootWorkflowExecutor).CurrentAtomicActivity == null &&
                            (this.normalPriorityEntriesQueue.Count > 0))
                            ret = this.normalPriorityEntriesQueue.Dequeue();
                    }
                    else
                    {
                        // scheduler can't run right now.. even though there is ready work
                        // do nothing in the scheduler
                        ret = null;
                    }
                }
 
                if (!workToDo)
                {
                    // no ready work to do in the scheduler...
                    // we are gonna return the thread back
                    this.empty = true;
                }
 
                // set it to true only iff there is something to run
                this.threadRequested = (ret != null);
            }
            return ret;
        }
 
        // This method should be called only after we have determined that
        // this instance can start running now
        public void Resume()
        {
            canRun = true;
 
            if (!empty)
            {
                // There is scheduled work
                // ask the threadprovider for a thread
                this.RootWorkflowExecutor.ScheduleForWork();
            }
        }
 
        // This method should be called only after we have determined that
        // this instance can start running now
        public void ResumeIfRunnable()
        {
            if (!canRun)
                return;
 
            if (!empty)
            {
                // There is scheduled work
                // ask the threadprovider for a thread
                this.RootWorkflowExecutor.ScheduleForWork();
            }
        }
        #endregion Run work
 
        #region Schedule work
 
        public void ScheduleItem(SchedulableItem s, bool isInAtomicTransaction, bool transacted)
        {
            lock (this.syncObject)
            {
                WorkflowTrace.Runtime.TraceEvent(TraceEventType.Information, 1, "Workflow Runtime: Scheduler: InstanceId: {0} : Scheduling entry: {1}", this.RootWorkflowExecutor.InstanceIdString, s.ToString());
                // SchedulableItems in AtomicTransaction has higher priority
                Queue<SchedulableItem> q = isInAtomicTransaction ? this.highPriorityEntriesQueue : this.normalPriorityEntriesQueue;
                q.Enqueue(s);
 
                if (transacted)
                {
                    if (transactedEntries == null)
                        transactedEntries = new Queue<SchedulableItem>();
                    transactedEntries.Enqueue(s);
                }
 
                if (!this.threadRequested)
                {
                    if (this.CanRun)
                    {
                        this.RootWorkflowExecutor.ScheduleForWork();
                        this.threadRequested = true;
                    }
                }
                this.empty = false;
            }
        }
 
        #endregion Schedule work
 
        #region psuedo-transacted support
 
        public void PostPersist()
        {
            transactedEntries = null;
        }
 
        public void Rollback()
        {
            if (transactedEntries != null && transactedEntries.Count > 0)
            {
                // make a list of non-transacted entries
                // @undone: bmalhi: transacted entries only on priority-0
 
                IEnumerator<SchedulableItem> e = this.normalPriorityEntriesQueue.GetEnumerator();
                Queue<SchedulableItem> newScheduled = new Queue<SchedulableItem>();
                while (e.MoveNext())
                {
                    if (!transactedEntries.Contains(e.Current))
                        newScheduled.Enqueue(e.Current);
                }
 
                // clear the scheduled items
                this.normalPriorityEntriesQueue.Clear();
 
                // schedule the non-transacted items back
                e = newScheduled.GetEnumerator();
                while (e.MoveNext())
                    this.normalPriorityEntriesQueue.Enqueue(e.Current);
 
                transactedEntries = null;
            }
        }
 
        #endregion psuedo-transacted support
    }
 
    #endregion Scheduler
}