|
//-----------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//-----------------------------------------------------------------------------
namespace System.ServiceModel.Dispatcher
{
using System;
using System.Diagnostics;
using System.Globalization;
using System.Runtime;
using System.Runtime.CompilerServices;
using System.Runtime.Diagnostics;
using System.ServiceModel;
using System.ServiceModel.Activation;
using System.ServiceModel.Channels;
using System.ServiceModel.Description;
using System.ServiceModel.Diagnostics;
using System.ServiceModel.Diagnostics.Application;
using System.Threading;
using System.Transactions;
using System.Xml;
using SessionIdleManager = System.ServiceModel.Channels.ServiceChannel.SessionIdleManager;
class ChannelHandler
{
public static readonly TimeSpan CloseAfterFaultTimeout = TimeSpan.FromSeconds(10);
public const string MessageBufferPropertyName = "_RequestMessageBuffer_";
readonly IChannelBinder binder;
readonly DuplexChannelBinder duplexBinder;
readonly ServiceHostBase host;
readonly bool incrementedActivityCountInConstructor;
readonly bool isCallback;
readonly ListenerHandler listener;
readonly ServiceThrottle throttle;
readonly bool wasChannelThrottled;
readonly SessionIdleManager idleManager;
readonly bool sendAsynchronously;
static AsyncCallback onAsyncReplyComplete = Fx.ThunkCallback(new AsyncCallback(ChannelHandler.OnAsyncReplyComplete));
static AsyncCallback onAsyncReceiveComplete = Fx.ThunkCallback(new AsyncCallback(ChannelHandler.OnAsyncReceiveComplete));
static Action<object> onContinueAsyncReceive = new Action<object>(ChannelHandler.OnContinueAsyncReceive);
static Action<object> onStartSyncMessagePump = new Action<object>(ChannelHandler.OnStartSyncMessagePump);
static Action<object> onStartAsyncMessagePump = new Action<object>(ChannelHandler.OnStartAsyncMessagePump);
static Action<object> onStartSingleTransactedBatch = new Action<object>(ChannelHandler.OnStartSingleTransactedBatch);
static Action<object> openAndEnsurePump = new Action<object>(ChannelHandler.OpenAndEnsurePump);
RequestInfo requestInfo;
ServiceChannel channel;
bool doneReceiving;
bool hasRegisterBeenCalled;
bool hasSession;
int isPumpAcquired;
bool isChannelTerminated;
bool isConcurrent;
bool isManualAddressing;
MessageVersion messageVersion;
ErrorHandlingReceiver receiver;
bool receiveSynchronously;
bool receiveWithTransaction;
RequestContext replied;
RequestContext requestWaitingForThrottle;
WrappedTransaction acceptTransaction;
ServiceThrottle instanceContextThrottle;
SharedTransactedBatchContext sharedTransactedBatchContext;
TransactedBatchContext transactedBatchContext;
bool isMainTransactedBatchHandler;
EventTraceActivity eventTraceActivity;
SessionOpenNotification sessionOpenNotification;
bool needToCreateSessionOpenNotificationMessage;
bool shouldRejectMessageWithOnOpenActionHeader;
internal ChannelHandler(MessageVersion messageVersion, IChannelBinder binder, ServiceChannel channel)
{
ClientRuntime clientRuntime = channel.ClientRuntime;
this.messageVersion = messageVersion;
this.isManualAddressing = clientRuntime.ManualAddressing;
this.binder = binder;
this.channel = channel;
this.isConcurrent = true;
this.duplexBinder = binder as DuplexChannelBinder;
this.hasSession = binder.HasSession;
this.isCallback = true;
DispatchRuntime dispatchRuntime = clientRuntime.DispatchRuntime;
if (dispatchRuntime == null)
{
this.receiver = new ErrorHandlingReceiver(binder, null);
}
else
{
this.receiver = new ErrorHandlingReceiver(binder, dispatchRuntime.ChannelDispatcher);
}
this.requestInfo = new RequestInfo(this);
}
internal ChannelHandler(MessageVersion messageVersion, IChannelBinder binder, ServiceThrottle throttle,
ListenerHandler listener, bool wasChannelThrottled, WrappedTransaction acceptTransaction, SessionIdleManager idleManager)
{
ChannelDispatcher channelDispatcher = listener.ChannelDispatcher;
this.messageVersion = messageVersion;
this.isManualAddressing = channelDispatcher.ManualAddressing;
this.binder = binder;
this.throttle = throttle;
this.listener = listener;
this.wasChannelThrottled = wasChannelThrottled;
this.host = listener.Host;
this.receiveSynchronously = channelDispatcher.ReceiveSynchronously;
this.sendAsynchronously = channelDispatcher.SendAsynchronously;
this.duplexBinder = binder as DuplexChannelBinder;
this.hasSession = binder.HasSession;
this.isConcurrent = ConcurrencyBehavior.IsConcurrent(channelDispatcher, this.hasSession);
if (channelDispatcher.MaxPendingReceives > 1)
{
// We need to preserve order if the ChannelHandler is not concurrent.
this.binder = new MultipleReceiveBinder(
this.binder,
channelDispatcher.MaxPendingReceives,
!this.isConcurrent);
}
if (channelDispatcher.BufferedReceiveEnabled)
{
this.binder = new BufferedReceiveBinder(this.binder);
}
this.receiver = new ErrorHandlingReceiver(this.binder, channelDispatcher);
this.idleManager = idleManager;
Fx.Assert((this.idleManager != null) == (this.binder.HasSession && this.listener.ChannelDispatcher.DefaultCommunicationTimeouts.ReceiveTimeout != TimeSpan.MaxValue), "idle manager is present only when there is a session with a finite receive timeout");
if (channelDispatcher.IsTransactedReceive && !channelDispatcher.ReceiveContextEnabled)
{
receiveSynchronously = true;
receiveWithTransaction = true;
if (channelDispatcher.MaxTransactedBatchSize > 0)
{
int maxConcurrentBatches = 1;
if (null != throttle && throttle.MaxConcurrentCalls > 1)
{
maxConcurrentBatches = throttle.MaxConcurrentCalls;
foreach (EndpointDispatcher endpointDispatcher in channelDispatcher.Endpoints)
{
if (ConcurrencyMode.Multiple != endpointDispatcher.DispatchRuntime.ConcurrencyMode)
{
maxConcurrentBatches = 1;
break;
}
}
}
this.sharedTransactedBatchContext = new SharedTransactedBatchContext(this, channelDispatcher, maxConcurrentBatches);
this.isMainTransactedBatchHandler = true;
this.throttle = null;
}
}
else if (channelDispatcher.IsTransactedReceive && channelDispatcher.ReceiveContextEnabled && channelDispatcher.MaxTransactedBatchSize > 0)
{
throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.IncompatibleBehaviors)));
}
if (this.binder.HasSession)
{
this.sessionOpenNotification = this.binder.Channel.GetProperty<SessionOpenNotification>();
this.needToCreateSessionOpenNotificationMessage = this.sessionOpenNotification != null && this.sessionOpenNotification.IsEnabled;
}
this.acceptTransaction = acceptTransaction;
this.requestInfo = new RequestInfo(this);
if (this.listener.State == CommunicationState.Opened)
{
this.listener.ChannelDispatcher.Channels.IncrementActivityCount();
this.incrementedActivityCountInConstructor = true;
}
}
internal ChannelHandler(ChannelHandler handler, TransactedBatchContext context)
{
this.messageVersion = handler.messageVersion;
this.isManualAddressing = handler.isManualAddressing;
this.binder = handler.binder;
this.listener = handler.listener;
this.wasChannelThrottled = handler.wasChannelThrottled;
this.host = handler.host;
this.receiveSynchronously = true;
this.receiveWithTransaction = true;
this.duplexBinder = handler.duplexBinder;
this.hasSession = handler.hasSession;
this.isConcurrent = handler.isConcurrent;
this.receiver = handler.receiver;
this.sharedTransactedBatchContext = context.Shared;
this.transactedBatchContext = context;
this.requestInfo = new RequestInfo(this);
this.sendAsynchronously = handler.sendAsynchronously;
this.sessionOpenNotification = handler.sessionOpenNotification;
this.needToCreateSessionOpenNotificationMessage = handler.needToCreateSessionOpenNotificationMessage;
this.shouldRejectMessageWithOnOpenActionHeader = handler.shouldRejectMessageWithOnOpenActionHeader;
}
internal IChannelBinder Binder
{
get { return this.binder; }
}
internal ServiceChannel Channel
{
get { return this.channel; }
}
internal bool HasRegisterBeenCalled
{
get { return this.hasRegisterBeenCalled; }
}
internal InstanceContext InstanceContext
{
get { return (this.channel != null) ? this.channel.InstanceContext : null; }
}
internal ServiceThrottle InstanceContextServiceThrottle
{
get
{
return this.instanceContextThrottle;
}
set
{
this.instanceContextThrottle = value;
}
}
bool IsOpen
{
get { return this.binder.Channel.State == CommunicationState.Opened; }
}
EndpointAddress LocalAddress
{
get
{
if (this.binder != null)
{
IInputChannel input = this.binder.Channel as IInputChannel;
if (input != null)
{
return input.LocalAddress;
}
IReplyChannel reply = this.binder.Channel as IReplyChannel;
if (reply != null)
{
return reply.LocalAddress;
}
}
return null;
}
}
object ThisLock
{
get { return this; }
}
EventTraceActivity EventTraceActivity
{
get
{
if (this.eventTraceActivity == null)
{
this.eventTraceActivity = new EventTraceActivity();
}
return this.eventTraceActivity;
}
}
internal static void Register(ChannelHandler handler)
{
handler.Register();
}
internal static void Register(ChannelHandler handler, RequestContext request)
{
BufferedReceiveBinder bufferedBinder = handler.Binder as BufferedReceiveBinder;
Fx.Assert(bufferedBinder != null, "ChannelHandler.Binder is not a BufferedReceiveBinder");
bufferedBinder.InjectRequest(request);
handler.Register();
}
void Register()
{
this.hasRegisterBeenCalled = true;
if (this.binder.Channel.State == CommunicationState.Created)
{
ActionItem.Schedule(openAndEnsurePump, this);
}
else
{
this.EnsurePump();
}
}
void AsyncMessagePump()
{
IAsyncResult result = this.BeginTryReceive();
if ((result != null) && result.CompletedSynchronously)
{
this.AsyncMessagePump(result);
}
}
void AsyncMessagePump(IAsyncResult result)
{
if (TD.ChannelReceiveStopIsEnabled())
{
TD.ChannelReceiveStop(this.EventTraceActivity, this.GetHashCode());
}
for (;;)
{
RequestContext request;
while (!this.EndTryReceive(result, out request))
{
result = this.BeginTryReceive();
if ((result == null) || !result.CompletedSynchronously)
{
return;
}
}
if (!HandleRequest(request, null))
{
break;
}
if (!TryAcquirePump())
{
break;
}
result = this.BeginTryReceive();
if (result == null || !result.CompletedSynchronously)
{
break;
}
}
}
IAsyncResult BeginTryReceive()
{
this.requestInfo.Cleanup();
if (TD.ChannelReceiveStartIsEnabled())
{
TD.ChannelReceiveStart(this.EventTraceActivity, this.GetHashCode());
}
this.shouldRejectMessageWithOnOpenActionHeader = !this.needToCreateSessionOpenNotificationMessage;
if (this.needToCreateSessionOpenNotificationMessage)
{
return new CompletedAsyncResult(ChannelHandler.onAsyncReceiveComplete, this);
}
return this.receiver.BeginTryReceive(TimeSpan.MaxValue, ChannelHandler.onAsyncReceiveComplete, this);
}
bool DispatchAndReleasePump(RequestContext request, bool cleanThread, OperationContext currentOperationContext)
{
ServiceChannel channel = this.requestInfo.Channel;
EndpointDispatcher endpoint = this.requestInfo.Endpoint;
bool releasedPump = false;
try
{
DispatchRuntime dispatchBehavior = this.requestInfo.DispatchRuntime;
if (channel == null || dispatchBehavior == null)
{
Fx.Assert("System.ServiceModel.Dispatcher.ChannelHandler.Dispatch(): (channel == null || dispatchBehavior == null)");
return true;
}
MessageBuffer buffer = null;
Message message;
EventTraceActivity eventTraceActivity = TraceDispatchMessageStart(request.RequestMessage);
AspNetEnvironment.Current.PrepareMessageForDispatch(request.RequestMessage);
if (dispatchBehavior.PreserveMessage)
{
object previousBuffer = null;
if (request.RequestMessage.Properties.TryGetValue(MessageBufferPropertyName, out previousBuffer))
{
buffer = (MessageBuffer)previousBuffer;
message = buffer.CreateMessage();
}
else
{
//
buffer = request.RequestMessage.CreateBufferedCopy(int.MaxValue);
message = buffer.CreateMessage();
}
}
else
{
message = request.RequestMessage;
}
DispatchOperationRuntime operation = dispatchBehavior.GetOperation(ref message);
if (operation == null)
{
Fx.Assert("ChannelHandler.Dispatch (operation == null)");
throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(String.Format(CultureInfo.InvariantCulture, "No DispatchOperationRuntime found to process message.")));
}
if (this.shouldRejectMessageWithOnOpenActionHeader && message.Headers.Action == OperationDescription.SessionOpenedAction)
{
throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.SFxNoEndpointMatchingAddressForConnectionOpeningMessage, message.Headers.Action, "Open")));
}
if (MessageLogger.LoggingEnabled)
{
MessageLogger.LogMessage(ref message, (operation.IsOneWay ? MessageLoggingSource.ServiceLevelReceiveDatagram : MessageLoggingSource.ServiceLevelReceiveRequest) | MessageLoggingSource.LastChance);
}
if (operation.IsTerminating && this.hasSession)
{
this.isChannelTerminated = true;
}
bool hasOperationContextBeenSet;
if (currentOperationContext != null)
{
hasOperationContextBeenSet = true;
currentOperationContext.ReInit(request, message, channel);
}
else
{
hasOperationContextBeenSet = false;
currentOperationContext = new OperationContext(request, message, channel, this.host);
}
if (dispatchBehavior.PreserveMessage)
{
currentOperationContext.IncomingMessageProperties.Add(MessageBufferPropertyName, buffer);
}
if (currentOperationContext.EndpointDispatcher == null && this.listener != null)
{
currentOperationContext.EndpointDispatcher = endpoint;
}
MessageRpc rpc = new MessageRpc(request, message, operation, channel, this.host,
this, cleanThread, currentOperationContext, this.requestInfo.ExistingInstanceContext, eventTraceActivity);
TraceUtility.MessageFlowAtMessageReceived(message, currentOperationContext, eventTraceActivity, true);
rpc.TransactedBatchContext = this.transactedBatchContext;
// passing responsibility for call throttle to MessageRpc
// (MessageRpc implicitly owns this throttle once it's created)
this.requestInfo.ChannelHandlerOwnsCallThrottle = false;
// explicitly passing responsibility for instance throttle to MessageRpc
rpc.MessageRpcOwnsInstanceContextThrottle = this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle;
this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle = false;
// These need to happen before Dispatch but after accessing any ChannelHandler
// state, because we go multi-threaded after this until we reacquire pump mutex.
this.ReleasePump();
releasedPump = true;
return operation.Parent.Dispatch(ref rpc, hasOperationContextBeenSet);
}
catch (Exception e)
{
if (Fx.IsFatal(e))
{
throw;
}
return this.HandleError(e, request, channel);
}
finally
{
if (!releasedPump)
{
this.ReleasePump();
}
}
}
internal void DispatchDone()
{
if (this.throttle != null)
{
this.throttle.DeactivateCall();
}
}
RequestContext GetSessionOpenNotificationRequestContext()
{
Fx.Assert(this.sessionOpenNotification != null, "this.sessionOpenNotification should not be null.");
Message message = Message.CreateMessage(this.Binder.Channel.GetProperty<MessageVersion>(), OperationDescription.SessionOpenedAction);
Fx.Assert(this.LocalAddress != null, "this.LocalAddress should not be null.");
message.Headers.To = this.LocalAddress.Uri;
this.sessionOpenNotification.UpdateMessageProperties(message.Properties);
return this.Binder.CreateRequestContext(message);
}
bool EndTryReceive(IAsyncResult result, out RequestContext requestContext)
{
bool valid;
if (this.needToCreateSessionOpenNotificationMessage)
{
this.needToCreateSessionOpenNotificationMessage = false;
Fx.Assert(result is CompletedAsyncResult, "result must be CompletedAsyncResult");
CompletedAsyncResult.End(result);
requestContext = this.GetSessionOpenNotificationRequestContext();
valid = true;
}
else
{
valid = this.receiver.EndTryReceive(result, out requestContext);
}
if (valid)
{
this.HandleReceiveComplete(requestContext);
}
return valid;
}
void EnsureChannelAndEndpoint(RequestContext request)
{
this.requestInfo.Channel = this.channel;
if (this.requestInfo.Channel == null)
{
bool addressMatched;
if (this.hasSession)
{
this.requestInfo.Channel = this.GetSessionChannel(request.RequestMessage, out this.requestInfo.Endpoint, out addressMatched);
}
else
{
this.requestInfo.Channel = this.GetDatagramChannel(request.RequestMessage, out this.requestInfo.Endpoint, out addressMatched);
}
if (this.requestInfo.Channel == null)
{
this.host.RaiseUnknownMessageReceived(request.RequestMessage);
if (addressMatched)
{
this.ReplyContractFilterDidNotMatch(request);
}
else
{
this.ReplyAddressFilterDidNotMatch(request);
}
}
}
else
{
this.requestInfo.Endpoint = this.requestInfo.Channel.EndpointDispatcher;
//For sessionful contracts, the InstanceContext throttle is not copied over to the channel
//as we create the channel before acquiring the lock
if (this.InstanceContextServiceThrottle != null && this.requestInfo.Channel.InstanceContextServiceThrottle == null)
{
this.requestInfo.Channel.InstanceContextServiceThrottle = this.InstanceContextServiceThrottle;
}
}
this.requestInfo.EndpointLookupDone = true;
if (this.requestInfo.Channel == null)
{
// SFx drops a message here
TraceUtility.TraceDroppedMessage(request.RequestMessage, this.requestInfo.Endpoint);
request.Close();
return;
}
if (this.requestInfo.Channel.HasSession || this.isCallback)
{
this.requestInfo.DispatchRuntime = this.requestInfo.Channel.DispatchRuntime;
}
else
{
this.requestInfo.DispatchRuntime = this.requestInfo.Endpoint.DispatchRuntime;
}
}
void EnsurePump()
{
if (null == this.sharedTransactedBatchContext || this.isMainTransactedBatchHandler)
{
if (TryAcquirePump())
{
if (this.receiveSynchronously)
{
ActionItem.Schedule(ChannelHandler.onStartSyncMessagePump, this);
}
else
{
if (Thread.CurrentThread.IsThreadPoolThread)
{
IAsyncResult result = this.BeginTryReceive();
if ((result != null) && result.CompletedSynchronously)
{
ActionItem.Schedule(ChannelHandler.onContinueAsyncReceive, result);
}
}
else
{
// Since this is not a threadpool thread, we don't know if this thread will exit
// while the IO is still pending (which would cancel the IO), so we have to get
// over to a threadpool thread which we know will not exit while there is pending IO.
ActionItem.Schedule(ChannelHandler.onStartAsyncMessagePump, this);
}
}
}
}
else
{
ActionItem.Schedule(ChannelHandler.onStartSingleTransactedBatch, this);
}
}
ServiceChannel GetDatagramChannel(Message message, out EndpointDispatcher endpoint, out bool addressMatched)
{
addressMatched = false;
endpoint = this.GetEndpointDispatcher(message, out addressMatched);
if (endpoint == null)
{
return null;
}
if (endpoint.DatagramChannel == null)
{
lock (this.listener.ThisLock)
{
if (endpoint.DatagramChannel == null)
{
endpoint.DatagramChannel = new ServiceChannel(this.binder, endpoint, this.listener.ChannelDispatcher, this.idleManager);
this.InitializeServiceChannel(endpoint.DatagramChannel);
}
}
}
return endpoint.DatagramChannel;
}
EndpointDispatcher GetEndpointDispatcher(Message message, out bool addressMatched)
{
return this.listener.Endpoints.Lookup(message, out addressMatched);
}
ServiceChannel GetSessionChannel(Message message, out EndpointDispatcher endpoint, out bool addressMatched)
{
addressMatched = false;
if (this.channel == null)
{
lock (this.ThisLock)
{
if (this.channel == null)
{
endpoint = this.GetEndpointDispatcher(message, out addressMatched);
if (endpoint != null)
{
this.channel = new ServiceChannel(this.binder, endpoint, this.listener.ChannelDispatcher, this.idleManager);
this.InitializeServiceChannel(this.channel);
}
}
}
}
if (this.channel == null)
{
endpoint = null;
}
else
{
endpoint = this.channel.EndpointDispatcher;
}
return this.channel;
}
void InitializeServiceChannel(ServiceChannel channel)
{
if (this.wasChannelThrottled)
{
// TFS#500703, when the idle timeout was hit, the constructor of ServiceChannel will abort itself directly. So
// the session throttle will not be released and thus lead to a service unavailablity.
// Note that if the channel is already aborted, the next line "channel.ServiceThrottle = this.throttle;" will throw an exception,
// so we are not going to do any more work inside this method.
// Ideally we should do a thorough refactoring work for this throttling issue. However, it's too risky as a QFE. We should consider
// this in a whole release.
// Note that the "wasChannelThrottled" boolean will only be true if we aquired the session throttle. So we don't have to check HasSession
// again here.
if (channel.Aborted && this.throttle != null)
{
// This line will release the "session" throttle.
this.throttle.DeactivateChannel();
}
channel.ServiceThrottle = this.throttle;
}
if (this.InstanceContextServiceThrottle != null)
{
channel.InstanceContextServiceThrottle = this.InstanceContextServiceThrottle;
}
ClientRuntime clientRuntime = channel.ClientRuntime;
if (clientRuntime != null)
{
Type contractType = clientRuntime.ContractClientType;
Type callbackType = clientRuntime.CallbackClientType;
if (contractType != null)
{
channel.Proxy = ServiceChannelFactory.CreateProxy(contractType, callbackType, MessageDirection.Output, channel);
}
}
if (this.listener != null)
{
this.listener.ChannelDispatcher.InitializeChannel((IClientChannel)channel.Proxy);
}
((IChannel)channel).Open();
}
void ProvideFault(Exception e, ref ErrorHandlerFaultInfo faultInfo)
{
if (this.listener != null)
{
this.listener.ChannelDispatcher.ProvideFault(e, this.requestInfo.Channel == null ? this.binder.Channel.GetProperty<FaultConverter>() : this.requestInfo.Channel.GetProperty<FaultConverter>(), ref faultInfo);
}
else if (this.channel != null)
{
DispatchRuntime dispatchBehavior = this.channel.ClientRuntime.CallbackDispatchRuntime;
dispatchBehavior.ChannelDispatcher.ProvideFault(e, this.channel.GetProperty<FaultConverter>(), ref faultInfo);
}
}
internal bool HandleError(Exception e)
{
ErrorHandlerFaultInfo dummy = new ErrorHandlerFaultInfo();
return this.HandleError(e, ref dummy);
}
bool HandleError(Exception e, ref ErrorHandlerFaultInfo faultInfo)
{
if (e == null)
{
Fx.Assert(SR.GetString(SR.GetString(SR.SFxNonExceptionThrown)));
throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.GetString(SR.SFxNonExceptionThrown))));
}
if (this.listener != null)
{
return listener.ChannelDispatcher.HandleError(e, ref faultInfo);
}
else if (this.channel != null)
{
return this.channel.ClientRuntime.CallbackDispatchRuntime.ChannelDispatcher.HandleError(e, ref faultInfo);
}
else
{
return false;
}
}
bool HandleError(Exception e, RequestContext request, ServiceChannel channel)
{
ErrorHandlerFaultInfo faultInfo = new ErrorHandlerFaultInfo(this.messageVersion.Addressing.DefaultFaultAction);
bool replied, replySentAsync;
ProvideFaultAndReplyFailure(request, e, ref faultInfo, out replied, out replySentAsync);
if (!replySentAsync)
{
return this.HandleErrorContinuation(e, request, channel, ref faultInfo, replied);
}
else
{
return false;
}
}
bool HandleErrorContinuation(Exception e, RequestContext request, ServiceChannel channel, ref ErrorHandlerFaultInfo faultInfo, bool replied)
{
if (replied)
{
try
{
request.Close();
}
catch (Exception e1)
{
if (Fx.IsFatal(e1))
{
throw;
}
this.HandleError(e1);
}
}
else
{
request.Abort();
}
if (!this.HandleError(e, ref faultInfo) && this.hasSession)
{
if (channel != null)
{
if (replied)
{
TimeoutHelper timeoutHelper = new TimeoutHelper(CloseAfterFaultTimeout);
try
{
channel.Close(timeoutHelper.RemainingTime());
}
catch (Exception e2)
{
if (Fx.IsFatal(e2))
{
throw;
}
this.HandleError(e2);
}
try
{
this.binder.CloseAfterFault(timeoutHelper.RemainingTime());
}
catch (Exception e3)
{
if (Fx.IsFatal(e3))
{
throw;
}
this.HandleError(e3);
}
}
else
{
channel.Abort();
this.binder.Abort();
}
}
else
{
if (replied)
{
try
{
this.binder.CloseAfterFault(CloseAfterFaultTimeout);
}
catch (Exception e4)
{
if (Fx.IsFatal(e4))
{
throw;
}
this.HandleError(e4);
}
}
else
{
this.binder.Abort();
}
}
}
return true;
}
void HandleReceiveComplete(RequestContext context)
{
try
{
if (this.channel != null)
{
this.channel.HandleReceiveComplete(context);
}
else
{
if (context == null && this.hasSession)
{
bool close;
lock (this.ThisLock)
{
close = !this.doneReceiving;
this.doneReceiving = true;
}
if (close)
{
this.receiver.Close();
if (this.idleManager != null)
{
this.idleManager.CancelTimer();
}
ServiceThrottle throttle = this.throttle;
if (throttle != null)
{
throttle.DeactivateChannel();
}
}
}
}
}
finally
{
if ((context == null) && this.incrementedActivityCountInConstructor)
{
this.listener.ChannelDispatcher.Channels.DecrementActivityCount();
}
}
}
bool HandleRequest(RequestContext request, OperationContext currentOperationContext)
{
if (request == null)
{
// channel EOF, stop receiving
return false;
}
ServiceModelActivity activity = DiagnosticUtility.ShouldUseActivity ? TraceUtility.ExtractActivity(request) : null;
using (ServiceModelActivity.BoundOperation(activity))
{
if (this.HandleRequestAsReply(request))
{
this.ReleasePump();
return true;
}
if (this.isChannelTerminated)
{
this.ReleasePump();
this.ReplyChannelTerminated(request);
return true;
}
if (this.requestInfo.RequestContext != null)
{
Fx.Assert("ChannelHandler.HandleRequest: this.requestInfo.RequestContext != null");
}
this.requestInfo.RequestContext = request;
if (!this.TryAcquireCallThrottle(request))
{
if (DS.ServiceThrottleIsEnabled())
{
DS.CallThrottleWaiting(request.RequestMessage);
}
// this.ThrottleAcquiredForCall will be called to continue
return false;
}
// NOTE: from here on down, ensure that this code is the same as ThrottleAcquiredForCall (see 55460)
if (this.requestInfo.ChannelHandlerOwnsCallThrottle)
{
Fx.Assert("ChannelHandler.HandleRequest: this.requestInfo.ChannelHandlerOwnsCallThrottle");
}
this.requestInfo.ChannelHandlerOwnsCallThrottle = true;
if (!this.TryRetrievingInstanceContext(request))
{
//Would have replied and close the request.
return true;
}
this.requestInfo.Channel.CompletedIOOperation();
//Only acquire InstanceContext throttle if one doesnt already exist.
if (!this.TryAcquireThrottle(request, (this.requestInfo.ExistingInstanceContext == null)))
{
// this.ThrottleAcquired will be called to continue
return false;
}
if (this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle)
{
Fx.Assert("ChannelHandler.HandleRequest: this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle");
}
this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle = (this.requestInfo.ExistingInstanceContext == null);
if (!this.DispatchAndReleasePump(request, true, currentOperationContext))
{
// this.DispatchDone will be called to continue
return false;
}
}
return true;
}
bool HandleRequestAsReply(RequestContext request)
{
if (this.duplexBinder != null)
{
if (this.duplexBinder.HandleRequestAsReply(request.RequestMessage))
{
return true;
}
}
return false;
}
static void OnStartAsyncMessagePump(object state)
{
((ChannelHandler)state).AsyncMessagePump();
}
static void OnStartSyncMessagePump(object state)
{
ChannelHandler handler = state as ChannelHandler;
if (TD.ChannelReceiveStopIsEnabled())
{
TD.ChannelReceiveStop(handler.EventTraceActivity, state.GetHashCode());
}
if (handler.receiveWithTransaction)
{
handler.SyncTransactionalMessagePump();
}
else
{
handler.SyncMessagePump();
}
}
static void OnStartSingleTransactedBatch(object state)
{
ChannelHandler handler = state as ChannelHandler;
handler.TransactedBatchLoop();
}
static void OnAsyncReceiveComplete(IAsyncResult result)
{
if (!result.CompletedSynchronously)
{
((ChannelHandler)result.AsyncState).AsyncMessagePump(result);
}
}
static void OnContinueAsyncReceive(object state)
{
IAsyncResult result = (IAsyncResult)state;
((ChannelHandler)result.AsyncState).AsyncMessagePump(result);
}
static void OpenAndEnsurePump(object state)
{
((ChannelHandler)state).OpenAndEnsurePump();
}
void OpenAndEnsurePump()
{
Exception exception = null;
try
{
this.binder.Channel.Open();
}
catch (Exception e)
{
if (Fx.IsFatal(e))
{
throw;
}
exception = e;
}
if (exception != null)
{
if (DiagnosticUtility.ShouldTraceWarning)
{
TraceUtility.TraceEvent(System.Diagnostics.TraceEventType.Warning,
TraceCode.FailedToOpenIncomingChannel,
SR.GetString(SR.TraceCodeFailedToOpenIncomingChannel));
}
SessionIdleManager idleManager = this.idleManager;
if (idleManager != null)
{
idleManager.CancelTimer();
}
if ((this.throttle != null) && this.hasSession)
{
this.throttle.DeactivateChannel();
}
bool errorHandled = this.HandleError(exception);
if (this.incrementedActivityCountInConstructor)
{
this.listener.ChannelDispatcher.Channels.DecrementActivityCount();
}
if (!errorHandled)
{
this.binder.Channel.Abort();
}
}
else
{
this.EnsurePump();
}
}
bool TryReceive(TimeSpan timeout, out RequestContext requestContext)
{
this.shouldRejectMessageWithOnOpenActionHeader = !this.needToCreateSessionOpenNotificationMessage;
bool valid;
if (this.needToCreateSessionOpenNotificationMessage)
{
this.needToCreateSessionOpenNotificationMessage = false;
requestContext = this.GetSessionOpenNotificationRequestContext();
valid = true;
}
else
{
valid = this.receiver.TryReceive(timeout, out requestContext);
}
if (valid)
{
this.HandleReceiveComplete(requestContext);
}
return valid;
}
void ReplyAddressFilterDidNotMatch(RequestContext request)
{
FaultCode code = FaultCode.CreateSenderFaultCode(AddressingStrings.DestinationUnreachable,
this.messageVersion.Addressing.Namespace);
string reason = SR.GetString(SR.SFxNoEndpointMatchingAddress, request.RequestMessage.Headers.To);
ReplyFailure(request, code, reason);
}
void ReplyContractFilterDidNotMatch(RequestContext request)
{
// By default, the contract filter is just a filter over the set of initiating actions in
// the contract, so we do error messages accordingly
AddressingVersion addressingVersion = this.messageVersion.Addressing;
if (addressingVersion != AddressingVersion.None && request.RequestMessage.Headers.Action == null)
{
throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
new MessageHeaderException(
SR.GetString(SR.SFxMissingActionHeader, addressingVersion.Namespace), AddressingStrings.Action, addressingVersion.Namespace));
}
else
{
// some of this code is duplicated in DispatchRuntime.UnhandledActionInvoker
// ideally both places would use FaultConverter and ActionNotSupportedException
FaultCode code = FaultCode.CreateSenderFaultCode(AddressingStrings.ActionNotSupported,
this.messageVersion.Addressing.Namespace);
string reason = SR.GetString(SR.SFxNoEndpointMatchingContract, request.RequestMessage.Headers.Action);
ReplyFailure(request, code, reason, this.messageVersion.Addressing.FaultAction);
}
}
void ReplyChannelTerminated(RequestContext request)
{
FaultCode code = FaultCode.CreateSenderFaultCode(FaultCodeConstants.Codes.SessionTerminated,
FaultCodeConstants.Namespaces.NetDispatch);
string reason = SR.GetString(SR.SFxChannelTerminated0);
string action = FaultCodeConstants.Actions.NetDispatcher;
Message fault = Message.CreateMessage(this.messageVersion, code, reason, action);
ReplyFailure(request, fault, action, reason, code);
}
void ReplyFailure(RequestContext request, FaultCode code, string reason)
{
string action = this.messageVersion.Addressing.DefaultFaultAction;
ReplyFailure(request, code, reason, action);
}
void ReplyFailure(RequestContext request, FaultCode code, string reason, string action)
{
Message fault = Message.CreateMessage(this.messageVersion, code, reason, action);
ReplyFailure(request, fault, action, reason, code);
}
void ReplyFailure(RequestContext request, Message fault, string action, string reason, FaultCode code)
{
FaultException exception = new FaultException(reason, code);
ErrorBehavior.ThrowAndCatch(exception);
ErrorHandlerFaultInfo faultInfo = new ErrorHandlerFaultInfo(action);
faultInfo.Fault = fault;
bool replied, replySentAsync;
ProvideFaultAndReplyFailure(request, exception, ref faultInfo, out replied, out replySentAsync);
this.HandleError(exception, ref faultInfo);
}
void ProvideFaultAndReplyFailure(RequestContext request, Exception exception, ref ErrorHandlerFaultInfo faultInfo, out bool replied, out bool replySentAsync)
{
replied = false;
replySentAsync = false;
bool requestMessageIsFault = false;
try
{
requestMessageIsFault = request.RequestMessage.IsFault;
}
#pragma warning suppress 56500 // covered by FxCOP
catch (Exception e)
{
if (Fx.IsFatal(e))
{
throw;
}
// ---- it
}
bool enableFaults = false;
if (this.listener != null)
{
enableFaults = this.listener.ChannelDispatcher.EnableFaults;
}
else if (this.channel != null && this.channel.IsClient)
{
enableFaults = this.channel.ClientRuntime.EnableFaults;
}
if ((!requestMessageIsFault) && enableFaults)
{
this.ProvideFault(exception, ref faultInfo);
if (faultInfo.Fault != null)
{
Message reply = faultInfo.Fault;
try
{
try
{
if (this.PrepareReply(request, reply))
{
if (this.sendAsynchronously)
{
var state = new ContinuationState { ChannelHandler = this, Channel = channel, Exception = exception, FaultInfo = faultInfo, Request = request, Reply = reply };
var result = request.BeginReply(reply, ChannelHandler.onAsyncReplyComplete, state);
if (result.CompletedSynchronously)
{
ChannelHandler.AsyncReplyComplete(result, state);
replied = true;
}
else
{
replySentAsync = true;
}
}
else
{
request.Reply(reply);
replied = true;
}
}
}
finally
{
if (!replySentAsync)
{
reply.Close();
}
}
}
#pragma warning suppress 56500 // covered by FxCOP
catch (Exception e)
{
if (Fx.IsFatal(e))
{
throw;
}
this.HandleError(e);
}
}
}
}
/// <summary>
/// Prepares a reply that can either be sent asynchronously or synchronously depending on the value of
/// sendAsynchronously
/// </summary>
/// <param name="request">The request context to prepare</param>
/// <param name="reply">The reply to prepare</param>
/// <returns>True if channel is open and prepared reply should be sent; otherwise false.</returns>
bool PrepareReply(RequestContext request, Message reply)
{
// Ensure we only reply once (we may hit the same error multiple times)
if (this.replied == request)
{
return false;
}
this.replied = request;
bool canSendReply = true;
Message requestMessage = null;
try
{
requestMessage = request.RequestMessage;
}
#pragma warning suppress 56500 // covered by FxCOP
catch (Exception e)
{
if (Fx.IsFatal(e))
{
throw;
}
// ---- it
}
if (!object.ReferenceEquals(requestMessage, null))
{
UniqueId requestID = null;
try
{
requestID = requestMessage.Headers.MessageId;
}
catch (MessageHeaderException)
{
// ---- it - we don't need to correlate the reply if the MessageId header is bad
}
if (!object.ReferenceEquals(requestID, null) && !this.isManualAddressing)
{
System.ServiceModel.Channels.RequestReplyCorrelator.PrepareReply(reply, requestID);
}
if (!this.hasSession && !this.isManualAddressing)
{
try
{
canSendReply = System.ServiceModel.Channels.RequestReplyCorrelator.AddressReply(reply, requestMessage);
}
catch (MessageHeaderException)
{
// ---- it - we don't need to address the reply if the FaultTo header is bad
}
}
}
// ObjectDisposeException can happen
// if the channel is closed in a different
// thread. 99% this check will avoid false
// exceptions.
return this.IsOpen && canSendReply;
}
static void AsyncReplyComplete(IAsyncResult result, ContinuationState state)
{
try
{
state.Request.EndReply(result);
}
catch (Exception e)
{
DiagnosticUtility.TraceHandledException(e, System.Diagnostics.TraceEventType.Error);
if (Fx.IsFatal(e))
{
throw;
}
state.ChannelHandler.HandleError(e);
}
try
{
state.Reply.Close();
}
catch (Exception e)
{
DiagnosticUtility.TraceHandledException(e, System.Diagnostics.TraceEventType.Error);
if (Fx.IsFatal(e))
{
throw;
}
state.ChannelHandler.HandleError(e);
}
try
{
state.ChannelHandler.HandleErrorContinuation(state.Exception, state.Request, state.Channel, ref state.FaultInfo, true);
}
catch (Exception e)
{
DiagnosticUtility.TraceHandledException(e, System.Diagnostics.TraceEventType.Error);
if (Fx.IsFatal(e))
{
throw;
}
state.ChannelHandler.HandleError(e);
}
state.ChannelHandler.EnsurePump();
}
static void OnAsyncReplyComplete(IAsyncResult result)
{
if (result.CompletedSynchronously)
{
return;
}
try
{
var state = (ContinuationState)result.AsyncState;
ChannelHandler.AsyncReplyComplete(result, state);
}
catch (Exception e)
{
DiagnosticUtility.TraceHandledException(e, System.Diagnostics.TraceEventType.Error);
if (Fx.IsFatal(e))
{
throw;
}
}
}
void ReleasePump()
{
if (this.isConcurrent)
{
Interlocked.Exchange(ref this.isPumpAcquired, 0);
}
}
void SyncMessagePump()
{
OperationContext existingOperationContext = OperationContext.Current;
try
{
OperationContext currentOperationContext = new OperationContext(this.host);
OperationContext.Current = currentOperationContext;
for (;;)
{
RequestContext request;
this.requestInfo.Cleanup();
while (!TryReceive(TimeSpan.MaxValue, out request))
{
}
if (!HandleRequest(request, currentOperationContext))
{
break;
}
if (!TryAcquirePump())
{
break;
}
currentOperationContext.Recycle();
}
}
finally
{
OperationContext.Current = existingOperationContext;
}
}
[MethodImpl(MethodImplOptions.NoInlining)]
void SyncTransactionalMessagePump()
{
for (;;)
{
bool completedSynchronously;
if (null == sharedTransactedBatchContext)
{
completedSynchronously = TransactedLoop();
}
else
{
completedSynchronously = TransactedBatchLoop();
}
if (!completedSynchronously)
{
return;
}
}
}
bool TransactedLoop()
{
try
{
this.receiver.WaitForMessage();
}
catch (Exception ex)
{
if (Fx.IsFatal(ex))
{
throw;
}
if (!this.HandleError(ex))
{
throw;
}
}
RequestContext request;
Transaction tx = CreateOrGetAttachedTransaction();
OperationContext existingOperationContext = OperationContext.Current;
try
{
OperationContext currentOperationContext = new OperationContext(this.host);
OperationContext.Current = currentOperationContext;
for (;;)
{
this.requestInfo.Cleanup();
bool received = TryTransactionalReceive(tx, out request);
if (!received)
{
return IsOpen;
}
if (null == request)
{
return false;
}
TransactionMessageProperty.Set(tx, request.RequestMessage);
if (!HandleRequest(request, currentOperationContext))
{
return false;
}
if (!TryAcquirePump())
{
return false;
}
tx = CreateOrGetAttachedTransaction();
currentOperationContext.Recycle();
}
}
finally
{
OperationContext.Current = existingOperationContext;
}
}
bool TransactedBatchLoop()
{
if (null != this.transactedBatchContext)
{
if (this.transactedBatchContext.InDispatch)
{
this.transactedBatchContext.ForceRollback();
this.transactedBatchContext.InDispatch = false;
}
if (!this.transactedBatchContext.IsActive)
{
if (!this.isMainTransactedBatchHandler)
{
return false;
}
this.transactedBatchContext = null;
}
}
if (null == this.transactedBatchContext)
{
try
{
this.receiver.WaitForMessage();
}
catch (Exception ex)
{
if (Fx.IsFatal(ex))
{
throw;
}
if (!this.HandleError(ex))
{
throw;
}
}
this.transactedBatchContext = this.sharedTransactedBatchContext.CreateTransactedBatchContext();
}
OperationContext existingOperationContext = OperationContext.Current;
try
{
OperationContext currentOperationContext = new OperationContext(this.host);
OperationContext.Current = currentOperationContext;
RequestContext request;
while (this.transactedBatchContext.IsActive)
{
this.requestInfo.Cleanup();
bool valid = TryTransactionalReceive(this.transactedBatchContext.Transaction, out request);
if (!valid)
{
if (this.IsOpen)
{
this.transactedBatchContext.ForceCommit();
return true;
}
else
{
this.transactedBatchContext.ForceRollback();
return false;
}
}
if (null == request)
{
this.transactedBatchContext.ForceRollback();
return false;
}
TransactionMessageProperty.Set(this.transactedBatchContext.Transaction, request.RequestMessage);
this.transactedBatchContext.InDispatch = true;
if (!HandleRequest(request, currentOperationContext))
{
return false;
}
if (this.transactedBatchContext.InDispatch)
{
this.transactedBatchContext.ForceRollback();
this.transactedBatchContext.InDispatch = false;
return true;
}
if (!TryAcquirePump())
{
Fx.Assert("System.ServiceModel.Dispatcher.ChannelHandler.TransactedBatchLoop(): (TryAcquiredPump returned false)");
return false;
}
currentOperationContext.Recycle();
}
}
finally
{
OperationContext.Current = existingOperationContext;
}
return true;
}
Transaction CreateOrGetAttachedTransaction()
{
if (null != this.acceptTransaction)
{
lock (ThisLock)
{
if (null != this.acceptTransaction)
{
Transaction tx = this.acceptTransaction.Transaction;
this.acceptTransaction = null;
return tx;
}
}
}
if (null != this.InstanceContext && this.InstanceContext.HasTransaction)
{
return InstanceContext.Transaction.Attached;
}
else
{
return TransactionBehavior.CreateTransaction(
this.listener.ChannelDispatcher.TransactionIsolationLevel,
TransactionBehavior.NormalizeTimeout(this.listener.ChannelDispatcher.TransactionTimeout));
}
}
// calls receive on the channel; returns false if no message during the "short timeout"
bool TryTransactionalReceive(Transaction tx, out RequestContext request)
{
request = null;
bool received = false;
try
{
using (TransactionScope scope = new TransactionScope(tx))
{
if (null != this.sharedTransactedBatchContext)
{
lock (this.sharedTransactedBatchContext.ReceiveLock)
{
if (this.transactedBatchContext.AboutToExpire)
{
return false;
}
received = this.receiver.TryReceive(TimeSpan.Zero, out request);
}
}
else
{
TimeSpan receiveTimeout = TimeoutHelper.Min(this.listener.ChannelDispatcher.TransactionTimeout, this.listener.ChannelDispatcher.DefaultCommunicationTimeouts.ReceiveTimeout);
received = this.receiver.TryReceive(TransactionBehavior.NormalizeTimeout(receiveTimeout), out request);
}
scope.Complete();
}
if (received)
{
this.HandleReceiveComplete(request);
}
}
catch (ObjectDisposedException ex) // thrown from the transaction
{
this.HandleError(ex);
request = null;
return false;
}
catch (TransactionException ex)
{
this.HandleError(ex);
request = null;
return false;
}
catch (Exception ex)
{
if (Fx.IsFatal(ex))
{
throw;
}
if (!this.HandleError(ex))
{
throw;
}
}
return received;
}
// This callback always occurs async and always on a dirty thread
internal void ThrottleAcquiredForCall()
{
RequestContext request = this.requestWaitingForThrottle;
if (DS.ServiceThrottleIsEnabled())
{
DS.CallThrottleAcquired(request.RequestMessage);
}
this.requestWaitingForThrottle = null;
if (this.requestInfo.ChannelHandlerOwnsCallThrottle)
{
Fx.Assert("ChannelHandler.ThrottleAcquiredForCall: this.requestInfo.ChannelHandlerOwnsCallThrottle");
}
this.requestInfo.ChannelHandlerOwnsCallThrottle = true;
if (!this.TryRetrievingInstanceContext(request))
{
//Should reply/close request and also close the pump
this.EnsurePump();
return;
}
this.requestInfo.Channel.CompletedIOOperation();
if (this.TryAcquireThrottle(request, (this.requestInfo.ExistingInstanceContext == null)))
{
if (this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle)
{
Fx.Assert("ChannelHandler.ThrottleAcquiredForCall: this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle");
}
this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle = (this.requestInfo.ExistingInstanceContext == null);
if (this.DispatchAndReleasePump(request, false, null))
{
this.EnsurePump();
}
}
}
bool TryRetrievingInstanceContext(RequestContext request)
{
try
{
return TryRetrievingInstanceContextCore(request);
}
catch (Exception ex)
{
if (Fx.IsFatal(ex))
{
throw;
}
DiagnosticUtility.TraceHandledException(ex, TraceEventType.Error);
try
{
request.Close();
}
catch (Exception e)
{
if (Fx.IsFatal(e))
{
throw;
}
request.Abort();
}
return false;
}
}
//Return: False denotes failure, Caller should discard the request.
// : True denotes operation is sucessful.
bool TryRetrievingInstanceContextCore(RequestContext request)
{
bool releasePump = true;
try
{
if (!this.requestInfo.EndpointLookupDone)
{
this.EnsureChannelAndEndpoint(request);
}
if (this.requestInfo.Channel == null)
{
return false;
}
if (this.requestInfo.DispatchRuntime != null)
{
IContextChannel transparentProxy = this.requestInfo.Channel.Proxy as IContextChannel;
try
{
this.requestInfo.ExistingInstanceContext = this.requestInfo.DispatchRuntime.InstanceContextProvider.GetExistingInstanceContext(request.RequestMessage, transparentProxy);
releasePump = false;
}
catch (Exception e)
{
if (Fx.IsFatal(e))
{
throw;
}
this.requestInfo.Channel = null;
this.HandleError(e, request, channel);
return false;
}
}
else
{
// This can happen if we are pumping for an async client,
// and we receive a bogus reply. In that case, there is no
// DispatchRuntime, because we are only expecting replies.
//
// One possible fix for this would be in DuplexChannelBinder
// to drop all messages with a RelatesTo that do not match a
// pending request.
//
// However, that would not fix:
// (a) we could get a valid request message with a
// RelatesTo that we should try to process.
// (b) we could get a reply message that does not have
// a RelatesTo.
//
// So we do the null check here.
//
// SFx drops a message here
TraceUtility.TraceDroppedMessage(request.RequestMessage, this.requestInfo.Endpoint);
request.Close();
return false;
}
}
catch (Exception e)
{
if (Fx.IsFatal(e))
{
throw;
}
this.HandleError(e, request, channel);
return false;
}
finally
{
if (releasePump)
{
this.ReleasePump();
}
}
return true;
}
// This callback always occurs async and always on a dirty thread
internal void ThrottleAcquired()
{
RequestContext request = this.requestWaitingForThrottle;
if (DS.ServiceThrottleIsEnabled())
{
DS.InstanceThrottleAcquired(request.RequestMessage);
}
this.requestWaitingForThrottle = null;
if (this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle)
{
Fx.Assert("ChannelHandler.ThrottleAcquired: this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle");
}
this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle = (this.requestInfo.ExistingInstanceContext == null);
if (this.DispatchAndReleasePump(request, false, null))
{
this.EnsurePump();
}
}
bool TryAcquireThrottle(RequestContext request, bool acquireInstanceContextThrottle)
{
ServiceThrottle throttle = this.throttle;
if ((throttle != null) && (throttle.IsActive))
{
this.requestWaitingForThrottle = request;
if (throttle.AcquireInstanceContextAndDynamic(this, acquireInstanceContextThrottle))
{
this.requestWaitingForThrottle = null;
return true;
}
else
{
if (DS.ServiceThrottleIsEnabled())
{
DS.InstanceThrottleWaiting(request.RequestMessage);
}
return false;
}
}
else
{
return true;
}
}
bool TryAcquireCallThrottle(RequestContext request)
{
ServiceThrottle throttle = this.throttle;
if ((throttle != null) && (throttle.IsActive))
{
this.requestWaitingForThrottle = request;
if (throttle.AcquireCall(this))
{
this.requestWaitingForThrottle = null;
return true;
}
else
{
return false;
}
}
else
{
return true;
}
}
bool TryAcquirePump()
{
if (this.isConcurrent)
{
return Interlocked.CompareExchange(ref this.isPumpAcquired, 1, 0) == 0;
}
return true;
}
struct RequestInfo
{
public EndpointDispatcher Endpoint;
public InstanceContext ExistingInstanceContext;
public ServiceChannel Channel;
public bool EndpointLookupDone;
public DispatchRuntime DispatchRuntime;
public RequestContext RequestContext;
public ChannelHandler ChannelHandler;
public bool ChannelHandlerOwnsCallThrottle; // if true, we are responsible for call throttle
public bool ChannelHandlerOwnsInstanceContextThrottle; // if true, we are responsible for instance/dynamic throttle
public RequestInfo(ChannelHandler channelHandler)
{
this.Endpoint = null;
this.ExistingInstanceContext = null;
this.Channel = null;
this.EndpointLookupDone = false;
this.DispatchRuntime = null;
this.RequestContext = null;
this.ChannelHandler = channelHandler;
this.ChannelHandlerOwnsCallThrottle = false;
this.ChannelHandlerOwnsInstanceContextThrottle = false;
}
public void Cleanup()
{
if (this.ChannelHandlerOwnsInstanceContextThrottle)
{
this.ChannelHandler.throttle.DeactivateInstanceContext();
this.ChannelHandlerOwnsInstanceContextThrottle = false;
}
this.Endpoint = null;
this.ExistingInstanceContext = null;
this.Channel = null;
this.EndpointLookupDone = false;
this.RequestContext = null;
if (this.ChannelHandlerOwnsCallThrottle)
{
this.ChannelHandler.DispatchDone();
this.ChannelHandlerOwnsCallThrottle = false;
}
}
}
EventTraceActivity TraceDispatchMessageStart(Message message)
{
if (FxTrace.Trace.IsEnd2EndActivityTracingEnabled && message != null)
{
EventTraceActivity eventTraceActivity = EventTraceActivityHelper.TryExtractActivity(message);
if (TD.DispatchMessageStartIsEnabled())
{
TD.DispatchMessageStart(eventTraceActivity);
}
return eventTraceActivity;
}
return null;
}
/// <summary>
/// Data structure used to carry state for asynchronous replies
/// </summary>
struct ContinuationState
{
public ChannelHandler ChannelHandler;
public Exception Exception;
public RequestContext Request;
public Message Reply;
public ServiceChannel Channel;
public ErrorHandlerFaultInfo FaultInfo;
}
}
}
|