File: System\ServiceModel\Dispatcher\MultipleReceiveBinder.cs
Project: ndp\cdf\src\WCF\ServiceModel\System.ServiceModel.csproj (System.ServiceModel)
//-----------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------------------------
 
namespace System.ServiceModel.Dispatcher
{
 
    using System;
    using System.Collections.Generic;
    using System.ServiceModel.Diagnostics;
    using System.Runtime;
    using System.ServiceModel.Channels;
    using System.Threading;
 
    class MultipleReceiveBinder : IChannelBinder
    {
        internal static class MultipleReceiveDefaults
        {
            internal const int MaxPendingReceives = 1;
        }
 
        static AsyncCallback onInnerReceiveCompleted = Fx.ThunkCallback(OnInnerReceiveCompleted);
 
        MultipleReceiveAsyncResult outstanding;
        IChannelBinder channelBinder;
        ReceiveScopeQueue pendingResults;
        bool ordered;
 
        public MultipleReceiveBinder(IChannelBinder channelBinder, int size, bool ordered)
        {
            this.ordered = ordered;
            this.channelBinder = channelBinder;
            this.pendingResults = new ReceiveScopeQueue(size);
        }
 
        public IChannel Channel
        {
            get { return this.channelBinder.Channel; }
        }
 
        public bool HasSession
        {
            get { return this.channelBinder.HasSession; }
        }
 
        public Uri ListenUri
        {
            get { return this.channelBinder.ListenUri; }
        }
 
        public EndpointAddress LocalAddress
        {
            get { return this.channelBinder.LocalAddress; }
        }
 
        public EndpointAddress RemoteAddress
        {
            get { return this.channelBinder.RemoteAddress; }
        }
 
        public void Abort()
        {
            this.channelBinder.Abort();
        }
 
        public void CloseAfterFault(TimeSpan timeout)
        {
            this.channelBinder.CloseAfterFault(timeout);
        }
 
        public bool TryReceive(TimeSpan timeout, out RequestContext requestContext)
        {
            return this.channelBinder.TryReceive(timeout, out requestContext);
        }
 
        public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
        {
            // At anytime there can be only one thread in BeginTryReceive and the 
            // outstanding AsyncResult should have completed before the next one.
            // There should be no pending oustanding result here.
            Fx.AssertAndThrow(this.outstanding == null, "BeginTryReceive should not have a pending result.");
 
            MultipleReceiveAsyncResult multipleReceiveResult = new MultipleReceiveAsyncResult(callback, state);
            this.outstanding = multipleReceiveResult;
            EnsurePump(timeout);
            IAsyncResult innerResult;
            if (this.pendingResults.TryDequeueHead(out innerResult))
            {
                HandleReceiveRequestComplete(innerResult, true);
            }
 
            return multipleReceiveResult;
        }
 
        void EnsurePump(TimeSpan timeout)
        {
            // ensure we're running at full throttle, the BeginTryReceive calls we make below on the
            // IChannelBinder will typically complete future calls to BeginTryReceive made by CannelHandler
            // corollary to that is that most times these calls will be completed sycnhronously
            while (!this.pendingResults.IsFull)
            {
                ReceiveScopeSignalGate receiveScope = new ReceiveScopeSignalGate(this);
 
                // Enqueue the result without locks since this is the pump. 
                // BeginTryReceive can be called only from one thread and 
                // the head is not yet unlocked so no items can proceed.
                this.pendingResults.Enqueue(receiveScope);
                IAsyncResult result = this.channelBinder.BeginTryReceive(timeout, onInnerReceiveCompleted, receiveScope);
                if (result.CompletedSynchronously)
                {
                    this.SignalReceiveCompleted(result);
                }
            }
        }
 
        static void OnInnerReceiveCompleted(IAsyncResult nestedResult)
        {
            if (nestedResult.CompletedSynchronously)
            {
                return;
            }
 
            ReceiveScopeSignalGate thisPtr = nestedResult.AsyncState as ReceiveScopeSignalGate;
            thisPtr.Binder.HandleReceiveAndSignalCompletion(nestedResult, false);
        }
 
        void HandleReceiveAndSignalCompletion(IAsyncResult nestedResult, bool completedSynchronosly)
        {
            if (SignalReceiveCompleted(nestedResult))
            {
                HandleReceiveRequestComplete(nestedResult, completedSynchronosly);
            }
        }
 
        private bool SignalReceiveCompleted(IAsyncResult nestedResult)
        {
            if (this.ordered)
            {
                // Ordered recevies can proceed only if its own gate has 
                // been unlocked. Head is the only gate unlocked and only the 
                // result that owns the is the gate at the head can proceed.
                return this.pendingResults.TrySignal((ReceiveScopeSignalGate)nestedResult.AsyncState, nestedResult);
            }
            else
            {
                // Unordered receives can proceed with any gate. If the is head 
                // is not unlocked by BeginTryReceive then the result will 
                // be put on the last pending gate.
                return this.pendingResults.TrySignalPending(nestedResult);
            }
        }
 
        void HandleReceiveRequestComplete(IAsyncResult innerResult, bool completedSynchronously)
        {
            MultipleReceiveAsyncResult receiveResult = this.outstanding;
            Exception completionException = null;
 
            try
            {
                Fx.AssertAndThrow(receiveResult != null, "HandleReceive invoked without an outstanding result");
                // Cleanup states
                this.outstanding = null;
 
                // set the context on the outer result for the ChannelHandler.
                RequestContext context;
                receiveResult.Valid = this.channelBinder.EndTryReceive(innerResult, out context);
                receiveResult.RequestContext = context;
            }
            catch (Exception ex)
            {
                if (Fx.IsFatal(ex))
                {
                    throw;
                }
 
                completionException = ex;
            }
 
            receiveResult.Complete(completedSynchronously, completionException);
        }
 
        public bool EndTryReceive(IAsyncResult result, out RequestContext requestContext)
        {
            return MultipleReceiveAsyncResult.End(result, out requestContext);
        }
 
        public RequestContext CreateRequestContext(Message message)
        {
            return this.channelBinder.CreateRequestContext(message);
        }
 
        public void Send(Message message, TimeSpan timeout)
        {
            this.channelBinder.Send(message, timeout);
        }
 
        public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)
        {
            return this.channelBinder.BeginSend(message, timeout, callback, state);
        }
 
        public void EndSend(IAsyncResult result)
        {
            this.channelBinder.EndSend(result);
        }
 
        public Message Request(Message message, TimeSpan timeout)
        {
            return this.channelBinder.Request(message, timeout);
        }
 
        public IAsyncResult BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state)
        {
            return this.channelBinder.BeginRequest(message, timeout, callback, state);
        }
 
        public Message EndRequest(IAsyncResult result)
        {
            return this.channelBinder.EndRequest(result);
        }
 
        public bool WaitForMessage(TimeSpan timeout)
        {
            return this.channelBinder.WaitForMessage(timeout);
        }
 
        public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return this.channelBinder.BeginWaitForMessage(timeout, callback, state);
        }
 
        public bool EndWaitForMessage(IAsyncResult result)
        {
            return this.channelBinder.EndWaitForMessage(result);
        }
 
        class MultipleReceiveAsyncResult : AsyncResult
        {
            public MultipleReceiveAsyncResult(AsyncCallback callback, object state)
                : base(callback, state)
            {
            }
 
            public bool Valid
            {
                get;
                set;
            }
 
            public RequestContext RequestContext
            {
                get;
                set;
            }
 
            public new void Complete(bool completedSynchronously, Exception completionException)
            {
                base.Complete(completedSynchronously, completionException);
            }
 
            public static bool End(IAsyncResult result, out RequestContext context)
            {
                MultipleReceiveAsyncResult thisPtr = AsyncResult.End<MultipleReceiveAsyncResult>(result);
                context = thisPtr.RequestContext;
                return thisPtr.Valid;
            }
        }
 
        class ReceiveScopeSignalGate : SignalGate<IAsyncResult>
        {
            public ReceiveScopeSignalGate(MultipleReceiveBinder binder)
            {
                this.Binder = binder;
            }
 
            public MultipleReceiveBinder Binder
            {
                get;
                private set;
            }
        }
 
        class ReceiveScopeQueue
        {
            // This class is a circular queue with 2 pointers for pending items and head.
            // Ordered Receives : The head is unlocked by BeginTryReceive. The ReceiveGate can signal only the 
            // the gate that it owns. If the gate is the head then it will proceed.
            // Unordered Receives:  Any pending item can be signalled. The pending index keeps track 
            // of results that haven't  been completed. If the head is unlocked then it will proceed.
 
            int pending;
            int head;
            int count;
            readonly int size;
            ReceiveScopeSignalGate[] items;
 
            public ReceiveScopeQueue(int size)
            {
                this.size = size;
                this.head = 0;
                this.count = 0;
                this.pending = 0;
                items = new ReceiveScopeSignalGate[size];
            }
 
            internal bool IsFull
            {
                get { return this.count == this.size; }
            }
 
            internal void Enqueue(ReceiveScopeSignalGate receiveScope)
            {
                // This should only be called from EnsurePump which itself should only be 
                // BeginTryReceive. This makes sure that we don't need locks to enqueue an item.
                Fx.AssertAndThrow(this.count < this.size, "Cannot Enqueue into a full queue.");
                this.items[(this.head + this.count) % this.size] = receiveScope;
                count++;
            }
 
            void Dequeue()
            {
                // Dequeue should not be called outside a signal/unlock boundary.
                // There are no locks as this boundary ensures that only one thread 
                // Tries to dequeu an item either in the unlock or Signal thread.
                Fx.AssertAndThrow(this.count > 0, "Cannot Dequeue and empty queue.");
                this.items[head] = null;
                this.head = (head + 1) % this.size;
                this.count--;
            }
 
            internal bool TryDequeueHead(out IAsyncResult result)
            {
                // Invoked only from BeginTryReceive as only the main thread can 
                // dequeue the head and is  Successful only if it's already been signaled and completed.
                Fx.AssertAndThrow(this.count > 0, "Cannot unlock item when queue is empty");
                if (this.items[head].Unlock(out result))
                {
                    this.Dequeue();
                    return true;
                }
 
                return false;
            }
 
            public bool TrySignal(ReceiveScopeSignalGate scope, IAsyncResult nestedResult)
            {
                // Ordered receives can only signal the gate that the AsyncResult owns.
                // If the head has already been unlocked then it can proceed.
                if (scope.Signal(nestedResult))
                {
                    Dequeue();
                    return true;
                }
 
                return false;
            }
 
            public bool TrySignalPending(IAsyncResult result)
            {
                // free index will wrap around and always return the next free index;
                // Only the head of the queue can proceed as the head would be unlocked by
                // BeginTryReceive. All other requests will just submit their completed result.
                int nextPending = GetNextPending();
                if (this.items[nextPending].Signal(result))
                {
                    Dequeue();
                    return true;
                }
 
                return false;
            }
 
            int GetNextPending()
            {
                int slot = this.pending;
                while (true)
                {
                    if (slot == (slot = Interlocked.CompareExchange(ref this.pending, (slot + 1) % this.size, slot)))
                    {
                        return slot;
                    }
                }
            }
        }
    }
}