File: System\ServiceModel\Dispatcher\BufferedReceiveBinder.cs
Project: ndp\cdf\src\WCF\ServiceModel\System.ServiceModel.csproj (System.ServiceModel)
//-----------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------------------------
namespace System.ServiceModel.Dispatcher
{
    using System;
    using System.Collections.Generic;
    using System.ServiceModel.Diagnostics;
    using System.Runtime;
    using System.ServiceModel.Channels;
    using System.Threading;
 
    class BufferedReceiveBinder : IChannelBinder
    {
        static Action<object> tryReceive = new Action<object>(BufferedReceiveBinder.TryReceive);
        static AsyncCallback tryReceiveCallback = Fx.ThunkCallback(new AsyncCallback(TryReceiveCallback));
 
        IChannelBinder channelBinder;
        InputQueue<RequestContextWrapper> inputQueue;
 
        [Fx.Tag.SynchronizationObject(Blocking = true, Kind = Fx.Tag.SynchronizationKind.InterlockedNoSpin)]
        int pendingOperationSemaphore;
 
        public BufferedReceiveBinder(IChannelBinder channelBinder)
        {
            this.channelBinder = channelBinder;
            this.inputQueue = new InputQueue<RequestContextWrapper>();
        }
 
        public IChannel Channel
        {
            get { return this.channelBinder.Channel; }
        }
 
        public bool HasSession
        {
            get { return this.channelBinder.HasSession; }
        }
 
        public Uri ListenUri
        {
            get { return this.channelBinder.ListenUri; }
        }
 
        public EndpointAddress LocalAddress
        {
            get { return this.channelBinder.LocalAddress; }
        }
 
        public EndpointAddress RemoteAddress
        {
            get { return this.channelBinder.RemoteAddress; }
        }
 
        public void Abort()
        {
            this.inputQueue.Close();
            this.channelBinder.Abort();
        }
 
        public void CloseAfterFault(TimeSpan timeout)
        {
            this.inputQueue.Close();
            this.channelBinder.CloseAfterFault(timeout);
        }
 
        // Locking:
        // Only 1 channelBinder operation call should be active at any given time. All future calls
        // will wait on the inputQueue. The semaphore is always released right before the Dispatch on the inputQueue.
        // This protects a new call racing with an existing operation that is just about to fully complete.
 
        public bool TryReceive(TimeSpan timeout, out RequestContext requestContext)
        {
            if (Interlocked.CompareExchange(ref this.pendingOperationSemaphore, 1, 0) == 0)
            {
                ActionItem.Schedule(tryReceive, this);
            }
 
            RequestContextWrapper wrapper;
            bool success = this.inputQueue.Dequeue(timeout, out wrapper);
 
            if (success && wrapper != null)
            {
                requestContext = wrapper.RequestContext;
            }
            else
            {
                requestContext = null;
            }
 
            return success;
        }
 
        public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
        {
            if (Interlocked.CompareExchange(ref this.pendingOperationSemaphore, 1, 0) == 0)
            {
                IAsyncResult result = this.channelBinder.BeginTryReceive(timeout, tryReceiveCallback, this);
                if (result.CompletedSynchronously)
                {
                    HandleEndTryReceive(result);
                }
            }
 
            return this.inputQueue.BeginDequeue(timeout, callback, state);
        }
 
        public bool EndTryReceive(IAsyncResult result, out RequestContext requestContext)
        {
            RequestContextWrapper wrapper;
            bool success = this.inputQueue.EndDequeue(result, out wrapper);
 
            if (success && wrapper != null)
            {
                requestContext = wrapper.RequestContext;
            }
            else
            {
                requestContext = null;
            }
            return success;
        }
 
        public RequestContext CreateRequestContext(Message message)
        {
            return this.channelBinder.CreateRequestContext(message);
        }
 
        public void Send(Message message, TimeSpan timeout)
        {
            this.channelBinder.Send(message, timeout);
        }
 
        public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)
        {
            return this.channelBinder.BeginSend(message, timeout, callback, state);
        }
 
        public void EndSend(IAsyncResult result)
        {
            this.channelBinder.EndSend(result);
        }
 
        public Message Request(Message message, TimeSpan timeout)
        {
            return this.channelBinder.Request(message, timeout);
        }
 
        public IAsyncResult BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state)
        {
            return this.channelBinder.BeginRequest(message, timeout, callback, state);
        }
 
        public Message EndRequest(IAsyncResult result)
        {
            return this.channelBinder.EndRequest(result);
        }
 
        public bool WaitForMessage(TimeSpan timeout)
        {
            return this.channelBinder.WaitForMessage(timeout);
        }
 
        public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return this.channelBinder.BeginWaitForMessage(timeout, callback, state);
        }
 
        public bool EndWaitForMessage(IAsyncResult result)
        {
            return this.channelBinder.EndWaitForMessage(result);
        }
 
        internal void InjectRequest(RequestContext requestContext)
        {
            // Reuse the existing requestContext
            this.inputQueue.EnqueueAndDispatch(new RequestContextWrapper(requestContext));
        }
 
        //
        // TryReceive threads
        //
 
        static void TryReceive(object state)
        {
            BufferedReceiveBinder binder = (BufferedReceiveBinder)state;
 
            RequestContext requestContext;
            bool requiresDispatch = false;
            try
            {
                if (binder.channelBinder.TryReceive(TimeSpan.MaxValue, out requestContext))
                {
                    requiresDispatch = binder.inputQueue.EnqueueWithoutDispatch(new RequestContextWrapper(requestContext), null);
                }
            }
            catch (Exception exception)
            {
                if (Fx.IsFatal(exception))
                {
                    throw;
                }
 
                requiresDispatch = binder.inputQueue.EnqueueWithoutDispatch(exception, null);
            }
            finally
            {
                Interlocked.Exchange(ref binder.pendingOperationSemaphore, 0);
                if (requiresDispatch)
                {
                    binder.inputQueue.Dispatch();
                }
            }
        }
 
        static void TryReceiveCallback(IAsyncResult result)
        {
            if (result.CompletedSynchronously)
            {
                return;
            }
 
            HandleEndTryReceive(result);
        }
 
        static void HandleEndTryReceive(IAsyncResult result)
        {
            BufferedReceiveBinder binder = (BufferedReceiveBinder)result.AsyncState;
 
            RequestContext requestContext;
            bool requiresDispatch = false;
            try
            {
                if (binder.channelBinder.EndTryReceive(result, out requestContext))
                {
                    requiresDispatch = binder.inputQueue.EnqueueWithoutDispatch(new RequestContextWrapper(requestContext), null);
                }
            }
            catch (Exception exception)
            {
                if (Fx.IsFatal(exception))
                {
                    throw;
                }
 
                requiresDispatch = binder.inputQueue.EnqueueWithoutDispatch(exception, null);
            }
            finally
            {
                Interlocked.Exchange(ref binder.pendingOperationSemaphore, 0);
                if (requiresDispatch)
                {
                    binder.inputQueue.Dispatch();
                }
            }
        }
 
        // A RequestContext may be 'null' (some pieces of ChannelHandler depend on this) but the InputQueue
        // will not allow null items to be enqueued. Wrap the RequestContexts in another object to
        // facilitate this semantic
        class RequestContextWrapper
        {
            public RequestContextWrapper(RequestContext requestContext)
            {
                this.RequestContext = requestContext;
            }
 
            public RequestContext RequestContext
            {
                get;
                private set;
            }
        }
    }
}