|
using System;
using System.Collections;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Collections.Specialized;
using System.Text;
using System.IO;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using System.Data.Common;
using System.Data;
using System.Data.SqlClient;
using System.Timers;
using System.Diagnostics;
using System.Reflection;
using System.Workflow.Runtime;
using System.Workflow.ComponentModel;
using System.Workflow.Runtime.Hosting;
using System.Text.RegularExpressions;
using System.Threading;
using System.Transactions;
using System.Globalization;
using System.Workflow.ComponentModel.Serialization;
using System.ComponentModel.Design.Serialization;
using System.Xml;
using System.Configuration;
namespace System.Workflow.Runtime.Tracking
{
[Obsolete("The System.Workflow.* types are deprecated. Instead, please use the new types from System.Activities.*")]
public sealed class SqlTrackingService : TrackingService, IProfileNotification
{
#region Private/Protected Members
private bool _isTrans = true;
private bool _partition = false;
private bool _defaultProfile = true;
private bool _enableRetries = false;
private bool _ignoreCommonEnableRetries = false;
private DateTime _lastProfileCheck;
private System.Timers.Timer _timer = new System.Timers.Timer();
private double _interval = 60000;
//private static int _deadlock = 1205;
private TypeKeyedCollection _types = new TypeKeyedCollection();
private object _typeCacheLock = new object();
private WorkflowCommitWorkBatchService _transactionService;
private DbResourceAllocator _dbResourceAllocator;
private static Version UnknownProfileVersionId = new Version(0, 0);
// Saved from constructor input to be used in service start initialization
private NameValueCollection _parameters;
string _unvalidatedConnectionString;
private delegate void ExecuteRetriedDelegate(object param);
#endregion
#region Configuration Properties
public string ConnectionString
{
get { return _unvalidatedConnectionString; }
}
/// <summary>
/// Determines if tracking data should be held and transactionally written to the database at persistence points.
/// </summary>
/// <value></value>
public bool IsTransactional
{
get { return _isTrans; }
set
{
_isTrans = value;
}
}
/// <summary>
/// Indicates that records should be moved from the active instance tables to the appropriate parition tables when the instance completes.
/// </summary>
public bool PartitionOnCompletion
{
get { return _partition; }
set { _partition = value; }
}
/// <summary>
/// Determines if the default profile should be used for workflow types that do not have a profile specified for them.
/// </summary>
/// <value></value>
public bool UseDefaultProfile
{
get { return _defaultProfile; }
set { _defaultProfile = value; }
}
/// <summary>
/// The time interval, in milliseconds, at which to check the database for changes to profiles.
/// Default is 60000.
/// </summary>
/// <remarks>
/// Setting the interval results in the next check to occur the specified number of millisecond
/// from the time at which the property is set.
/// </remarks>
public double ProfileChangeCheckInterval
{
get { return _interval; }
set
{
if (value <= 0)
throw new ArgumentException(ExecutionStringManager.InvalidProfileCheckValue);
_interval = value;
//
// Set the timer's interval.
// This will reset the timer
_timer.Interval = _interval;
}
}
public bool EnableRetries
{
get { return _enableRetries; }
set
{
_enableRetries = value;
_ignoreCommonEnableRetries = true;
}
}
internal DbResourceAllocator DbResourceAllocator
{
get { return this._dbResourceAllocator; }
}
#endregion
#region Construction
public SqlTrackingService(string connectionString)
{
if (String.IsNullOrEmpty(connectionString))
throw new ArgumentNullException("connectionString", ExecutionStringManager.MissingConnectionString);
_unvalidatedConnectionString = connectionString;
}
public SqlTrackingService(NameValueCollection parameters)
{
if (parameters == null)
throw new ArgumentNullException("parameters", ExecutionStringManager.MissingParameters);
if (parameters.Count > 0)
{
foreach (string key in parameters.Keys)
{
if (0 == string.Compare("IsTransactional", key, StringComparison.OrdinalIgnoreCase))
_isTrans = bool.Parse(parameters[key]);
else if (0 == string.Compare("UseDefaultProfile", key, StringComparison.OrdinalIgnoreCase))
_defaultProfile = bool.Parse(parameters[key]);
else if (0 == string.Compare("PartitionOnCompletion", key, StringComparison.OrdinalIgnoreCase))
_partition = bool.Parse(parameters[key]);
else if (0 == string.Compare("ProfileChangeCheckInterval", key, StringComparison.OrdinalIgnoreCase))
{
_interval = double.Parse(parameters[key], NumberFormatInfo.InvariantInfo);
if (_interval <= 0)
throw new ArgumentException(ExecutionStringManager.InvalidProfileCheckValue);
}
else if (0 == string.Compare("ConnectionString", key, StringComparison.OrdinalIgnoreCase))
_unvalidatedConnectionString = parameters[key];
else if (0 == string.Compare("EnableRetries", key, StringComparison.OrdinalIgnoreCase))
{
_enableRetries = bool.Parse(parameters[key]);
_ignoreCommonEnableRetries = true;
}
}
}
_parameters = parameters;
}
#endregion
#region WorkflowRuntimeService
override protected internal void Start()
{
_lastProfileCheck = DateTime.UtcNow;
_dbResourceAllocator = new DbResourceAllocator(this.Runtime, _parameters, _unvalidatedConnectionString);
// Check connection string mismatch if using SharedConnectionWorkflowTransactionService
_transactionService = this.Runtime.GetService<WorkflowCommitWorkBatchService>();
_dbResourceAllocator.DetectSharedConnectionConflict(_transactionService);
//
// If we didn't find a local value for enable retries
// check in the common section
if ((!_ignoreCommonEnableRetries) && (null != base.Runtime))
{
NameValueConfigurationCollection commonConfigurationParameters = base.Runtime.CommonParameters;
if (commonConfigurationParameters != null)
{
// Then scan for connection string in the common configuration parameters section
foreach (string key in commonConfigurationParameters.AllKeys)
{
if (string.Compare("EnableRetries", key, StringComparison.OrdinalIgnoreCase) == 0)
{
_enableRetries = bool.Parse(commonConfigurationParameters[key].Value);
break;
}
}
}
}
_timer.Interval = _interval;
_timer.AutoReset = false; // ensure that only one timer thread is checking for profile changes at a time
_timer.Elapsed += new ElapsedEventHandler(CheckProfileChanges);
_timer.Start();
base.Start();
}
#endregion WorkflowRuntimeService
#region IProfileNotification Implementation
protected internal override TrackingChannel GetTrackingChannel(TrackingParameters parameters)
{
if (null == parameters)
throw new ArgumentNullException("parameters");
//
// Return a new channel for this instance
// Give it the parameters and this to store
return new SqlTrackingChannel(parameters, this);
}
public event EventHandler<ProfileUpdatedEventArgs> ProfileUpdated;
public event EventHandler<ProfileRemovedEventArgs> ProfileRemoved;
protected internal override TrackingProfile GetProfile(Type workflowType, Version profileVersion)
{
if (null == workflowType)
throw new ArgumentNullException("workflowType");
// parameter wantToCreateDefault = false:
// looking for a specific version that has already been running with this instance; don't use a default here
return GetProfileByScheduleType(workflowType, profileVersion, false);
}
protected internal override bool TryGetProfile(Type workflowType, out TrackingProfile profile)
{
if (null == workflowType)
throw new ArgumentNullException("workflowType");
profile = GetProfileByScheduleType(workflowType, SqlTrackingService.UnknownProfileVersionId, _defaultProfile);
if (null == profile)
return false;
else
return true;
}
protected internal override TrackingProfile GetProfile(Guid scheduleInstanceId)
{
TrackingProfile profile = null;
GetProfile(scheduleInstanceId, out profile);
return profile;
}
private bool GetProfile(Guid scheduleInstanceId, out TrackingProfile profile)
{
profile = null;
DbCommand cmd = this._dbResourceAllocator.NewCommand();
cmd.CommandType = CommandType.StoredProcedure;
cmd.CommandText = "[dbo].[GetInstanceTrackingProfile]";
cmd.Parameters.Add(this._dbResourceAllocator.NewDbParameter("@InstanceId", scheduleInstanceId));
DbDataReader reader = null;
try
{
reader = ExecuteReaderRetried(cmd, CommandBehavior.CloseConnection);
//
// Should only reach here in non exception state
if (!reader.HasRows)
{
//
// Didn't find a specific profile for this instance
reader.Close();
profile = null;
return false;
}
else
{
if (!reader.Read())
{
reader.Close();
profile = null;
return false;
}
if (reader.IsDBNull(0))
profile = null;
else
{
string tmp = reader.GetString(0);
TrackingProfileSerializer serializer = new TrackingProfileSerializer();
StringReader pReader = null;
try
{
pReader = new StringReader(tmp);
profile = serializer.Deserialize(pReader);
}
finally
{
if (null != pReader)
pReader.Close();
}
}
return true;
}
}
finally
{
if ((null != reader) && (!reader.IsClosed))
reader.Close();
if ((null != cmd) && (null != cmd.Connection) && (ConnectionState.Closed != cmd.Connection.State))
cmd.Connection.Close();
}
}
protected internal override bool TryReloadProfile(Type workflowType, Guid scheduleInstanceId, out TrackingProfile profile)
{
if (null == workflowType)
throw new ArgumentNullException("workflowType");
bool found = GetProfile(scheduleInstanceId, out profile);
if (found)
return true;
else
{
profile = null;
return false;
}
}
#endregion
#region Profile Management Methods
private void CheckProfileChanges(object sender, ElapsedEventArgs e)
{
DbCommand cmd = null;
DbDataReader reader = null;
try
{
if ((null == ProfileUpdated) && (null == ProfileRemoved))
return; // no one to notify
Debug.WriteLine("Checking for updated profiles...");
cmd = this._dbResourceAllocator.NewCommand();
cmd.CommandText = "GetUpdatedTrackingProfiles";
cmd.CommandType = CommandType.StoredProcedure;
cmd.Parameters.Add(this._dbResourceAllocator.NewDbParameter("@LastCheckDateTime", _lastProfileCheck));
DbParameter param = this._dbResourceAllocator.NewDbParameter();
param.ParameterName = "@MaxCheckDateTime";
param.DbType = DbType.DateTime;
param.Direction = System.Data.ParameterDirection.Output;
cmd.Parameters.Add(param);
reader = ExecuteReaderRetried(cmd, CommandBehavior.CloseConnection);
//
// No changes
if (!reader.HasRows)
return;
while (reader.Read())
{
Type t = null;
string tmp = null;
TrackingProfile profile = null;
t = Assembly.Load(reader[1] as string).GetType(reader[0] as string);
if (null == t)
continue;
tmp = reader[2] as string;
if (null == tmp)
{
if (null != ProfileRemoved)
ProfileRemoved(this, new ProfileRemovedEventArgs(t));
}
else
{
TrackingProfileSerializer serializer = new TrackingProfileSerializer();
StringReader pReader = null;
try
{
pReader = new StringReader(tmp);
profile = serializer.Deserialize(pReader);
}
finally
{
if (null != pReader)
pReader.Close();
}
if (null != ProfileUpdated)
ProfileUpdated(this, new ProfileUpdatedEventArgs(t, profile));
}
Debug.WriteLine(ExecutionStringManager.UpdatedProfile + t.FullName);
}
}
finally
{
if ((null != reader) && (!reader.IsClosed))
reader.Close();
//
// This should never be null/empty unless the proc failed which should throw
if (null != cmd)
{
//
// If the value is null we error'd so keep the same last time for the next check
if (null != cmd.Parameters[1].Value)
_lastProfileCheck = (DateTime)cmd.Parameters[1].Value;
}
if ((null != cmd) && (null != cmd.Connection) && (ConnectionState.Closed != cmd.Connection.State))
cmd.Connection.Close();
//
// Start the timer again (autoreset is false to avoid multiple threads checking for profile changes)
_timer.Start();
}
}
#endregion
#region Private Methods
private void ExecuteRetried(ExecuteRetriedDelegate executeRetried, object param)
{
short count = 0;
DbRetry dbRetry = new DbRetry(_enableRetries);
while (true)
{
try
{
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteRetried " + executeRetried.Method.Name + " start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
executeRetried(param);
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteRetried " + executeRetried.Method.Name + " end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
break;
}
catch (Exception e)
{
WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlTrackingService.ExecuteRetried caught exception: " + e.ToString());
if (dbRetry.TryDoRetry(ref count))
{
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteRetried " + executeRetried.Method.Name + " retrying.");
continue;
}
throw;
}
}
}
private DbDataReader ExecuteReaderRetried(DbCommand command, CommandBehavior behavior)
{
DbDataReader reader = null;
short count = 0;
DbRetry dbRetry = new DbRetry(_enableRetries);
while (true)
{
try
{
ResetConnectionForCommand(command);
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteReaderRetried ExecuteReader start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
reader = command.ExecuteReader(behavior);
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteReaderRetried ExecuteReader end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
break;
}
catch (Exception e)
{
WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlTrackingService.ExecuteReaderRetried caught exception from ExecuteReader: " + e.ToString());
if (dbRetry.TryDoRetry(ref count))
{
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteReaderRetried retrying.");
continue;
}
throw;
}
}
return reader;
}
private void ExecuteNonQueryRetried(DbCommand command)
{
short count = 0;
DbRetry dbRetry = new DbRetry(_enableRetries);
while (true)
{
try
{
ResetConnectionForCommand(command);
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryRetried ExecuteNonQuery start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
command.ExecuteNonQuery();
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryRetried ExecuteNonQuery end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
break;
}
catch (Exception e)
{
WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlTrackingService.ExecuteNonQueryRetried caught exception from ExecuteNonQuery: " + e.ToString());
if (dbRetry.TryDoRetry(ref count))
{
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryRetried retrying.");
continue;
}
throw;
}
}
}
private void ExecuteNonQueryWithTxRetried(DbCommand command)
{
try
{
short count = 0;
DbRetry dbRetry = new DbRetry(_enableRetries);
while (true)
{
try
{
ResetConnectionForCommand(command);
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryWithTxRetried ExecuteNonQuery start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
command.Transaction = command.Connection.BeginTransaction();
command.ExecuteNonQuery();
command.Transaction.Commit();
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryWithTxRetried ExecuteNonQuery end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
break;
}
catch (Exception e)
{
WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlTrackingService.ExecuteNonQueryWithTxRetried caught exception from ExecuteNonQuery: " + e.ToString());
try
{
if (null != command.Transaction)
command.Transaction.Rollback();
}
catch
{
//
// Rollback() can throw, nothing to do but ---- if this happens
// so that we don't lose the original exception
}
if (dbRetry.TryDoRetry(ref count))
{
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryWithTxRetried retrying.");
continue;
}
throw;
}
}
}
finally
{
if ((null != command) && (null != command.Connection) && (ConnectionState.Closed != command.Connection.State))
command.Connection.Close();
}
}
private void ResetConnectionForCommand(DbCommand command)
{
if (null == command)
return;
if (null != command.Connection)
{
if (ConnectionState.Open != command.Connection.State)
{
if (ConnectionState.Closed != command.Connection.State)
command.Connection.Close();
command.Connection.Dispose();
command.Connection = _dbResourceAllocator.OpenNewConnectionNoEnlist();
}
}
}
internal static XmlWriter CreateXmlWriter(TextWriter output)
{
XmlWriterSettings settings = new XmlWriterSettings();
settings.Indent = true;
settings.IndentChars = ("\t");
settings.OmitXmlDeclaration = true;
settings.CloseOutput = true;
return XmlWriter.Create(output as TextWriter, settings);
}
private TrackingProfile GetProfileByScheduleType(Type workflowType, Version profileVersionId, bool wantToCreateDefault)
{
DbCommand cmd = this._dbResourceAllocator.NewCommand();
DbDataReader reader = null;
TrackingProfile profile = null;
cmd.CommandType = CommandType.StoredProcedure;
cmd.CommandText = "dbo.GetTrackingProfile";
cmd.Parameters.Add(this._dbResourceAllocator.NewDbParameter("@TypeFullName", workflowType.FullName));
cmd.Parameters.Add(this._dbResourceAllocator.NewDbParameter("@AssemblyFullName", workflowType.Assembly.FullName));
if (profileVersionId != SqlTrackingService.UnknownProfileVersionId)
cmd.Parameters.Add(this._dbResourceAllocator.NewDbParameter("@Version", profileVersionId.ToString()));
cmd.Parameters.Add(this._dbResourceAllocator.NewDbParameter("@CreateDefault", wantToCreateDefault));
try
{
reader = ExecuteReaderRetried(cmd, CommandBehavior.CloseConnection);
if (reader.Read())
{
string tmp = reader[0] as string;
if (null != tmp)
{
TrackingProfileSerializer serializer = new TrackingProfileSerializer();
StringReader pReader = null;
try
{
pReader = new StringReader(tmp);
profile = serializer.Deserialize(pReader);
}
finally
{
if (null != pReader)
pReader.Close();
}
}
}
}
finally
{
if ((null != reader) && (!reader.IsClosed))
reader.Close();
if ((null != cmd) && (null != cmd.Connection) && (ConnectionState.Closed != cmd.Connection.State))
cmd.Connection.Close();
}
return profile;
}
#endregion
#region Private Classes
private class TypeKeyedCollection : KeyedCollection<string, Type>
{
protected override string GetKeyForItem(Type item)
{
return item.AssemblyQualifiedName;
}
}
private class SerializedDataItem : TrackingDataItem
{
public Type Type;
public string StringData;
public byte[] SerializedData;
public bool NonSerializable;
}
private class SerializedEventArgs : EventArgs
{
public Type Type;
public byte[] SerializedArgs;
}
private struct AddedActivity
{
public string ActivityTypeFullName;
public string ActivityTypeAssemblyFullName;
public string QualifiedName;
public string ParentQualifiedName;
public string AddedActivityActionXoml;
public int Order;
}
private struct RemovedActivity
{
public string QualifiedName;
public string ParentQualifiedName;
public string RemovedActivityActionXoml;
public int Order;
}
private class SerializedWorkflowChangedEventArgs : SerializedEventArgs
{
public IList<AddedActivity> AddedActivities = new List<AddedActivity>();
public IList<RemovedActivity> RemovedActivities = new List<RemovedActivity>();
}
#endregion Private Classes
internal class SqlTrackingChannel : TrackingChannel, IPendingWork
{
#region Private Members
private SqlTrackingService _service = null;
private string _callPathKey = null, _parentCallPathKey = null;
private bool _isTrans = false;
private long _internalId = -1;
private long _tmpInternalId = -1;
private Dictionary<string, long> _activityInstanceId = new Dictionary<string, long>(32);
private Dictionary<string, long> _tmpActivityInstanceId = new Dictionary<string, long>(10);
private TrackingParameters _parameters = null;
private bool _pendingArchive = false;
private bool _completedTerminated = false;
private static int _activityEventBatchSize = 5;
private static int _dataItemBatchSize = 5;
private static int _dataItemAnnotationBatchSize = 5;
private static int _eventAnnotationBatchSize = 5;
#endregion
#region Construction
protected SqlTrackingChannel()
{
}
public SqlTrackingChannel(TrackingParameters parameters, SqlTrackingService service)
{
if (null == service)
return;
_service = service;
_parameters = parameters;
_isTrans = service.IsTransactional;
GetCallPathKeys(parameters.CallPath);
if (!_isTrans)
{
//
// Look up instance id or insert if new instance
// If we're transactional we'll do this in the first IPendingWork.Commit()
_service.ExecuteRetried(ExecuteInsertWorkflowInstance, null);
}
}
#endregion
#region Public Properties
private DbResourceAllocator DbResourceAllocator
{
get { return _service.DbResourceAllocator; }
}
private WorkflowCommitWorkBatchService WorkflowCommitWorkBatchService
{
get { return _service._transactionService; }
}
#endregion
#region TrackingChannel
protected internal override void InstanceCompletedOrTerminated()
{
if (_isTrans)
{
//
// Indicate that at the next batch commit we should stamp the enddate
_completedTerminated = true;
//
// Indicate that when the next batch commit completes successfully we should partition this instance
if (_service.PartitionOnCompletion)
_pendingArchive = true;
}
else
{
_service.ExecuteRetried(ExecuteSetEndDate, null);
if (_service.PartitionOnCompletion)
_service.ExecuteRetried(PartitionInstance, null);
}
}
private void PartitionInstance(object param)
{
DbCommand command = null;
try
{
//
// Allow enlisting if there is an ambient tx
// This can only happen on a host initiated terminate in V1.
DbConnection connection = DbResourceAllocator.OpenNewConnection(false);
command = DbResourceAllocator.NewCommand(connection);
command.CommandText = "[dbo].[PartitionWorkflowInstance]";
command.CommandType = CommandType.StoredProcedure;
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", _internalId));
command.ExecuteNonQuery();
}
finally
{
if ((null != command) && (null != command.Connection) && (ConnectionState.Closed != command.Connection.State))
command.Connection.Close();
}
}
private void ExecuteSetEndDate(object param)
{
DbCommand command = null;
try
{
//
// Allow enlisting if there is an ambient tx
// This can only happen on a host initiated terminate in V1.
DbConnection connection = DbResourceAllocator.OpenNewConnection(false);
command = DbResourceAllocator.NewCommand(connection);
ExecuteSetEndDate(_internalId, command);
}
finally
{
if ((null != command) && (null != command.Connection) && (ConnectionState.Closed != command.Connection.State))
command.Connection.Close();
}
}
private void ExecuteSetEndDate(long internalId, DbCommand command)
{
if (null == command)
throw new ArgumentNullException("command");
command.Parameters.Clear();
command.CommandText = "[dbo].[SetWorkflowInstanceEndDateTime]";
command.CommandType = CommandType.StoredProcedure;
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", internalId));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EndDateTime", DateTime.UtcNow));
command.ExecuteNonQuery();
}
protected internal override void Send(TrackingRecord record)
{
if ((Guid.Empty == _parameters.InstanceId) || (null == record))
throw new ArgumentException(ExecutionStringManager.MissingParametersTrack);
if (record is ActivityTrackingRecord)
{
ActivityTrackingRecord act = record as ActivityTrackingRecord;
if (_isTrans)
WorkflowEnvironment.WorkBatch.Add(this, SerializeRecord(act));
else
_service.ExecuteRetried(ExecuteInsertActivityStatusInstance, SerializeRecord(act));
}
else if (record is WorkflowTrackingRecord)
{
//
// Instance events cannot be batched - many occur when there isn't a batch
WorkflowTrackingRecord inst = (WorkflowTrackingRecord)record;
if (_isTrans)
{
WorkflowEnvironment.WorkBatch.Add(this, SerializeRecord(inst));
}
else
{
if (TrackingWorkflowEvent.Changed == inst.TrackingWorkflowEvent)
{
//
// Dynamic updates are inserted in the WorkflowInstanceEvent table
// and then the arg (workflowchanges) is normalized into xoml
// and the added/removed activities tables
_service.ExecuteRetried(ExecuteInsertWorkflowChange, SerializeRecord(inst));
}
else
{
_service.ExecuteRetried(ExecuteInsertWorkflowInstanceEvent, SerializeRecord(inst));
}
}
}
else if (record is UserTrackingRecord)
{
UserTrackingRecord user = (UserTrackingRecord)record;
if (_isTrans)
WorkflowEnvironment.WorkBatch.Add(this, SerializeRecord(user));
else
_service.ExecuteRetried(ExecuteInsertUserEvent, SerializeRecord(user));
}
}
#endregion
#region IPendingWork Members
public bool MustCommit(ICollection items)
{
//
// Never force a persist - this is a balancing act but the V1
// decision is to err on the side of persisting only when the workflow
// requires it based on its model. If the workflow uses persistence points
// wisely this is great. If it goes a long time between persists with lots
// of events the persists will take a long time as the batch can be huge.
return false;
}
public void Commit(System.Transactions.Transaction transaction, ICollection items)
{
if ((null == items) || (0 == items.Count))
return;
DbCommand command = null;
DbConnection connection = null;
bool needToCloseConnection = false;
DbTransaction localTransaction = null;
bool commitTx = false;
try
{
//
// Get the connection and transaction
// The connection might be shared or local
// The tx is shared and may be either a DTC or a local sql tx
connection = DbResourceAllocator.GetEnlistedConnection(
this.WorkflowCommitWorkBatchService, transaction, out needToCloseConnection);
localTransaction = DbResourceAllocator.GetLocalTransaction(
this.WorkflowCommitWorkBatchService, transaction);
if (null == localTransaction)
{
localTransaction = connection.BeginTransaction(System.Data.IsolationLevel.ReadCommitted);
commitTx = true;
}
command = DbResourceAllocator.NewCommand(connection);
command.Transaction = localTransaction;
//
// If we don't have the internal id for the instance this is the first batch
// for this channel instance. If this is a new instance the following will insert
// a new instance record in the db and set _tmpInternalId. If this is a reload of
// an existing instance it will just do a lookup and set _tmpInternalId
// In Completed we will assign _tmpInternalId to _internalId if the batch is successful.
long internalId = -1;
if (_internalId <= 0)
{
ExecuteInsertWorkflowInstance(command);
internalId = _tmpInternalId;
}
else
internalId = _internalId;
IList<ActivityTrackingRecord> activities = new List<ActivityTrackingRecord>(5);
WorkflowTrackingRecord workflow = null;
//
// Build the batch statement
foreach (object o in items)
{
if (!(o is TrackingRecord))
continue;
if (o is ActivityTrackingRecord)
{
//
// If we have a cached workflow tracking record send it
if (null != workflow)
{
ExecuteInsertWorkflowInstanceEvent(internalId, workflow, null, command);
workflow = null;
}
ActivityTrackingRecord activity = (ActivityTrackingRecord)o;
//
// Add this event to the list and send to the db if we've hit our limit
activities.Add(activity);
if (_activityEventBatchSize == activities.Count)
{
ExecuteInsertActivityStatusInstance(internalId, activities, command);
activities = new List<ActivityTrackingRecord>(5);
}
}
else if (o is UserTrackingRecord)
{
//
// If we have cached activity or workflow tracking records send them
if (activities.Count > 0)
{
ExecuteInsertActivityStatusInstance(internalId, activities, command);
activities.Clear();
}
if (null != workflow)
{
ExecuteInsertWorkflowInstanceEvent(internalId, workflow, null, command);
workflow = null;
}
ExecuteInsertUserEvent(internalId, (UserTrackingRecord)o, command);
}
else if (o is WorkflowTrackingRecord)
{
//
// If we have cached activity tracking records send them
if (activities.Count > 0)
{
ExecuteInsertActivityStatusInstance(internalId, activities, command);
activities.Clear();
}
WorkflowTrackingRecord record = (WorkflowTrackingRecord)o;
if (TrackingWorkflowEvent.Changed == record.TrackingWorkflowEvent)
{
//
// If we're already holding a workflow tracking record send both to the db
// else cache it and wait for the next workflow tracking record
if (null != workflow)
{
ExecuteInsertWorkflowInstanceEvent(internalId, workflow, null, command);
workflow = null;
}
ExecuteInsertWorkflowChange(internalId, record, command);
}
else
{
//
// If we're already holding a workflow tracking record send both to the db
// else cache it and wait for the next workflow tracking record
if (null != workflow)
{
ExecuteInsertWorkflowInstanceEvent(internalId, workflow, record, command);
workflow = null;
}
else
{
workflow = record;
}
}
}
}
//
// If we ended up with any activities event send them.
if (activities.Count > 0)
ExecuteInsertActivityStatusInstance(internalId, activities, command);
if (null != workflow)
{
ExecuteInsertWorkflowInstanceEvent(internalId, workflow, null, command);
workflow = null;
}
if (_completedTerminated)
ExecuteSetEndDate(internalId, command);
if (commitTx)
localTransaction.Commit();
}
catch (DbException e)
{
if (commitTx)
localTransaction.Rollback();
WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "Error writing tracking data to database: " + e);
throw;
}
finally
{
if (needToCloseConnection)
{
connection.Dispose();
}
}
return;
}
public void Complete(bool succeeded, ICollection items)
{
//
// If we didn't succeed on commit reset all flags
if (!succeeded)
{
_completedTerminated = false;
_pendingArchive = false;
_tmpInternalId = -1;
_tmpActivityInstanceId.Clear();
return;
}
//
// Commit succeeded - move the tmp internalId to the real internalId member
if (-1 == _internalId && _tmpInternalId > 0)
_internalId = _tmpInternalId;
//
// Move the tmp activity instance ids to the real activity instance id member
if (null != _tmpActivityInstanceId && _tmpActivityInstanceId.Count > 0)
{
foreach (string key in _tmpActivityInstanceId.Keys)
{
if (!_activityInstanceId.ContainsKey(key))
_activityInstanceId.Add(key, _tmpActivityInstanceId[key]);
}
_tmpActivityInstanceId.Clear();
}
if (_pendingArchive)
{
try
{
_service.ExecuteRetried(PartitionInstance, null);
}
catch (Exception e)
{
//
// ---- exceptions here, do not fail the instance.
// Partition logic can be re-run to clean up on failure
WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, string.Format(System.Globalization.CultureInfo.InvariantCulture, "Error partitioning instance {0}: {1}", _parameters.InstanceId, e.ToString()));
}
}
}
#endregion
#region Sql Commands - InsertWorkflowInstance
private void ExecuteInsertWorkflowInstance(object param)
{
DbConnection conn = DbResourceAllocator.OpenNewConnection();
DbCommand command = DbResourceAllocator.NewCommand(conn);
DbTransaction tx = null;
try
{
tx = conn.BeginTransaction(System.Data.IsolationLevel.ReadCommitted);
command.Connection = conn;
command.Transaction = tx;
_internalId = ExecuteInsertWorkflowInstance(command);
tx.Commit();
}
catch (Exception)
{
try
{
if (null != tx)
tx.Rollback();
}
catch (Exception)
{
//
// Rollback can throw - ignore these exceptions
// so we don't lose the original exception
}
//
// Re-throw original exception
throw;
}
finally
{
if ((null != conn) && (ConnectionState.Closed != conn.State))
conn.Close();
}
return;
}
private long ExecuteInsertWorkflowInstance(DbCommand command)
{
if (null == command)
throw new ArgumentNullException("command");
if ((null == command.Connection) || (ConnectionState.Open != command.Connection.State))
throw new ArgumentException(ExecutionStringManager.InvalidCommandBadConnection, "command");
//
// Write the type and the workflow definition
string xaml = _parameters.RootActivity.GetValue(Activity.WorkflowXamlMarkupProperty) as string;
if (null != xaml && xaml.Length > 0)
InsertWorkflow(command, _parameters.InstanceId, null, _parameters.RootActivity);
else
InsertWorkflow(command, _parameters.InstanceId, _parameters.WorkflowType, _parameters.RootActivity);
//
// Write the instance record
BuildInsertWorkflowInstanceParameters(command);
DbDataReader reader = null;
try
{
reader = command.ExecuteReader();
if (reader.Read())
_tmpInternalId = reader.GetInt64(0);
return _tmpInternalId;
}
finally
{
if (null != reader)
reader.Close();
}
}
private void BuildInsertWorkflowInstanceParameters(DbCommand command)
{
Debug.Assert((command != null), "Null command");
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertWorkflowInstance]";
command.Parameters.Clear();
bool xamlInst = false;
string xaml = _parameters.RootActivity.GetValue(Activity.WorkflowXamlMarkupProperty) as string;
if (null != xaml && xaml.Length > 0)
xamlInst = true;
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceId", _parameters.InstanceId));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TypeFullName", (xamlInst ? _parameters.InstanceId.ToString() : _parameters.WorkflowType.FullName)));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@AssemblyFullName", (xamlInst ? _parameters.InstanceId.ToString() : _parameters.WorkflowType.Assembly.FullName)));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ContextGuid", _parameters.ContextGuid));
if (Guid.Empty != _parameters.CallerInstanceId)
{
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@CallerInstanceId", _parameters.CallerInstanceId));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@CallPath", _callPathKey));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@CallerContextGuid", _parameters.CallerContextGuid));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@CallerParentContextGuid", _parameters.CallerParentContextGuid));
}
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventDateTime", this.GetSqlDateTimeString(DateTime.UtcNow)));
}
private void InsertWorkflow(DbCommand command, Guid workflowInstanceId, Type workflowType, Activity rootActivity)
{
string xoml = null;
//
// If we've already seen this type just return
if (null != workflowType)
{
lock (_service._typeCacheLock)
{
if (_service._types.Contains(workflowType.AssemblyQualifiedName))
return;
else
xoml = GetXomlDocument(rootActivity);
}
}
else
{
// Don't forget to deal with XOML-only workflows
lock (_service._typeCacheLock)
{
xoml = GetXomlDocument(rootActivity);
}
}
//
// It is possible to ---- here but the pk specifies ignore duplicate key
// This is better than taking a lock around all of the logic in this method.
command.Parameters.Clear();
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertWorkflow]";
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TypeFullName", (null == workflowType ? workflowInstanceId.ToString() : workflowType.FullName)));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@AssemblyFullName", (null == workflowType ? workflowInstanceId.ToString() : workflowType.Assembly.FullName)));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@IsInstanceType", (null == workflowType ? true : false)));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowDefinition", xoml));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowId", DbType.Int32, System.Data.ParameterDirection.Output));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Exists", DbType.Boolean, System.Data.ParameterDirection.Output));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Activities", GetActivitiesXml((CompositeActivity)rootActivity)));
command.ExecuteNonQuery();
//
// Add this to the list of types we've already seen so we don't go
// through the serialization overhead again and hit the db only to learn we've already stored it
// Use a lock here to avoid ---- on _types dictionary
if (null != workflowType)
{
lock (_service._typeCacheLock)
{
if (!_service._types.Contains(workflowType.AssemblyQualifiedName))
{
_service._types.Add(workflowType);
}
}
}
return;
}
#endregion
#region Sql Commands - InsertWorkflowInstanceEvent
private void ExecuteInsertWorkflowInstanceEvent(object param)
{
WorkflowTrackingRecord record = param as WorkflowTrackingRecord;
if (null == record)
throw new ArgumentException(ExecutionStringManager.InvalidWorkflowTrackingRecordParameter, "param");
DbConnection conn = DbResourceAllocator.OpenNewConnection();
DbCommand command = DbResourceAllocator.NewCommand(conn);
DbTransaction tx = null;
try
{
tx = conn.BeginTransaction(System.Data.IsolationLevel.ReadCommitted);
command.Connection = conn;
command.Transaction = tx;
ExecuteInsertWorkflowInstanceEvent(_internalId, record, null, command);
tx.Commit();
}
catch (Exception)
{
try
{
if (null != tx)
tx.Rollback();
}
catch (Exception)
{
//
// Rollback can throw - ignore these exceptions
// so we don't lose the original exception
}
//
// Re-throw original exception
throw;
}
finally
{
if ((null != conn) && (ConnectionState.Closed != conn.State))
conn.Close();
}
return;
}
private void ExecuteInsertWorkflowInstanceEvent(long internalId, WorkflowTrackingRecord record1, WorkflowTrackingRecord record2, DbCommand command)
{
if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State))
throw new ArgumentException();
BuildInsertWorkflowInstanceEventParameters(internalId, record1, record2, command);
command.ExecuteNonQuery();
long eventId1 = (long)command.Parameters["@WorkflowInstanceEventId1"].Value;
Debug.Assert(eventId1 > 0, "Invalid eventId1");
long eventId2 = -1;
if (null != record2)
{
eventId2 = (long)command.Parameters["@WorkflowInstanceEventId2"].Value;
Debug.Assert(eventId2 > 0, "Invalid eventId2");
}
List<KeyValuePair<long, string>> annotations = new List<KeyValuePair<long, string>>(record1.Annotations.Count + (null == record2 ? 0 : record2.Annotations.Count));
foreach (string s in record1.Annotations)
annotations.Add(new KeyValuePair<long, string>(eventId1, s));
if (null != record2)
{
foreach (string s in record2.Annotations)
annotations.Add(new KeyValuePair<long, string>(eventId2, s));
}
BatchExecuteInsertEventAnnotation(internalId, 'w', annotations, command);
}
private void BuildInsertWorkflowInstanceEventParameters(long internalId, WorkflowTrackingRecord record1, WorkflowTrackingRecord record2, DbCommand command)
{
if (null == record1)
throw new ArgumentNullException("record");
if (null == command)
throw new ArgumentNullException("command");
Debug.Assert(internalId != -1, "Invalid internalId");
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertWorkflowInstanceEvent]";
command.Parameters.Clear();
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", internalId));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TrackingWorkflowEventId1", (int)record1.TrackingWorkflowEvent));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventDateTime1", record1.EventDateTime));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventOrder1", record1.EventOrder));
if (null != record1.EventArgs)
{
Type t = record1.EventArgs.GetType();
Byte[] data = null;
if (!(record1.EventArgs is SerializedEventArgs))
record1 = SerializeRecord(record1);
SerializedEventArgs sargs = record1.EventArgs as SerializedEventArgs;
data = sargs.SerializedArgs;
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventArgTypeFullName1", t.FullName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventArgAssemblyFullName1", t.Assembly.FullName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventArg1", data));
}
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceEventId1", DbType.Int64, ParameterDirection.Output));
if (null != record2)
{
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TrackingWorkflowEventId2", (int)record2.TrackingWorkflowEvent));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventDateTime2", record2.EventDateTime));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventOrder2", record2.EventOrder));
if (null != record2.EventArgs)
{
Type t = record2.EventArgs.GetType();
Byte[] data = null;
if (!(record2.EventArgs is SerializedEventArgs))
record2 = SerializeRecord(record2);
SerializedEventArgs sargs = record2.EventArgs as SerializedEventArgs;
data = sargs.SerializedArgs;
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventArgTypeFullName2", t.FullName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventArgAssemblyFullName2", t.Assembly.FullName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventArg2", data));
}
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceEventId2", DbType.Int64, ParameterDirection.Output));
}
}
#endregion
#region Sql Commands - InsertActivityStatusInstance
private void ExecuteInsertActivityStatusInstance(object param)
{
ActivityTrackingRecord record = param as ActivityTrackingRecord;
if (null == record)
throw new ArgumentException(ExecutionStringManager.InvalidActivityTrackingRecordParameter, "param");
DbConnection conn = DbResourceAllocator.OpenNewConnection();
DbTransaction tx = null;
try
{
tx = conn.BeginTransaction(System.Data.IsolationLevel.ReadCommitted);
DbCommand command = conn.CreateCommand();
command.Transaction = tx;
IList<ActivityTrackingRecord> activity = new List<ActivityTrackingRecord>(1);
activity.Add(record);
ExecuteInsertActivityStatusInstance(_internalId, activity, command);
tx.Commit();
}
catch (Exception)
{
//
// Rollback can throw - ignore these exceptions
// so we don't lose the original exception
try
{
if (null != tx)
tx.Rollback();
}
catch (Exception)
{
}
//
// Re-throw original exception
throw;
}
finally
{
if ((null != conn) && (ConnectionState.Closed != conn.State))
conn.Close();
}
return;
}
private void ExecuteInsertActivityStatusInstance(long internalId, IList<ActivityTrackingRecord> activities, DbCommand command)
{
if (null == activities || activities.Count <= 0)
return;
if (activities.Count > _activityEventBatchSize)
throw new ArgumentOutOfRangeException("activities");
if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State))
throw new ArgumentException();
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertActivityExecutionStatusEventMultiple]";
//
// Add the common parameters
command.Parameters.Clear();
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceId", _parameters.InstanceId));
//
// If we have the workflow's internal id use it to avoid the look up in the db
DbParameter param = DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", DbType.Int64, System.Data.ParameterDirection.InputOutput);
command.Parameters.Add(param);
if (internalId > 0)
param.Value = internalId;
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceContextGuid", _parameters.ContextGuid));
//
// Hashed ids of QName, context and pcontext used as key for storing activity record ids
// Save these for each record in the list so we don't have to recompute them below when adding to the cache
string[] ids = new string[] { null, null, null, null, null };
for (int i = 0; i < activities.Count; i++)
{
ActivityTrackingRecord record = activities[i];
long aid = -1;
ids[i] = BuildQualifiedNameVarName(record.QualifiedName, record.ContextGuid, record.ParentContextGuid);
TryGetActivityInstanceId(ids[i], out aid);
BuildInsertActivityStatusEventParameters(internalId, aid, i + 1, record, command);
}
command.ExecuteNonQuery();
//
// Get all the output ids
long[] eventIds = new long[] { -1, -1, -1, -1, -1 };
for (int i = 0; i < activities.Count; i++)
{
string index = (i + 1).ToString(CultureInfo.InvariantCulture);
//
// ActivityInstanceId
long aId = (long)command.Parameters["@ActivityInstanceId" + index].Value;
Debug.Assert(aId > 0, "Invalid @ActivityInstanceId output parameter value");
//
// For all status changes that aren't "Closed" add the id to the instance cache
// Set... method checks and only adds if it does already exist.
// To keep the cache size under control remove entries for activities that have closed.
// The activity might fault and need to do a lookup in the db but this isn't the common
// path and the db lookup isn't very expensive.
if (ActivityExecutionStatus.Closed != activities[i].ExecutionStatus)
SetActivityInstanceId(ids[i], aId);
else
RemoveActivityInstanceId(ids[i]);
//
// ActivityExecutionStatusEventId
long aeseId = (long)command.Parameters["@ActivityExecutionStatusEventId" + index].Value;
Debug.Assert(aeseId > 0, "Invalid @ActivityExecutionStatusEventId output parameter value");
eventIds[i] = aeseId;
}
List<KeyValuePair<long, string>> annotations = new List<KeyValuePair<long, string>>(10);
List<KeyValuePair<long, TrackingDataItem>> items = new List<KeyValuePair<long, TrackingDataItem>>(10);
for (int i = 0; i < activities.Count; i++)
{
ActivityTrackingRecord record = activities[i];
//
// Get the ActivityExecutionStatusEventId
long eventId = eventIds[i];
if (eventId <= 0)
throw new InvalidOperationException();
foreach (string s in record.Annotations)
annotations.Add(new KeyValuePair<long, string>(eventId, s));
foreach (TrackingDataItem item in record.Body)
items.Add(new KeyValuePair<long, TrackingDataItem>(eventId, item));
}
BatchExecuteInsertEventAnnotation(internalId, 'a', annotations, command);
BatchExecuteInsertTrackingDataItems(internalId, 'a', items, command);
}
private void BuildInsertActivityStatusEventParameters(long internalId, long activityInstanceId, int parameterId, ActivityTrackingRecord record, DbCommand command)
{
string paramIdString = parameterId.ToString(CultureInfo.InvariantCulture);
//
// If we have the activity's instance id use it to avoid the look up in the db
DbParameter param = DbResourceAllocator.NewDbParameter("@ActivityInstanceId" + paramIdString, DbType.Int64, System.Data.ParameterDirection.InputOutput);
command.Parameters.Add(param);
if (activityInstanceId > 0)
param.Value = activityInstanceId;
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@QualifiedName" + paramIdString, record.QualifiedName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ContextGuid" + paramIdString, record.ContextGuid));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ParentContextGuid" + paramIdString, record.ParentContextGuid));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ExecutionStatusId" + paramIdString, (int)record.ExecutionStatus));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventDateTime" + paramIdString, record.EventDateTime));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventOrder" + paramIdString, record.EventOrder));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ActivityExecutionStatusEventId" + paramIdString, DbType.Int64, ParameterDirection.Output));
}
#endregion
#region Sql Commands - InsertUserEvent
private void ExecuteInsertUserEvent(object param)
{
UserTrackingRecord record = param as UserTrackingRecord;
if (null == record)
throw new ArgumentException(ExecutionStringManager.InvalidUserTrackingRecordParameter, "param");
DbConnection conn = DbResourceAllocator.OpenNewConnection();
DbTransaction tx = null;
try
{
tx = conn.BeginTransaction(System.Data.IsolationLevel.ReadCommitted);
DbCommand command = conn.CreateCommand();
command.Transaction = tx;
ExecuteInsertUserEvent(_internalId, record, command);
tx.Commit();
}
catch (Exception)
{
//
// Rollback can throw - ignore these exceptions
// so we don't lose the original exception
try
{
if (null != tx)
tx.Rollback();
}
catch (Exception)
{
}
//
// Re-throw original exception
throw;
}
finally
{
if ((null != conn) && (ConnectionState.Closed != conn.State))
conn.Close();
}
return;
}
private void ExecuteInsertUserEvent(long internalId, UserTrackingRecord record, DbCommand command)
{
if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State))
throw new ArgumentException();
long aid = -1;
bool cached = false;
//
// Check if we have the activityInstanceId in the cache - we cache to avoid repeatedly searching this table.
string id = BuildQualifiedNameVarName(record.QualifiedName, record.ContextGuid, record.ParentContextGuid);
if (TryGetActivityInstanceId(id, out aid))
cached = true;
BuildInsertUserEventParameters(internalId, aid, record, command);
command.ExecuteNonQuery();
//
// If we didn't already have the activityInstanceId get it from the IN/OUT param and put it in the cache
if (!cached)
SetActivityInstanceId(id, (long)command.Parameters["@ActivityInstanceId"].Value);
long eventId = (long)command.Parameters["@UserEventId"].Value;
List<KeyValuePair<long, string>> annotations = new List<KeyValuePair<long, string>>(10);
List<KeyValuePair<long, TrackingDataItem>> items = new List<KeyValuePair<long, TrackingDataItem>>(10);
foreach (string s in record.Annotations)
annotations.Add(new KeyValuePair<long, string>(eventId, s));
foreach (TrackingDataItem item in record.Body)
items.Add(new KeyValuePair<long, TrackingDataItem>(eventId, item));
BatchExecuteInsertEventAnnotation(internalId, 'u', annotations, command);
BatchExecuteInsertTrackingDataItems(internalId, 'u', items, command);
}
private void BuildInsertUserEventParameters(long internalId, long activityInstanceId, UserTrackingRecord record, DbCommand command)
{
Debug.Assert(internalId != -1, "Invalid internalId");
Debug.Assert((command != null), "Null command passed to BuildInsertActivityStatusEventParameters");
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertUserEvent]";
command.Parameters.Clear();
DbParameter param = DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", DbType.Int64);
command.Parameters.Add(param);
param.Value = internalId;
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventOrder", record.EventOrder));
//
// If we have the activity's instance id use it to avoid the look up in the db
param = DbResourceAllocator.NewDbParameter("@ActivityInstanceId", DbType.Int64, System.Data.ParameterDirection.InputOutput);
command.Parameters.Add(param);
if (activityInstanceId > 0)
{
param.Value = activityInstanceId;
}
else
{
//
// Keep the network traffic down - only include the fields needed
// to insert an ActivityInstance record if we don't have the activityInstanceId
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@QualifiedName", record.QualifiedName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ContextGuid", record.ContextGuid));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ParentContextGuid", record.ParentContextGuid));
}
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventDateTime", record.EventDateTime));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserDataKey", record.UserDataKey));
if (null != record.UserData)
{
Type t = record.UserData.GetType();
Byte[] data = null;
bool nonSerializable = false;
string userDataString = null;
if (!(record.UserData is SerializedDataItem))
SerializeDataItem(record.UserData, out data, out nonSerializable);
SerializedDataItem sItem = record.UserData as SerializedDataItem;
data = sItem.SerializedData;
nonSerializable = sItem.NonSerializable;
userDataString = sItem.StringData;
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserDataTypeFullName", t.FullName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserDataAssemblyFullName", t.Assembly.FullName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserData_Str", userDataString));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserData_Blob", data));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserDataNonSerializable", nonSerializable));
}
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserEventId", DbType.Int64, ParameterDirection.Output));
}
#endregion
#region Sql Commands - InsertTrackingDataItem
private void BatchExecuteInsertTrackingDataItems(long internalId, char eventTypeId, IList<KeyValuePair<long, TrackingDataItem>> items, DbCommand command)
{
if (null == items || items.Count <= 0)
return;
//
// If the list is smaller than the batch size just push the whole thing
if (items.Count <= _dataItemBatchSize)
{
ExecuteInsertTrackingDataItems(internalId, eventTypeId, items, command);
return;
}
//
// Need to split the list into max batch size chunks
List<KeyValuePair<long, TrackingDataItem>> batch = new List<KeyValuePair<long, TrackingDataItem>>(_dataItemBatchSize);
foreach (KeyValuePair<long, TrackingDataItem> kvp in items)
{
batch.Add(kvp);
if (batch.Count == _dataItemBatchSize)
{
ExecuteInsertTrackingDataItems(internalId, eventTypeId, batch, command);
batch.Clear();
}
}
//
// Send anything that hasn't been sent
if (batch.Count > 0)
ExecuteInsertTrackingDataItems(internalId, eventTypeId, batch, command);
}
private void ExecuteInsertTrackingDataItems(long internalId, char eventTypeId, IList<KeyValuePair<long, TrackingDataItem>> items, DbCommand command)
{
Debug.Assert(internalId != -1, "Invalid internalId");
if (null == items || items.Count <= 0)
return;
if (items.Count > _dataItemAnnotationBatchSize)
throw new ArgumentOutOfRangeException("items");
if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State))
throw new ArgumentException();
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertTrackingDataItemMultiple]";
command.Parameters.Clear();
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", internalId));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventTypeId", eventTypeId));
int i = 1; // base 1 to match parameter names
foreach (KeyValuePair<long, TrackingDataItem> kvp in items)
{
string index = (i++).ToString(CultureInfo.InvariantCulture);
SerializedDataItem sItem = kvp.Value as SerializedDataItem;
if (null == sItem)
sItem = SerializeDataItem(kvp.Value);
Type t = sItem.Type;
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventId" + index, kvp.Key));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@FieldName" + index, sItem.FieldName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TypeFullName" + index, ((null == t) ? null : t.FullName)));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@AssemblyFullName" + index, ((null == t) ? null : t.Assembly.FullName)));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Data_Str" + index, sItem.StringData));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Data_Blob" + index, sItem.SerializedData));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@DataNonSerializable" + index, sItem.NonSerializable));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TrackingDataItemId" + index, DbType.Int64, System.Data.ParameterDirection.Output));
}
command.ExecuteNonQuery();
//
// Get all the out parameters holding the data item record ids
// This keeps us from repeatedly going into the parameters collection
// below if a data item has more than one annotation
List<long> ids = new List<long>(_dataItemAnnotationBatchSize);
for (i = 0; i < items.Count; i++)
{
string index = (i + 1).ToString(CultureInfo.InvariantCulture);
ids.Insert(i, (long)command.Parameters["@TrackingDataItemId" + index].Value);
}
//
// Go through all the data items and send all the annotations in batches
List<KeyValuePair<long, string>> annotations = new List<KeyValuePair<long, string>>(_dataItemAnnotationBatchSize);
i = 0;
foreach (KeyValuePair<long, TrackingDataItem> kvp in items)
{
TrackingDataItem item = kvp.Value;
long dataItemId = ids[i++];
foreach (string s in item.Annotations)
{
annotations.Add(new KeyValuePair<long, string>(dataItemId, s));
if (annotations.Count == _dataItemAnnotationBatchSize)
{
ExecuteInsertAnnotation(internalId, annotations, command);
annotations.Clear();
}
}
}
//
// If we have anything left send them.
if (annotations.Count > 0)
ExecuteInsertAnnotation(internalId, annotations, command);
}
private void ExecuteInsertAnnotation(long internalId, IList<KeyValuePair<long, string>> annotations, DbCommand command)
{
if (null == annotations || annotations.Count <= 0)
return;
if (annotations.Count > _dataItemAnnotationBatchSize)
throw new ArgumentOutOfRangeException("annotations");
if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State))
throw new ArgumentNullException("command");
command.Parameters.Clear();
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertTrackingDataItemAnnotationMultiple]";
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", internalId));
int i = 1; // base 1 to match parameter names
foreach (KeyValuePair<long, string> kvp in annotations)
{
string index = (i++).ToString(CultureInfo.InvariantCulture);
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@HasData" + index, true));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TrackingDataItemId" + index, kvp.Key));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Annotation" + index, kvp.Value));
}
command.ExecuteNonQuery();
return;
}
private void BatchExecuteInsertEventAnnotation(long internalId, char eventTypeId, IList<KeyValuePair<long, string>> annotations, DbCommand command)
{
if (null == annotations || annotations.Count <= 0)
return;
//
// If the list is smaller than the max batch size just send it directly
if (annotations.Count <= _eventAnnotationBatchSize)
{
ExecuteInsertEventAnnotation(internalId, eventTypeId, annotations, command);
return;
}
//
// Need to split the list into max batch size chunks
List<KeyValuePair<long, string>> batch = new List<KeyValuePair<long, string>>(_eventAnnotationBatchSize);
foreach (KeyValuePair<long, string> kvp in annotations)
{
batch.Add(kvp);
if (batch.Count == _eventAnnotationBatchSize)
{
ExecuteInsertEventAnnotation(internalId, eventTypeId, batch, command);
batch.Clear();
}
}
//
// Send anything that hasn't been sent
if (batch.Count > 0)
ExecuteInsertEventAnnotation(internalId, eventTypeId, batch, command);
}
private void ExecuteInsertEventAnnotation(long internalId, char eventTypeId, IList<KeyValuePair<long, string>> annotations, DbCommand command)
{
Debug.Assert(internalId != -1, "Invalid internalId");
if (null == annotations || annotations.Count <= 0)
return;
if (annotations.Count > _eventAnnotationBatchSize)
throw new ArgumentOutOfRangeException("annotations");
if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State))
throw new ArgumentNullException("command");
command.Parameters.Clear();
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertEventAnnotationMultiple]";
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", internalId));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventTypeId", eventTypeId));
int i = 1; //base 1 to match parameter names
foreach (KeyValuePair<long, string> kvp in annotations)
{
string index = (i++).ToString(CultureInfo.InvariantCulture);
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@HasData" + index, true));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventId" + index, kvp.Key));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Annotation" + index, kvp.Value));
}
command.ExecuteNonQuery();
return;
}
#endregion
#region Workflow Change
private void ExecuteInsertWorkflowChange(object param)
{
WorkflowTrackingRecord record = param as WorkflowTrackingRecord;
if (null == record)
throw new ArgumentException(ExecutionStringManager.InvalidWorkflowTrackingRecordParameter, "param");
DbCommand command = DbResourceAllocator.NewCommand();
try
{
if (ConnectionState.Open != command.Connection.State)
command.Connection.Open();
command.Transaction = command.Connection.BeginTransaction();
ExecuteInsertWorkflowChange(_internalId, record, command);
command.Transaction.Commit();
}
catch (Exception)
{
//
// Rollback can throw - ignore these exceptions
// so we don't lose the original exception
try
{
if ((null != command) && (null != command.Transaction))
command.Transaction.Rollback();
}
catch (Exception)
{
}
//
// Re-throw original exception
throw;
}
finally
{
if ((null != command) && (null != command.Connection) && (ConnectionState.Closed != command.Connection.State))
command.Connection.Close();
}
return;
}
private void ExecuteInsertWorkflowChange(long internalId, WorkflowTrackingRecord record, DbCommand command)
{
if (null == record)
throw new ArgumentNullException("record");
if (null == record.EventArgs)
throw new InvalidOperationException(ExecutionStringManager.InvalidWorkflowChangeArgs);
if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State))
throw new ArgumentNullException("command");
//
// If we haven't already serialized do so now.
// This is work we have to do to write to store in the db anyway.
if (!(record.EventArgs is SerializedWorkflowChangedEventArgs))
record = SerializeRecord(record);
//
// Insert the workflow instance event
BuildInsertWorkflowInstanceEventParameters(internalId, record, null, command);
command.ExecuteNonQuery();
//
// Get the event id for added/removed activities and annotations
long eventId = (long)command.Parameters["@WorkflowInstanceEventId1"].Value;
SerializedWorkflowChangedEventArgs sargs = (SerializedWorkflowChangedEventArgs)record.EventArgs;
//
// Normalize the activities that have been added/removed if we're tracking definitions
if ((null != sargs.AddedActivities) && (sargs.AddedActivities.Count > 0))
{
foreach (AddedActivity added in sargs.AddedActivities)
ExecuteInsertAddedActivity(internalId, added.QualifiedName, added.ParentQualifiedName, added.ActivityTypeFullName, added.ActivityTypeAssemblyFullName, added.AddedActivityActionXoml, eventId, added.Order, command);
}
if ((null != sargs.RemovedActivities) && (sargs.RemovedActivities.Count > 0))
{
foreach (RemovedActivity removed in sargs.RemovedActivities)
ExecuteInsertRemovedActivity(internalId, removed.QualifiedName, removed.ParentQualifiedName, removed.RemovedActivityActionXoml, eventId, removed.Order, command);
}
List<KeyValuePair<long, string>> annotations = new List<KeyValuePair<long, string>>(record.Annotations.Count);
foreach (string s in record.Annotations)
annotations.Add(new KeyValuePair<long, string>(eventId, s));
BatchExecuteInsertEventAnnotation(internalId, 'w', annotations, command);
}
private void ExecuteInsertAddedActivity(long internalId, string qualifiedName, string parentQualifiedName, string typeFullName, string assemblyFullName, string addedActivityActionXoml, long eventId, int order, DbCommand command)
{
if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State))
throw new ArgumentNullException("command");
command.Parameters.Clear();
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertAddedActivity]";
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", internalId));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceEventId", eventId));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@QualifiedName", qualifiedName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TypeFullName", typeFullName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@AssemblyFullName", assemblyFullName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ParentQualifiedName", parentQualifiedName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@AddedActivityAction", addedActivityActionXoml));
if (-1 == order)
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Order", DBNull.Value));
else
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Order", order));
command.ExecuteNonQuery();
}
private void ExecuteInsertRemovedActivity(long internalId, string qualifiedName, string parentQualifiedName, string removedActivityActionXoml, long eventId, int order, DbCommand command)
{
if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State))
throw new ArgumentNullException("command");
command.Parameters.Clear();
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertRemovedActivity]";
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", internalId));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceEventId", eventId));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@QualifiedName", qualifiedName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ParentQualifiedName", parentQualifiedName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@RemovedActivityAction", removedActivityActionXoml));
if (-1 == order)
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Order", DBNull.Value));
else
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Order", order));
command.ExecuteNonQuery();
}
#endregion
#region Utility
private bool TryGetActivityInstanceId(string key, out long id)
{
//
// Check the cache of committed ids
if (_activityInstanceId.TryGetValue(key, out id))
return true;
//
// If we're batched check the cache of temp ids generated during this batch commit
if (_isTrans)
return _tmpActivityInstanceId.TryGetValue(key, out id);
else
return false; // not batched so we didn't find the id
}
private void SetActivityInstanceId(string key, long id)
{
//
// If we're batched put the ids in the temp member
// If the commit is successful we'll move these to the real member
// in IPendingWork.Complete
if (_isTrans)
{
if (!_tmpActivityInstanceId.ContainsKey(key))
_tmpActivityInstanceId.Add(key, id);
}
else
{
if (!_activityInstanceId.ContainsKey(key))
_activityInstanceId.Add(key, id);
}
}
private void RemoveActivityInstanceId(string key)
{
//
// Remove from both the temp and real caches
if (_isTrans)
{
if (_tmpActivityInstanceId.ContainsKey(key))
_tmpActivityInstanceId.Remove(key);
}
if (_activityInstanceId.ContainsKey(key))
_activityInstanceId.Remove(key);
}
private string GetSqlDateTimeString(DateTime dateTime)
{
return dateTime.Year.ToString(System.Globalization.CultureInfo.InvariantCulture) + PadToDblDigit(dateTime.Month) + PadToDblDigit(dateTime.Day) + " " + dateTime.Hour.ToString(System.Globalization.CultureInfo.InvariantCulture) + ":" + dateTime.Minute.ToString(System.Globalization.CultureInfo.InvariantCulture) + ":" + dateTime.Second.ToString(System.Globalization.CultureInfo.InvariantCulture) + ":" + dateTime.Millisecond.ToString(System.Globalization.CultureInfo.InvariantCulture);
}
private string PadToDblDigit(int num)
{
string s = num.ToString(System.Globalization.CultureInfo.InvariantCulture);
if (s.Length == 1)
return "0" + s;
else
return s;
}
/// <summary>
/// Build a string to uniquely identify each activity that should be recorded as a seperate instance.
/// A separate instance is defined by the combination of QualifiedName, Context and ParentContext
/// </summary>
/// <param name="record"></param>
/// <returns></returns>
private string BuildQualifiedNameVarName(string qId, Guid context, Guid parentContext)
{
Guid hashed = HashHelper.HashStringToGuid(qId);
return hashed.ToString().Replace('-', '_') + "_" + context.ToString().Replace('-', '_') + "_" + parentContext.ToString().Replace('-', '_');
}
private ActivityTrackingRecord SerializeRecord(ActivityTrackingRecord record)
{
if ((null == record.Body) || (0 == record.Body.Count))
return record;
for (int i = 0; i < record.Body.Count; i++)
record.Body[i] = SerializeDataItem(record.Body[i]);
return record;
}
private UserTrackingRecord SerializeRecord(UserTrackingRecord record)
{
if (((null == record.Body) || (0 == record.Body.Count)) && (null == record.EventArgs) && (null == record.UserData))
return record;
if (null != record.UserData)
{
SerializedDataItem item = new SerializedDataItem();
byte[] data = null;
bool nonSerializable;
SerializeDataItem(record.UserData, out data, out nonSerializable);
item.Type = record.UserData.GetType();
item.StringData = record.UserData.ToString();
item.SerializedData = data;
item.NonSerializable = nonSerializable;
record.UserData = item;
}
for (int i = 0; i < record.Body.Count; i++)
record.Body[i] = SerializeDataItem(record.Body[i]);
return record;
}
private WorkflowTrackingRecord SerializeRecord(WorkflowTrackingRecord record)
{
if (null == record.EventArgs)
return record;
SerializedEventArgs args;
if (TrackingWorkflowEvent.Changed == record.TrackingWorkflowEvent)
{
//
// Convert the WorkflowChanged items
SerializedWorkflowChangedEventArgs sargs = new SerializedWorkflowChangedEventArgs();
TrackingWorkflowChangedEventArgs wargs = (TrackingWorkflowChangedEventArgs)record.EventArgs;
if (null != wargs)
{
for (int i = 0; i < wargs.Changes.Count; i++)
{
WorkflowChangeAction action = wargs.Changes[i];
if (action is RemovedActivityAction)
AddRemovedActivity((RemovedActivityAction)action, i, sargs.RemovedActivities);
else if (action is AddedActivityAction)
AddAddedActivity((AddedActivityAction)action, i, sargs.AddedActivities);
}
}
args = sargs;
}
else
{
args = new SerializedEventArgs();
byte[] data = null;
bool nonSerializable;
SerializeDataItem(record.EventArgs, out data, out nonSerializable);
args.SerializedArgs = data;
//
// nonSerializable will only be null for SerializationExceptions, all others bubble
if (nonSerializable)
{
//
// Something didn't serialize.
// If this is an exception or terminated event it is most likely the Exception member
// Save the exception message - better than losing all record of the exception
Exception e;
switch (record.TrackingWorkflowEvent)
{
case TrackingWorkflowEvent.Terminated:
e = ((TrackingWorkflowTerminatedEventArgs)record.EventArgs).Exception;
if (null != e)
{
SerializeDataItem(e.ToString(), out data, out nonSerializable);
args.SerializedArgs = data;
}
break;
case TrackingWorkflowEvent.Exception:
e = ((TrackingWorkflowExceptionEventArgs)record.EventArgs).Exception;
if (null != e)
{
SerializeDataItem(e.ToString(), out data, out nonSerializable);
args.SerializedArgs = data;
}
break;
}
}
}
//
// Set the type of the EventArgs and then
// put the serialized item in the args member,
// we don't need the original Args object any longer
args.Type = record.EventArgs.GetType();
record.EventArgs = args;
return record;
}
private void AddRemovedActivity(RemovedActivityAction removedAction, int order, IList<RemovedActivity> activities)
{
Activity removed = removedAction.OriginalRemovedActivity;
RemovedActivity removedActivity = new RemovedActivity();
removedActivity.Order = order;
removedActivity.QualifiedName = removed.QualifiedName;
if (null != removed.Parent)
removedActivity.ParentQualifiedName = removed.Parent.QualifiedName;
//
// Save the defintion of this change
removedActivity.RemovedActivityActionXoml = GetXomlDocument(removedAction);
activities.Add(removedActivity);
//
// Recursively add all contained activities to the removed list
if (removed is CompositeActivity)
{
foreach (Activity activity in ((CompositeActivity)removed).Activities)
{
AddRemovedActivity(activity, activities);
}
}
}
private void AddRemovedActivity(Activity removed, IList<RemovedActivity> activities)
{
RemovedActivity removedActivity = new RemovedActivity();
removedActivity.Order = -1;
removedActivity.QualifiedName = removed.QualifiedName;
if (null != removed.Parent)
removedActivity.ParentQualifiedName = removed.Parent.QualifiedName;
activities.Add(removedActivity);
//
// Recursively add all contained activities to the removed list
if (removed is CompositeActivity)
{
foreach (Activity activity in ((CompositeActivity)removed).Activities)
{
AddRemovedActivity(activity, activities);
}
}
}
private void AddAddedActivity(AddedActivityAction addedAction, int order, IList<AddedActivity> activities)
{
Activity added = addedAction.AddedActivity;
AddedActivity addedActivity = new AddedActivity();
addedActivity.Order = order;
Type type = added.GetType();
addedActivity.ActivityTypeFullName = type.FullName;
addedActivity.ActivityTypeAssemblyFullName = type.Assembly.FullName;
addedActivity.QualifiedName = added.QualifiedName;
if (null != added.Parent)
addedActivity.ParentQualifiedName = added.Parent.QualifiedName;
addedActivity.AddedActivityActionXoml = GetXomlDocument(addedAction);
activities.Add(addedActivity);
//
// Recursively add all contained activities to the added list
if (added is CompositeActivity)
{
foreach (Activity activity in ((CompositeActivity)added).Activities)
{
AddAddedActivity(activity, activities);
}
}
}
private void AddAddedActivity(Activity added, IList<AddedActivity> activities)
{
AddedActivity addedActivity = new AddedActivity();
addedActivity.Order = -1;
Type type = added.GetType();
addedActivity.ActivityTypeFullName = type.FullName;
addedActivity.ActivityTypeAssemblyFullName = type.Assembly.FullName;
addedActivity.QualifiedName = added.QualifiedName;
if (null != added.Parent)
addedActivity.ParentQualifiedName = added.Parent.QualifiedName;
activities.Add(addedActivity);
//
// Recursively add all contained activities to the added list
if (added is CompositeActivity)
{
foreach (Activity activity in ((CompositeActivity)added).Activities)
{
AddAddedActivity(activity, activities);
}
}
}
private SerializedDataItem SerializeDataItem(TrackingDataItem item)
{
if (null == item)
return null;
SerializedDataItem s = new SerializedDataItem();
s.Data = item.Data;
s.Annotations.AddRange(item.Annotations);
s.FieldName = item.FieldName;
if (null != item.Data)
{
byte[] state = null;
bool nonSerializable;
SerializeDataItem(item.Data, out state, out nonSerializable);
s.SerializedData = state;
s.StringData = item.Data.ToString();
s.Type = item.Data.GetType();
s.NonSerializable = nonSerializable;
}
return s;
}
/// <summary>
/// Binary serialize an object. Used to persist trackingDataItems.
/// </summary>
/// <param name="data"></param>
/// <param name="state"></param>
private void SerializeDataItem(object data, out byte[] state, out bool nonSerializable)
{
nonSerializable = false;
state = null;
if (null == data)
return;
MemoryStream stream = new MemoryStream(1024);
BinaryFormatter bf = new BinaryFormatter();
try
{
bf.Serialize(stream, data);
state = new byte[stream.Length];
stream.Position = 0;
if (stream.Length > Int32.MaxValue)
return;
else
{
int read = 0, totalRead = 0, cbToRead = 0;
do
{
totalRead += read;
cbToRead = (int)stream.Length - totalRead;
read = stream.Read(state, totalRead, cbToRead);
} while (read > 0);
}
}
catch (SerializationException)
{
nonSerializable = true;
return;
}
finally
{
stream.Close();
}
}
/// <summary>
/// Make string sql safe
/// </summary>
/// <param name="val"></param>
/// <returns></returns>
private string SqlEscape(string val)
{
if (null == val)
return null;
return val.Replace("'", "''");
}
/*
static char[] hexDigits = {
'0', '1', '2', '3', '4', '5', '6', '7',
'8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
/// <summary>
/// Convert a byte array to a string of hex chars for sql image type
/// </summary>
/// <param name="bytes"></param>
/// <returns></returns>
private static string ToHexString( byte[] bytes )
{
if ( null == bytes )
return null;
if ( 0 == bytes.Length )
return null;
char[] chars = new char[bytes.Length * 2];
for ( int i = 0; i < bytes.Length; i++ )
{
int b = bytes[i];
chars[i * 2] = hexDigits[b >> 4];
chars[i * 2 + 1] = hexDigits[b & 0xF];
}
return "0x" + new string( chars );
}
*/
private void GetCallPathKeys(IList<string> callPath)
{
if ((null == callPath) || (callPath.Count <= 0))
return;
for (int i = 0; i < callPath.Count; i++)
{
_callPathKey = _callPathKey + "." + callPath[i];
if (i < callPath.Count - 1)
_parentCallPathKey = _parentCallPathKey + "." + callPath[i];
}
if (null != _callPathKey)
_callPathKey = SqlEscape(_callPathKey.Substring(1));
if (null != _parentCallPathKey)
_parentCallPathKey = SqlEscape(_parentCallPathKey.Substring(1));
}
private string GetActivitiesXml(CompositeActivity root)
{
if (null == root)
return null;
StringBuilder sb = new StringBuilder();
XmlWriter writer = XmlWriter.Create(sb);
try
{
writer.WriteStartDocument();
writer.WriteStartElement("Activities");
WriteActivity(root, writer);
writer.WriteEndElement();
writer.WriteEndDocument();
}
finally
{
writer.Flush();
writer.Close();
}
return sb.ToString();
}
private void WriteActivity(Activity activity, XmlWriter writer)
{
if (null == activity)
return;
if (null == writer)
throw new ArgumentNullException("writer");
Type t = activity.GetType();
writer.WriteStartElement("Activity");
writer.WriteElementString("TypeFullName", t.FullName);
writer.WriteElementString("AssemblyFullName", t.Assembly.FullName);
writer.WriteElementString("QualifiedName", activity.QualifiedName);
//
// Don't write the element if the value is null, sql will see a missing element as a null value
if (null != activity.Parent)
writer.WriteElementString("ParentQualifiedName", activity.Parent.QualifiedName);
writer.WriteEndElement();
if (activity is CompositeActivity)
foreach (Activity a in GetAllEnabledActivities((CompositeActivity)activity))
WriteActivity(a, writer);
}
// This function returns all the executable activities including secondary flow activities.
private IList<Activity> GetAllEnabledActivities(CompositeActivity compositeActivity)
{
if (compositeActivity == null)
throw new ArgumentNullException("compositeActivity");
List<Activity> allActivities = new List<Activity>(compositeActivity.EnabledActivities);
foreach (Activity secondaryFlowActivity in ((ISupportAlternateFlow)compositeActivity).AlternateFlowActivities)
{
if (!allActivities.Contains(secondaryFlowActivity))
allActivities.Add(secondaryFlowActivity);
}
return allActivities;
}
internal string GetXomlDocument(object obj)
{
string xomlText = null;
using (StringWriter stringWriter = new StringWriter(System.Globalization.CultureInfo.InvariantCulture))
{
using (XmlWriter xmlWriter = CreateXmlWriter(stringWriter))
{
WorkflowMarkupSerializer serializer = new WorkflowMarkupSerializer();
serializer.Serialize(xmlWriter, obj);
xomlText = stringWriter.ToString();
}
}
return xomlText;
}
#endregion
}
}
}
|