|
#pragma warning disable 1634, 1691
using System;
using System.Diagnostics;
using System.Transactions;
using System.Collections;
using System.Collections.Generic;
using System.Workflow.Runtime.Hosting;
namespace System.Workflow.Runtime
{
#region Runtime Batch Implementation
#region WorkBatch
internal enum WorkBatchState
{
Usable,
Merged,
Completed
}
/// <summary>
/// Summary description for Work Batching.
/// </summary>
internal sealed class WorkBatch : IWorkBatch, IDisposable
{
private PendingWorkCollection _pendingWorkCollection;
private object mutex = new object();
private WorkBatchState _state;
private WorkBatchCollection _collection = null;
private WorkBatch()
{
}
internal WorkBatch(WorkBatchCollection workBackCollection)
{
_pendingWorkCollection = new PendingWorkCollection();
_state = WorkBatchState.Usable;
_collection = workBackCollection;
}
internal int Count
{
get { return _pendingWorkCollection.WorkItems.Count; }
}
internal void SetWorkBatchCollection(WorkBatchCollection workBatchCollection)
{
_collection = workBatchCollection;
}
#region IWorkBatch Implementation
/// <summary>
/// Add Work to Batch
/// </summary>
/// <param name="work"></param>
/// <param name="workItem"></param>
void IWorkBatch.Add(IPendingWork work, object workItem)
{
if (_pendingWorkCollection == null)
throw new ObjectDisposedException("WorkBatch");
lock (this.mutex)
{
System.Diagnostics.Debug.Assert(this._state == WorkBatchState.Usable, "Trying to add to unusable batch.");
_pendingWorkCollection.Add(work, _collection.GetNextWorkItemOrderId(work), workItem);
}
}
#endregion
#region Internal Implementation
internal bool IsDirty
{
get
{
return this._pendingWorkCollection.IsDirty;
}
}
/// <summary>
/// This one commits all the pending work and its items
/// added so far in this batch.
/// </summary>
/// <param name="transaction"></param>
internal void Commit(Transaction transaction)
{
lock (this.mutex)
{
_pendingWorkCollection.Commit(transaction);
}
}
/// <summary>
/// This one completes the pending work
/// </summary>
/// <param name="succeeded"></param>
internal void Complete(bool succeeded)
{
lock (this.mutex)
{
if (this._pendingWorkCollection.IsUsable)
{
_pendingWorkCollection.Complete(succeeded);
_pendingWorkCollection.Dispose();
this._state = WorkBatchState.Completed;
}
}
}
/// <summary>
/// API for Runtime to call to do Merge Operation: Right now
/// we dont use this because we dont support incoming work collection.
/// </summary>
/// <param name="batch"></param>
internal void Merge(WorkBatch batch)
{
if (batch == null)
return; //nothing to merge
if (_pendingWorkCollection == null)
throw new ObjectDisposedException("WorkBatch");
lock (this.mutex)
{
lock (batch.mutex)
{
foreach (KeyValuePair<IPendingWork, SortedList<long, object>> item in batch._pendingWorkCollection.WorkItems)
{
//_pendingWorkCollection.AddRange(item.Key, item.Value);
SortedList<long, object> newItems = item.Value;
foreach (KeyValuePair<long, object> kvp in newItems)
_pendingWorkCollection.Add(item.Key, kvp.Key, kvp.Value);
}
}
this._state = WorkBatchState.Merged;
}
}
#endregion
#region IDisposable Members
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
private void Dispose(bool disposing)
{
if (disposing)
{
_pendingWorkCollection.Dispose();
_pendingWorkCollection = null;
}
}
#endregion
#region PendingWorkCollection implementation
/// <summary>
/// Pending Work Implementation
/// </summary>
internal sealed class PendingWorkCollection : IDisposable
{
Dictionary<IPendingWork, SortedList<long, object>> Items;
#region Internal Implementation
internal PendingWorkCollection()
{
Items = new Dictionary<IPendingWork, SortedList<long, object>>();
}
internal Dictionary<IPendingWork, SortedList<long, object>> WorkItems
{
get
{
return Items;
}
}
internal bool IsUsable
{
get
{
return this.Items != null;
}
}
internal bool IsDirty
{
get
{
if (!this.IsUsable)
return false;
//
// Loop through all pending work items in the collection
// If any of them assert that they need to commit than the batch is dirty
foreach (KeyValuePair<IPendingWork, SortedList<long, object>> workItem in this.WorkItems)
{
try
{
IPendingWork work = workItem.Key;
if (work.MustCommit(workItem.Value))
return true;
}
catch (Exception e)
{
if (WorkflowExecutor.IsIrrecoverableException(e))
{
#pragma warning disable 56503
throw;
#pragma warning restore 56503
}
else
{
// Ignore exceptions and treat condition as false return value;
}
}
}
//
// If no one asserted that they need to commit we're not dirty
return false;
}
}
internal void Add(IPendingWork work, long orderId, object workItem)
{
SortedList<long, object> workItems = null;
if (!Items.TryGetValue(work, out workItems))
{
workItems = new SortedList<long, object>();
Items.Add(work, workItems);
}
Debug.Assert(!workItems.ContainsKey(orderId), string.Format(System.Globalization.CultureInfo.InvariantCulture, "List already contains key {0}", orderId));
workItems.Add(orderId, workItem);
WorkflowTrace.Runtime.TraceEvent(TraceEventType.Information, 0, "pending work hc {0} added workItem hc {1}", work.GetHashCode(), workItem.GetHashCode());
}
//Commit All Pending Work
internal void Commit(Transaction transaction)
{
//ignore items param
foreach (KeyValuePair<IPendingWork, SortedList<long, object>> workItem in Items)
{
IPendingWork work = workItem.Key;
List<object> values = new List<object>(workItem.Value.Values);
work.Commit(transaction, values);
}
}
//Complete All Pending Work
internal void Complete(bool succeeded)
{
foreach (KeyValuePair<IPendingWork, SortedList<long, object>> workItem in Items)
{
IPendingWork work = workItem.Key;
List<object> values = new List<object>(workItem.Value.Values);
try
{
work.Complete(succeeded, values);
}
catch (Exception e)
{
if (WorkflowExecutor.IsIrrecoverableException(e))
{
throw;
}
else
{
WorkflowTrace.Runtime.TraceEvent(TraceEventType.Warning, 0, "Work Item {0} threw exception on complete notification", workItem.GetType());
}
}
}
}
#endregion
#region IDisposable Members
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
private void Dispose(bool disposing)
{
if (disposing && Items != null)
{
Items.Clear();
Items = null;
}
}
#endregion
}
#endregion
}
#endregion
#region WorkBatchCollection
/// <summary>
/// collection of name to Batch
/// </summary>
internal sealed class WorkBatchCollection : Dictionary<object, WorkBatch>
{
object transientBatchID = new object();
private object mutex = new object();
//
// All access must be through Interlocked.* methods
private long _workItemOrderId = 0;
internal long WorkItemOrderId
{
get
{
return Threading.Interlocked.Read(ref _workItemOrderId);
}
set
{
Debug.Assert(value >= _workItemOrderId, "New value for WorkItemOrderId must be greater than the current value");
lock (mutex)
{
_workItemOrderId = value;
}
}
}
internal long GetNextWorkItemOrderId(IPendingWork pendingWork)
{
return Threading.Interlocked.Increment(ref _workItemOrderId);
}
/// <summary>
/// A new batch is created per atomic scope or any
/// required sub batches. An example of an optional sub batch
/// could be a batch created for Send activities
/// </summary>
/// <param name="id"></param>
/// <returns></returns>
internal IWorkBatch GetBatch(object id)
{
WorkBatch batch = null;
lock (mutex)
{
if (this.TryGetValue(id, out batch))
return batch;
batch = new WorkBatch(this);
Add(id, batch);
}
return batch;
}
/// <summary>
/// Find a batch for a given id without creating it.
/// </summary>
/// <param name="id">batch key</param>
/// <returns>batch or null if not found</returns>
private WorkBatch FindBatch(object id)
{
WorkBatch batch = null;
lock (mutex)
{
TryGetValue(id, out batch);
}
return batch;
}
internal IWorkBatch GetTransientBatch()
{
return GetBatch(transientBatchID);
}
internal WorkBatch GetMergedBatch()
{
lock (mutex)
{
WorkBatch batch = new WorkBatch(this);
foreach (WorkBatch existingBatch in this.Values)
{
batch.Merge(existingBatch);
}
//Copy of all the items merged in one batch.
//Order is preserved in the same way batches are created.
return batch;
}
}
internal void RollbackBatch(object id)
{
lock (mutex)
{
WorkBatch batch = FindBatch(id);
if (batch != null)
{
batch.Complete(false);
batch.Dispose();
Remove(id);
}
}
}
// Rollback all sub batches, calling "complete(false)" on all entries.
internal void RollbackAllBatchedWork()
{
lock (mutex)
{
foreach (WorkBatch batch in this.Values)
{
batch.Complete(false);
batch.Dispose();
}
Clear(); // clear the collection
}
}
// Clear sub batches after successful commit/complete.
internal void ClearSubBatches()
{
lock (mutex)
{
foreach (WorkBatch existingBatch in this.Values)
{
existingBatch.Dispose();
}
Clear(); // clear the collection
}
}
internal void ClearTransientBatch()
{
RollbackBatch(transientBatchID);
}
}
#endregion
#endregion
}
|