|
//-----------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//-----------------------------------------------------------------------------
namespace System.ServiceModel.Channels
{
using System.Runtime;
using System.Threading;
using System.ServiceModel.Diagnostics.Application;
sealed class Msmq4PoisonHandler : IPoisonHandlingStrategy
{
MsmqQueue mainQueue;
MsmqQueue mainQueueForMove;
MsmqQueue retryQueueForPeek;
MsmqQueue retryQueueForMove;
MsmqQueue poisonQueue;
MsmqQueue lockQueueForReceive;
IOThreadTimer timer;
MsmqReceiveHelper receiver;
bool disposed;
string poisonQueueName;
string retryQueueName;
string mainQueueName;
MsmqRetryQueueMessage retryQueueMessage;
static Action<object> onStartPeek = new Action<object>(StartPeek);
static AsyncCallback onPeekCompleted = Fx.ThunkCallback(OnPeekCompleted);
public Msmq4PoisonHandler(MsmqReceiveHelper receiver)
{
this.receiver = receiver;
this.timer = new IOThreadTimer(new Action<object>(OnTimer), null, false);
this.disposed = false;
this.mainQueueName = this.ReceiveParameters.AddressTranslator.UriToFormatName(this.ListenUri);
this.poisonQueueName = this.ReceiveParameters.AddressTranslator.UriToFormatName(new Uri(this.ListenUri.AbsoluteUri + ";poison"));
this.retryQueueName = this.ReceiveParameters.AddressTranslator.UriToFormatName(new Uri(this.ListenUri.AbsoluteUri + ";retry"));
}
MsmqReceiveParameters ReceiveParameters
{
get { return this.receiver.MsmqReceiveParameters; }
}
Uri ListenUri
{
get { return this.receiver.ListenUri; }
}
public void Open()
{
if (this.ReceiveParameters.ReceiveContextSettings.Enabled)
{
Fx.Assert(this.receiver.Queue is MsmqSubqueueLockingQueue, "Queue must be MsmqSubqueueLockingQueue");
this.lockQueueForReceive = ((MsmqSubqueueLockingQueue)this.receiver.Queue).LockQueueForReceive;
}
this.mainQueue = this.receiver.Queue;
this.mainQueueForMove = new MsmqQueue(this.mainQueueName, UnsafeNativeMethods.MQ_MOVE_ACCESS);
// Open up the poison queue (for handling poison messages).
this.poisonQueue = new MsmqQueue(this.poisonQueueName, UnsafeNativeMethods.MQ_MOVE_ACCESS);
this.retryQueueForMove = new MsmqQueue(this.retryQueueName, UnsafeNativeMethods.MQ_MOVE_ACCESS);
this.retryQueueForPeek = new MsmqQueue(this.retryQueueName, UnsafeNativeMethods.MQ_RECEIVE_ACCESS);
this.retryQueueMessage = new MsmqRetryQueueMessage();
if (Thread.CurrentThread.IsThreadPoolThread)
{
StartPeek(this);
}
else
{
ActionItem.Schedule(Msmq4PoisonHandler.onStartPeek, this);
}
}
static void StartPeek(object state)
{
Msmq4PoisonHandler handler = state as Msmq4PoisonHandler;
lock (handler)
{
if (!handler.disposed)
{
handler.retryQueueForPeek.BeginPeek(handler.retryQueueMessage, TimeSpan.MaxValue, onPeekCompleted, handler);
}
}
}
public bool CheckAndHandlePoisonMessage(MsmqMessageProperty messageProperty)
{
if (this.ReceiveParameters.ReceiveContextSettings.Enabled)
{
return ReceiveContextPoisonHandling(messageProperty);
}
else
{
return NonReceiveContextPoisonHandling(messageProperty);
}
}
public bool ReceiveContextPoisonHandling(MsmqMessageProperty messageProperty)
{
// The basic idea is to use the message move count to get the number of retry attempts the message has been through
// The computation of the retry count and retry cycle count is slightly involved due to fact that the message being processed
// could have been recycled message. (Recycled message is the message that moves from lock queue to retry queue to main queue
// and back to lock queue
//
// Count to tally message recycling (lock queue to retry queue to main queue adds move count of 2 to the message)
const int retryMoveCount = 2;
// Actual number of times message is received before recycling to retry queue
int actualReceiveRetryCount = this.ReceiveParameters.ReceiveRetryCount + 1;
// The message is recycled these many number of times
int maxRetryCycles = this.ReceiveParameters.MaxRetryCycles;
// Max change in message move count between recycling
int maxMovePerCycle = (2 * actualReceiveRetryCount) + 1;
// Number of recycles the message has been through
int messageCyclesCompleted = messageProperty.MoveCount / (maxMovePerCycle + retryMoveCount);
// Total number of moves on the message at the end of the last recycle
int messageMoveCountForCyclesCompleted = messageCyclesCompleted * (maxMovePerCycle + retryMoveCount);
// The differential move count for the current cycle
int messageMoveCountForCurrentCycle = messageProperty.MoveCount - messageMoveCountForCyclesCompleted;
lock (this)
{
if (this.disposed)
throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ObjectDisposedException(this.GetType().ToString()));
// Check if the message has already completed its max recycle count (MaxRetryCycles)
// and the disposed the message first. Such a message was previously disposed using the ReceiveErrorHandling method
// and the channel/listener would immediately fault
//
if (messageCyclesCompleted > maxRetryCycles)
{
FinalDisposition(messageProperty);
return true;
}
// Check if the message is eligible for recycling/disposition
if (messageMoveCountForCurrentCycle >= maxMovePerCycle)
{
if (TD.ReceiveRetryCountReachedIsEnabled())
{
TD.ReceiveRetryCountReached(messageProperty.MessageId);
}
if (messageCyclesCompleted < maxRetryCycles)
{
// The message is eligible for recycling, move the message the message to retry queue
MsmqReceiveHelper.MoveReceivedMessage(this.lockQueueForReceive, this.retryQueueForMove, messageProperty.LookupId);
MsmqDiagnostics.PoisonMessageMoved(messageProperty.MessageId, false, this.receiver.InstanceId);
}
else
{
if (TD.MaxRetryCyclesExceededMsmqIsEnabled())
{
TD.MaxRetryCyclesExceededMsmq(messageProperty.MessageId);
}
// Dispose the message using ReceiveErrorHandling
FinalDisposition(messageProperty);
}
return true;
}
else
{
return false;
}
}
}
public bool NonReceiveContextPoisonHandling(MsmqMessageProperty messageProperty)
{
if (messageProperty.AbortCount <= this.ReceiveParameters.ReceiveRetryCount)
return false;
int retryCycle = messageProperty.MoveCount / 2;
lock (this)
{
if (this.disposed)
throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ObjectDisposedException(this.GetType().ToString()));
if (retryCycle >= this.ReceiveParameters.MaxRetryCycles)
{
if (TD.MaxRetryCyclesExceededMsmqIsEnabled())
{
TD.MaxRetryCyclesExceededMsmq(messageProperty.MessageId);
}
FinalDisposition(messageProperty);
}
else
{
MsmqReceiveHelper.MoveReceivedMessage(this.mainQueue, this.retryQueueForMove, messageProperty.LookupId);
MsmqDiagnostics.PoisonMessageMoved(messageProperty.MessageId, false, this.receiver.InstanceId);
}
}
return true;
}
public void FinalDisposition(MsmqMessageProperty messageProperty)
{
if (this.ReceiveParameters.ReceiveContextSettings.Enabled)
{
this.InternalFinalDisposition(this.lockQueueForReceive, messageProperty);
}
else
{
this.InternalFinalDisposition(this.mainQueue, messageProperty);
}
}
private void InternalFinalDisposition(MsmqQueue disposeFromQueue, MsmqMessageProperty messageProperty)
{
switch (this.ReceiveParameters.ReceiveErrorHandling)
{
case ReceiveErrorHandling.Drop:
this.receiver.DropOrRejectReceivedMessage(disposeFromQueue, messageProperty, false);
break;
case ReceiveErrorHandling.Fault:
MsmqReceiveHelper.TryAbortTransactionCurrent();
if (null != this.receiver.ChannelListener)
this.receiver.ChannelListener.FaultListener();
if (null != this.receiver.Channel)
this.receiver.Channel.FaultChannel();
break;
case ReceiveErrorHandling.Reject:
this.receiver.DropOrRejectReceivedMessage(disposeFromQueue, messageProperty, true);
MsmqDiagnostics.PoisonMessageRejected(messageProperty.MessageId, this.receiver.InstanceId);
break;
case ReceiveErrorHandling.Move:
MsmqReceiveHelper.MoveReceivedMessage(disposeFromQueue, this.poisonQueue, messageProperty.LookupId);
MsmqDiagnostics.PoisonMessageMoved(messageProperty.MessageId, true, this.receiver.InstanceId);
break;
default:
Fx.Assert("System.ServiceModel.Channels.Msmq4PoisonHandler.FinalDisposition(): (unexpected ReceiveErrorHandling)");
break;
}
}
public void Dispose()
{
lock (this)
{
if (!this.disposed)
{
this.disposed = true;
this.timer.Cancel();
if (null != this.retryQueueForPeek)
this.retryQueueForPeek.Dispose();
if (null != this.retryQueueForMove)
this.retryQueueForMove.Dispose();
if (null != this.poisonQueue)
this.poisonQueue.Dispose();
if (null != this.mainQueueForMove)
this.mainQueueForMove.Dispose();
}
}
}
static void OnPeekCompleted(IAsyncResult result)
{
Msmq4PoisonHandler handler = result.AsyncState as Msmq4PoisonHandler;
MsmqQueue.ReceiveResult receiveResult = MsmqQueue.ReceiveResult.Unknown;
try
{
receiveResult = handler.retryQueueForPeek.EndPeek(result);
}
catch (MsmqException ex)
{
MsmqDiagnostics.ExpectedException(ex);
}
if (MsmqQueue.ReceiveResult.MessageReceived == receiveResult)
{
lock (handler)
{
if (!handler.disposed)
{
// Check the time - move it, and begin peeking again
// if necessary, or wait for the timeout.
DateTime lastMoveTime = MsmqDateTime.ToDateTime(handler.retryQueueMessage.LastMoveTime.Value);
TimeSpan waitTime = lastMoveTime + handler.ReceiveParameters.RetryCycleDelay - DateTime.UtcNow;
if (waitTime < TimeSpan.Zero)
handler.OnTimer(handler);
else
handler.timer.Set(waitTime);
}
}
}
}
void OnTimer(object state)
{
lock (this)
{
if (!this.disposed)
{
try
{
this.retryQueueForPeek.TryMoveMessage(this.retryQueueMessage.LookupId.Value, this.mainQueueForMove, MsmqTransactionMode.Single);
}
catch (MsmqException ex)
{
MsmqDiagnostics.ExpectedException(ex);
}
this.retryQueueForPeek.BeginPeek(this.retryQueueMessage, TimeSpan.MaxValue, onPeekCompleted, this);
}
}
}
class MsmqRetryQueueMessage : NativeMsmqMessage
{
LongProperty lookupId;
IntProperty lastMoveTime;
public MsmqRetryQueueMessage()
: base(2)
{
this.lookupId = new LongProperty(this, UnsafeNativeMethods.PROPID_M_LOOKUPID);
this.lastMoveTime = new IntProperty(this, UnsafeNativeMethods.PROPID_M_LAST_MOVE_TIME);
}
public LongProperty LookupId
{
get { return this.lookupId; }
}
public IntProperty LastMoveTime
{
get { return this.lastMoveTime; }
}
}
}
}
|