File: System\ServiceModel\Activities\Dispatcher\BufferedReceiveManager.cs
Project: ndp\cdf\src\NetFx40\System.ServiceModel.Activities\System.ServiceModel.Activities.csproj (System.ServiceModel.Activities)
//----------------------------------------------------------------
// Copyright (c) Microsoft Corporation.  All rights reserved.
//----------------------------------------------------------------
namespace System.ServiceModel.Activities.Dispatcher
{
    using System.Activities.Hosting;
    using System.Collections.Generic;
    using System.Collections.ObjectModel;
    using System.Runtime;
    using System.Runtime.DurableInstancing;
    using System.ServiceModel.Channels;
    using System.Threading;
 
    sealed class BufferedReceiveManager : IExtension<ServiceHostBase>
    {
        static AsyncCallback onEndAbandon;
        Dictionary<InstanceKey, List<BufferedReceiveMessageProperty>> bufferedProperties;
        PendingMessageThrottle throttle;
        WorkflowServiceHost host;
 
        int initialized;
 
        [Fx.Tag.SynchronizationObject(Blocking = false)]
        object thisLock;
 
        public BufferedReceiveManager(int maxPendingMessagesPerChannel)
        {
            this.throttle = new PendingMessageThrottle(maxPendingMessagesPerChannel);
            this.thisLock = new object();
        }
 
        public bool BufferReceive(OperationContext operationContext, ReceiveContext receiveContext, string bookmarkName, BufferedReceiveState state, bool retry)
        {
            Fx.Assert(receiveContext != null, "ReceiveContext must be present in order to perform buffering");
 
            bool success = false;
 
            BufferedReceiveMessageProperty property = null;
            if (BufferedReceiveMessageProperty.TryGet(operationContext.IncomingMessageProperties, out property))
            {
                CorrelationMessageProperty correlation = null;
                if (CorrelationMessageProperty.TryGet(operationContext.IncomingMessageProperties, out correlation))
                {
                    InstanceKey instanceKey = correlation.CorrelationKey;
                    int channelKey = operationContext.Channel.GetHashCode();
                    if (this.throttle.Acquire(channelKey))
                    {
                        try
                        {
                            // Tag the property with identifying data to be used during later processing
                            if (UpdateProperty(property, receiveContext, channelKey, bookmarkName, state))
                            {
                                // Cleanup if we are notified the ReceiveContext faulted underneath us
                                receiveContext.Faulted += delegate(object sender, EventArgs e)
                                {
                                    lock (this.thisLock)
                                    {
                                        if (this.bufferedProperties.ContainsKey(instanceKey))
                                        {
                                            if (this.bufferedProperties[instanceKey].Remove(property))
                                            {
                                                try
                                                {
                                                    property.RequestContext.DelayClose(false);
                                                    property.RequestContext.Abort();
                                                }
                                                catch (Exception exception)
                                                {
                                                    if (Fx.IsFatal(exception))
                                                    {
                                                        throw;
                                                    }
 
                                                    // ---- these exceptions as we are already on the error path
                                                }
 
                                                this.throttle.Release(channelKey);
                                            }
                                        }
                                    }
                                };
 
                                // Actual Buffering
                                lock (this.thisLock)
                                {
                                    // Optimistic state check in case we just raced with the receiveContext
                                    // faulting. If the receiveContext still faults after the state check, the above
                                    // cleanup routine will handle things correctly. In both cases, a double-release
                                    // of the throttle is protected.
                                    if (receiveContext.State == ReceiveContextState.Received)
                                    {
                                        bool found = false;
                                        // if the exception indicates retry-able (such as RetryException),
                                        // we will simply retry.  This happens when racing with abort and 
                                        // WF informing the client to retry (BufferedReceiveManager is a
                                        // client in this case).
                                        if (retry)
                                        {
                                            property.RequestContext.DelayClose(true);
                                            property.RegisterForReplay(operationContext);
                                            property.ReplayRequest();
                                            property.Notification.NotifyInvokeReceived(property.RequestContext.InnerRequestContext);
                                            found = true;
                                        }
                                        else
                                        {
                                            ReadOnlyCollection<BookmarkInfo> bookmarks = this.host.DurableInstanceManager.PersistenceProviderDirectory.GetBookmarksForInstance(instanceKey);
                                            // Retry in case match the existing bookmark
                                            if (bookmarks != null)
                                            {
                                                for (int i = 0; i < bookmarks.Count; ++i)
                                                {
                                                    BookmarkInfo bookmark = bookmarks[i];
                                                    if (bookmark.BookmarkName == bookmarkName)
                                                    {
                                                        // Found it so retry...
                                                        property.RequestContext.DelayClose(true);
                                                        property.RegisterForReplay(operationContext);
                                                        property.ReplayRequest();
                                                        property.Notification.NotifyInvokeReceived(property.RequestContext.InnerRequestContext);
                                                        found = true;
                                                        break;
                                                    }
                                                }
                                            }
                                        }
 
                                        if (!found)
                                        {
                                            List<BufferedReceiveMessageProperty> properties;
                                            if (!this.bufferedProperties.TryGetValue(instanceKey, out properties))
                                            {
                                                properties = new List<BufferedReceiveMessageProperty>();
                                                this.bufferedProperties.Add(instanceKey, properties);
                                            }
                                            property.RequestContext.DelayClose(true);
                                            property.RegisterForReplay(operationContext);
                                            properties.Add(property);
                                        }
                                        else
                                        {
                                            this.throttle.Release(channelKey);
                                        }
                                        success = true;
                                    }
                                }
                            }
                        }
                        finally
                        {
                            if (!success)
                            {
                                this.throttle.Release(channelKey);
                            }
                        }
                    }
                }
            }
 
            return success;
        }
 
        public void Retry(HashSet<InstanceKey> associatedInstances, ReadOnlyCollection<BookmarkInfo> availableBookmarks)
        {
            List<BookmarkInfo> bookmarks = new List<BookmarkInfo>(availableBookmarks);
            foreach (InstanceKey instanceKey in associatedInstances)
            {
                lock (this.thisLock)
                {
                    if (this.bufferedProperties.ContainsKey(instanceKey))
                    {
                        List<BufferedReceiveMessageProperty> properties = this.bufferedProperties[instanceKey];
                        int index = 0;
 
                        while (index < properties.Count && bookmarks.Count > 0)
                        {
                            BufferedReceiveMessageProperty property = properties[index];
 
                            // Determine if this property is now ready to be processed
                            int channelKey = 0;
                            bool found = false;
                            for (int i = 0; i < bookmarks.Count; ++i)
                            {
                                BookmarkInfo bookmark = (BookmarkInfo)bookmarks[i];
                                PropertyData data = (PropertyData)property.UserState;
                                if (bookmark.BookmarkName == data.BookmarkName)
                                {
                                    // Found it so retry...
                                    bookmarks.RemoveAt(i);
                                    channelKey = data.ChannelKey;
                                    property.ReplayRequest();
                                    property.Notification.NotifyInvokeReceived(property.RequestContext.InnerRequestContext);
                                    found = true;
                                    break;
                                }
                            }
 
                            if (!found)
                            {
                                index++;
                            }
                            else
                            {
                                properties.RemoveAt(index);
                                this.throttle.Release(channelKey);
                            }
                        }
                    }
                }
 
                if (bookmarks.Count == 0)
                {
                    break;
                }
            }
        }
 
        public void AbandonBufferedReceives(HashSet<InstanceKey> associatedInstances)
        {
            foreach (InstanceKey instanceKey in associatedInstances)
            {
                lock (this.thisLock)
                {
                    if (this.bufferedProperties.ContainsKey(instanceKey))
                    {
                        foreach (BufferedReceiveMessageProperty property in this.bufferedProperties[instanceKey])
                        {
                            PropertyData data = (PropertyData)property.UserState;
                            AbandonReceiveContext(data.ReceiveContext);
                            this.throttle.Release(data.ChannelKey);
                        }
 
                        this.bufferedProperties.Remove(instanceKey);
                    }
                }
            }
        }
 
        // clean up any remaining buffered receives as part of ServiceHost close.
        internal void AbandonBufferedReceives()
        {
            lock (this.thisLock)
            {
                foreach (List<BufferedReceiveMessageProperty> value in this.bufferedProperties.Values)
                {
                    foreach (BufferedReceiveMessageProperty property in value)
                    {
                        PropertyData data = (PropertyData)property.UserState;
                        AbandonReceiveContext(data.ReceiveContext);
                        this.throttle.Release(data.ChannelKey);
                    }
                }
                this.bufferedProperties.Clear();
            }
        }
 
        // Best-effort to abandon the receiveContext
        internal static void AbandonReceiveContext(ReceiveContext receiveContext)
        {
            if (receiveContext != null)
            {
                if (onEndAbandon == null)
                {
                    onEndAbandon = Fx.ThunkCallback(new AsyncCallback(OnEndAbandon));
                }
 
                try
                {
                    IAsyncResult result = receiveContext.BeginAbandon(
                        TimeSpan.MaxValue, onEndAbandon, receiveContext);
                    if (result.CompletedSynchronously)
                    {
                        HandleEndAbandon(result);
                    }
                }
                catch (Exception exception)
                {
                    if (Fx.IsFatal(exception))
                    {
                        throw;
                    }
 
                    // We ---- any Abandon exception - best effort.
                    FxTrace.Exception.AsWarning(exception);
                }
            }
        }
 
        static bool HandleEndAbandon(IAsyncResult result)
        {
            ReceiveContext receiveContext = (ReceiveContext)result.AsyncState;
            receiveContext.EndAbandon(result);
            return true;
        }
 
        static void OnEndAbandon(IAsyncResult result)
        {
            if (result.CompletedSynchronously)
            {
                return;
            }
 
            try
            {
                HandleEndAbandon(result);
            }
            catch (Exception exception)
            {
                if (Fx.IsFatal(exception))
                {
                    throw;
                }
 
                // We ---- any Abandon exception - best effort.
                FxTrace.Exception.AsWarning(exception);
            }
        }
 
        void IExtension<ServiceHostBase>.Attach(ServiceHostBase owner)
        {
            if (owner == null)
            {
                throw FxTrace.Exception.AsError(new ArgumentNullException("owner"));
            }
 
            if (Interlocked.CompareExchange(ref this.initialized, 1, 0) != 0)
            {
                throw FxTrace.Exception.AsError(
                    new InvalidOperationException(SR.BufferedReceiveBehaviorMultipleUse));
            }
 
            owner.ThrowIfClosedOrOpened();
 
            Fx.Assert(owner is WorkflowServiceHost, "owner must be of WorkflowServiceHost type!");
            this.host = (WorkflowServiceHost)owner;
            Initialize();
        }
 
        void IExtension<ServiceHostBase>.Detach(ServiceHostBase owner)
        {
        }
 
        bool UpdateProperty(BufferedReceiveMessageProperty property, ReceiveContext receiveContext, int channelKey, string bookmarkName, BufferedReceiveState state)
        {
            // If there's data already there make sure the state is allowed
            if (property.UserState == null)
            {
                property.UserState = new PropertyData()
                {
                    ReceiveContext = receiveContext,
                    ChannelKey = channelKey,
                    BookmarkName = bookmarkName,
                    State = state
                };
            }
            else
            {
                PropertyData data = (PropertyData)property.UserState;
 
                // We should not buffer twice at the same state
                if (data.State == state)
                {
                    return false;
                }
 
                data.State = state;
            }
 
            return true;
        }
 
        void Initialize()
        {
            this.bufferedProperties = new Dictionary<InstanceKey, List<BufferedReceiveMessageProperty>>();
        }
 
        class PendingMessageThrottle
        {
            [Fx.Tag.SynchronizationObject(Blocking = false)]
            Dictionary<int, ThrottleEntry> pendingMessages;
 
            int maxPendingMessagesPerChannel;
            int warningRestoreLimit;
 
            public PendingMessageThrottle(int maxPendingMessagesPerChannel)
            {
                this.maxPendingMessagesPerChannel = maxPendingMessagesPerChannel;
                this.warningRestoreLimit = (int)Math.Floor(0.7 * (double)maxPendingMessagesPerChannel);
                this.pendingMessages = new Dictionary<int, ThrottleEntry>();
            }
 
            public bool Acquire(int channelKey)
            {
                lock (this.pendingMessages)
                {
                    if (!this.pendingMessages.ContainsKey(channelKey))
                    {
                        this.pendingMessages.Add(channelKey, new ThrottleEntry());
                    }
 
                    ThrottleEntry entry = this.pendingMessages[channelKey];
                    if (entry.Count < this.maxPendingMessagesPerChannel)
                    {
                        entry.Count++;
                        if (TD.PendingMessagesPerChannelRatioIsEnabled())
                        {
                            TD.PendingMessagesPerChannelRatio(entry.Count, this.maxPendingMessagesPerChannel);
                        }
                        return true;
                    }
                    else
                    {
                        if (TD.MaxPendingMessagesPerChannelExceededIsEnabled())
                        {
                            if (!entry.WarningIssued)
                            {
                                TD.MaxPendingMessagesPerChannelExceeded(this.maxPendingMessagesPerChannel);
                                entry.WarningIssued = true;
                            }
                        }
 
                        return false;
                    }
                }
            }
 
            public void Release(int channelKey)
            {
                lock (this.pendingMessages)
                {
                    ThrottleEntry entry = this.pendingMessages[channelKey];
                    Fx.Assert(entry.Count > 0, "The pending message throttle was released too many times");
 
                    entry.Count--;
                    if (TD.PendingMessagesPerChannelRatioIsEnabled())
                    {
                        TD.PendingMessagesPerChannelRatio(entry.Count, this.maxPendingMessagesPerChannel);
                    }
                    if (entry.Count == 0)
                    {
                        this.pendingMessages.Remove(channelKey);
                    }
                    else if (entry.Count < this.warningRestoreLimit)
                    {
                        entry.WarningIssued = false;
                    }
                }
            }
 
            class ThrottleEntry
            {
                public ThrottleEntry()
                {
                }
 
                public bool WarningIssued
                {
                    get;
                    set;
                }
 
                public int Count
                {
                    get;
                    set;
                }
            }
        }
 
        class PropertyData
        {
            public PropertyData()
            {
            }
 
            public ReceiveContext ReceiveContext
            {
                get;
                set;
            }
 
            public int ChannelKey
            {
                get;
                set;
            }
 
            public string BookmarkName
            {
                get;
                set;
            }
 
            public BufferedReceiveState State
            {
                get;
                set;
            }
        }
    }
}