File: System\ServiceModel\Activities\InternalSendMessage.cs
Project: ndp\cdf\src\NetFx40\System.ServiceModel.Activities\System.ServiceModel.Activities.csproj (System.ServiceModel.Activities)
//-----------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------------------------
 
namespace System.ServiceModel.Activities
{
    using System;
    using System.Activities;
    using System.Activities.Statements;
    using System.Collections.Generic;
    using System.Collections.ObjectModel;
    using System.Diagnostics;
    using System.Diagnostics.CodeAnalysis;
    using System.Linq;
    using System.Runtime;
    using System.Runtime.Collections;
    using System.Runtime.Diagnostics;
    using System.Security.Principal;
    using System.ServiceModel;
    using System.Runtime.Serialization;
    using System.ServiceModel.Activities.Description;
    using System.ServiceModel.Activities.Dispatcher;
    using System.ServiceModel.Activities.Tracking;
    using System.ServiceModel.Channels;
    using System.ServiceModel.Description;
    using System.ServiceModel.Diagnostics;
    using System.Transactions;
    using System.Xaml;
    using System.Xml.Linq;
    using System.Runtime.DurableInstancing;
    using System.Security;
 
    // InternalSendMessage encapsulates both the server and client send.  For the server
    // send it provides the ability to persist after correlations have been initialized
    // but before the send has actually been completed by the channel stack.  This is not
    // supported by client send.
 
    class InternalSendMessage : NativeActivity
    {
        static string runtimeTransactionHandlePropertyName = typeof(RuntimeTransactionHandle).FullName;
 
        // Explicit correlation OM
        Collection<CorrelationInitializer> correlationInitializers;
        Collection<CorrelationQuery> replyCorrelationQueries;
 
        ICollection<CorrelationQuery> correlationQueries;
 
        MessageVersion messageVersion;
 
        ContractDescription cachedContract;
        ServiceEndpoint cachedServiceEndpoint;
        AddressHeaderCollection cachedEndpointHeaderCollection;
        FactoryCacheKey cachedFactoryCacheKey;
        bool isConfigSettingsSecure;
        bool configVerified;
 
        KeyValuePair<ObjectCacheItem<ChannelFactoryReference>, SendMessageChannelCache> lastUsedFactoryCacheItem;
 
 
        // this will be scheduled if ShouldPersistBeforeSend is set to true
        Activity persist;
 
        WaitOnChannelCorrelation channelCorrelationCompletionWaiter;
        Variable<VolatileSendMessageInstance> sendMessageInstance;
        Variable<NoPersistHandle> noPersistHandle;
        Variable<Bookmark> extensionSendCompleteBookmark;
        Variable<Guid> e2eActivityId;
 
        OpenChannelFactory openChannelFactory;
        OpenChannelAndSendMessage openChannelAndSendMessage;
 
        FaultCallback onSendFailure;
 
        public InternalSendMessage()
        {
            this.TokenImpersonationLevel = TokenImpersonationLevel.Identification;
 
            this.sendMessageInstance = new Variable<VolatileSendMessageInstance>();
            this.channelCorrelationCompletionWaiter = new WaitOnChannelCorrelation { Instance = this.sendMessageInstance };
 
            this.noPersistHandle = new Variable<NoPersistHandle>();
            this.extensionSendCompleteBookmark = new Variable<Bookmark>();
            this.e2eActivityId = new Variable<Guid>();
 
            this.openChannelFactory = new OpenChannelFactory { Instance = this.sendMessageInstance };
            this.openChannelAndSendMessage = new OpenChannelAndSendMessage { Instance = this.sendMessageInstance, InternalSendMessage = this, };
        }
 
        public TokenImpersonationLevel TokenImpersonationLevel
        {
            get;
            set;
        }
 
        // Endpoint defines the service to talk to, and endpointAddress is used to set 
        // the Uri at the runtime, such as the duplex scenario.
        public Endpoint Endpoint
        {
            get;
            set;
        }
 
        public string EndpointConfigurationName
        {
            get;
            set;
        }
 
        // This is needed for the callback case
        public InArgument<Uri> EndpointAddress
        {
            get;
            set;
        }
 
        public InArgument<CorrelationHandle> CorrelatesWith
        {
            get;
            set;
        }
        
        public string OperationName
        {
            get;
            set;
        }
 
        public string Action
        {
            get;
            set;
        }
 
        // cache for internal implementation. This should be set by the Send<T>
        // Should only be used in initating send. 
        // Should use this instead of OperationContract.IsOneWay
        public bool IsOneWay
        {
            get;
            set;
        }
 
        protected override bool CanInduceIdle
        {
            get
            {
                return true;
            }
        }
 
        // this flag is for Send/SendReply to indicate if we are client-side send or receive-side sendreply
        // 
        internal bool IsSendReply
        {
            get;
            set;
        }
 
        // Used for cleaning up the Message variable
        internal OutArgument<Message> MessageOut
        {
            get;
            set;
        }
 
        // should be used to decide whether persist before sending the message
        internal bool ShouldPersistBeforeSend { get; set; }
 
        internal string OwnerDisplayName { get; set; }
 
        public Collection<CorrelationInitializer> CorrelationInitializers
        {
            get
            {
                if (this.correlationInitializers == null)
                {
                    this.correlationInitializers = new Collection<CorrelationInitializer>();
                }
                return this.correlationInitializers;
            }
        }
 
        // This will be passed in from the parent Send activity
        public CorrelationQuery CorrelationQuery
        {
            get;
            set;
        }
 
        // This needs to be set by the ReceiveReply, we assume that this is unique
        internal ICollection<CorrelationQuery> ReplyCorrelationQueries
        {
            get
            {
                if (this.replyCorrelationQueries == null)
                {
                    this.replyCorrelationQueries = new Collection<CorrelationQuery>();
                }
 
                return this.replyCorrelationQueries;
            }
        }
 
        // on the serverside, the ContractName is set during ContractInference and is used for retrieving the
        // correct CorrelationQueryBehavior. ContractName on the Serverside can thus be different from what is
        // set on the OM
        public XName ServiceContractName
        {
            get;
            set;
        }
 
        public InArgument<Message> Message
        {
            get;
            set;
        }
 
        internal Send Parent
        {
            get;
            set;
        }
 
        internal static Guid TraceCorrelationActivityId
        {
            [Fx.Tag.SecurityNote(Critical = "Critical because Trace.CorrelationManager has a Link demand for UnmanagedCode.",
                Safe = "Safe because we aren't leaking a critical resource.")]
            [SecuritySafeCritical]
            get
            {
                return Trace.CorrelationManager.ActivityId;
            }
        }
 
        // we cache the ServiceEndpoint for perf reasons so that we can retrieve endpointaddress, contract etc without
        // creating a new ServiceEndpoint each time
        // Note that we should not pass the cachedServiceEndpoint to the ChannelFactory, as we need to have a 
        // distinct instance per-Factory.
        ServiceEndpoint GetCachedServiceEndpoint()
        {
            if (this.cachedServiceEndpoint == null)
            {
                this.cachedServiceEndpoint = CreateServiceEndpoint();
            }
            return this.cachedServiceEndpoint;
        }
 
        AddressHeaderCollection GetCachedEndpointHeaders()
        {
            Fx.Assert(this.Endpoint != null, "Endpoint should not be null");
            if (this.cachedEndpointHeaderCollection == null)
            {
                this.cachedEndpointHeaderCollection = new AddressHeaderCollection(this.Endpoint.Headers);
            }
            return this.cachedEndpointHeaderCollection;
        }
 
        void InitializeEndpoint(ref ServiceEndpoint serviceEndpoint, string configurationName)
        {
            ServiceEndpoint serviceEndpointFromConfig = null;
 
            if (configurationName != null)
            {
                // load the standard endpoint from the config
                serviceEndpointFromConfig = ConfigLoader.LookupEndpoint(configurationName, null, serviceEndpoint.Contract);
            }
 
            if (serviceEndpointFromConfig != null)
            {
                // standard endpoint case: it can completely override the endpoint
                serviceEndpoint = serviceEndpointFromConfig;
            }
            else
            {
                // normal endpoint case
                if (!serviceEndpoint.IsFullyConfigured)
                {
                    new ConfigLoader().LoadChannelBehaviors(serviceEndpoint, configurationName);
                }
            }
        }
 
        // used to create ChannelFactoryReference instances. We don't cache the serviceEndpoint 
        // directly, as we need to have a distinct instance per-Factory. So it's cached behind the 
        // scenes as part of the ChannelFactoryReference
        ServiceEndpoint CreateServiceEndpoint()
        {
            ContractDescription contract = null;
            bool ensureTransactionFlow = false;
            if (this.cachedContract == null)
            {
                contract = this.GetContractDescription();
                ensureTransactionFlow = true;
            }
            else
            {
                contract = this.cachedContract;
            }
            ServiceEndpoint result = new ServiceEndpoint(contract);
            if (this.Endpoint != null)
            {
                result.Binding = this.Endpoint.Binding;
                if (this.Endpoint.AddressUri != null)
                {
                    result.Address = new EndpointAddress(this.Endpoint.AddressUri, this.Endpoint.Identity, this.GetCachedEndpointHeaders());
                }
            }
            // Get ServiceEndpoint will be called only on the client side, hence if endpoint is null, we will try to load the config with 
            // endpointConfigurationName. 
            // endpointConfigurationName = null will be translated to endpointConfigurationName = String.Empty
            else
            {
                // we are loading the binding & the behaviors from config
                if (this.ServiceContractName != null)
                {
                    result.Contract.ConfigurationName = this.ServiceContractName.LocalName;
                }
                InitializeEndpoint(ref result, this.EndpointConfigurationName ?? string.Empty);
            }
 
            // if the cachedContract is null, verify if TransactionFlow is accounted for in the contract
            // if cachedContract is not null, we can skip this since the contract should be fixed for the workflow definition 
            if (ensureTransactionFlow)
            {
                EnsureTransactionFlowOnContract(ref result);
                this.cachedContract = result.Contract;
            }
            EnsureCorrelationQueryBehavior(result);
 
            return result;
        }
 
        void EnsureCorrelationQueryBehavior(ServiceEndpoint serviceEndpoint)
        {
            CorrelationQueryBehavior correlationQueryBehavior = serviceEndpoint.Behaviors.Find<CorrelationQueryBehavior>();
            if (correlationQueryBehavior == null)
            {
                // Add CorrelationQueryBehavior if either Binding has queries or if either Send or ReceiveReplies 
                // have correlation query associated with them
                if (CorrelationQueryBehavior.BindingHasDefaultQueries(serviceEndpoint.Binding)
                    || this.CorrelationQuery != null
                    || this.ReplyCorrelationQueries.Count > 0)
                {
                    correlationQueryBehavior = new CorrelationQueryBehavior(new Collection<CorrelationQuery>());
                    serviceEndpoint.Behaviors.Add(correlationQueryBehavior);
                }
            }
            if (correlationQueryBehavior != null)
            {
                // add CorrelationQuery from Send
                if (this.CorrelationQuery != null && !correlationQueryBehavior.CorrelationQueries.Contains(this.CorrelationQuery))
                {
                    correlationQueryBehavior.CorrelationQueries.Add(this.CorrelationQuery);
                }
 
                //add ReplyCorrelationQueries from ReceiveReply (there could be multiple ReceiveReplies for a Send and hence the collection
                foreach (CorrelationQuery query in this.ReplyCorrelationQueries)
                {
                    // Filter out duplicate CorrelationQueries in the collection.
                    // Currently, we only do reference comparison and Where message filter comparison.
                    if (!correlationQueryBehavior.CorrelationQueries.Contains(query))
                    {
                        correlationQueryBehavior.CorrelationQueries.Add(query);
                    }
                    else
                    {
                        if (TD.DuplicateCorrelationQueryIsEnabled())
                        {
                            TD.DuplicateCorrelationQuery(query.Where.ToString());
                        }
                    }
                }
 
                this.correlationQueries = correlationQueryBehavior.CorrelationQueries;
            }
        }
 
        static void EnsureCorrelationBehaviorScopeName(ActivityContext context, CorrelationQueryBehavior correlationBehavior)
        {
            Fx.Assert(correlationBehavior != null, "caller must verify");
            if (correlationBehavior.ScopeName == null)
            {
                CorrelationExtension extension = context.GetExtension<CorrelationExtension>();
                if (extension != null)
                {
                    correlationBehavior.ScopeName = extension.ScopeName;
                }
            }
        }
 
        void EnsureTransactionFlowOnContract(ref ServiceEndpoint serviceEndpoint)
        {
            if (!this.IsOneWay)
            {
                BindingElementCollection elementCollection = serviceEndpoint.Binding.CreateBindingElements();
                TransactionFlowBindingElement bindingElement = elementCollection.Find<TransactionFlowBindingElement>();
                bool flowTransaction = ((bindingElement != null) && (bindingElement.Transactions));
                if (flowTransaction)
                {
                    ContractInferenceHelper.EnsureTransactionFlowOnContract(ref serviceEndpoint,
                        this.ServiceContractName, this.OperationName, this.Action, this.Parent.ProtectionLevel);
                }
            }
        }
 
        internal MessageVersion GetMessageVersion()
        {
            if (this.messageVersion == null)
            {
                ServiceEndpoint endpoint = this.GetCachedServiceEndpoint();
                this.messageVersion = (endpoint != null && endpoint.Binding != null) ? endpoint.Binding.MessageVersion : null;
            }
            return this.messageVersion;
        }
 
        ContractDescription GetContractDescription()
        {
            ContractDescription cd;
 
            // When channel cache is disabled or when operation uses message contract,
            // we use the fully inferred description; otherwise, we use a fixed description to increase channel cache hits
 
            if (!this.Parent.ChannelCacheEnabled || this.Parent.OperationUsesMessageContract)
            {
                // If this is one-way send untyped message, this.OperationDescription would still be null
                if (this.Parent.OperationDescription == null)
                {
                    Fx.Assert(this.IsOneWay, "We can only reach here when we are one-way send Message!");
                    this.Parent.OperationDescription = ContractInferenceHelper.CreateOneWayOperationDescription(this.Parent);
                }
 
                cd = ContractInferenceHelper.CreateContractFromOperation(this.ServiceContractName, this.Parent.OperationDescription);
            }
            else
            {
                // Create ContractDescription using Fixed MessageIn/MessageOut contract
                // If IOutputChannel, we create a Contract with name IOutputChannel and OperationDescription "Send"
                // else, Contract name is IRequestChannel with OperationDescription "Request"
 
                if (this.IsOneWay)
                {
                    cd = ContractInferenceHelper.CreateOutputChannelContractDescription(this.ServiceContractName, this.Parent.ProtectionLevel);
                }
                else
                {
                    cd = ContractInferenceHelper.CreateRequestChannelContractDescription(this.ServiceContractName, this.Parent.ProtectionLevel);
                }
            }
 
            if (this.ServiceContractName != null)
            {
                cd.ConfigurationName = this.ServiceContractName.LocalName;
            }
            return cd;
        }
 
        EndpointAddress CreateEndpointAddress(NativeActivityContext context)
        {
            ServiceEndpoint endpoint = this.GetCachedServiceEndpoint();
            Uri endpointAddressUri = (this.EndpointAddress != null) ? this.EndpointAddress.Get(context) : null;
 
            if (endpoint != null && endpoint.Address != null)
            {
                return endpointAddressUri == null ?
                    endpoint.Address :
                    (new EndpointAddressBuilder(endpoint.Address) { Uri = endpointAddressUri }).ToEndpointAddress();
            }
            else if (this.Endpoint != null)
            {
                return endpointAddressUri == null ?
                    this.Endpoint.GetAddress() :
                    new EndpointAddress(endpointAddressUri, this.Endpoint.Identity, this.GetCachedEndpointHeaders());
            }
            else
            {
                return null;
            }
        }
 
        EndpointAddress CreateEndpointAddressFromCallback(EndpointAddress CallbackAddress)
        {
            Fx.Assert(CallbackAddress != null, "CallbackAddress cannot be null");
 
            EndpointIdentity endpointIdentity = null;
            AddressHeaderCollection headers = null;
            EndpointAddress endpointAddress;
 
            if (this.Endpoint != null)
            {
                // we honor Identity and Headers on the Endpoint OM even when the AddressUri is null
                endpointIdentity = this.Endpoint.Identity;
                headers = this.GetCachedEndpointHeaders();
            }
            else
            {
                // this could be from config
                ServiceEndpoint endpoint = this.GetCachedServiceEndpoint();
                Fx.Assert(endpoint != null, " endpoint cannot be null");
                if (endpoint.Address != null)
                {
                    endpointIdentity = endpoint.Address.Identity;
                    headers = endpoint.Address.Headers;
                }
            }
 
            if (endpointIdentity != null || headers != null)
            {
                Uri callbackUri = CallbackAddress.Uri;
                endpointAddress = new EndpointAddress(callbackUri, endpointIdentity, headers);
            }
            else
            {
                endpointAddress = CallbackAddress;
            }
            return endpointAddress;
        }
 
 
        bool IsEndpointSettingsSafeForCache()
        {
            if (!this.configVerified)
            {
 
                // let's set isConfigSettingsSecure flag to false if we use endpointConfiguration, 
                // this is used to decide if we cache factory or not
 
                this.isConfigSettingsSecure = this.Endpoint != null ? true : false;
                this.configVerified = true;
            }
            return this.isConfigSettingsSecure;
        }
 
        protected override void CacheMetadata(NativeActivityMetadata metadata)
        {
            if (ShouldPersistBeforeSend)
            {
                if (this.persist == null)
                {
                    this.persist = new Persist();
                }
                metadata.AddImplementationChild(this.persist);
            }
 
            RuntimeArgument endpointAddressArgument = new RuntimeArgument(Constants.EndpointAddress, Constants.UriType, ArgumentDirection.In);
            if (this.EndpointAddress == null)
            {
                this.EndpointAddress = new InArgument<Uri>();
            }
            metadata.Bind(this.EndpointAddress, endpointAddressArgument);
            metadata.AddArgument(endpointAddressArgument);
 
            RuntimeArgument correlatesWithArgument = new RuntimeArgument(Constants.CorrelatesWith, Constants.CorrelationHandleType, ArgumentDirection.In);
            if (this.CorrelatesWith == null)
            {
                this.CorrelatesWith = new InArgument<CorrelationHandle>();
            }
            metadata.Bind(this.CorrelatesWith, correlatesWithArgument);
            metadata.AddArgument(correlatesWithArgument);
            
            if (this.correlationInitializers != null)
            {
                int count = 0;
                foreach (CorrelationInitializer correlation in this.correlationInitializers)
                {
                    if (correlation.CorrelationHandle != null)
                    {
                        RuntimeArgument argument = new RuntimeArgument(Constants.Parameter + count,
                            correlation.CorrelationHandle.ArgumentType, correlation.CorrelationHandle.Direction, true);
                        metadata.Bind(correlation.CorrelationHandle, argument);
                        metadata.AddArgument(argument);
                        count++;
                    }
                }
            }
 
            RuntimeArgument requestMessageArgument = new RuntimeArgument(Constants.RequestMessage, Constants.MessageType, ArgumentDirection.In);
            if (this.Message == null)
            {
                this.Message = new InArgument<Message>();
            }
            metadata.Bind(this.Message, requestMessageArgument);
            metadata.AddArgument(requestMessageArgument);
 
            if (this.MessageOut != null)
            {
                RuntimeArgument requestMessageReference = new RuntimeArgument("MessageReference", Constants.MessageType, ArgumentDirection.Out);
                metadata.Bind(this.MessageOut, requestMessageReference);
                metadata.AddArgument(requestMessageReference);
            }
 
            metadata.AddImplementationVariable(this.sendMessageInstance);
            metadata.AddImplementationVariable(this.noPersistHandle);
            metadata.AddImplementationVariable(this.extensionSendCompleteBookmark);
            metadata.AddImplementationVariable(this.e2eActivityId);
 
            metadata.AddImplementationChild(this.channelCorrelationCompletionWaiter);
            metadata.AddImplementationChild(this.openChannelFactory);
            metadata.AddImplementationChild(this.openChannelAndSendMessage);
 
            metadata.AddDefaultExtensionProvider(SendMessageChannelCache.DefaultExtensionProvider);
        }
 
        protected override void Cancel(NativeActivityContext context)
        {
            SendReceiveExtension sendReceiveExtension = context.GetExtension<SendReceiveExtension>();
            if (sendReceiveExtension != null)
            {
                Bookmark pendingBookmark = this.extensionSendCompleteBookmark.Get(context);
                if (pendingBookmark != null)
                {
                    sendReceiveExtension.Cancel(pendingBookmark);
                    context.RemoveBookmark(pendingBookmark);
                }
                context.MarkCanceled();
            }
            else
            {
                // Do nothing.  InternalSendMessage cannot be canceled since
                // the individual parts of the process cannot be canceled.
            }
        }
 
        protected override void Abort(NativeActivityAbortContext context)
        {
            SendReceiveExtension sendReceiveExtension = context.GetExtension<SendReceiveExtension>();
            if (sendReceiveExtension != null)
            {
                Bookmark pendingBookmark = this.extensionSendCompleteBookmark.Get(context);
                if (pendingBookmark != null)
                {
                    sendReceiveExtension.Cancel(pendingBookmark);
                }
                base.Abort(context);
            }
            else
            {
 
                VolatileSendMessageInstance volatileInstance = this.sendMessageInstance.Get(context);
 
                if (volatileInstance != null)
                {
                    CleanupResources(volatileInstance.Instance);
                }
            }
        }
 
        void CleanupResources(SendMessageInstance instance)
        {
            if (instance != null)
            {
                instance.Dispose();
            }
        }
 
        // A separate code-path for extension based execution least impacts 
        // the existing workflow hosts. In the future we will add an extension from 
        // workflowservicehost and always use the extension.
        protected override void Execute(NativeActivityContext context)
        {
            SendReceiveExtension sendReceiveExtension = context.GetExtension<SendReceiveExtension>();
            if (sendReceiveExtension != null)
            {
                this.ExecuteUsingExtension(sendReceiveExtension, context);
            }
            else
            {
                // 
 
 
 
                // The entire InternalSendMessage runs in a no persist zone
                NoPersistHandle noPersistHandle = this.noPersistHandle.Get(context);
                noPersistHandle.Enter(context);
 
                // Set up the SendMessageInstance, which will 
                // setup an AsyncOperationBlock under the hood and thus block persistence 
                // until the message has been sent and we return to the workflow thread
                SendMessageInstance instance = new SendMessageInstance(this, context);
                SetSendMessageInstance(context, instance);
 
                if (instance.RequestContext != null)
                {
                    ExecuteClientRequest(context, instance);
                }
                else
                {
                    ExecuteServerResponse(context, instance);
                }
            }
        }
 
        void ExecuteUsingExtension(SendReceiveExtension sendReceiveExtension, NativeActivityContext context)
        {
            CorrelationHandle correlatesWith = null;
            if (this.TryGetCorrelatesWithHandle(context, out correlatesWith) && !correlatesWith.IsInitalized())
            {
                throw FxTrace.Exception.AsError(new ValidationException(SR.SendWithUninitializedCorrelatesWith(this.OperationName ?? string.Empty)));
            }
 
            CorrelationHandle ambientHandle = CorrelationHandle.GetAmbientCorrelation(context);
            if (correlatesWith == null)
            {
                correlatesWith = ambientHandle;
            }
 
            Guid e2eTracingId;
            SendSettings sendSettings;
 
            if (this.IsSendReply)
            {
                if (correlatesWith == null || !correlatesWith.IsInitalized())
                {
                    throw FxTrace.Exception.AsError(new ValidationException(SR.SendWithUninitializedCorrelatesWith(this.OperationName ?? string.Empty)));
                }
 
                e2eTracingId = correlatesWith.E2ETraceId;
                sendSettings = GetSettingsForSendReply();
            }
            else
            {
                CorrelationHandle requestReplyCorrelationHandle;
                this.correlationInitializers.TryGetRequestReplyCorrelationHandle(context, out requestReplyCorrelationHandle);
 
                // validate correlation configuration
                if (this.IsOneWay)
                {
                    if (requestReplyCorrelationHandle != null)
                    {
                        // this is a one-way send , we should not have a RequestReply Correlation initializer
                        throw FxTrace.Exception.AsError(new InvalidOperationException(SR.RequestReplyHandleShouldNotBePresentForOneWay));
                    }
                }
                else
                {
                    if (requestReplyCorrelationHandle == null && ambientHandle == null)
                    {
                        // we neither have a requestReply nor an ambientHandle
                        throw FxTrace.Exception.AsError(new InvalidOperationException(
                            SR.SendMessageNeedsToPairWithReceiveMessageForTwoWayContract(this.OperationName ?? string.Empty)));
                    }
                }
 
                e2eTracingId = InternalSendMessage.TraceCorrelationActivityId;
                if (e2eTracingId == Guid.Empty)
                {
                    e2eTracingId = Guid.NewGuid();
                }
                sendSettings = GetSettingsForSend(context);
            }
 
            this.SendToExtension(sendReceiveExtension, context, sendSettings, e2eTracingId, correlatesWith);
        }
 
        void SendToExtension(SendReceiveExtension sendReceiveExtension, NativeActivityContext context, SendSettings sendSettings, Guid e2eTracingId, CorrelationHandle correlatesWith)
        {
            Message message = this.Message.Get(context);
 
            // add a transient correlation if necessary
            if (!IsOneWay && !IsSendReply)
            {
                CorrelationMessageProperty correlationMessageProperty;
                if (!message.Properties.TryGetValue(CorrelationMessageProperty.Name, out correlationMessageProperty))
                {
                    InstanceKey requestReplyCorrelationKey = new InstanceKey(Guid.NewGuid(),
                            new Dictionary<XName, InstanceValue>
                            {
                                { WorkflowServiceNamespace.RequestReplyCorrelation, new InstanceValue(true) }
                            });
 
                    List<InstanceKey> transientCorrelations = new List<InstanceKey>();
                    transientCorrelations.Add(requestReplyCorrelationKey);
                    correlationMessageProperty = new CorrelationMessageProperty(InstanceKey.InvalidKey, new List<InstanceKey>(0), transientCorrelations);
                    message.Properties[CorrelationMessageProperty.Name] = correlationMessageProperty;
                }
                else
                {
                    InstanceKey requestReplyCorrelationKey;
                    // if requestReplyCorrelationKey does not exist, clone correlationMessageProperty and
                    // replace it in the message with one that has the key.
                    if (!this.TryGetRequestReplyCorrelationInstanceKey(correlationMessageProperty, out requestReplyCorrelationKey))
                    {
                        requestReplyCorrelationKey = new InstanceKey(Guid.NewGuid(),
                            new Dictionary<XName, InstanceValue>
                            {
                                { WorkflowServiceNamespace.RequestReplyCorrelation, new InstanceValue(true) }
                            });
                        List<InstanceKey> transientCorrelations = new List<InstanceKey>(correlationMessageProperty.TransientCorrelations);
                        transientCorrelations.Add(requestReplyCorrelationKey);
                        CorrelationMessageProperty newProperty = new CorrelationMessageProperty(
                                correlationMessageProperty.CorrelationKey,
                                correlationMessageProperty.AdditionalKeys,
                                transientCorrelations);
                        message.Properties[CorrelationMessageProperty.Name] = newProperty;
                    }
                }
            }
 
            MessageContext messageContext = new MessageContext(message) { EndToEndTracingId = e2eTracingId };
            Bookmark sendCompleteBookmark = context.CreateBookmark(SendCompleteOnExtension);
            this.extensionSendCompleteBookmark.Set(context, sendCompleteBookmark);
            this.e2eActivityId.Set(context, e2eTracingId);
            this.ProcessSendMessageTrace(context, e2eTracingId, true);
            sendReceiveExtension.Send(
                messageContext, 
                sendSettings, 
                (correlatesWith == null) ? null : correlatesWith.InstanceKey, 
                sendCompleteBookmark);
 
            if (this.MessageOut != null)
            {
                this.MessageOut.Set(context, null);
            }
 
            this.Message.Set(context, null);
        }
 
        SendSettings GetSettingsForSendReply()
        {
            return new SendSettings
            {
                RequirePersistBeforeSend = this.ShouldPersistBeforeSend,
                OwnerDisplayName = this.OwnerDisplayName
            };
        }
 
        SendSettings GetSettingsForSend(NativeActivityContext context)
        {
            SendSettings settings = new SendSettings
            {
                IsOneWay = this.IsOneWay,
                EndpointConfigurationName = this.EndpointConfigurationName,
                TokenImpersonationLevel = this.TokenImpersonationLevel,
                ProtectionLevel = this.Parent.ProtectionLevel,
                OwnerDisplayName = this.OwnerDisplayName
            };
 
            if (this.EndpointAddress != null)
            {
                settings.EndpointAddress = this.EndpointAddress.Get(context);
            }
 
            if (this.Endpoint != null)
            {
                settings.Endpoint = XamlServices.Parse(XamlServices.Save(this.Endpoint)) as Endpoint;
            }
 
            return settings;
        }
 
        void SendCompleteOnExtension(NativeActivityContext context, Bookmark bookmark, object state)
        {
            // Now that the bookmark has been resumed, clear out the workflow variable holding 
            // its value.
            this.extensionSendCompleteBookmark.Set(context, null);
 
            Exception fault = state as Exception;
            if (fault != null)
            {
                throw FxTrace.Exception.AsError(fault);
            }
 
            CorrelationMessageProperty correlationMessageProperty = state as CorrelationMessageProperty;
 
            if (state != null && correlationMessageProperty == null)
            {
                throw FxTrace.Exception.AsError(new InvalidOperationException(SR.InvalidDataFromSendBookmarkState(this.OperationName ?? string.Empty)));
            }
 
            if (correlationMessageProperty != null)
            {
                this.InitializeCorrelationHandles(context, correlationMessageProperty);
            }
 
            Guid e2eActivityId = this.e2eActivityId.Get(context);
            this.ProcessSendMessageCompleteTrace(context, e2eActivityId);
        }
 
        void InitializeCorrelationHandles(NativeActivityContext context, CorrelationMessageProperty correlationMessageProperty)
        {
            CorrelationHandle ambientCorrelationHandle = CorrelationHandle.GetAmbientCorrelation(context);
 
            if (this.IsSendReply)
            {
                // Check for ContextCorrelationInitializer handle
                CorrelationHandle contextCorrelationHandle = CorrelationHandle.GetExplicitContextCorrelation(context, this.correlationInitializers);
                MessagingActivityHelper.InitializeCorrelationHandles(context, contextCorrelationHandle, ambientCorrelationHandle, this.correlationInitializers, correlationMessageProperty.CorrelationKey, correlationMessageProperty.AdditionalKeys);
            }
            else
            {
                // Check for CallbackCorrelationInitializer handle
                CorrelationHandle callbackCorrelationHandle = CorrelationHandle.GetExplicitCallbackCorrelation(context, this.correlationInitializers);
                MessagingActivityHelper.InitializeCorrelationHandles(context, callbackCorrelationHandle, ambientCorrelationHandle, this.correlationInitializers, correlationMessageProperty.CorrelationKey, correlationMessageProperty.AdditionalKeys);
 
                InstanceKey requestReplyInstanceKey;
                if (this.TryGetRequestReplyCorrelationInstanceKey(correlationMessageProperty, out requestReplyInstanceKey))
                {
                    CorrelationHandle requestReplyCorrelationHandle = CorrelationHandle.GetExplicitRequestReplyCorrelation(context, this.correlationInitializers);
                    if (requestReplyCorrelationHandle != null)
                    {
                        requestReplyCorrelationHandle.TransientInstanceKey = requestReplyInstanceKey;
                    }
                    else if (ambientCorrelationHandle != null)
                    {
                        ambientCorrelationHandle.TransientInstanceKey = requestReplyInstanceKey;
                    }
                }
            }
        }
 
        bool TryGetRequestReplyCorrelationInstanceKey(CorrelationMessageProperty correlationMessageProperty, out InstanceKey instanceKey)
        {
            instanceKey = null;
 
            foreach (InstanceKey key in correlationMessageProperty.TransientCorrelations)
            {
                InstanceValue value;
                if (key.Metadata.TryGetValue(WorkflowServiceNamespace.RequestReplyCorrelation, out value))
                {
                    instanceKey = key;
                    break;
                }
            }
 
            return instanceKey != null;
        }
 
        bool TryGetCorrelatesWithHandle(NativeActivityContext context, out CorrelationHandle correlationHandle)
        {
            correlationHandle = null;
            if (this.CorrelatesWith != null)
            {
                correlationHandle = this.CorrelatesWith.Get(context);
            }
 
            return correlationHandle != null;
        }
 
        void SetSendMessageInstance(NativeActivityContext context, SendMessageInstance instance)
        {
            VolatileSendMessageInstance volatileInstance = new VolatileSendMessageInstance { Instance = instance };
            this.sendMessageInstance.Set(context, volatileInstance);
        }
 
        SendMessageInstance GetSendMessageInstance(ActivityContext context)
        {
            VolatileSendMessageInstance volatileInstance = this.sendMessageInstance.Get(context);
 
            Fx.Assert(volatileInstance != null, "This should never be null.");
 
            return volatileInstance.Instance;
        }
 
        // Used for server-side send (replies). We don't have any async code here since the
        // Dispatcher handles any completions
        void ExecuteServerResponse(NativeActivityContext context, SendMessageInstance instance)
        {
            Fx.Assert(instance.ResponseContext != null, "only valid for responses");
            Fx.Assert(instance.ResponseContext.WorkflowOperationContext != null, "The WorkflowOperationContext is required on the CorrelationResponseContext");
            instance.OperationContext = instance.ResponseContext.WorkflowOperationContext.OperationContext;
 
            // now that we have our op-context, invoke the callback that user might have added in the AEC in the previous activity 
            // e.g. distributed compensation activity will add this so that they can convert an execution property 
            // to an message properties, as will Transaction Flow
            instance.ProcessMessagePropertyCallbacks();
 
            ProcessSendMessageTrace(context, instance, false);
 
            // retrieve the correct CorrelationQueryBehavior from the ChannelExtensions collection
            CorrelationQueryBehavior correlationBehavior = null;
            Collection<CorrelationQueryBehavior> correlationQueryBehaviors = instance.OperationContext.Channel.Extensions.FindAll<CorrelationQueryBehavior>();
            foreach (CorrelationQueryBehavior cqb in correlationQueryBehaviors)
            {
                if (cqb.ServiceContractName == this.ServiceContractName)
                {
                    correlationBehavior = cqb;
                    break;
                }
            }
 
            //set the reply
            instance.RequestOrReply = this.Message.Get(context);
 
            if (correlationBehavior != null)
            {
                EnsureCorrelationBehaviorScopeName(context, correlationBehavior);
                instance.RegisterCorrelationBehavior(correlationBehavior);
 
                if (instance.CorrelationKeyCalculator != null)
                {
                    if (correlationBehavior.SendNames != null && correlationBehavior.SendNames.Count > 0)
                    {
                        if (correlationBehavior.SendNames.Count == 1 && (correlationBehavior.SendNames.Contains(ContextExchangeCorrelationHelper.CorrelationName)))
                        {
                            // Contextchannel is the only channel participating in correlation
                            // Since we already have the instance id, we don't have to wait for the context channel to call us back to initialize 
                            // the correlation - InstanceId can be retrieved directly from ContextMessageProperty through Operation context.
                            ContextMessageProperty contextProperties = null;
                            if (ContextMessageProperty.TryGet(instance.OperationContext.OutgoingMessageProperties, out contextProperties))
                            {
                                // 
 
                                CorrelationDataMessageProperty.AddData(instance.RequestOrReply, ContextExchangeCorrelationHelper.CorrelationName, () => ContextExchangeCorrelationHelper.GetContextCorrelationData(instance.OperationContext));
                            }
                            // Initialize correlations right away without waiting for the context channel to call us back
                            InitializeCorrelations(context, instance);
                        }
                        else
                        {
                            // Initialize correlations through channel callback
                            instance.OperationContext.OutgoingMessageProperties.Add(CorrelationCallbackMessageProperty.Name,
                                new MessageCorrelationCallbackMessageProperty(correlationBehavior.SendNames ?? new string[0], instance));
                            instance.CorrelationSynchronizer = new CorrelationSynchronizer();
                        }
                    }
                    else
                    {
                        // there are no channel based queries, we can initialize correlations right away
                        InitializeCorrelations(context, instance);
                    }
                }
            }
 
            // For exception case: Always call WorkflowOperationContext.SendFault to either send back the fault in the request/reply case 
            // or make sure the error handler extension gets a chance to handle this fault;
            if (instance.ResponseContext.Exception != null)
            {
                try
                {
                    instance.ResponseContext.WorkflowOperationContext.SendFault(instance.ResponseContext.Exception);
                }
                catch (Exception e)
                {
                    if (Fx.IsFatal(e))
                    {
                        throw;
                    }
                    instance.ResponseContext.Exception = e;
                }
            }
            else
            {
                try
                {
                    instance.ResponseContext.WorkflowOperationContext.SendReply(instance.RequestOrReply);
                }
                catch (Exception e)
                {
                    if (Fx.IsFatal(e))
                    {
                        throw;
                    }
                    instance.ResponseContext.Exception = e;
                }
            }
 
            if (TraceUtility.ActivityTracing)
            {
                if (instance.AmbientActivityId != InternalSendMessage.TraceCorrelationActivityId)
                {
                    if (TD.StopSignpostEventIsEnabled())
                    {
                        TD.StopSignpostEvent(new DictionaryTraceRecord(new Dictionary<string, string>(3) {
                                                    { MessagingActivityHelper.ActivityName, this.DisplayName },
                                                    { MessagingActivityHelper.ActivityType, MessagingActivityHelper.MessagingActivityTypeActivityExecution },
                                                    { MessagingActivityHelper.ActivityInstanceId, context.ActivityInstanceId }
                            }));
                    }
                    FxTrace.Trace.SetAndTraceTransfer(instance.AmbientActivityId, true);
                    instance.AmbientActivityId = Guid.Empty;
                }
            }
 
            if (instance.CorrelationSynchronizer == null)
            {
                // We aren't doing any correlation work so we just
                // finalize the send.
                context.SetValue(this.Message, null);
                context.SetValue(this.MessageOut, null);
 
                if (ShouldPersistBeforeSend)
                {
                    // Need to allow persistence.
                    NoPersistHandle noPersistHandle = this.noPersistHandle.Get(context);
                    noPersistHandle.Exit(context);
 
                    // 
                    context.ScheduleActivity(this.persist, new CompletionCallback(OnPersistCompleted));
                }
                else
                {
                    FinalizeSendMessageCore(instance);
                }
            }
            else
            {
                // We're doing correlation.  Either the work is already
                // done or we need to synchronize with the channel stack.
                if (instance.CorrelationSynchronizer.IsChannelWorkComplete)
                {
                    // No need to schedule our completion waiter
                    OnChannelCorrelationCompleteCore(context, instance);
                }
                else
                {
                    context.ScheduleActivity(this.channelCorrelationCompletionWaiter, OnChannelCorrelationComplete, null);
                }
 
                // We notify that we're done with the send.  If the
                // correlation processing has already completed then
                // we'll finalize the send.
                if (instance.CorrelationSynchronizer.NotifySendComplete())
                {
                    FinalizeSendMessageCore(instance);
                }
            }
        }
 
        void ProcessSendMessageTrace(NativeActivityContext context, SendMessageInstance instance, bool isClient)
        {
            if (TraceUtility.MessageFlowTracing)
            {
                if (TraceUtility.ActivityTracing)
                {
                    instance.AmbientActivityId = InternalSendMessage.TraceCorrelationActivityId;
                }
 
                if (isClient)
                {
                    //We need to emit a transfer from WF instance ID to the id set in the TLS
                    instance.E2EActivityId = InternalSendMessage.TraceCorrelationActivityId;
                    if (instance.E2EActivityId == Guid.Empty)
                    {
                        instance.E2EActivityId = Guid.NewGuid();
                    }
                }
                else
                {
                    instance.E2EActivityId = instance.ResponseContext.WorkflowOperationContext.E2EActivityId;
                }
 
                this.ProcessSendMessageTrace(context, instance.E2EActivityId, isClient);
            }
        }
 
        void ProcessSendMessageTrace(NativeActivityContext context, Guid e2eActivityId, bool isClient)
        {
            if (TraceUtility.MessageFlowTracing)
            {
                try
                {
                    if (isClient)
                    {
                        if (context.WorkflowInstanceId != e2eActivityId)
                        {
                            DiagnosticTraceBase.ActivityId = context.WorkflowInstanceId;
                            FxTrace.Trace.SetAndTraceTransfer(e2eActivityId, true);
                        }
                    }
                    else
                    {
                        DiagnosticTraceBase.ActivityId = context.WorkflowInstanceId;
                    }
 
                    context.Track(
                        new SendMessageRecord(MessagingActivityHelper.MessageCorrelationSendRecord)
                        {
                            E2EActivityId = e2eActivityId
                        });
 
                    if (TraceUtility.ActivityTracing)
                    {
                        if (TD.StartSignpostEventIsEnabled())
                        {
                            TD.StartSignpostEvent(new DictionaryTraceRecord(new Dictionary<string, string>(3) {
                                                    { MessagingActivityHelper.ActivityName, this.DisplayName },
                                                    { MessagingActivityHelper.ActivityType, MessagingActivityHelper.MessagingActivityTypeActivityExecution },
                                                    { MessagingActivityHelper.ActivityInstanceId, context.ActivityInstanceId }
                            }));
                        }
                    }
                }
                catch (Exception ex)
                {
                    if (Fx.IsFatal(ex))
                    {
                        throw;
                    }
                    FxTrace.Exception.AsInformation(ex);
                }
            }
        }
 
        void ProcessSendMessageCompleteTrace(NativeActivityContext context, Guid e2eActivityId)
        {
            Guid ambientActivityId = InternalSendMessage.TraceCorrelationActivityId;
            if (TraceUtility.ActivityTracing)
            {
                if (TD.StopSignpostEventIsEnabled())
                {
                    TD.StopSignpostEvent(new DictionaryTraceRecord(new Dictionary<string, string>(3) {
                                                    { MessagingActivityHelper.ActivityName, this.DisplayName },
                                                    { MessagingActivityHelper.ActivityType, MessagingActivityHelper.MessagingActivityTypeActivityExecution },
                                                    { MessagingActivityHelper.ActivityInstanceId, context.ActivityInstanceId }
                                }));
                }
                FxTrace.Trace.SetAndTraceTransfer(ambientActivityId, true);
            }
            if (TD.WfMessageSentIsEnabled())
            {
                // 
                EventTraceActivity eta = new EventTraceActivity();
                if (e2eActivityId != Guid.Empty)
                {
                    eta.SetActivityId(e2eActivityId);
                }
                TD.WfMessageSent(eta, ambientActivityId);
            }
        }
 
        void OnChannelCorrelationComplete(NativeActivityContext context, ActivityInstance completedInstance)
        {
            SendMessageInstance instance = GetSendMessageInstance(context);
            Fx.Assert(instance != null, "The instance cannot be null here.");
 
            OnChannelCorrelationCompleteCore(context, instance);
        }
 
        void OnChannelCorrelationCompleteCore(NativeActivityContext context, SendMessageInstance instance)
        {
            Message message = InitializeCorrelations(context, instance);
            instance.CorrelationSynchronizer.NotifyMessageUpdatedByWorkflow(message);
 
            context.SetValue(this.Message, null);
            context.SetValue(this.MessageOut, null);
 
            if (this.ShouldPersistBeforeSend && instance.RequestContext == null)
            {
                // Need to allow persistence.
                NoPersistHandle noPersistHandle = this.noPersistHandle.Get(context);
                noPersistHandle.Exit(context);
 
                // 
                context.ScheduleActivity(this.persist, new CompletionCallback(OnPersistCompleted));
            }
            else
            {
                // Create a bookmark to complete the callback, this is to ensure that the InstanceKey does get saved in the PPD 
                // by the time the bookmark is resumed. The instancekey is not getting  saved in the PPD  till workflow gets to 
                // the next idle state. 
                // 
                Bookmark completeCorrelationBookmark = context.CreateBookmark(CompleteCorrelationCallback, BookmarkOptions.NonBlocking);
                context.ResumeBookmark(completeCorrelationBookmark, null);
            }
        }
 
        void CompleteCorrelationCallback(NativeActivityContext context, Bookmark bookmark, object value)
        {
            SendMessageInstance instance = GetSendMessageInstance(context);
            Fx.Assert(instance != null, "The instance cannot be null here.");
 
            if (instance.CorrelationSynchronizer.NotifyWorkflowCorrelationProcessingComplete())
            {
                // The send complete notification has already occurred
                // so it is up to us to finalize the send.
                FinalizeSendMessageCore(instance);
            }
        }
 
        void OnPersistCompleted(NativeActivityContext context, ActivityInstance completedInstance)
        {
            // We can reenter no persist now
            NoPersistHandle noPersistHandle = this.noPersistHandle.Get(context);
            noPersistHandle.Enter(context);
 
            // We might get back a null here because we've allowed persistence.
            // If that is the case we'll just ignore it ... we don't have any more
            // meaningful work to do.
            SendMessageInstance instance = GetSendMessageInstance(context);
 
            if (instance != null)
            {
                // Do it with or without correlation
                if (instance.CorrelationSynchronizer == null || instance.CorrelationSynchronizer.NotifyWorkflowCorrelationProcessingComplete())
                {
                    // The send complete notification has already occurred
                    // so it is up to us to finalize the send.
                    FinalizeSendMessageCore(instance);
                }
            }
        }
 
        void ExecuteClientRequest(NativeActivityContext context, SendMessageInstance instance)
        {
            // This is the client send request: we need to figure out the channel and request message first
 
            // Get the Extension for the ChannelSettings
            instance.CacheExtension = context.GetExtension<SendMessageChannelCache>();
            Fx.Assert(instance.CacheExtension != null, "channelCacheExtension must exist.");
 
 
            // Send.ChannelCacheEnabled must be set before we call CreateEndpointAddress
            // because CreateEndpointAddress will cache description and description resolution depends on the value of ChannelCacheEnabled
            this.Parent.InitializeChannelCacheEnabledSetting(instance.CacheExtension);
 
            // if there is a correlatesWith handle with callbackcontext(Durable Duplex case), use the callback address and context from
            // there. The handle could be an explicit 'CorrelatesWith' handle or an ambient handle.
            if (instance.CorrelatesWith != null)
            {
                if (instance.CorrelatesWith.CallbackContext != null)
                {
                    instance.CorrelationCallbackContext = instance.CorrelatesWith.CallbackContext;
 
                    // construct EndpointAdress based on the ListenAddress from callback and the identity and headers from Endpoint or from Config
                    instance.EndpointAddress = CreateEndpointAddressFromCallback(instance.CorrelationCallbackContext.ListenAddress.ToEndpointAddress());
                }
 
                if (instance.CorrelatesWith.Context != null)
                {
                    instance.CorrelationContext = instance.CorrelatesWith.Context;
                }
            }
            // Request  is always of Type Message. Message Argument will be set by Send<T> using the appropriate formatter
            instance.RequestOrReply = this.Message.Get(context);
 
            if (instance.EndpointAddress == null)
            {
                //try to get it from endpoint or config
                instance.EndpointAddress = CreateEndpointAddress(context);
            }
 
            if (instance.EndpointAddress == null)
            {
                throw FxTrace.Exception.AsError(new ValidationException(SR.EndpointAddressNotSetInEndpoint(this.OperationName)));
            }
 
            // Configname to be used for the FactoryCacheKey, 
            // if endpoint is defined, we use the settings from endpoint and ignore the endpointConfigurationName
            // if endpoint is not defined we use the endpointConfigurationName
            string configName = (this.Endpoint != null) ? null : this.EndpointConfigurationName;
 
            ProcessSendMessageTrace(context, instance, true);
 
            // Get ChannelFactory from the cache
            ObjectCache<FactoryCacheKey, ChannelFactoryReference> channelFactoryCache = null;
            ObjectCacheItem<ChannelFactoryReference> cacheItem = null;
            ChannelCacheSettings channelCacheSettings;                        
            
            // retrieve the FactoryCacheKey and cache it so that we could use it later.  
            if (this.cachedFactoryCacheKey == null)
            {
                ServiceEndpoint targetEndpoint = this.GetCachedServiceEndpoint();
                this.cachedFactoryCacheKey = new FactoryCacheKey(this.Endpoint, configName, this.IsOneWay, this.TokenImpersonationLevel,
                    targetEndpoint.Contract, this.correlationQueries);
            }
            
            // let's decide if we can share the cache from the extension
            // cache can be share if AllowUnsafeSharing is true or it is safe to share
            if (instance.CacheExtension.AllowUnsafeCaching || this.IsEndpointSettingsSafeForCache())
            {
                channelFactoryCache = instance.CacheExtension.GetFactoryCache();
                Fx.Assert(channelFactoryCache != null, "factory cache should be initialized either from the extension or from the globalcache");
 
                channelCacheSettings = instance.CacheExtension.ChannelSettings;
 
                // Get a ChannelFactoryReference (either cached or brand new)
                KeyValuePair<ObjectCacheItem<ChannelFactoryReference>, SendMessageChannelCache> localLastUsedCacheItem = this.lastUsedFactoryCacheItem;
                if (object.ReferenceEquals(localLastUsedCacheItem.Value, instance.CacheExtension))
                {
                    if (localLastUsedCacheItem.Key != null && localLastUsedCacheItem.Key.TryAddReference())
                    {
                        cacheItem = localLastUsedCacheItem.Key;
                    }
                    else
                    {
                        // the item is invalid
                        this.lastUsedFactoryCacheItem = new KeyValuePair<ObjectCacheItem<ChannelFactoryReference>, SendMessageChannelCache>(null, null);
                    }
                }
 
                if (cacheItem == null)
                {
                    // try retrieving the factoryreference directly from the factory cache 
                    cacheItem = channelFactoryCache.Take(this.cachedFactoryCacheKey);
                }
                if (cacheItem == null && TD.SendMessageChannelCacheMissIsEnabled())
                {
                    TD.SendMessageChannelCacheMiss();
                }
            }
            else
            {
                // not safe to share cache, do not cache anything
                channelCacheSettings = ChannelCacheSettings.EmptyCacheSettings;
            }
 
            ChannelFactoryReference newFactoryReference = null;
            if (cacheItem == null)
            {
                // nothing in our cache, we'll have to setup a new factory reference, which ClientSendAsyncResult will open asynchronously
                ServiceEndpoint targetEndpoint = this.CreateServiceEndpoint();
                // create a new ChannelFactoryReference that holds the channelfactory and a cache for its channels, 
                // cache settings are based on the channelcachesettings provided through the extension
                newFactoryReference = new ChannelFactoryReference(this.cachedFactoryCacheKey, targetEndpoint, channelCacheSettings);
            }
 
            instance.SetupFactoryReference(cacheItem, newFactoryReference, channelFactoryCache);
 
            if (this.onSendFailure == null)
            {
                this.onSendFailure = new FaultCallback(OnSendFailure);
            }
 
            if (instance.FactoryReference.NeedsOpen)
            {
                context.ScheduleActivity(this.openChannelFactory, OnChannelFactoryOpened, this.onSendFailure);
            }
            else
            {
                OnChannelFactoryOpenedCore(context, instance);
            }
        }
 
        void OnSendFailure(NativeActivityFaultContext context, Exception propagatedException, ActivityInstance propagatedFrom)
        {
            // We throw the exception because we want this activity to abort
            // as well.  The abort path will take care of performing resource
            // clean-up (see Abort(NativeActivityAbortContext)).
            throw FxTrace.Exception.AsError(propagatedException);
        }
 
        void OnChannelFactoryOpened(NativeActivityContext context, ActivityInstance completedInstance)
        {
            SendMessageInstance instance = GetSendMessageInstance(context);
            Fx.Assert(instance != null, "Must have a SendMessageInstance here.");
 
            OnChannelFactoryOpenedCore(context, instance);
        }
 
        void OnChannelFactoryOpenedCore(NativeActivityContext context, SendMessageInstance instance)
        {
            // now that we know the factory is open, setup our client channel and pool reference
            instance.PopulateClientChannel();
 
            IContextChannel contextChannel = instance.ClientSendChannel as IContextChannel;
            instance.OperationContext = (contextChannel == null) ? null : new OperationContext(contextChannel);
            
            // Retrieve the CorrelationQueryBehavior from the serviceEndpoint that we used for ChannelFactoryCreation
            // we later look for CorrelationQueryBehavior.SendNames which actually gets initialized during ChannelFactory creation
            // 
            CorrelationQueryBehavior correlationQueryBehavior = instance.FactoryReference.CorrelationQueryBehavior;
 
            if (correlationQueryBehavior != null)
            {
                EnsureCorrelationBehaviorScopeName(context, correlationQueryBehavior);
                instance.RegisterCorrelationBehavior(correlationQueryBehavior);
            }
 
            // now that we have our op-context, invoke the callback that user might have added in the AEC in the previous activity 
            // e.g. distributed compensation activity will add this so that they can convert an execution property 
            // to an message properties, as will Transaction Flow
            instance.ProcessMessagePropertyCallbacks();
 
            // Add the ContextMessage Property if either CallBackContextMessageProperty or ContextMessageProperty is set
            // if both are set validate that the context is the same in both of them
            ContextMessageProperty contextMessageProperty = null;
            if (instance.CorrelationCallbackContext != null && instance.CorrelationContext != null)
            {
                // validate if the context is equivalent
                if (MessagingActivityHelper.CompareContextEquality(instance.CorrelationCallbackContext.Context, instance.CorrelationContext.Context))
                {
                    contextMessageProperty = new ContextMessageProperty(instance.CorrelationCallbackContext.Context);
                }
                else
                {
                    throw FxTrace.Exception.AsError(new InvalidOperationException(SR.ContextMismatchInContextAndCallBackContext));
                }
            }
            else if (instance.CorrelationCallbackContext != null)
            {
                contextMessageProperty = new ContextMessageProperty(instance.CorrelationCallbackContext.Context);
            }
            else if (instance.CorrelationContext != null)
            {
                contextMessageProperty = new ContextMessageProperty(instance.CorrelationContext.Context);
            }
 
            if (contextMessageProperty != null)
            {
                contextMessageProperty.AddOrReplaceInMessage(instance.RequestOrReply);
            }
 
            // Add callback context Message property with instance id.
            // If binding contains ContextBindingElement with listenaddress set, the callback context message property will flow on the wire
 
            // Pull the instanceId from the CorrelationHandle, if it is already initialized, else create a new GUID.
            // we want to send the callback context only for the first message and when there is a FollowingContextCorrelation defined ( i.e., we are expecting a 
            // receive message back) or when there is an ambienthandle and the handle is not initalized. We will never use CorrelatesWith handle to initialize 
            // FollowingContext, since CorrelatesWith on the client side should always be used for a following correlation
            String contextValue;
            CorrelationHandle followingContextHandle = instance.ContextBasedCorrelationHandle != null ? instance.ContextBasedCorrelationHandle : instance.AmbientHandle;
 
            if (followingContextHandle != null && (followingContextHandle.Scope == null || followingContextHandle.Scope.IsInitialized == false))
            {
                // we are creating a new GUID for the context. As a practice,we don't want to send the WorkflowInstanceId over the wire
                contextValue = Guid.NewGuid().ToString();
                Dictionary<string, string> contextValues = new Dictionary<string, string>(1)
                    {
                        { ContextMessageProperty.InstanceIdKey, contextValue }
                    };
                new CallbackContextMessageProperty(contextValues).AddOrReplaceInMessage(instance.RequestOrReply);
            }
 
            // verify if we can complete Correlation intialization now
            if (instance.CorrelationSendNames != null)
            {
                // we're going to initialize request correlations later
                instance.RequestOrReply.Properties.Add(CorrelationCallbackMessageProperty.Name,
                    new MessageCorrelationCallbackMessageProperty(instance.CorrelationSendNames, instance));
 
                instance.CorrelationSynchronizer = new CorrelationSynchronizer();
            }
            else
            {
                InitializeCorrelations(context, instance);
            }
 
            if (instance.CorrelationSynchronizer != null)
            {
                context.ScheduleActivity(this.channelCorrelationCompletionWaiter, OnChannelCorrelationComplete, this.onSendFailure);
            }
 
            context.ScheduleActivity(this.openChannelAndSendMessage, OnClientSendComplete, this.onSendFailure);
        }
 
        void OnClientSendComplete(NativeActivityContext context, ActivityInstance completedInstance)
        {
            SendMessageInstance instance = GetSendMessageInstance(context);
 
            if (instance.CorrelationSynchronizer == null || instance.CorrelationSynchronizer.NotifySendComplete())
            {
                // Either there was no correlation or the send completed
                // after the correlation processing so we need to do the
                // finalize
                FinalizeSendMessageCore(instance);
            }
        }
 
        Message InitializeCorrelations(NativeActivityContext context, SendMessageInstance instance)
        {
            if (instance.CorrelationKeyCalculator != null)
            {
                // first setup the key-based correlations, pass in the Correlation Initialiers and the AmbientHandle 
                // for associating the keys. 
                // For content based correlation, we will never initalize correlation with a selectHandle.It has to be either specified in a CorrelationInitalizer 
                // or should be an ambient handle
                // For contextbased correlation, selecthandle will be callbackHandle in case of Send and contextHandle in case of sendReply
                instance.RequestOrReply = MessagingActivityHelper.InitializeCorrelationHandles(context,
                    instance.ContextBasedCorrelationHandle, instance.AmbientHandle, this.correlationInitializers,
                    instance.CorrelationKeyCalculator, instance.RequestOrReply);
            }
 
            // then setup any channel based correlations as necessary
            // 
            if (instance.RequestContext != null)
            {
                // first check for an explicit association
                CorrelationHandle requestReplyCorrelationHandle = instance.GetExplicitRequestReplyCorrelationHandle(context, this.correlationInitializers);
                if (requestReplyCorrelationHandle != null)
                {
                    if (!requestReplyCorrelationHandle.TryRegisterRequestContext(context, instance.RequestContext))
                    {
                        throw FxTrace.Exception.AsError(new InvalidOperationException(SR.TryRegisterRequestContextFailed));
                    }
                }
                else // if that fails, use the ambient handle. We do not use the CorrelatesWith handle for RequestReply correlation
                {
                    if (!this.IsOneWay)
                    {
                        // we have already validated this in SendMessageInstanceConstructor, just assert here
                        Fx.Assert(instance.AmbientHandle != null, "For two way send we need to have either a RequestReply correlation handle or an ambient handle");
                        if (!instance.AmbientHandle.TryRegisterRequestContext(context, instance.RequestContext))
                        {
                            throw FxTrace.Exception.AsError(new InvalidOperationException(SR.TryRegisterRequestContextFailed));
                        }
                    }
                }
            }
 
            return instance.RequestOrReply;
        }
 
        void FinalizeSendMessageCore(SendMessageInstance instance)
        {
            Exception completionException = instance.GetCompletionException();
 
            if (completionException != null)
            {
                throw FxTrace.Exception.AsError(completionException);
            }
        }
 
        class OpenChannelFactory : AsyncCodeActivity
        {
            public OpenChannelFactory()
            {
            }
 
            public InArgument<VolatileSendMessageInstance> Instance
            {
                get;
                set;
            }
 
            protected override void CacheMetadata(CodeActivityMetadata metadata)
            {
                RuntimeArgument instanceArgument = new RuntimeArgument("Instance", typeof(VolatileSendMessageInstance), ArgumentDirection.In);
                if (this.Instance == null)
                {
                    this.Instance = new InArgument<VolatileSendMessageInstance>();
                }
                metadata.Bind(this.Instance, instanceArgument);
 
                metadata.SetArgumentsCollection(
                    new Collection<RuntimeArgument>
                {
                    instanceArgument
                });
            }
 
            protected override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state)
            {
                VolatileSendMessageInstance volatileInstance = this.Instance.Get(context);
 
                return new OpenChannelFactoryAsyncResult(volatileInstance.Instance, callback, state);
            }
 
            protected override void EndExecute(AsyncCodeActivityContext context, IAsyncResult result)
            {
                OpenChannelFactoryAsyncResult.End(result);
            }
 
            class OpenChannelFactoryAsyncResult : AsyncResult
            {
                static AsyncCompletion channelFactoryOpenCompletion = new AsyncCompletion(ChannelFactoryOpenCompletion);
 
                SendMessageInstance instance;
 
                public OpenChannelFactoryAsyncResult(SendMessageInstance instance, AsyncCallback callback, object state)
                    : base(callback, state)
                {
                    this.instance = instance;
                    bool completeSelf = false;
 
                    if (this.instance.FactoryReference.NeedsOpen)
                    {
                        IAsyncResult result = this.instance.FactoryReference.BeginOpen(PrepareAsyncCompletion(channelFactoryOpenCompletion), this);
                        if (result.CompletedSynchronously)
                        {
                            completeSelf = OnNewChannelFactoryOpened(result);
                        }
                    }
                    else
                    {
                        completeSelf = true;
                    }
 
                    if (completeSelf)
                    {
                        Complete(true);
                    }
                }
 
                public static void End(IAsyncResult result)
                {
                    AsyncResult.End<OpenChannelFactoryAsyncResult>(result);
                }
 
                static bool ChannelFactoryOpenCompletion(IAsyncResult result)
                {
                    OpenChannelFactoryAsyncResult thisPtr = (OpenChannelFactoryAsyncResult)result.AsyncState;
                    return thisPtr.OnNewChannelFactoryOpened(result);
                }
 
                bool OnNewChannelFactoryOpened(IAsyncResult result)
                {
                    ObjectCacheItem<ChannelFactoryReference> newCacheItem =
                        this.instance.FactoryReference.EndOpen(result, this.instance.FactoryCache);
                    this.instance.RegisterNewCacheItem(newCacheItem);
 
                    return true;
                }
 
            }
        }
 
        class OpenChannelAndSendMessage : AsyncCodeActivity
        {
            public OpenChannelAndSendMessage()
            {
            }
 
            public InArgument<VolatileSendMessageInstance> Instance
            {
                get;
                set;
            }
 
            public InternalSendMessage InternalSendMessage
            {
                get;
                set;
            }
 
            protected override void CacheMetadata(CodeActivityMetadata metadata)
            {
                RuntimeArgument instanceArgument = new RuntimeArgument("Instance", typeof(VolatileSendMessageInstance), ArgumentDirection.In);
                if (this.Instance == null)
                {
                    this.Instance = new InArgument<VolatileSendMessageInstance>();
                }
                metadata.Bind(this.Instance, instanceArgument);
                metadata.AddArgument(instanceArgument);
            }
 
            protected override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state)
            {
                VolatileSendMessageInstance volatileInstance = this.Instance.Get(context);
                Transaction transaction = null;
 
                RuntimeTransactionHandle handle = context.GetProperty<RuntimeTransactionHandle>();
                if (handle != null)
                {
                    transaction = handle.GetCurrentTransaction(context);
                }
 
                return new OpenChannelAndSendMessageAsyncResult(InternalSendMessage, volatileInstance.Instance, transaction, callback, state);
            }
 
            protected override void EndExecute(AsyncCodeActivityContext context, IAsyncResult result)
            {
                OpenChannelAndSendMessageAsyncResult.End(result);
            }
 
            class OpenChannelAndSendMessageAsyncResult : TransactedAsyncResult
            {
                static AsyncCompletion onChannelOpened = new AsyncCompletion(OnChannelOpened);
                static AsyncCompletion onChannelSendComplete = new AsyncCompletion(OnChannelSendComplete);
                static AsyncCallback onChannelReceiveReplyCompleted = Fx.ThunkCallback(OnChannelReceiveReplyComplete);
 
                SendMessageInstance instance;
                InternalSendMessage internalSendMessage;
                IChannel channel;
                Transaction currentTransactionContext;
                Guid ambientActivityId;
 
                //This is used to create a blocking dependent clone to synchronize the transaction commit processing with the completion of the aborting clone
                //that is created in this async result.
                DependentTransaction dependentClone;
 
                public OpenChannelAndSendMessageAsyncResult(InternalSendMessage internalSendMessage, SendMessageInstance instance, Transaction currentTransactionContext, AsyncCallback callback, object state)
                    : base(callback, state)
                {
                    this.internalSendMessage = internalSendMessage;
                    this.instance = instance;
                    this.channel = this.instance.ClientSendChannel;
                    this.currentTransactionContext = currentTransactionContext;
                    
                    bool completeSelf = false;
 
                    //channel is still in created state, we need to open it
                    if (this.channel.State == CommunicationState.Created)
                    {
                        // Disable ContextManager before channel is opened
                        IContextManager contextManager = this.channel.GetProperty<IContextManager>();
                        if (contextManager != null)
                        {
                            contextManager.Enabled = false;
                        }
 
                        IAsyncResult result = this.channel.BeginOpen(PrepareAsyncCompletion(onChannelOpened), this);
                        if (result.CompletedSynchronously)
                        {
                            completeSelf = OnChannelOpened(result);
                        }
                    }
                    else
                    {
                        // channel already opened & retrieved from cache
                        // we don't have to do anything with ChannelOpen
                        completeSelf = BeginSendMessage();
                    }
 
                    if (completeSelf)
                    {
                        Complete(true);
                    }
                }
 
                public static void End(IAsyncResult result)
                {
                    AsyncResult.End<OpenChannelAndSendMessageAsyncResult>(result);
                }
 
                static bool OnChannelOpened(IAsyncResult result)
                {
                    OpenChannelAndSendMessageAsyncResult thisPtr = (OpenChannelAndSendMessageAsyncResult)result.AsyncState;
                    thisPtr.channel.EndOpen(result);
                    return thisPtr.BeginSendMessage();
                }
 
                bool BeginSendMessage()
                {
                    IAsyncResult result = null;
                    bool requestSucceeded = false;
                    OperationContext oldContext = OperationContext.Current;
                    bool asyncSend = !this.internalSendMessage.IsOneWay;
 
                    try
                    {
                        OperationContext.Current = this.instance.OperationContext;
 
                        if (TraceUtility.MessageFlowTracingOnly)
                        {
                            //set the E2E activity ID
                            DiagnosticTraceBase.ActivityId = this.instance.E2EActivityId;
                        }
 
                        using (PrepareTransactionalCall(this.currentTransactionContext))
                        {
                            if (asyncSend)
                            {
                                //If there is a transaction that we could be flowing out then we create this blocking clone to sync with the commit processing.
                                if (this.currentTransactionContext != null)
                                {
                                    this.dependentClone = this.currentTransactionContext.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
                                }
 
                                this.instance.RequestContext.EnsureAsyncWaitHandle();
 
                                result = ((IRequestChannel)this.channel).BeginRequest(this.instance.RequestOrReply, onChannelReceiveReplyCompleted, this);
                                if (result.CompletedSynchronously)
                                {
                                    Message reply = ((IRequestChannel)this.channel).EndRequest(result);
                                    this.instance.RequestContext.ReceiveReply(this.instance.OperationContext, reply);
                                }
                            }
                            else
                            {
                                result = ((IOutputChannel)this.channel).BeginSend(this.instance.RequestOrReply, PrepareAsyncCompletion(onChannelSendComplete), this);
                                if (result.CompletedSynchronously)
                                {
                                    ((IOutputChannel)this.channel).EndSend(result);
                                }
                            }
 
                            requestSucceeded = true;
                        }
                    }
                    finally
                    {
                        OperationContext.Current = oldContext;
 
                        if (!requestSucceeded)
                        {
                            //if we did not succeed, complete the blocking clone anyway if we created it
                            if (this.dependentClone != null)
                            {
                                this.dependentClone.Complete();
                                this.dependentClone = null;
                            }
                            this.channel.Abort();
                        }
 
                        if (result != null && result.CompletedSynchronously)
                        {
                            //if we are done synchronously, we need to complete a blocking dependent clone if we created one (asyncSend case)
                            if (this.dependentClone != null)
                            {
                                this.dependentClone.Complete();
                                this.dependentClone = null;
                            }
                            this.internalSendMessage.CleanupResources(this.instance);
                        }
                    }
 
                    if (asyncSend)
                    {
                        return true;
                    }
                    else
                    {
                        return SyncContinue(result);
                    }
                }
 
                static void OnChannelReceiveReplyComplete(IAsyncResult result)
                {
                    if (result.CompletedSynchronously)
                    {
                        return;
                    }
 
                    OpenChannelAndSendMessageAsyncResult thisPtr = (OpenChannelAndSendMessageAsyncResult)result.AsyncState;
 
                    OperationContext oldContext = OperationContext.Current;
 
                    Message reply = null;
                    bool requestSucceeded = false;
 
                    try
                    {
                        OperationContext.Current = thisPtr.instance.OperationContext;
 
                        thisPtr.TraceActivityData();
 
                        System.Transactions.TransactionScope scope = TransactionHelper.CreateTransactionScope(thisPtr.currentTransactionContext);
                        try
                        {
                            Fx.Assert(thisPtr.channel is IRequestChannel, "Channel must be of IRequestChannel type!");
 
                            reply = ((IRequestChannel)thisPtr.channel).EndRequest(result);
 
                            //
                            thisPtr.instance.RequestContext.ReceiveAsyncReply(thisPtr.instance.OperationContext, reply, null);
 
                            requestSucceeded = true;
                        }
                        finally
                        {
                            TransactionHelper.CompleteTransactionScope(ref scope);
                        }
                    }
                    catch (Exception exception)
                    {
                        if (Fx.IsFatal(exception))
                        {
                            throw;
                        }
 
                        thisPtr.instance.RequestContext.ReceiveAsyncReply(thisPtr.instance.OperationContext, null, exception);
                    }
                    finally
                    {
                        //Complete the blocking dependent clone created before the async call was made.
                        if (thisPtr.dependentClone != null)
                        {
                            thisPtr.dependentClone.Complete();
                            thisPtr.dependentClone = null;
                        }
 
                        OperationContext.Current = oldContext;
 
                        if (!requestSucceeded)
                        {
                            thisPtr.channel.Abort();
                        }
 
                        thisPtr.internalSendMessage.CleanupResources(thisPtr.instance);
                    }
                }
 
                static bool OnChannelSendComplete(IAsyncResult result)
                {
                    if (result.CompletedSynchronously)
                    {
                        return true;
                    }
 
                    OpenChannelAndSendMessageAsyncResult thisPtr = (OpenChannelAndSendMessageAsyncResult)result.AsyncState;
 
                    OperationContext oldContext = OperationContext.Current;
 
                    try
                    {
                        OperationContext.Current = thisPtr.instance.OperationContext;
 
                        thisPtr.TraceActivityData();
 
                        System.Transactions.TransactionScope scope = TransactionHelper.CreateTransactionScope(thisPtr.currentTransactionContext);
                        try
                        {
                            Fx.Assert(thisPtr.channel is IOutputChannel, "Channel must be of IOutputChannel type!");
 
                            ((IOutputChannel)thisPtr.channel).EndSend(result);
                        }
                        finally
                        {
                            TransactionHelper.CompleteTransactionScope(ref scope);
                        }
                    }
                    catch (Exception exception)
                    {
                        if (Fx.IsFatal(exception))
                        {
                            throw;
                        }
 
                        // stash away the exception to be retrieved in FinalizeSendMessageCore
                        thisPtr.instance.RequestContext.Exception = exception;
                    }
                    finally
                    {
                        OperationContext.Current = oldContext;
                        thisPtr.internalSendMessage.CleanupResources(thisPtr.instance);
                    }
 
                    return true;
                }
 
                void TraceActivityData()
                {
                    if (TraceUtility.ActivityTracing)
                    {
                        if (TD.StopSignpostEventIsEnabled())
                        {
                            TD.StopSignpostEvent(new DictionaryTraceRecord(new Dictionary<string, string>(3) {
                                                    { MessagingActivityHelper.ActivityName, this.instance.Activity.DisplayName },
                                                    { MessagingActivityHelper.ActivityType, MessagingActivityHelper.MessagingActivityTypeActivityExecution },
                                                    { MessagingActivityHelper.ActivityInstanceId, this.instance.ActivityInstanceId }
                                }));
                        }
                        FxTrace.Trace.SetAndTraceTransfer(this.ambientActivityId, true);
                        this.ambientActivityId = Guid.Empty;
                    }
                    if (TD.WfMessageSentIsEnabled())
                    {
                        // 
                        EventTraceActivity eta = new EventTraceActivity();
                        if (this.instance.E2EActivityId != Guid.Empty)
                        {
                            eta.SetActivityId(this.instance.E2EActivityId);
                        }
                        TD.WfMessageSent(eta, this.ambientActivityId);
                    }
                }
            }
        }
 
        class WaitOnChannelCorrelation : AsyncCodeActivity
        {
            public WaitOnChannelCorrelation()
            {
            }
 
            public InArgument<VolatileSendMessageInstance> Instance
            {
                get;
                set;
            }
 
            protected override void CacheMetadata(CodeActivityMetadata metadata)
            {
                RuntimeArgument instanceArgument = new RuntimeArgument("Instance", typeof(VolatileSendMessageInstance), ArgumentDirection.In);
                if (this.Instance == null)
                {
                    this.Instance = new InArgument<VolatileSendMessageInstance>();
                }
                metadata.Bind(this.Instance, instanceArgument);
 
                metadata.SetArgumentsCollection(
                    new Collection<RuntimeArgument>
                {
                    instanceArgument
                });
            }
 
            protected override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state)
            {
                VolatileSendMessageInstance volatileInstance = this.Instance.Get(context);
 
                Fx.Assert(volatileInstance.Instance != null, "This should not have gone through a persistence episode yet.");
 
                return new WaitOnChannelCorrelationAsyncResult(volatileInstance.Instance.CorrelationSynchronizer, callback, state);
            }
 
            protected override void EndExecute(AsyncCodeActivityContext context, IAsyncResult result)
            {
                WaitOnChannelCorrelationAsyncResult.End(result);
            }
 
            class WaitOnChannelCorrelationAsyncResult : AsyncResult
            {
                CorrelationSynchronizer synchronizer;
 
                public WaitOnChannelCorrelationAsyncResult(CorrelationSynchronizer synchronizer, AsyncCallback callback, object state)
                    : base(callback, state)
                {
                    this.synchronizer = synchronizer;
 
                    if (synchronizer.IsChannelWorkComplete)
                    {
                        Complete(true);
                    }
                    else
                    {
                        if (synchronizer.SetWorkflowNotificationCallback(new Action(OnChannelCorrelationComplete)))
                        {
                            // The bool flipped just before we set the action so
                            // we're actually complete.  The contract is that the
                            // action will never be raised if Set returns true.
                            Complete(true);
                        }
                    }
                }
 
                public static void End(IAsyncResult result)
                {
                    AsyncResult.End<WaitOnChannelCorrelationAsyncResult>(result);
                }
 
                void OnChannelCorrelationComplete()
                {
                    Complete(false);
                }
            }
        }
 
       internal class CorrelationSynchronizer
        {
            Action onRequestSetByChannel;
            Action<Message> onWorkflowCorrelationProcessingComplete;
            object thisLock;
            Completion completion;
 
            public CorrelationSynchronizer()
            {
                this.thisLock = new object();
            }
 
            public bool IsChannelWorkComplete
            {
                get;
                private set;
            }
 
            public Message UpdatedMessage
            {
                get;
                private set;
            }
 
            public void NotifyRequestSetByChannel(Action<Message> onWorkflowCorrelationProcessingComplete)
            {
                Fx.Assert(onWorkflowCorrelationProcessingComplete != null, "Must have a non-null callback.");
                Action toCall = null;
 
                lock (this.thisLock)
                {
                    this.IsChannelWorkComplete = true;
                    this.onWorkflowCorrelationProcessingComplete = onWorkflowCorrelationProcessingComplete;
 
                    toCall = this.onRequestSetByChannel;
                }
 
                if (toCall != null)
                {
                    toCall();
                }
            }
 
            public void NotifyMessageUpdatedByWorkflow(Message message)
            {
                this.UpdatedMessage = message;
            }
 
            public bool NotifyWorkflowCorrelationProcessingComplete()
            {
                Fx.Assert(this.onWorkflowCorrelationProcessingComplete != null, "This must be set before this can be called.");
 
                bool result = false;
 
                lock (this.thisLock)
                {
                    if (this.completion == Completion.SendComplete)
                    {
                        // The send has already completed so we are responsible for
                        // making sure FinalizeSendMessage is called.
                        result = true;
                    }
                    else
                    {
                        Fx.Assert(this.completion == Completion.None, "We should be the first one to complete.");
 
                        this.completion = Completion.CorrelationComplete;
                    }
                }
 
                this.onWorkflowCorrelationProcessingComplete(this.UpdatedMessage);
 
                return result;
            }
 
            public bool NotifySendComplete()
            {
                bool result = false;
                lock (this.thisLock)
                {
                    if (this.completion == Completion.CorrelationComplete)
                    {
                        // The correlation has already finished so we are responsible for
                        // making sure that FinalizeSendMessage is called.
                        result = true;
                    }
                    else
                    {
                        Fx.Assert(this.completion == Completion.None, "We should be the first one to complete.");
 
                        this.completion = Completion.SendComplete;
                    }
                }
 
                return result;
            }
 
            // Returns true if the channel work is actually done.  If this
            // returns true then the passed in Action will never be called.
            public bool SetWorkflowNotificationCallback(Action onRequestSetByChannel)
            {
                Fx.Assert(onRequestSetByChannel != null, "Must have a non-null callback.");
 
                bool result = false;
                lock (this.thisLock)
                {
                    result = this.IsChannelWorkComplete;
                    this.onRequestSetByChannel = onRequestSetByChannel;
                }
 
                return result;
            }
 
            // This three state enum allows us to determine whether
            // we are the first or second code path.  The second
            // code path needs finalize the send.
            enum Completion
            {
                None,
                SendComplete,
                CorrelationComplete
            }
        }
 
        // This class defines the instance data that used to store intermediate states
        // during the volatile async operation of sending a message.
        internal class SendMessageInstance
        {
            CorrelationHandle explicitChannelCorrelationHandle;
            IList<ISendMessageCallback> sendMessageCallbacks;
            ChannelFactoryReference factoryReference;
            ObjectCacheItem<ChannelFactoryReference> cacheItem;
            ObjectCache<FactoryCacheKey, ChannelFactoryReference> factoryCache;
            
            readonly InternalSendMessage parent;
            bool isUsingCacheFromExtension;
 
            // needed so that we can return our ClientSendChannel to the pool under Dispose
            ObjectCacheItem<Pool<IChannel>> clientChannelPool;
 
            public SendMessageInstance(InternalSendMessage parent, NativeActivityContext context)
            {
                this.parent = parent;
 
                // setup both our following state as well as any anonymous response information
                CorrelationHandle correlatesWith = (parent.CorrelatesWith == null) ? null : parent.CorrelatesWith.Get(context);
                if (correlatesWith != null && !correlatesWith.IsInitalized())
                {
                    // if send or sendReply has a correlatesWith, it should always be initialized with either content or with callbackcontext, context or 
                    // ResponseContext
                    throw FxTrace.Exception.AsError(new ValidationException(SR.SendWithUninitializedCorrelatesWith(this.parent.OperationName ?? string.Empty)));
                }
 
                if (correlatesWith == null)
                {
                    this.AmbientHandle = context.Properties.Find(CorrelationHandle.StaticExecutionPropertyName) as CorrelationHandle;
                    correlatesWith = this.AmbientHandle;
                }
 
                this.CorrelatesWith = correlatesWith;
 
                if (!parent.IsSendReply)
                {
                    // we're a client-side request
 
                    // Validate correlation handle
                    CorrelationHandle requestReplyCorrelationHandle = GetExplicitRequestReplyCorrelationHandle(context, parent.correlationInitializers);
                    if (parent.IsOneWay)
                    {
                        if (requestReplyCorrelationHandle != null)
                        {
                            // this is a one-way send , we should not have a RequestReply Correlation initializer
                            throw FxTrace.Exception.AsError(new InvalidOperationException(SR.RequestReplyHandleShouldNotBePresentForOneWay));
 
                        }
                    }
                    else // two-way send
                    {
                        if (requestReplyCorrelationHandle == null && this.AmbientHandle == null)
                        {
                            this.AmbientHandle = context.Properties.Find(CorrelationHandle.StaticExecutionPropertyName) as CorrelationHandle;
                            if (this.AmbientHandle == null)
                            {
                                // we neither have a channelHandle nor an ambientHandle
                                throw FxTrace.Exception.AsError(new InvalidOperationException(
                                    SR.SendMessageNeedsToPairWithReceiveMessageForTwoWayContract(parent.OperationName ?? string.Empty)));
                            }
                        }
                    }
 
                    // Formatter and OperationContract should be  removed from CorrelationRequestContext
                    // This will be done when SendMessage/ReceiveMessage is completely removed from the code base
                    this.RequestContext = new CorrelationRequestContext();
 
                    // callback correlationHandle is used for initalizing context based correlation 
                    this.ContextBasedCorrelationHandle = CorrelationHandle.GetExplicitCallbackCorrelation(context, parent.correlationInitializers);
 
                    // by default we use the channel factory cache from the extension
                    isUsingCacheFromExtension = true;
                }
                else
                {
                    // we are a server-side following send
                    CorrelationResponseContext responseContext;
                    if (correlatesWith == null || !correlatesWith.TryAcquireResponseContext(context, out responseContext))
                    {
                        throw FxTrace.Exception.AsError(new InvalidOperationException(SR.CorrelatedContextRequiredForAnonymousSend));
                    }
 
                    // Contract inference logic should validate that the Receive and Following send do not have conflicting data(e.g., OperationName)
 
                    this.ResponseContext = responseContext;
 
                    // in case of Context based correlation, we use context handle to initialize correlation
                    this.ContextBasedCorrelationHandle = CorrelationHandle.GetExplicitContextCorrelation(context, parent.correlationInitializers);
                }
 
                this.sendMessageCallbacks = MessagingActivityHelper.GetCallbacks<ISendMessageCallback>(context.Properties);
 
                if (TraceUtility.MessageFlowTracing)
                {
                    this.ActivityInstanceId = context.ActivityInstanceId;
                }
            }
 
            public InternalSendMessage Activity
            {
                get
                {
                    return this.parent;
                }
            }
 
            public CorrelationHandle CorrelatesWith
            {
                get;
                private set;
            }
 
            public CorrelationHandle AmbientHandle
            {
                get;
                private set;
            }
 
            public CorrelationHandle ContextBasedCorrelationHandle
            {
                get;
                private set;
            }
 
            public EndpointAddress EndpointAddress
            {
                get;
                set;
            }
 
            public IChannel ClientSendChannel
            {
                get;
                private set;
            }
 
            public CorrelationSynchronizer CorrelationSynchronizer
            {
                get;
                set;
            }
 
            public Message RequestOrReply
            {
                get;
                set;
            }
 
            public OperationContext OperationContext
            {
                get;
                set;
            }
 
            public CorrelationRequestContext RequestContext
            {
                get;
                private set;
            }
 
            // This is required for setting adding the ChannelFactory to the cache once it is opened
            public ObjectCache<FactoryCacheKey, ChannelFactoryReference> FactoryCache
            {
                get
                {
                    return this.factoryCache;
                }
            }
 
            // This is required for setting adding the ChannelFactory to the cache once it is opened
            public SendMessageChannelCache CacheExtension
            {
                get;
                set;
            }
 
            //This is required for returning it to the cache after use
            public ChannelFactoryReference FactoryReference
            {
                get
                {
                    return this.factoryReference;
                }
            }
 
            public CorrelationResponseContext ResponseContext
            {
                get;
                private set;
            }
 
            public CorrelationKeyCalculator CorrelationKeyCalculator
            {
                get;
                private set;
            }
 
            public CorrelationCallbackContext CorrelationCallbackContext
            {
                get;
                set;
            }
 
            public CorrelationContext CorrelationContext
            {
                get;
                set;
            }
 
            public Guid AmbientActivityId
            {
                get;
                set;
            }
 
            public ICollection<string> CorrelationSendNames
            {
                get;
                private set;
            }
 
            public Guid E2EActivityId
            {
                get;
                set;
            }
 
            public string ActivityInstanceId
            {
                get;
                private set;
            }
 
            public bool IsCorrelationInitialized
            {
                get;
                set;
            }
 
            public void SetupFactoryReference(ObjectCacheItem<ChannelFactoryReference> cacheItem, ChannelFactoryReference newFactoryReference, ObjectCache<FactoryCacheKey, ChannelFactoryReference> factoryCache)
            {
                this.factoryCache = factoryCache;
                if (this.factoryCache == null)
                {
                    isUsingCacheFromExtension = false;
                }
                if (cacheItem != null)
                {
                    // we found the item in our cache
                    Fx.Assert(newFactoryReference == null, "need one of cacheItem or newFactoryReference");
                    Fx.Assert(cacheItem.Value != null, "should have valid value");
                    this.cacheItem = cacheItem;
                    this.factoryReference = cacheItem.Value;
                }
                else
                {
                    Fx.Assert(newFactoryReference != null, "need one of cacheItem or newFactoryReference");
                    this.factoryReference = newFactoryReference;
                }
            }
 
            public void RegisterNewCacheItem(ObjectCacheItem<ChannelFactoryReference> newCacheItem)
            {
                Fx.Assert(this.cacheItem == null, "should only be called for new cache items");
                this.cacheItem = newCacheItem;
            }
 
            public CorrelationHandle GetExplicitRequestReplyCorrelationHandle(NativeActivityContext context, Collection<CorrelationInitializer> additionalCorrelations)
            {
                if (this.explicitChannelCorrelationHandle == null)
                {
                    this.explicitChannelCorrelationHandle = CorrelationHandle.GetExplicitRequestReplyCorrelation(context, additionalCorrelations);
                }
                return this.explicitChannelCorrelationHandle;
            }
 
            public void RegisterCorrelationBehavior(CorrelationQueryBehavior correlationBehavior)
            {
                Fx.Assert(correlationBehavior != null, "caller must verify");
                if (correlationBehavior.ScopeName != null)
                {
                    CorrelationKeyCalculator keyCalculator = correlationBehavior.GetKeyCalculator();
                    if (keyCalculator != null)
                    {
                        this.CorrelationKeyCalculator = keyCalculator;
                        if (this.RequestContext != null)
                        {
                            this.RequestContext.CorrelationKeyCalculator = keyCalculator;
                            // for requests, determine if we should be using the correlation callback
                            if (correlationBehavior.SendNames != null && correlationBehavior.SendNames.Count > 0)
                            {
                                this.CorrelationSendNames = correlationBehavior.SendNames;
                            }
                        }
                    }
                }
            }
 
            public void ProcessMessagePropertyCallbacks()
            {
                if (this.sendMessageCallbacks != null)
                {
                    foreach (ISendMessageCallback sendMessageCallback in this.sendMessageCallbacks)
                    {
                        sendMessageCallback.OnSendMessage(this.OperationContext);
                    }
                }
            }
 
            public void PopulateClientChannel()
            {
                Fx.Assert(this.ClientSendChannel == null && this.clientChannelPool == null, "should only be called once per instance");
                this.ClientSendChannel = this.FactoryReference.TakeChannel(this.EndpointAddress, out this.clientChannelPool);
            }
 
            public void Dispose()
            {
                if (this.ClientSendChannel != null)
                {
                    Fx.Assert(this.FactoryReference != null, "Must have a factory reference.");
                    this.FactoryReference.ReturnChannel(this.ClientSendChannel, this.clientChannelPool);
                    this.ClientSendChannel = null;
                    this.clientChannelPool = null;
                }
 
                if (this.cacheItem != null)
                {
                    this.cacheItem.ReleaseReference();
 
                    // if we are using the FactoryCache from the extension, store the last used cacheItem and extension
                    if (this.isUsingCacheFromExtension)
                    {
                        this.parent.lastUsedFactoryCacheItem = new KeyValuePair<ObjectCacheItem<ChannelFactoryReference>, SendMessageChannelCache>(this.cacheItem, this.CacheExtension);
                    }
                    this.cacheItem = null;
                }
            }
 
            public Exception GetCompletionException()
            {
                if (this.RequestContext != null)
                {
                    // We got an exception trying to send message or receive a reply
                    // Scenario: ContractFilterMismatch at serverside if the message action is not matched correctly
                    return this.RequestContext.Exception;
                }
                else
                {
                    return this.ResponseContext.Exception;
                }
            }
        }
 
        class MessageCorrelationCallbackMessageProperty : CorrelationCallbackMessageProperty
        {
            public MessageCorrelationCallbackMessageProperty(ICollection<string> neededData, SendMessageInstance instance)
                : base(neededData)
            {
                this.Instance = instance;
            }
 
            protected MessageCorrelationCallbackMessageProperty(MessageCorrelationCallbackMessageProperty callback)
                : base(callback)
            {
                this.Instance = callback.Instance;
            }
 
            public SendMessageInstance Instance
            {
                get;
                private set;
            }
 
            public override IMessageProperty CreateCopy()
            {
                return new MessageCorrelationCallbackMessageProperty(this);
            }
 
            protected override IAsyncResult OnBeginFinalizeCorrelation(Message message, TimeSpan timeout, AsyncCallback callback, object state)
            {
                return new FinalizeCorrelationAsyncResult(this, message, callback, state);
            }
 
            protected override Message OnEndFinalizeCorrelation(IAsyncResult result)
            {
                return FinalizeCorrelationAsyncResult.End(result);
            }
 
            protected override Message OnFinalizeCorrelation(Message message, TimeSpan timeout)
            {
                return OnEndFinalizeCorrelation(OnBeginFinalizeCorrelation(message, timeout, null, null));
            }
 
            class FinalizeCorrelationAsyncResult : AsyncResult
            {
                Message message;
                Completion completion;
 
                object thisLock;
 
                public FinalizeCorrelationAsyncResult(MessageCorrelationCallbackMessageProperty property, Message message,
                    AsyncCallback callback, object state)
                    : base(callback, state)
                {
                    bool completeSelf = false;
                    if (property.Instance.IsCorrelationInitialized)
                    {
                        // we do not modify the message since correlation is not calculated again
                        this.message = message;
                        completeSelf = true;
                    }
                    else
                    {
                        property.Instance.IsCorrelationInitialized = true;
                        this.thisLock = new object();
 
                        property.Instance.RequestOrReply = message;
 
                        property.Instance.CorrelationSynchronizer.NotifyRequestSetByChannel(new Action<Message>(OnWorkflowCorrelationProcessingComplete));
 
                        // We have to do this dance with the lock because
                        // we aren't sure if we've been running sync or not.
                        // NOTE: It is possible for us to go async and
                        // still decide we're completing sync.  This is fine
                        // as it does not violate the async pattern since
                        // the work is done by the time Begin completes.
                        completeSelf = false;
 
                        lock (this.thisLock)
                        {
                            if (completion == Completion.WorkflowCorrelationProcessingComplete)
                            {
                                completeSelf = true;
                            }
                            else
                            {
                                Fx.Assert(this.completion == Completion.None, "We must be not ready then.");
 
                                this.completion = Completion.ConstructorComplete;
                            }
                        }
                    }
                    if (completeSelf)
                    {
                        Complete(true);
                    }
 
                }
 
                void OnWorkflowCorrelationProcessingComplete(Message updatedMessage)
                {
                    this.message = updatedMessage;
 
                    // We have to do this dance with the lock because
                    // we aren't sure if we've been running sync or not.
                    // NOTE: It is possible for us to go async and
                    // still decide we're completing sync.  This is fine
                    // as it does not violate the async pattern since
                    // the work is done by the time Begin completes.
                    bool completeSelf = false;
 
                    lock (this.thisLock)
                    {
                        if (this.completion == Completion.ConstructorComplete)
                        {
                            completeSelf = true;
                        }
                        else
                        {
                            Fx.Assert(this.completion == Completion.None, "We must be not ready then.");
 
                            this.completion = Completion.WorkflowCorrelationProcessingComplete;
                        }
                    }
 
                    if (completeSelf)
                    {
                        Complete(false);
                    }
                }
 
                public static Message End(IAsyncResult result)
                {
                    FinalizeCorrelationAsyncResult thisPtr = AsyncResult.End<FinalizeCorrelationAsyncResult>(result);
                    return thisPtr.message;
                }
 
                // This three state enum allows us to determine whether
                // we are the first or second code path.  The second
                // code path needs to complete the async result.
                enum Completion
                {
                    None,
                    ConstructorComplete,
                    WorkflowCorrelationProcessingComplete
                }
            }
        }
 
        [DataContract]
        internal class VolatileSendMessageInstance
        {
            public VolatileSendMessageInstance()
            {
            }
 
            // Note that we do not mark this DataMember since we don’t want it to be serialized
            public SendMessageInstance Instance { get; set; }
        }
 
        // Represents an item in our object cache. Stores a ChannelFactory and an associated pool of channels
        internal sealed class ChannelFactoryReference : IDisposable
        {
            static AsyncCallback onDisposeCommunicationObject = Fx.ThunkCallback(new AsyncCallback(OnDisposeCommunicationObject));
            Action<Pool<IChannel>> disposeChannelPool;
            readonly FactoryCacheKey factoryKey;
            readonly ServiceEndpoint targetEndpoint;
            ChannelFactory channelFactory;
            ObjectCache<EndpointAddress, Pool<IChannel>> channelCache;
            CorrelationQueryBehavior correlationQueryBehavior;
            Func<Pool<IChannel>> createChannelCacheItem;
 
            // Aborting a channel that is in the middle of closing can cause an ObjectDisposedException in the Close.
            // We need to prevent DisposeCommunicationObject(ChannelFactory) from racing with a call to 
            // DisposeCommunicationObject()on an individual channel.
            // This lock will be used to synchronize calls into DisposeCommunicationObject method.
            object disposeLock = new object();
 
            public ChannelFactoryReference(FactoryCacheKey factoryKey, ServiceEndpoint targetEndpoint, ChannelCacheSettings channelCacheSettings)
            {
                Fx.Assert(channelCacheSettings != null, " channelCacheSettings should not be null");
                Fx.Assert(factoryKey != null, " factoryKey should not be null");
                Fx.Assert(targetEndpoint != null, " targetEndpoint should not be null");
 
                this.factoryKey = factoryKey;
                this.targetEndpoint = targetEndpoint;
                                
                if (factoryKey.IsOperationContractOneWay)
                {
                    this.channelFactory = new ChannelFactory<IOutputChannel>(targetEndpoint);
                }
                else
                {
                    this.channelFactory = new ChannelFactory<IRequestChannel>(targetEndpoint);
                }
 
                this.channelFactory.UseActiveAutoClose = true;
                this.channelFactory.Credentials.Windows.AllowedImpersonationLevel = factoryKey.TokenImpersonationLevel;
 
                ObjectCacheSettings channelSettings = new ObjectCacheSettings
                {
                    CacheLimit = channelCacheSettings.MaxItemsInCache,
                    IdleTimeout = channelCacheSettings.IdleTimeout,
                    LeaseTimeout = channelCacheSettings.LeaseTimeout
                };
 
                this.disposeChannelPool = new Action<Pool<IChannel>>(this.DisposeChannelPool);
 
                // our channel cache is keyed solely on endpoint since we don't allow the via to be dynamic
                // for a ChannelFactoryReference
                this.channelCache = new ObjectCache<EndpointAddress, Pool<IChannel>>(channelSettings)
                {
                    DisposeItemCallback = this.disposeChannelPool
                };
                this.createChannelCacheItem = () => new Pool<IChannel>(channelCacheSettings.MaxItemsInCache);
            }
 
            public CorrelationQueryBehavior CorrelationQueryBehavior
            {
                get
                {
                    if (this.correlationQueryBehavior == null)
                    {
                        this.correlationQueryBehavior = this.targetEndpoint.Behaviors.Find<CorrelationQueryBehavior>();
                    }
 
                    return this.correlationQueryBehavior;
                }
            }
            
            // As a perf optimization, we provide this property to avoid async result/callback creations
            public bool NeedsOpen
            {
                get
                {
                    return this.channelFactory.State == CommunicationState.Created;
                }
            }
 
            public IAsyncResult BeginOpen(AsyncCallback callback, object state)
            {
                Fx.Assert(NeedsOpen, "caller should check NeedsOpen first");
                return this.channelFactory.BeginOpen(callback, state);
            }
 
            // after open we should be added to a cache if one is provided
            public ObjectCacheItem<ChannelFactoryReference> EndOpen(IAsyncResult result, ObjectCache<FactoryCacheKey, ChannelFactoryReference> factoryCache)
            {
                this.channelFactory.EndOpen(result);
 
                ObjectCacheItem<ChannelFactoryReference> cacheItem = null;
                if (factoryCache != null)
                {
                    cacheItem = factoryCache.Add(this.factoryKey, this);
                }
 
                return cacheItem;
            }
 
            [SuppressMessage(FxCop.Category.Usage, FxCop.Rule.DisposableFieldsShouldBeDisposed,
                Justification = "disposable field is being disposed using DisposeCommunicationObject")]
            public void Dispose()
            {
                lock (this.disposeLock)
                {
                    DisposeCommunicationObject(this.channelFactory);
                }
            }
 
            public IChannel TakeChannel(EndpointAddress endpointAddress, out ObjectCacheItem<Pool<IChannel>> channelPool)
            {
                channelPool = this.channelCache.Take(endpointAddress, this.createChannelCacheItem);
                Fx.Assert(channelPool != null, "Take with delegate should always return a valid Item");
 
                IChannel result = null;
 
                lock (channelPool.Value)
                {
                    result = channelPool.Value.Take();
                }
 
                // make an effort to kill stale channels
                ServiceChannel serviceChannel = result as ServiceChannel;
                if (result != null && (result.State != CommunicationState.Opened || (serviceChannel != null && serviceChannel.Binder.Channel.State != CommunicationState.Opened)))
                {
                    result.Abort();
                    result = null;
                }
 
                if (result == null)
                {
                    Uri via = null;
 
                    // service endpoint always sets the ListenUri, which will break default callback-context behavior
                    if (this.targetEndpoint.Address != null && this.targetEndpoint.Address.Uri != this.targetEndpoint.ListenUri)
                    {
                        via = this.targetEndpoint.ListenUri;
                    }
 
                    if (this.factoryKey.IsOperationContractOneWay)
                    {
                        result = ((ChannelFactory<IOutputChannel>)this.channelFactory).CreateChannel(endpointAddress, via);
                    }
                    else
                    {
                        result = ((ChannelFactory<IRequestChannel>)this.channelFactory).CreateChannel(endpointAddress, via);
                    }
                }
 
                if (!(result is ServiceChannel))
                {
                    result = ServiceChannelFactory.GetServiceChannel(result);
                }
 
                return result;
            }
 
            public void ReturnChannel(IChannel channel, ObjectCacheItem<Pool<IChannel>> channelPool)
            {
                bool shouldDispose = channel.State != CommunicationState.Opened;
 
                // channel is in open state, try returning it to the pool
                if (!shouldDispose)
                {
                    lock (channelPool.Value)
                    {
                        shouldDispose = !channelPool.Value.Return(channel);
                    }
                }
 
                if (shouldDispose)
                {
                    lock (this.disposeLock)
                    {
                        if (this.channelFactory.State != CommunicationState.Closed &&
                            this.channelFactory.State != CommunicationState.Closing)
                        {
                            // not caching the channel, so we need to close it
                            DisposeCommunicationObject(channel);
                        }
                    }
                }
 
                // and return our cache item
                channelPool.ReleaseReference();
            }
 
            public void DisposeChannelPool(Pool<IChannel> channelPool)
            {
                IChannel channel;
 
                // we don't need to lock the Take from the Pool here since no one will be accessing this anymore
                // Dispose will be called under a lock from the ObjectCacheItem
                while ((channel = channelPool.Take()) != null)
                {
                    lock (this.disposeLock)
                    {
                        if (this.channelFactory.State != CommunicationState.Closed &&
                            this.channelFactory.State != CommunicationState.Closing)
                        {
                            DisposeCommunicationObject(channel);
                        }
                    }
                }
            }
 
            static void DisposeCommunicationObject(ICommunicationObject communicationObject)
            {
                bool success = false;
                try
                {
                    if (communicationObject.State == CommunicationState.Opened)
                    {
                        IAsyncResult result = communicationObject.BeginClose(ServiceDefaults.CloseTimeout, onDisposeCommunicationObject, communicationObject);
                        if (result.CompletedSynchronously)
                        {
                            communicationObject.EndClose(result);
                        }
                        success = true;
                    }
                }
                catch (CommunicationException)
                {
                    // expected, we'll abort
                }
                catch (TimeoutException)
                {
                    // expected, we'll abort
                }
                finally
                {
                    if (!success)
                    {
                        communicationObject.Abort();
                    }
                }
            }
 
            static void OnDisposeCommunicationObject(IAsyncResult result)
            {
                if (result.CompletedSynchronously)
                {
                    return;
                }
                ICommunicationObject communicationObject = (ICommunicationObject)result.AsyncState;
 
                bool success = false;
                try
                {
                    communicationObject.EndClose(result);
                    success = true;
                }
                catch (CommunicationException)
                {
                    // expected, we'll abort
                }
                catch (TimeoutException)
                {
                    // expected, we'll abort
                }
                catch (ObjectDisposedException)
                {
                    // expected,
                    // ObjectDisposedException may be thrown if you try to abort ClientSecurityDuplexSessionChannel that is in the middle of closing.
                    // we'll abort
                }
                finally
                {
                    if (!success)
                    {
                        communicationObject.Abort();
                    }
                }
            }
        }
 
        internal class FactoryCacheKey : IEquatable<FactoryCacheKey>
        {
            Endpoint endpoint;
            bool isOperationContractOneWay;
            
            TokenImpersonationLevel tokenImpersonationLevel;
            ContractDescription contract;
            Collection<CorrelationQuery> correlationQueries;
            string endpointConfigurationName;
 
            public FactoryCacheKey(Endpoint endpoint, string endpointConfigurationName, bool isOperationOneway,
                TokenImpersonationLevel tokenImpersonationLevel, ContractDescription contractDescription,
                ICollection<CorrelationQuery> correlationQueries)
            {
                this.endpoint = endpoint;
                this.endpointConfigurationName = endpointConfigurationName;
                this.isOperationContractOneWay = isOperationOneway;
                this.tokenImpersonationLevel = tokenImpersonationLevel;
                this.contract = contractDescription;
 
                if (correlationQueries != null)
                {
                    this.correlationQueries = new Collection<CorrelationQuery>();
                    foreach (CorrelationQuery query in correlationQueries)
                    {
                        this.correlationQueries.Add(query);
                    }
                }
            }
 
            public bool IsOperationContractOneWay
            {
                get
                {
                    return this.isOperationContractOneWay;
                }
            }
 
            public TokenImpersonationLevel TokenImpersonationLevel
            {
                get
                {
                    return this.tokenImpersonationLevel;
                }
            }
 
            public bool Equals(FactoryCacheKey other)
            {
                if (object.ReferenceEquals(this, other))
                {
                    return true;
                }
 
                if (other == null)
                {
                    // this means only one of them is null
                    return false;
                }
 
                // 1) Compare Endpoint/EndpointConfigurationName
                if ((this.endpoint == null && other.endpoint != null) ||
                    (other.endpoint == null && this.endpoint != null))
                {
                    return false;
                }
 
                // if endpoint is not null we compare the endpoint, else we compare the endpointconfiguration
                if (this.endpoint != null)
                {
                    if (!object.ReferenceEquals(this.endpoint, other.endpoint))
                    {
                        // Binding -
                        // We are comparing by ref here, can we compare binding elements instead
                        if (this.endpoint.Binding != other.endpoint.Binding)
                        {
                            return false;
                        }
                    }
                }
                else if (this.endpointConfigurationName != other.endpointConfigurationName)
                {
                    return false;
                }
 
                // (2) TokenImpersonationlevel
                if (this.TokenImpersonationLevel != other.TokenImpersonationLevel)
                {
                    return false;
                }
 
                // (3) OperationContract.IsOneWay to decide if the ChannelFactory needs to be of type RequestChannel or OutputChannel
                if (this.IsOperationContractOneWay != other.IsOperationContractOneWay)
                {
                    return false;
                }
 
                // (4) Verify if the ContractDescriptions are equivalent
                if (!ContractDescriptionComparerHelper.IsContractDescriptionEquivalent(this.contract, other.contract))
                {
                    return false;
                }
 
                // (5) Verify the correlationquery collection
                //  For now, we verify each query by ref, so that loop scenarios would work
                //  Can we do a value comparison here?  
                if (!ContractDescriptionComparerHelper.EqualsUnordered(this.correlationQueries, other.correlationQueries))
                {
                    return false;
                }
                
                return true;
            }
            
            public override int GetHashCode()
            {
                int hashCode = 0;
 
                if (this.contract != null && this.contract.Name != null)
                {
                    //using ContractName as the hashcode
                    hashCode ^= this.contract.Name.GetHashCode();
                }
 
                if (this.endpoint != null && this.endpoint.Binding != null)
                {
                    //we compare binding by ref
                    hashCode ^= this.endpoint.Binding.GetHashCode();
                }
 
                return hashCode;
            }
        }
        
        static class ContractDescriptionComparerHelper
        {
            public static bool EqualsUnordered<T>(Collection<T> left, Collection<T> right) where T : class
            {
                return EqualsUnordered(left, right, (t1, t2) => t1 == t2);
            }
 
            public static bool IsContractDescriptionEquivalent(ContractDescription c1, ContractDescription c2)
            {
                if (c1 == c2)
                {
                    return true;
                }
 
                // if the contract is not one of the default contracts that we use, we only do a byref comparison
                // fully inferred contracts always have null ContractType
                if (c1.ContractType == null || c2.ContractType == null)
                {
                    return false;
                }
 
                //compare contractname
                return (c1 != null &&
                        c2 != null &&
                        c1.Name == c2.Name &&
                        c1.Namespace == c2.Namespace &&
                        c1.ConfigurationName == c2.ConfigurationName &&
                        c1.ProtectionLevel == c2.ProtectionLevel &&
                        c1.SessionMode == c2.SessionMode &&
                        c1.ContractType == c2.ContractType &&
                        c1.Behaviors.Count == c2.Behaviors.Count && //we have no way to verify each one
                        EqualsUnordered<OperationDescription>(c1.Operations, c2.Operations, (o1, o2) => IsOperationDescriptionEquivalent(o1, o2)));
            }
 
            static bool EqualsOrdered<T>(IList<T> left, IList<T> right, Func<T, T, bool> equals)
            {
                if (left == null)
                {
                    return (right == null || right.Count == 0);
                }
                else if (right == null)
                {
                    return left.Count == 0;
                }
                if (left.Count != right.Count)
                {
                    return false;
                }
                for (int i = 0; i < left.Count; i++)
                {
                    if (!equals(left[i], right[i]))
                    {
                        return false;
                    }
                }
                return true;
            }
 
            static bool EqualsUnordered<T>(Collection<T> left, Collection<T> right, Func<T, T, bool> equals)
            {
                if (left == null)
                {
                    return (right == null || right.Count == 0);
                }
                else if (right == null)
                {
                    return left.Count == 0;
                }
                // This check ensures that the lists have the same contents, but does not verify that they have the same
                // quantity of each item if they are duplicates.
                return left.Count == right.Count &&
                    left.All(leftItem => right.Any(rightItem => equals(leftItem, rightItem))) &&
                    right.All(rightItem => left.Any(leftItem => equals(leftItem, rightItem)));
            }
 
            static bool IsOperationDescriptionEquivalent(OperationDescription o1, OperationDescription o2)
            {
                if (o1 == o2)
                {
                    return true;
                }
 
                return (o1.Name == o2.Name &&
                        o1.ProtectionLevel == o2.ProtectionLevel &&
                        o1.IsOneWay == o2.IsOneWay &&
                        IsTransactionBehaviorEquivalent(o1, o2) && //we are verifying only the TransactionFlowBehavior
                        EqualsOrdered(o1.Messages, o2.Messages, (m1, m2) => IsMessageDescriptionEquivalent(m1, m2)));
            }
 
            static bool IsMessageDescriptionEquivalent(MessageDescription m1, MessageDescription m2)
            {
                if (m1 == m2)
                {
                    return true;
                }
 
                //we are comparing only action and direction
                return (m1.Action == m2.Action && m1.Direction == m2.Direction);
            }
 
            static bool IsTransactionBehaviorEquivalent(OperationDescription o1, OperationDescription o2)
            {
                if ((o1 == null || o2 == null) && o1 == o2)
                {
                    return true;
                }
                if (o1.Behaviors.Count == o2.Behaviors.Count)
                {
                    //we are only going to check the TransactionFlowAttribute
                    TransactionFlowAttribute t1 = o1.Behaviors.Find<TransactionFlowAttribute>();
                    TransactionFlowAttribute t2 = o2.Behaviors.Find<TransactionFlowAttribute>();
                    if ((t1 == null && t2 != null) || (t2 == null && t1 != null))
                    {
                        return false;
 
                    }
                    //verify if both have the same value for TransactionFlowOption
                    if ((t1 != null) && (t1.Transactions != t2.Transactions))
                    {
                        return false;
                    }
                    else
                    {
                        return true;
                    }
                }
                else
                {
                    return false;
                }
            }
        }
    }
}