File: Hosting\DefaultWorkflowSchedulerService.cs
Project: ndp\cdf\src\WF\RunTime\System.Workflow.Runtime.csproj (System.Workflow.Runtime)
using System;
using System.Collections;
using System.Collections.ObjectModel;
using System.Collections.Generic;
using System.Threading;
using System.Collections.Specialized;
using System.Diagnostics;
using System.Workflow.Runtime;
using System.Globalization;
 
namespace System.Workflow.Runtime.Hosting
{
    [Obsolete("The System.Workflow.* types are deprecated.  Instead, please use the new types from System.Activities.*")]
    public class DefaultWorkflowSchedulerService : WorkflowSchedulerService
    {
        // next two fields controlled by locking the timerQueue
        private KeyedPriorityQueue<Guid, CallbackInfo, DateTime> timerQueue = new KeyedPriorityQueue<Guid, CallbackInfo, DateTime>();
        private Timer callbackTimer;
 
        private TimerCallback timerCallback;
        private const string MAX_SIMULTANEOUS_WORKFLOWS_KEY = "maxSimultaneousWorkflows";
        private const int DEFAULT_MAX_SIMULTANEOUS_WORKFLOWS = 5;
        private static TimeSpan infinite = new TimeSpan(Timeout.Infinite);
        private readonly int maxSimultaneousWorkflows;       // Maximum number of work items allowed in ThreadPool queue
        private static TimeSpan fiveMinutes = new TimeSpan(0, 5, 0);
 
        // next three fields controlled by locking the waitingQueue
        private int numCurrentWorkers;
        private Queue<WorkItem> waitingQueue;       // Queue for extra items waiting to be allowed into thread pool
        private volatile bool running = false;
 
        private IList<PerformanceCounter> queueCounters;    // expose internal queue length
 
        private static int DefaultThreadCount
        {
            get
            {
                return Environment.ProcessorCount == 1
                    ? DEFAULT_MAX_SIMULTANEOUS_WORKFLOWS
                    : (int)(DEFAULT_MAX_SIMULTANEOUS_WORKFLOWS * Environment.ProcessorCount * .8);
            }
        }
 
        public DefaultWorkflowSchedulerService()
            : this(DefaultThreadCount)
        {
        }
 
 
        public DefaultWorkflowSchedulerService(int maxSimultaneousWorkflows)
            : base()
        {
            if (maxSimultaneousWorkflows < 1)
                throw new ArgumentOutOfRangeException(MAX_SIMULTANEOUS_WORKFLOWS_KEY, maxSimultaneousWorkflows, String.Empty);
            this.maxSimultaneousWorkflows = maxSimultaneousWorkflows;
            init();
        }
 
        public DefaultWorkflowSchedulerService(NameValueCollection parameters)
            : base()
        {
            if (parameters == null)
                throw new ArgumentNullException("parameters");
 
            maxSimultaneousWorkflows = DefaultThreadCount;
            foreach (string key in parameters.Keys)
            {
                if (key == null)
                    throw new ArgumentException(String.Format(Thread.CurrentThread.CurrentCulture, ExecutionStringManager.UnknownConfigurationParameter, "null"));
                string p = parameters[key];
                if (!key.Equals(MAX_SIMULTANEOUS_WORKFLOWS_KEY, StringComparison.OrdinalIgnoreCase))
                    throw new ArgumentException(String.Format(Thread.CurrentThread.CurrentCulture, ExecutionStringManager.UnknownConfigurationParameter, key));
                if (!int.TryParse(p, System.Globalization.NumberStyles.Integer, System.Globalization.CultureInfo.CurrentCulture, out maxSimultaneousWorkflows))
                    throw new FormatException(MAX_SIMULTANEOUS_WORKFLOWS_KEY);
            }
 
            if (maxSimultaneousWorkflows < 1)
                throw new ArgumentOutOfRangeException(MAX_SIMULTANEOUS_WORKFLOWS_KEY, maxSimultaneousWorkflows, String.Empty);
 
            init();
        }
 
        private void init()
        {
            timerCallback = new TimerCallback(OnTimerCallback);
            timerQueue.FirstElementChanged += OnFirstElementChanged;
            waitingQueue = new Queue<WorkItem>();
        }
 
 
        public int MaxSimultaneousWorkflows
        {
            get { return maxSimultaneousWorkflows; }
        }
 
        internal protected override void Schedule(WaitCallback callback, Guid workflowInstanceId)
        {
            WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Scheduling work for instance {0}", workflowInstanceId);
 
            if (callback == null)
                throw new ArgumentNullException("callback");
            if (workflowInstanceId == Guid.Empty)
                throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "workflowInstanceId"));
 
            // Add the work item to our internal queue and signal the ProcessQueue thread
            EnqueueWorkItem(new WorkItem(callback, workflowInstanceId));
        }
 
        internal protected override void Schedule(WaitCallback callback, Guid workflowInstanceId, DateTime whenUtc, Guid timerId)
        {
            WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Scheduling work for instance {0} on timer ID {1} in {2}", workflowInstanceId, timerId, (whenUtc - DateTime.UtcNow));
 
            if (callback == null)
                throw new ArgumentNullException("callback");
            if (timerId == Guid.Empty)
                throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "timerId"));
            if (workflowInstanceId == Guid.Empty)
                throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "workflowInstanceId"));
 
            CallbackInfo ci = new CallbackInfo(this, callback, workflowInstanceId, whenUtc);
 
            lock (timerQueue)
            {
                timerQueue.Enqueue(timerId, ci, whenUtc);
            }
        }
 
        internal protected override void Cancel(Guid timerId)
        {
            WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Cancelling work with timer ID {0}", timerId);
 
            if (timerId == Guid.Empty)
                throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "timerId"), "timerId");
 
            lock (timerQueue)
            {
                timerQueue.Remove(timerId);
            }
        }
 
        override protected void OnStarted()
        {
            lock (timerQueue)
            {
                base.OnStarted();
                CallbackInfo ci = timerQueue.Peek();
                if (ci != null)
                    callbackTimer = CreateTimerCallback(ci);
                running = true;
            }
            lock (waitingQueue)
            {
                int nToStart = Math.Min(maxSimultaneousWorkflows, waitingQueue.Count);
                for (int i = 0; i < nToStart; i++)
                {
                    if (ThreadPool.QueueUserWorkItem(QueueWorkerProcess))
                    {
                        numCurrentWorkers++;
                    }
                }
            }
            if (queueCounters == null && this.Runtime.PerformanceCounterManager != null)
            {
                queueCounters = this.Runtime.PerformanceCounterManager.CreateCounters(ExecutionStringManager.PerformanceCounterWorkflowsWaitingName);
            }
        }
 
        protected internal override void Stop()
        {
            lock (timerQueue)
            {
                base.Stop();
                if (callbackTimer != null)
                {
                    callbackTimer.Dispose();
                    callbackTimer = null;
                }
                running = false;
            }
            lock (waitingQueue)
            {
                while (numCurrentWorkers > 0)
                {
                    Monitor.Wait(waitingQueue);
                }
            }
        }
 
        private void OnFirstElementChanged(object source, KeyedPriorityQueueHeadChangedEventArgs<CallbackInfo> e)
        {
            // timerQueue must have been locked by operation that caused this event to fire
 
            if (callbackTimer != null)
            {
                callbackTimer.Dispose();
                callbackTimer = null;
            }
            if (e.NewFirstElement != null && this.State == WorkflowRuntimeServiceState.Started)
            {
                callbackTimer = CreateTimerCallback(e.NewFirstElement);
            }
        }
 
        private void OnTimerCallback(object ignored)
        {
            //Make sure activity ID comes out of Threadpool are initialized to null.
            Trace.CorrelationManager.ActivityId = Guid.Empty;
 
            CallbackInfo ci = null;
            bool fire = false;
            try
            {
                lock (timerQueue)
                {
                    if (State == WorkflowRuntimeServiceState.Started)
                    {
                        ci = timerQueue.Peek();
                        if (ci != null)
                        {
                            if (ci.IsExpired)
                            {
                                WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Timeout occured for timer for instance {0}", ci.State);
                                timerQueue.Dequeue();
                                fire = true;
                            }
                            else
                            {
                                callbackTimer = CreateTimerCallback(ci);
                            }
                        }
                    }
                }
                if (fire && ci != null)
                    ci.Callback(ci.State);
            }
            // Ignore cases where the workflow has been stolen out from under us
            catch (WorkflowOwnershipException)
            { }
            catch (ThreadAbortException e)
            {
                WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "Timeout for instance, {0} threw exception {1}", ci == null ? null : ci.State, e.Message);
                RaiseServicesExceptionNotHandledEvent(e, (Guid)ci.State);
                throw;
            }
            catch (Exception e)
            {
                WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "Timeout for instance, {0} threw exception {1}", ci == null ? null : ci.State, e.Message);
                RaiseServicesExceptionNotHandledEvent(e, (Guid)ci.State);
            }
        }
 
        private Timer CreateTimerCallback(CallbackInfo info)
        {
            DateTime now = DateTime.UtcNow;
            TimeSpan span = (info.When > now) ? info.When - now : TimeSpan.Zero;
            if (span > fiveMinutes) // never let more than five minutes go by without checking
                span = fiveMinutes;
            return new Timer(timerCallback, info.State, span, infinite);
        }
 
        private void EnqueueWorkItem(WorkItem workItem)
        {
            lock (waitingQueue)
            {
                waitingQueue.Enqueue(workItem);
                if (running && numCurrentWorkers < maxSimultaneousWorkflows)
                {
                    if (ThreadPool.QueueUserWorkItem(this.QueueWorkerProcess))
                    {
                        numCurrentWorkers++;
                    }
                }
            }
            if (queueCounters != null)
            {
                foreach (PerformanceCounter p in queueCounters)
                {
                    p.RawValue = waitingQueue.Count;
                }
            }
        }
 
        private void QueueWorkerProcess(object state /*unused*/)
        {
            //Make sure activity ID comes out of Threadpool are initialized to null.
            Trace.CorrelationManager.ActivityId = Guid.Empty;
 
            while (true)
            {
                WorkItem workItem;
                lock (waitingQueue)
                {
                    if (waitingQueue.Count == 0 || !running)
                    {
                        numCurrentWorkers--;
                        Monitor.Pulse(waitingQueue);
                        return;
                    }
                    workItem = waitingQueue.Dequeue();
                }
                if (queueCounters != null)
                {
                    foreach (PerformanceCounter p in queueCounters)
                    {
                        p.RawValue = waitingQueue.Count;
                    }
                }
                workItem.Invoke(this);
            }
        }
 
 
        internal class WorkItem
        {
            private WaitCallback callback;
            private object state;
 
            public WorkItem(WaitCallback callback, object state)
            {
                this.callback = callback;
                this.state = state;
            }
 
            public WaitCallback Callback
            {
                get { return callback; }
            }
 
            public void Invoke(WorkflowSchedulerService service)
            {
                try
                {
                    WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Running workflow {0}", state);
                    Callback(state);
                }
                catch (Exception e)
                {
                    if (WorkflowExecutor.IsIrrecoverableException(e))
                    {
                        throw;
                    }
                    else
                    {
                        service.RaiseExceptionNotHandledEvent(e, (Guid)state);
                    }
                }
            }
        }
 
        internal class CallbackInfo
        {
            WaitCallback callback;
            object state;
            DateTime when;
            WorkflowSchedulerService service;
 
            public CallbackInfo(WorkflowSchedulerService service, WaitCallback callback, object state, DateTime when)
            {
                this.service = service;
                this.callback = callback;
                this.state = state;
                this.when = when;
            }
 
            public DateTime When
            {
                get { return when; }
            }
 
            public bool IsExpired
            {
                get { return DateTime.UtcNow >= when; }
            }
 
            public object State
            {
                get { return state; }
            }
 
            public WaitCallback Callback
            {
                get { return callback; }
            }
        }
    }
}