File: System\ServiceModel\Discovery\ProbeDuplexAsyncResult.cs
Project: ndp\cdf\src\NetFx40\System.ServiceModel.Discovery\System.ServiceModel.Discovery.csproj (System.ServiceModel.Discovery)
//----------------------------------------------------------------
// Copyright (c) Microsoft Corporation.  All rights reserved.
//----------------------------------------------------------------
 
namespace System.ServiceModel.Discovery
{
    using System.Collections.ObjectModel;
    using System.Diagnostics.CodeAnalysis;
    using System.Runtime;
    
    abstract class ProbeDuplexAsyncResult<TProbeMessage, TResponseChannel> : AsyncResult
    {
        readonly IDiscoveryServiceImplementation discoveryServiceImpl;
        readonly IMulticastSuppressionImplementation multicastSuppressionImpl;
        readonly DuplexFindContext findRequest;
        readonly DiscoveryOperationContext context;
        readonly TimeoutHelper timeoutHelper;        
 
        static AsyncCompletion onShouldRedirectFindCompletedCallback = new AsyncCompletion(OnShouldRedirectFindCompleted);
        static AsyncCompletion onSendProxyAnnouncementsCompletedCallback = new AsyncCompletion(OnSendProxyAnnouncementsCompleted);
        static AsyncCallback onFindCompletedCallback = Fx.ThunkCallback(new AsyncCallback(OnFindCompleted));
        static AsyncCompletion onSendFindResponsesCompletedCallback = new AsyncCompletion(OnSendFindResponsesCompleted);
 
        bool isFindCompleted;
        
        [Fx.Tag.SynchronizationObject]
        object findCompletedLock;
 
        TResponseChannel responseChannel;
        Exception findException;
 
        [SuppressMessage("Microsoft.Usage", "CA2214:DoNotCallOverridableMethodsInConstructors")]
        protected ProbeDuplexAsyncResult(TProbeMessage probeMessage,
            IDiscoveryServiceImplementation discoveryServiceImpl,
            IMulticastSuppressionImplementation multicastSuppressionImpl,
            AsyncCallback callback,
            object state)
            : base(callback, state)
        {
            Fx.Assert(probeMessage != null, "The probeMessage must be non null.");
            Fx.Assert(discoveryServiceImpl != null, "The discoveryServiceImpl must be non null.");
 
            this.discoveryServiceImpl = discoveryServiceImpl;
            this.multicastSuppressionImpl = multicastSuppressionImpl;
            this.findCompletedLock = new object();
 
            if (!this.Validate(probeMessage))
            {
                this.Complete(true);
                return;
            }
            else
            {
                this.context = new DiscoveryOperationContext(OperationContext.Current);
                this.findRequest = new DuplexFindContext(this.GetFindCriteria(probeMessage), this);
                this.timeoutHelper = new TimeoutHelper(this.findRequest.Criteria.Duration);
                this.timeoutHelper.RemainingTime();                
                this.Process();
            }
        }
 
        protected DiscoveryOperationContext Context
        {
            get
            {
                return this.context;
            }
        }
 
        TResponseChannel ResponseChannel
        {
            get
            {
                if (this.responseChannel == null)
                {
                    this.responseChannel = this.context.GetCallbackChannel<TResponseChannel>();
                }
 
                return this.responseChannel;
            }
        }
 
        protected virtual bool Validate(TProbeMessage probeMessage)
        {
            return (DiscoveryService.EnsureMessageId() &&
                DiscoveryService.EnsureReplyTo() &&
                this.ValidateContent(probeMessage) &&
                this.EnsureNotDuplicate());
        }
 
        protected abstract bool ValidateContent(TProbeMessage probeMessage);
 
        protected abstract FindCriteria GetFindCriteria(TProbeMessage probeMessage);
 
        protected abstract IAsyncResult BeginSendFindResponse(
            TResponseChannel responseChannel,
            DiscoveryMessageSequence discoveryMessageSequence,
            EndpointDiscoveryMetadata matchingEndpoint,
            AsyncCallback callback,
            object state);
        protected abstract void EndSendFindResponse(TResponseChannel responseChannel, IAsyncResult result);
 
        protected abstract IAsyncResult BeginSendProxyAnnouncement(
            TResponseChannel responseChannel,
            DiscoveryMessageSequence discoveryMessageSequence,
            EndpointDiscoveryMetadata proxyEndpointDiscoveryMetadata,
            AsyncCallback callback,
            object state);
        protected abstract void EndSendProxyAnnouncement(TResponseChannel responseChannel, IAsyncResult result);
 
        static bool OnShouldRedirectFindCompleted(IAsyncResult result)
        {
            Collection<EndpointDiscoveryMetadata> redirectionEndpoints = null;
 
            ProbeDuplexAsyncResult<TProbeMessage, TResponseChannel> thisPtr =
                (ProbeDuplexAsyncResult<TProbeMessage, TResponseChannel>)result.AsyncState;
 
            if (thisPtr.multicastSuppressionImpl.EndShouldRedirectFind(result, out redirectionEndpoints))
            {
                return thisPtr.SendProxyAnnouncements(redirectionEndpoints);
            }
            else
            {
                return thisPtr.ProcessFindRequest();
            }
        }
 
        static bool OnSendProxyAnnouncementsCompleted(IAsyncResult result)
        {
            ProxyAnnouncementsSendAsyncResult.End(result);
            return true;
        }
 
        static void OnFindCompleted(IAsyncResult result)
        {
            if (result.CompletedSynchronously)
            {
                return;
            }
            else
            {
                ProbeDuplexAsyncResult<TProbeMessage, TResponseChannel> thisPtr =
                    (ProbeDuplexAsyncResult<TProbeMessage, TResponseChannel>)result.AsyncState;
                thisPtr.FinishFind(result);
            }
        }
 
        static bool OnSendFindResponsesCompleted(IAsyncResult result)
        {
            FindResponsesSendAsyncResult.End(result);
 
            ProbeDuplexAsyncResult<TProbeMessage, TResponseChannel> thisPtr =
                (ProbeDuplexAsyncResult<TProbeMessage, TResponseChannel>)result.AsyncState;
            if (thisPtr.findException != null)
            {
                throw FxTrace.Exception.AsError(thisPtr.findException);
            }
 
            return true;
        }
 
        void FinishFind(IAsyncResult result)
        {
            try
            {
                lock (this.findCompletedLock)
                {
                    this.isFindCompleted = true;
                }
                this.discoveryServiceImpl.EndFind(result);
            }
            catch (Exception e)
            {
                if (Fx.IsFatal(e))
                {
                    throw;
                }
 
                this.findException = e;
            }
            finally
            {
                this.findRequest.MatchingEndpoints.Shutdown();
            }
        }
 
        void Process()
        {
            if ((this.multicastSuppressionImpl != null) && (this.context.DiscoveryMode == ServiceDiscoveryMode.Adhoc))
            {
                if (this.SuppressFindRequest())
                {
                    this.Complete(true);
                    return;
                }
            }
            else
            {
                if (this.ProcessFindRequest())
                {
                    this.Complete(true);
                    return;
                }
            }
        }
 
        bool SuppressFindRequest()
        {
            IAsyncResult result = this.multicastSuppressionImpl.BeginShouldRedirectFind(
                this.findRequest.Criteria,
                this.PrepareAsyncCompletion(onShouldRedirectFindCompletedCallback),
                this);
 
            return (result.CompletedSynchronously && OnShouldRedirectFindCompleted(result));
        }
 
        bool SendProxyAnnouncements(Collection<EndpointDiscoveryMetadata> redirectionEndpoints)
        {
            if ((redirectionEndpoints == null) || (redirectionEndpoints.Count == 0))
            {
                return true;
            }
 
            IAsyncResult result = new ProxyAnnouncementsSendAsyncResult(
                this,
                redirectionEndpoints,
                this.PrepareAsyncCompletion(onSendProxyAnnouncementsCompletedCallback),
                this);
 
            return (result.CompletedSynchronously && OnSendProxyAnnouncementsCompleted(result));
        }
 
        bool ProcessFindRequest()
        {
            IAsyncResult result = this.discoveryServiceImpl.BeginFind(
                findRequest,
                onFindCompletedCallback,
                this);
 
            if (result.CompletedSynchronously)
            {
                this.FinishFind(result);
            }
 
            return this.SendFindResponses();
        }
 
        bool SendFindResponses()
        {
            IAsyncResult result = new FindResponsesSendAsyncResult(
                this,
                this.PrepareAsyncCompletion(onSendFindResponsesCompletedCallback),
                this);
 
            return (result.CompletedSynchronously && OnSendFindResponsesCompleted(result));
        }
 
        bool EnsureNotDuplicate()
        {
            bool isDuplicate = this.discoveryServiceImpl.IsDuplicate(OperationContext.Current.IncomingMessageHeaders.MessageId);
 
            if (isDuplicate && TD.DuplicateDiscoveryMessageIsEnabled())
            {
                TD.DuplicateDiscoveryMessage(
                    this.context.EventTraceActivity,
                    ProtocolStrings.TracingStrings.Probe,
                    OperationContext.Current.IncomingMessageHeaders.MessageId.ToString());
            }
 
            return !isDuplicate;
        }
 
        IAsyncResult BeginSendFindResponse(
            EndpointDiscoveryMetadata matchingEndpoint, 
            TimeSpan timeout,
            AsyncCallback callback,
            object state)
        {
            IAsyncResult result;
            IContextChannel contextChannel = (IContextChannel)this.ResponseChannel;
            using (new OperationContextScope(contextChannel))
            {
                this.context.AddressDuplexResponseMessage(OperationContext.Current);
 
                contextChannel.OperationTimeout = timeout;
 
                result = this.BeginSendFindResponse(
                    this.ResponseChannel,
                    this.discoveryServiceImpl.GetNextMessageSequence(),
                    matchingEndpoint,
                    callback,
                    state);
            }
 
            return result;
        }
 
        void EndSendFindResponse(IAsyncResult result)
        {
            this.EndSendFindResponse(this.ResponseChannel, result);
        }
 
        IAsyncResult BeginSendProxyAnnouncement(
            EndpointDiscoveryMetadata proxyEndpoint, 
            TimeSpan timeout, 
            AsyncCallback callback, 
            object state)
        {
            IAsyncResult result;
            IContextChannel contextChannel = (IContextChannel)this.ResponseChannel;
            using (new OperationContextScope(contextChannel))
            {
                this.context.AddressDuplexResponseMessage(OperationContext.Current);
 
                contextChannel.OperationTimeout = timeout;
 
                result = this.BeginSendProxyAnnouncement(
                    this.ResponseChannel,
                    this.discoveryServiceImpl.GetNextMessageSequence(),
                    proxyEndpoint,
                    callback,
                    state);
            }
 
            return result;
        }
 
        void EndSendProxyAnnouncement(IAsyncResult result)
        {
            this.EndSendProxyAnnouncement(this.ResponseChannel, result);
        }
 
        class ProxyAnnouncementsSendAsyncResult : RandomDelaySendsAsyncResult
        {
            ProbeDuplexAsyncResult<TProbeMessage, TResponseChannel> probeDuplexAsyncResult;
            Collection<EndpointDiscoveryMetadata> redirectionEndpoints;
 
            public ProxyAnnouncementsSendAsyncResult(
                ProbeDuplexAsyncResult<TProbeMessage, TResponseChannel> probeDuplexAsyncResult,
                Collection<EndpointDiscoveryMetadata> redirectionEndpoints,
                AsyncCallback callback,
                object state)
                : base(
                redirectionEndpoints.Count,
                probeDuplexAsyncResult.context.MaxResponseDelay,
                callback,
                state)
            {
                this.probeDuplexAsyncResult = probeDuplexAsyncResult;
                this.redirectionEndpoints = redirectionEndpoints;
                this.Start(this.probeDuplexAsyncResult.timeoutHelper.RemainingTime());
            }
 
            public static void End(IAsyncResult result)
            {
                AsyncResult.End<ProxyAnnouncementsSendAsyncResult>(result);
            }
 
            protected override IAsyncResult OnBeginSend(int index, TimeSpan timeout, AsyncCallback callback, object state)
            {
                return this.probeDuplexAsyncResult.BeginSendProxyAnnouncement(
                    this.redirectionEndpoints[index],
                    timeout,
                    callback,
                    state);
            }
 
            protected override void OnEndSend(IAsyncResult result)
            {
                this.probeDuplexAsyncResult.EndSendProxyAnnouncement(result);
            }
        }
 
        class FindResponsesSendAsyncResult : RandomDelayQueuedSendsAsyncResult<EndpointDiscoveryMetadata>
        {
            readonly ProbeDuplexAsyncResult<TProbeMessage, TResponseChannel> probeDuplexAsyncResult;
 
            public FindResponsesSendAsyncResult(
                ProbeDuplexAsyncResult<TProbeMessage, TResponseChannel> probeDuplexAsyncResult,
                AsyncCallback callback,
                object state)
                : base(
                probeDuplexAsyncResult.context.MaxResponseDelay,
                probeDuplexAsyncResult.findRequest.MatchingEndpoints,                
                callback,
                state)
            {
                this.probeDuplexAsyncResult = probeDuplexAsyncResult;
                this.Start(this.probeDuplexAsyncResult.timeoutHelper.RemainingTime());
            }
 
            public static void End(IAsyncResult result)
            {
                AsyncResult.End<FindResponsesSendAsyncResult>(result);
            }
 
            protected override IAsyncResult OnBeginSendItem(
                EndpointDiscoveryMetadata item, 
                TimeSpan timeout,
                AsyncCallback callback, 
                object state)
            {
                return this.probeDuplexAsyncResult.BeginSendFindResponse(
                    item,
                    timeout,
                    callback,
                    state);
            }
 
            protected override void OnEndSendItem(IAsyncResult result)
            {
                this.probeDuplexAsyncResult.EndSendFindResponse(result);
            }
        }
 
        class DuplexFindContext : FindRequestContext
        {
            readonly InputQueue<EndpointDiscoveryMetadata> matchingEndpoints;
            readonly ProbeDuplexAsyncResult<TProbeMessage, TResponseChannel> probeDuplexAsyncResult;
 
            public DuplexFindContext(FindCriteria criteria, ProbeDuplexAsyncResult<TProbeMessage, TResponseChannel> probeDuplexAsyncResult)
                : base(criteria)
            {
                this.matchingEndpoints = new InputQueue<EndpointDiscoveryMetadata>();
                this.probeDuplexAsyncResult = probeDuplexAsyncResult;
            }
 
            public InputQueue<EndpointDiscoveryMetadata> MatchingEndpoints
            {
                get
                {
                    return this.matchingEndpoints;
                }
            }
 
            protected override void OnAddMatchingEndpoint(EndpointDiscoveryMetadata matchingEndpoint)
            {                
                lock (this.probeDuplexAsyncResult.findCompletedLock)
                {
                    if (this.probeDuplexAsyncResult.isFindCompleted)
                    {
                        throw FxTrace.Exception.AsError(
                            new InvalidOperationException(SR.DiscoveryCannotAddMatchingEndpoint));
                    }
                    else
                    {
                        this.matchingEndpoints.EnqueueAndDispatch(matchingEndpoint, null, false);
                    }
                }                
            }
        }
    }
}