File: System\ServiceModel\Channels\MsmqOutputChannel.cs
Project: ndp\cdf\src\WCF\ServiceModel\System.ServiceModel.csproj (System.ServiceModel)
//------------------------------------------------------------  
// Copyright (c) Microsoft Corporation.  All rights reserved.   
//------------------------------------------------------------  
 
namespace System.ServiceModel.Channels
{
    using System.Runtime;
    using System.ServiceModel;
    using System.ServiceModel.Security.Tokens;
 
    sealed class MsmqOutputChannel : TransportOutputChannel
    {
        MsmqQueue msmqQueue;
        MsmqTransactionMode transactionMode;
        readonly byte[] preamble; // cached .NET framing singleton preamble
        SynchronizedDisposablePool<MsmqOutputMessage<IOutputChannel>> outputMessages;
        MsmqChannelFactory<IOutputChannel> factory;
        SecurityTokenProviderContainer certificateTokenProvider;
 
        public MsmqOutputChannel(MsmqChannelFactory<IOutputChannel> factory, EndpointAddress to, Uri via, bool manualAddressing)
            : base(factory, to, via, manualAddressing, factory.MessageVersion)
        {
            // construct the .NET framing preamble used for every message
            byte[] modeBytes = ClientSingletonSizedEncoder.ModeBytes;
            EncodedVia encodedVia = new EncodedVia(this.Via.AbsoluteUri);
            EncodedContentType encodedContentType = EncodedContentType.Create(factory.MessageEncoderFactory.Encoder.ContentType);
 
            this.preamble = DiagnosticUtility.Utility.AllocateByteArray(modeBytes.Length + ClientSingletonSizedEncoder.CalcStartSize(encodedVia, encodedContentType));
 
            Buffer.BlockCopy(modeBytes, 0, this.preamble, 0, modeBytes.Length);
            ClientSingletonSizedEncoder.EncodeStart(this.preamble, modeBytes.Length, encodedVia, encodedContentType);
 
            this.outputMessages = new SynchronizedDisposablePool<MsmqOutputMessage<IOutputChannel>>(factory.MaxPoolSize);
            if (factory.IsMsmqX509SecurityConfigured)
            {
                this.certificateTokenProvider = factory.CreateX509TokenProvider(to, via);
            }
            this.factory = factory;
        }
 
        void CloseQueue()
        {
            this.outputMessages.Dispose();
            if (null != this.msmqQueue)
                this.msmqQueue.Dispose();
            this.msmqQueue = null;
        }
 
        void OnCloseCore(bool isAborting, TimeSpan timeout)
        {
            this.CloseQueue();
            this.outputMessages.Dispose();
            if (factory.IsMsmqX509SecurityConfigured)
            {
                if (isAborting)
                    this.certificateTokenProvider.Abort();
                else
                    this.certificateTokenProvider.Close(timeout);
            }
        }
 
        protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
        {
            OnCloseCore(false, timeout);
            return new CompletedAsyncResult(callback, state);
        }
 
        protected override void OnEndClose(IAsyncResult result)
        {
            CompletedAsyncResult.End(result);
        }
 
        protected override void OnClose(TimeSpan timeout)
        {
            OnCloseCore(false, timeout);
        }
 
        protected override void OnAbort()
        {
            OnCloseCore(true, TimeSpan.Zero);
        }
 
        void OnOpenCore(TimeSpan timeout)
        {
            OpenQueue();
            if (factory.IsMsmqX509SecurityConfigured)
            {
                this.certificateTokenProvider.Open(timeout);
            }
        }
 
        void OpenQueue()
        {
            try
            {
                this.msmqQueue = new MsmqQueue(this.factory.AddressTranslator.UriToFormatName(this.RemoteAddress.Uri),
                                           UnsafeNativeMethods.MQ_SEND_ACCESS);
            }
            catch (MsmqException ex)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(ex.Normalized);
            }
            if (this.factory.ExactlyOnce)
            {
                this.transactionMode = MsmqTransactionMode.CurrentOrSingle;
            }
            else
            {
                this.transactionMode = MsmqTransactionMode.None;
            }
        }
 
        protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
        {
            OnOpenCore(timeout);
            return new CompletedAsyncResult(callback, state);
        }
 
        protected override void OnEndOpen(IAsyncResult result)
        {
            CompletedAsyncResult.End(result);
        }
 
        protected override void OnOpen(TimeSpan timeout)
        {
            OnOpenCore(timeout);
        }
 
        protected override IAsyncResult OnBeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)
        {
            OnSend(message, timeout);
            return new CompletedAsyncResult(callback, state);
        }
 
        protected override void OnEndSend(IAsyncResult result)
        {
            CompletedAsyncResult.End(result);
        }
 
        protected override void OnSend(Message message, TimeSpan timeout)
        {
            // serialize the indigo message to byte array and copy the .NET framing preamble
            ArraySegment<byte> messageData = this.factory.MessageEncoderFactory.Encoder.WriteMessage(
                message, int.MaxValue, this.factory.BufferManager, preamble.Length);
            Buffer.BlockCopy(preamble, 0, messageData.Array, messageData.Offset - preamble.Length, preamble.Length);
 
            byte[] buffer = messageData.Array;
            int offset = messageData.Offset - preamble.Length;
            int size = messageData.Count + preamble.Length;
 
            MsmqOutputMessage<IOutputChannel> msmqMessage = this.outputMessages.Take();
            if (msmqMessage == null)
            {
                msmqMessage = new MsmqOutputMessage<IOutputChannel>(this.factory, size, this.RemoteAddress);
                MsmqDiagnostics.PoolFull(this.factory.MaxPoolSize);
            }
            try
            {
                msmqMessage.ApplyCertificateIfNeeded(this.certificateTokenProvider, this.factory.MsmqTransportSecurity.MsmqAuthenticationMode, timeout);
                msmqMessage.Body.EnsureBufferLength(size);
                msmqMessage.Body.BufferLength = size;
                Buffer.BlockCopy(buffer, offset, msmqMessage.Body.Buffer, 0, size);
                this.factory.BufferManager.ReturnBuffer(buffer);
 
                bool lockHeld = false;
                try
                {
                    Msmq.EnterXPSendLock(out lockHeld, this.factory.MsmqTransportSecurity.MsmqProtectionLevel);
                    this.msmqQueue.Send(msmqMessage, this.transactionMode);
                    MsmqDiagnostics.DatagramSent(msmqMessage.MessageId, message);
                }
                catch (MsmqException ex)
                {
                    if (ex.FaultSender)
                        this.Fault();
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(ex.Normalized);
                }
                finally
                {
                    if (lockHeld)
                    {
                        Msmq.LeaveXPSendLock();
                    }
                }
            }
            finally
            {
                if (!this.outputMessages.Return(msmqMessage))
                {
                    msmqMessage.Dispose();
                }
            }
        }
    }
}