File: System\ServiceModel\MsmqIntegration\MsmqIntegrationOutputChannel.cs
Project: ndp\cdf\src\WCF\ServiceModel\System.ServiceModel.csproj (System.ServiceModel)
//------------------------------------------------------------  
// Copyright (c) Microsoft Corporation.  All rights reserved.   
//------------------------------------------------------------  
 
namespace System.ServiceModel.MsmqIntegration
{
    using System.IO;
    using System.Runtime;
    using System.ServiceModel;
    using System.ServiceModel.Channels;
    using System.ServiceModel.Diagnostics;
    using System.ServiceModel.Security.Tokens;
 
    sealed class MsmqIntegrationOutputChannel : TransportOutputChannel
    {
        MsmqQueue msmqQueue;
        MsmqTransactionMode transactionMode;
        MsmqIntegrationChannelFactory factory;
        SecurityTokenProviderContainer certificateTokenProvider;
 
        public MsmqIntegrationOutputChannel(MsmqIntegrationChannelFactory factory, EndpointAddress to, Uri via, bool manualAddressing)
            : base(factory, to, via, manualAddressing, factory.MessageVersion)
        {
            this.factory = factory;
            if (factory.IsMsmqX509SecurityConfigured)
            {
                this.certificateTokenProvider = factory.CreateX509TokenProvider(to, via);
            }
        }
        
        void CloseQueue()
        {
            if (null != this.msmqQueue)
                this.msmqQueue.Dispose();
            this.msmqQueue = null;
        }
 
        void OnCloseCore(bool isAborting, TimeSpan timeout)
        {
            this.CloseQueue();
            if (this.certificateTokenProvider != null)
            {
                if (isAborting)
                    this.certificateTokenProvider.Abort();
                else
                    this.certificateTokenProvider.Close(timeout);
            }
        }
 
        protected override void OnAbort()
        {
            this.OnCloseCore(true, TimeSpan.Zero);
        }
 
        protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
        {
            this.OnCloseCore(false, timeout);
            return new CompletedAsyncResult(callback, state);
        }
 
        protected override void OnEndClose(IAsyncResult result)
        {
            CompletedAsyncResult.End(result);
        }
 
        protected override void OnClose(TimeSpan timeout)
        {
            this.OnCloseCore(false, timeout);
        }
 
        void OpenQueue()
        {
            try
            {
                this.msmqQueue = new MsmqQueue(this.factory.AddressTranslator.UriToFormatName(this.RemoteAddress.Uri), UnsafeNativeMethods.MQ_SEND_ACCESS);
            }
            catch (MsmqException ex)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(ex.Normalized);
            }
            if (this.factory.ExactlyOnce)
            {
                this.transactionMode = MsmqTransactionMode.CurrentOrSingle;
            }
            else
            {
                this.transactionMode = MsmqTransactionMode.None;
            }
        }
 
        void OnOpenCore(TimeSpan timeout)
        {
            OpenQueue();
            if (this.certificateTokenProvider != null)
            {
                this.certificateTokenProvider.Open(timeout);
            }
        }
 
        protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
        {
            OnOpenCore(timeout);
            return new CompletedAsyncResult(callback, state);
        }
 
        protected override void OnEndOpen(IAsyncResult result)
        {
            CompletedAsyncResult.End(result);
        }
 
        protected override void OnOpen(TimeSpan timeout)
        {
            OnOpenCore(timeout);
        }
 
        protected override IAsyncResult OnBeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)
        {
            OnSend(message, timeout);
            return new CompletedAsyncResult(callback, state);
        }
 
        protected override void OnEndSend(IAsyncResult result)
        {
            CompletedAsyncResult.End(result);
        }
 
        protected override void OnSend(Message message, TimeSpan timeout)
        {
            MessageProperties properties = message.Properties;
            Stream stream = null;
            
            MsmqIntegrationMessageProperty property = MsmqIntegrationMessageProperty.Get(message);
            if (null == property)
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new CommunicationException(SR.GetString(SR.MsmqMessageDoesntHaveIntegrationProperty)));                
            if (null != property.Body)
                stream = this.factory.Serialize(property);
 
            int size;
            if (stream == null)
            {
                size = 0;
            }
            else
            {
                if (stream.Length > int.MaxValue)
                {
                    throw TraceUtility.ThrowHelperError(new ProtocolException(SR.GetString(SR.MessageSizeMustBeInIntegerRange)), message);
                }
 
                size = (int)stream.Length;
            }
 
            using (MsmqIntegrationOutputMessage msmqMessage = new MsmqIntegrationOutputMessage(this.factory, size, this.RemoteAddress, property))
            {
                msmqMessage.ApplyCertificateIfNeeded(this.certificateTokenProvider, this.factory.MsmqTransportSecurity.MsmqAuthenticationMode, timeout);
 
                if (stream != null)
                {
                    stream.Position = 0;
                    for (int bytesRemaining = size; bytesRemaining > 0; )
                    {
                        int bytesRead = stream.Read(msmqMessage.Body.Buffer, 0, bytesRemaining);
                        bytesRemaining -= bytesRead;
                    }
                }
 
                bool lockHeld = false;
                try
                {
                    Msmq.EnterXPSendLock(out lockHeld, this.factory.MsmqTransportSecurity.MsmqProtectionLevel);
                    this.msmqQueue.Send(msmqMessage, this.transactionMode);
                    MsmqDiagnostics.DatagramSent(msmqMessage.MessageId, message);
                    property.Id = MsmqMessageId.ToString(msmqMessage.MessageId.Buffer);
                }
                catch (MsmqException ex)
                {
                    if (ex.FaultSender)
                        this.Fault();
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(ex.Normalized);
                }
                finally
                {
                    if (lockHeld)
                    {
                        Msmq.LeaveXPSendLock();
                    }
                }
            }
        }
        class MsmqIntegrationOutputMessage : MsmqOutputMessage<IOutputChannel>
        {
            ByteProperty acknowledge;
            StringProperty adminQueue;
            IntProperty appSpecific;
            BufferProperty correlationId;
            BufferProperty extension;
            StringProperty label;
            ByteProperty priority;
            StringProperty responseQueue;
 
 
            public MsmqIntegrationOutputMessage(
                MsmqChannelFactoryBase<IOutputChannel> factory, 
                int bodySize, 
                EndpointAddress remoteAddress, 
                MsmqIntegrationMessageProperty property)
                : base(factory, bodySize, remoteAddress, 8)
            {
                if (null == property)
                {
                    Fx.Assert("MsmqIntegrationMessageProperty expected");
                }
 
                if (property.AcknowledgeType.HasValue)
                    EnsureAcknowledgeProperty((byte)property.AcknowledgeType.Value);
 
                if (null != property.AdministrationQueue)
                    EnsureAdminQueueProperty(property.AdministrationQueue, false);
 
                if (property.AppSpecific.HasValue)
                    this.appSpecific = new IntProperty(this, UnsafeNativeMethods.PROPID_M_APPSPECIFIC, property.AppSpecific.Value);
 
                if (property.BodyType.HasValue)
                    EnsureBodyTypeProperty(property.BodyType.Value);
 
                if (null != property.CorrelationId)
                    this.correlationId = new BufferProperty(this, UnsafeNativeMethods.PROPID_M_CORRELATIONID, MsmqMessageId.FromString(property.CorrelationId));
 
                if (null != property.Extension)
                    this.extension = new BufferProperty(this, UnsafeNativeMethods.PROPID_M_EXTENSION, property.Extension);
 
                if (null != property.Label)
                    this.label = new StringProperty(this, UnsafeNativeMethods.PROPID_M_LABEL, property.Label);
 
                if (property.Priority.HasValue)
                    this.priority = new ByteProperty(this, UnsafeNativeMethods.PROPID_M_PRIORITY, (byte)property.Priority.Value);
 
                if (null != property.ResponseQueue)
                    EnsureResponseQueueProperty(property.ResponseQueue);
 
                if (property.TimeToReachQueue.HasValue)
                    EnsureTimeToReachQueueProperty(MsmqDuration.FromTimeSpan(property.TimeToReachQueue.Value));
            }
 
            void EnsureAcknowledgeProperty(byte value)
            {
                if (this.acknowledge == null)
                {
                    this.acknowledge = new ByteProperty(this, UnsafeNativeMethods.PROPID_M_ACKNOWLEDGE);
                }
                this.acknowledge.Value = value;
            }
 
            void EnsureAdminQueueProperty(Uri value, bool useNetMsmqTranslator)
            {
                if (null != value)
                {
                    string queueName = useNetMsmqTranslator ?
                        MsmqUri.NetMsmqAddressTranslator.UriToFormatName(value) : 
                        MsmqUri.FormatNameAddressTranslator.UriToFormatName(value);
 
                    if (this.adminQueue == null)
                    {
                        this.adminQueue = new StringProperty(this, UnsafeNativeMethods.PROPID_M_ADMIN_QUEUE, queueName);
                    }
                    else
                    {
                        this.adminQueue.SetValue(queueName);
                    }
                }
            }
 
            void EnsureResponseQueueProperty(Uri value)
            {
                if (null != value)
                {
                    string queueName = MsmqUri.FormatNameAddressTranslator.UriToFormatName(value);
                    if (this.responseQueue == null)
                    {
                        this.responseQueue = new StringProperty(this, UnsafeNativeMethods.PROPID_M_RESP_FORMAT_NAME, queueName);
                    }
                    else
                    {
                        this.responseQueue.SetValue(queueName);
                    }
                }
            }
        }
    }
}