File: System\ServiceModel\Activation\WorkerProcess.cs
Project: ndp\cdf\src\WCF\SMSvcHost\SMSvcHost.csproj (SMSvcHost)
//-----------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------------------------
 
namespace System.ServiceModel.Activation
{
    using System;
    using System.Diagnostics;
    using System.Globalization;
    using System.Reflection;
    using System.Runtime;
    using System.Runtime.Diagnostics;
    using System.Security.AccessControl;
    using System.ServiceModel;
    using System.ServiceModel.Activation.Diagnostics;
    using System.ServiceModel.Channels;
    using System.Threading;
 
    [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerSession)]
    abstract class WorkerProcess : IConnectionRegister
    {
        int isUnregistered;
        int processId;
        MessageQueue queue;
        int queueId;
        IConnectionDuplicator connectionDuplicator;
        EventTraceActivity eventTraceActivity;
 
        public bool IsRegistered
        {
            get { return isUnregistered == 0; }
        }
 
        public MessageQueue Queue
        {
            get
            {
                return this.queue;
            }
 
            set
            {
                this.queue = value;
            }
        }
 
        public int ProcessId
        {
            get
            {
                return this.processId;
            }
        }
 
#if DEBUG
        public int QueueId
        {
            get
            {
                return this.queueId;
            }
        }
#endif
 
        internal void Close()
        {
            if (Interlocked.Increment(ref isUnregistered) == 1)
            {
                if (this.queue != null)
                {
                    this.queue.Unregister(this);
                }
            }
        }
 
        protected EventTraceActivity EventTraceActivity
        {
            get
            {
                if (this.eventTraceActivity == null)
                {
                    this.eventTraceActivity = EventTraceActivity.GetFromThreadOrCreate();
                }
                return this.eventTraceActivity;
            }
        }
 
        protected abstract DuplicateContext DuplicateConnection(ListenerSessionConnection session);
        protected abstract void OnDispatchSuccess();
        protected abstract TransportType TransportType { get; }
 
        internal IAsyncResult BeginDispatchSession(ListenerSessionConnection session, AsyncCallback callback, object state)
        {
            return new DispatchSessionAsyncResult(session, callback, state);
        }
 
        internal bool EndDispatchSession(IAsyncResult result)
        {
            try
            {
                DispatchSessionAsyncResult dispatchAsyncResult = DispatchSessionAsyncResult.End(result);
                if (dispatchAsyncResult.DuplicateSucceeded)
                {
                    OnDispatchSuccess();
                    return true;
                }
            }
            catch (Exception exception)
            {
                EventLogEventId logEventId = EventLogEventId.MessageQueueDuplicatedSocketLeak;
                if (this.TransportType == TransportType.NamedPipe)
                {
                    logEventId = EventLogEventId.MessageQueueDuplicatedPipeLeak;
                }
 
                Debug.Print("WorkerProcess.DispatchSession() failed sending duplicated socket to processId: " + this.ProcessId + " exception:" + exception);
                DiagnosticUtility.EventLog.LogEvent(TraceEventType.Error,
                    (ushort)EventLogCategory.SharingService,
                    (uint)logEventId,
                    this.ProcessId.ToString(NumberFormatInfo.InvariantInfo),
                    ListenerTraceUtility.CreateSourceString(this),
                    exception.ToString());
 
                if (Fx.IsFatal(exception))
                {
                    throw;
                }
                
                if (exception is FaultException && !AppSettings.FailOnConnectionDispatchFaults)
                {
                    // Something went wrong with establishing a connection from the duplicated handle on the service side
                    // The communication between SMSvcHost and the service host is still good so no need to tear down the
                    // connection because of a single bad connecting client.
                    return false;
                }
 
                Close();
 
                // make sure we close the connection to the SharedConnectionListener
                // so it knows we've unregistered it
                ((IChannel)connectionDuplicator).Abort();
 
                if (!ShouldRecoverFromProxyCall(exception))
                {
                    throw;
                }
            }
 
            return false;
        }
 
        internal IConnectionDuplicator ConnectionDuplicator
        {
            get
            {
                return this.connectionDuplicator;
            }
        }
 
        void WorkerProcess_Closed(object sender, EventArgs e)
        {
            Debug.Print("WorkerProcess.WorkerProcess_Closed() worker leaving: " + processId + " State: " + ((IDuplexContextChannel)sender).State);
            Close();
        }
 
        void WorkerProcess_Faulted(object sender, EventArgs e)
        {
            Debug.Print("WorkerProcess.WorkerProcess_Faulted() worker leaving: " + processId + " State: " + ((IDuplexContextChannel)sender).State);
            Close();
        }
 
        ListenerExceptionStatus IConnectionRegister.Register(Version version, int processId, BaseUriWithWildcard path, int queueId, Guid token, string eventName)
        {
            if (TD.MessageQueueRegisterStartIsEnabled())
            {
                TD.MessageQueueRegisterStart(this.EventTraceActivity);
            }
 
            Debug.Print("WorkerProcess.Register() version: " + version + " processId: " + processId + " path: " + path + " queueId: " + queueId + " token: " + token + " eventName: " + eventName);
 
            if (DiagnosticUtility.ShouldTraceInformation)
            {
                ListenerTraceUtility.TraceEvent(TraceEventType.Information, ListenerTraceCode.MessageQueueRegisterCalled, SR.GetString(SR.TraceCodeMessageQueueRegisterCalled), new StringTraceRecord("Path", path.ToString()), this, null);
            }
 
            // Get the callback channel
            this.connectionDuplicator = OperationContext.Current.GetCallbackChannel<IConnectionDuplicator>();
 
            // Prevent this duplicate operation from timing out, faulting the pipe, and stopping any further communication with w3wp
            // we're gated by MaxPendingAccepts + MaxPendingConnection. see CSD Main bug 193390 for details
            ((IContextChannel)this.connectionDuplicator).OperationTimeout = TimeSpan.MaxValue;
 
            ListenerExceptionStatus status = ListenerExceptionStatus.Success;
            bool abortInstance = false;
 
            if (path == null || eventName == null)
            {
                status = ListenerExceptionStatus.InvalidArgument;
                abortInstance = true;
                goto FAILED;
            }
 
            // Vista only: validate remote process ID
            if (OSEnvironmentHelper.IsVistaOrGreater)
            {
                status = ListenerExceptionStatus.InvalidArgument;
                object property = OperationContext.Current.IncomingMessage.Properties[ConnectionMessageProperty.Name];
                Fx.Assert(property != null, "WorkerProcess.Register() ConnectionMessageProperty not found!");
 
                IConnection connection = property as IConnection;
                Fx.Assert(connection != null, "WorkerProcess.Register() ConnectionMessageProperty is not IConnection!");
 
                PipeHandle pipe = connection.GetCoreTransport() as PipeHandle;
                Fx.Assert(pipe != null, "WorkerProcess.Register() CoreTransport is not PipeHandle!");
 
                if (processId != pipe.GetClientPid())
                {
                    status = ListenerExceptionStatus.InvalidArgument;
                    abortInstance = true;
                    goto FAILED;
                }
            }
 
            // validate version
            Version ourVersion = Assembly.GetExecutingAssembly().GetName().Version;
            if (version > ourVersion)
            {
                // VERSIONING
                // in V1 we assume that we can handle earlier versions
                // this might not be true when we ship later releases.
                Debug.Print("WorkerProcess.Register() unsupported version ourVersion: " + ourVersion + " version: " + version);
                status = ListenerExceptionStatus.VersionUnsupported;
                goto FAILED;
            }
 
            if (queueId == 0 && path == null)
            {
                status = ListenerExceptionStatus.InvalidArgument;
                abortInstance = true;
                goto FAILED;
            }
 
            this.processId = processId;
            this.queueId = 0;
            if (queueId != 0)
            {
                this.queueId = queueId;
                status = ActivatedMessageQueue.Register(queueId, token, this);
            }
            else
            {
                status = MessageQueue.Register(path, this);
            }
 
            if (status == ListenerExceptionStatus.Success)
            {
                foreach (IChannel channel in OperationContext.Current.InstanceContext.IncomingChannels)
                {
                    channel.Faulted += new EventHandler(WorkerProcess_Faulted);
                    channel.Closed += new EventHandler(WorkerProcess_Closed);
                }
 
                try
                {
                    using (EventWaitHandle securityEvent = EventWaitHandle.OpenExisting(ListenerConstants.GlobalPrefix + eventName, EventWaitHandleRights.Modify))
                    {
                        securityEvent.Set();
                    }
                }
                catch (Exception exception)
                {
                    if (Fx.IsFatal(exception))
                    {
                        throw;
                    }
 
                    DiagnosticUtility.TraceHandledException(exception, TraceEventType.Error);
 
                    status = ListenerExceptionStatus.InvalidArgument;
                    abortInstance = true;
                }
            }
 
            if (status != ListenerExceptionStatus.Success)
            {
                goto FAILED;
            }
 
            if (DiagnosticUtility.ShouldTraceInformation)
            {
                ListenerTraceUtility.TraceEvent(TraceEventType.Information, ListenerTraceCode.MessageQueueRegisterSucceeded, SR.GetString(SR.TraceCodeMessageQueueRegisterSucceeded), new StringTraceRecord("Path", path.ToString()), this, null);
            }
            if (TD.MessageQueueRegisterCompletedIsEnabled())
            {
                TD.MessageQueueRegisterCompleted(this.EventTraceActivity, path.ToString());
            }
        FAILED:
            if (abortInstance)
            {
                if (DiagnosticUtility.ShouldTraceError)
                {
                    ListenerTraceUtility.TraceEvent(TraceEventType.Error, ListenerTraceCode.MessageQueueRegisterFailed, SR.GetString(SR.TraceCodeMessageQueueRegisterFailed),
                        new StringTraceRecord("Register", SR.GetString(SR.SharingRegistrationFailedAndAbort, status.ToString())), this, null);
                }
                if (TD.MessageQueueRegisterAbortIsEnabled())
                {
                    TD.MessageQueueRegisterAbort(this.EventTraceActivity, 
                        status.ToString(),
                        (path != null) ? path.ToString() : string.Empty);
                }
 
                AbortServiceInstance();
            }
            else if (status != ListenerExceptionStatus.Success)
            {
                if (DiagnosticUtility.ShouldTraceError)
                {
                    ListenerTraceUtility.TraceEvent(TraceEventType.Error, ListenerTraceCode.MessageQueueRegisterFailed, SR.GetString(SR.TraceCodeMessageQueueRegisterFailed),
                        new StringTraceRecord("Register", SR.GetString(SR.SharingRegistrationFailed, status.ToString())), this, null);
                }
                if (TD.MessageQueueRegisterFailedIsEnabled())
                {
                    TD.MessageQueueRegisterFailed(this.EventTraceActivity,
                        (path != null) ? path.ToString() : string.Empty, 
                        status.ToString());
                }
 
                InitiateClosingServiceInstance();
            }
 
            return status;
        }
 
        bool IConnectionRegister.ValidateUriRoute(Uri uri, System.Net.IPAddress address, int port)
        {
            if (this.queue == null)
            {
                AbortServiceInstance();
                return false;
            }
 
            MessageQueue destinationQueue = RoutingTable.Lookup(uri, address, port);
            return object.ReferenceEquals(destinationQueue, this.queue);
        }
 
        void IConnectionRegister.Unregister()
        {
            Debug.Print("WorkerProcess.Unregister() processId: " + processId);
            Close();
        }
 
        static bool ShouldRecoverFromProxyCall(Exception exception)
        {
            return (
                (exception is CommunicationException) ||
                (exception is ObjectDisposedException) ||
                (exception is TimeoutException)
                );
        }
 
        void AbortServiceInstance()
        {
            OperationContext.Current.InstanceContext.Abort();
        }
 
        void InitiateClosingServiceInstance()
        {
            InstanceContext serviceInstance = OperationContext.Current.InstanceContext;
            serviceInstance.BeginClose(ListenerConstants.RegistrationCloseTimeout,
                Fx.ThunkCallback(new AsyncCallback(CloseCallback)), serviceInstance);
        }
 
        static void CloseCallback(IAsyncResult asyncResult)
        {
            InstanceContext serviceInstance = asyncResult.AsyncState as InstanceContext;
            try
            {
                serviceInstance.EndClose(asyncResult);
            }
            catch (CommunicationException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
            }
            catch (TimeoutException e)
            {
                DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
            }
        }
 
        class DispatchSessionAsyncResult : AsyncResult
        {
            ListenerSessionConnection session;
            bool duplicateSucceeded;
            static AsyncCallback dispatchSessionCallback = Fx.ThunkCallback(new AsyncCallback(DispatchSessionCompletedCallback));
 
            public DispatchSessionAsyncResult(ListenerSessionConnection session, AsyncCallback callback, object state)
                : base(callback, state)
            {
                this.session = session;
                DuplicateContext duplicateContext = null;
                try
                {
                    duplicateContext = session.WorkerProcess.DuplicateConnection(session);
                }
                catch (ServiceActivationException e)
                {
                    int traceCode;
                    string traceDescription;
                    if (session.WorkerProcess is TcpWorkerProcess)
                    {
                        traceCode = ListenerTraceCode.MessageQueueDuplicatedSocketError;
                        traceDescription = SR.GetString(SR.TraceCodeMessageQueueDuplicatedSocketError);
                    }
                    else
                    {
                        traceCode = ListenerTraceCode.MessageQueueDuplicatedPipeError;
                        traceDescription = SR.GetString(SR.TraceCodeMessageQueueDuplicatedPipeError);
                    }
 
                    if (DiagnosticUtility.ShouldTraceError)
                    {
                        ListenerTraceUtility.TraceEvent(TraceEventType.Error, traceCode, traceDescription, this, e);
                    }
                    this.Complete(true, e);
                    return;
                }
 
                IAsyncResult result = this.session.WorkerProcess.ConnectionDuplicator.BeginDuplicate(duplicateContext,
                    dispatchSessionCallback, this);
 
                if (result.CompletedSynchronously)
                {
                    CompleteDuplicateSession(result);
                    this.Complete(true);
                }
            }
 
            static void DispatchSessionCompletedCallback(IAsyncResult result)
            {
                if (result.CompletedSynchronously)
                    return;
 
                DispatchSessionAsyncResult thisPtr = (DispatchSessionAsyncResult)result.AsyncState;
 
                Exception completeException = null;
 
                try
                {
                    thisPtr.CompleteDuplicateSession(result);
                }
#pragma warning suppress 56500 // covered by FxCOP
                catch (Exception exception)
                {
                    if (Fx.IsFatal(exception))
                    {
                        throw;
                    }
 
                    completeException = exception;
                }
 
                thisPtr.Complete(false, completeException);
            }
 
            void CompleteDuplicateSession(IAsyncResult result)
            {
                this.session.WorkerProcess.ConnectionDuplicator.EndDuplicate(result);
 
                // Successfully duplicated the session.
                duplicateSucceeded = true;
            }
 
            public bool DuplicateSucceeded
            {
                get
                {
                    return duplicateSucceeded;
                }
            }
 
            public static DispatchSessionAsyncResult End(IAsyncResult result)
            {
                return AsyncResult.End<DispatchSessionAsyncResult>(result);
            }
        }
    }
}