File: System\ServiceModel\Channels\ByteStreamMessage.cs
Project: ndp\cdf\src\NetFx40\System.ServiceModel.Channels\System.ServiceModel.Channels.csproj (System.ServiceModel.Channels)
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation.  All rights reserved.
//------------------------------------------------------------
 
namespace System.ServiceModel.Channels
{
    using System;
    using System.IO;
    using System.Net.Http;
    using System.Runtime;
    using System.Threading.Tasks;
    using System.Xml;
 
    public static class ByteStreamMessage
    {
        public static Message CreateMessage(Stream stream)
        {
            if (stream == null)
            {
                throw FxTrace.Exception.ArgumentNull("stream");
            }
            return CreateMessage(stream, XmlDictionaryReaderQuotas.Max, true); // moveBodyReaderToContent is true, for consistency with the other implementations of Message (including the Message base class itself)
        }
 
        public static Message CreateMessage(ArraySegment<byte> buffer)
        {
            return CreateMessage(buffer, null);
        }
 
        public static Message CreateMessage(ArraySegment<byte> buffer, BufferManager bufferManager)
        {
            if (buffer.Array == null)
            {
                throw FxTrace.Exception.ArgumentNull("buffer.Array", SR.ArgumentPropertyShouldNotBeNullError("buffer.Array"));
            }
 
            ByteStreamBufferedMessageData data = new ByteStreamBufferedMessageData(buffer, bufferManager);
            return CreateMessage(data, XmlDictionaryReaderQuotas.Max, true); // moveBodyReaderToContent is true, for consistency with the other implementations of Message (including the Message base class itself)
        }
 
        internal static Message CreateMessage(Stream stream, XmlDictionaryReaderQuotas quotas, bool moveBodyReaderToContent)
        {
            return new InternalByteStreamMessage(stream, quotas, moveBodyReaderToContent);
        }
 
        internal static Message CreateMessage(HttpRequestMessage httpRequestMessage, XmlDictionaryReaderQuotas quotas)
        {
            return new InternalByteStreamMessage(httpRequestMessage, quotas, true); // moveBodyReaderToContent is true, for consistency with the other implementations of Message (including the Message base class itself)
        }
 
        internal static Message CreateMessage(HttpResponseMessage httpResponseMessage, XmlDictionaryReaderQuotas quotas)
        {
            return new InternalByteStreamMessage(httpResponseMessage, quotas, true); // moveBodyReaderToContent is true, for consistency with the other implementations of Message (including the Message base class itself)
        }
 
        internal static Message CreateMessage(ByteStreamBufferedMessageData bufferedMessageData, XmlDictionaryReaderQuotas quotas, bool moveBodyReaderToContent)
        {
            return new InternalByteStreamMessage(bufferedMessageData, quotas, moveBodyReaderToContent);
        }
 
        internal static bool IsInternalByteStreamMessage(Message message)
        {
            Fx.Assert(message != null, "message should not be null");
            return message is InternalByteStreamMessage;
        }
 
        class InternalByteStreamMessage : Message
        {
            BodyWriter bodyWriter;
            MessageHeaders headers;
            MessageProperties properties;
            XmlByteStreamReader reader;
 
            /// <summary>
            /// If set to true, OnGetReaderAtBodyContents() calls MoveToContent() on the reader before returning it.
            /// If set to false, the reader is positioned on None, just before the root element of the body message.
            /// </summary>
            /// <remarks>
            /// We use this flag to preserve compatibility between .net 4.0 (or previous) and .net 4.5 (or later).
            ///
            /// In .net 4.0:
            /// - WebMessageEncodingBindingElement uses a raw encoder, different than ByteStreamMessageEncoder.
            /// - ByteStreamMessageEncodingBindingElement uses the ByteStreamMessageEncoder.
            /// - When the WebMessageEncodingBindingElement is used, the Message.GetReaderAtBodyContents() method returns 
            ///   an XmlDictionaryReader positioned initially on content (the root element of the xml); that's because MoveToContent() is called
            ///   on the reader before it's returned. 
            /// - When the ByteStreamMessageEncodingBindingElement is used, the Message.GetReaderAtBodyContents() method returns an 
            ///   XmlDictionaryReader positioned initially on None (just before the root element).
            /// 
            /// In .net 4.5:
            /// - Both WebMessageEncodingBindingElement and ByteStreamMessageEncodingBindingElement use the ByteStreamMessageEncoder.
            /// - So we need the ByteStreamMessageEncoder to call MoveToContent() when used by WebMessageEncodingBindingElement, and not do so
            ///   when used by the ByteStreamMessageEncodingBindingElement.
            /// - Preserving the compatibility with 4.0 is important especially because 4.5 is an in-place upgrade of 4.0.
            /// 
            /// See 252277 @ CSDMain for other info.
            /// </remarks>
            bool moveBodyReaderToContent;
 
            public InternalByteStreamMessage(ByteStreamBufferedMessageData bufferedMessageData, XmlDictionaryReaderQuotas quotas, bool moveBodyReaderToContent)
            {
                // Assign both writer and reader here so that we can CreateBufferedCopy without the need to
                // abstract between a streamed or buffered message. We're protected here by the state on Message
                // preventing both a read/write. 
 
                quotas = ByteStreamMessageUtility.EnsureQuotas(quotas);
 
                this.bodyWriter = new BufferedBodyWriter(bufferedMessageData);
                this.headers = new MessageHeaders(MessageVersion.None);
                this.properties = new MessageProperties();
                this.reader = new XmlBufferedByteStreamReader(bufferedMessageData, quotas);
                this.moveBodyReaderToContent = moveBodyReaderToContent;
            }
 
            public InternalByteStreamMessage(Stream stream, XmlDictionaryReaderQuotas quotas, bool moveBodyReaderToContent)
            {
                // Assign both writer and reader here so that we can CreateBufferedCopy without the need to
                // abstract between a streamed or buffered message. We're protected here by the state on Message
                // preventing both a read/write on the same stream. 
 
                quotas = ByteStreamMessageUtility.EnsureQuotas(quotas);
 
                this.bodyWriter = StreamedBodyWriter.Create(stream);
                this.headers = new MessageHeaders(MessageVersion.None);
                this.properties = new MessageProperties();
                this.reader = XmlStreamedByteStreamReader.Create(stream, quotas);
                this.moveBodyReaderToContent = moveBodyReaderToContent;
            }
 
            public InternalByteStreamMessage(HttpRequestMessage httpRequestMessage, XmlDictionaryReaderQuotas quotas, bool moveBodyReaderToContent)
            {
                Fx.Assert(httpRequestMessage != null, "The 'httpRequestMessage' parameter should not be null.");
 
                // Assign both writer and reader here so that we can CreateBufferedCopy without the need to
                // abstract between a streamed or buffered message. We're protected here by the state on Message
                // preventing both a read/write on the same stream. 
 
                quotas = ByteStreamMessageUtility.EnsureQuotas(quotas);
 
                this.bodyWriter = StreamedBodyWriter.Create(httpRequestMessage);
                this.headers = new MessageHeaders(MessageVersion.None);
                this.properties = new MessageProperties();
                this.reader = XmlStreamedByteStreamReader.Create(httpRequestMessage, quotas);
                this.moveBodyReaderToContent = moveBodyReaderToContent;
            }
 
            public InternalByteStreamMessage(HttpResponseMessage httpResponseMessage, XmlDictionaryReaderQuotas quotas, bool moveBodyReaderToContent)
            {
                Fx.Assert(httpResponseMessage != null, "The 'httpResponseMessage' parameter should not be null.");
 
                // Assign both writer and reader here so that we can CreateBufferedCopy without the need to
                // abstract between a streamed or buffered message. We're protected here by the state on Message
                // preventing both a read/write on the same stream. 
 
                quotas = ByteStreamMessageUtility.EnsureQuotas(quotas);
 
                this.bodyWriter = StreamedBodyWriter.Create(httpResponseMessage);
                this.headers = new MessageHeaders(MessageVersion.None);
                this.properties = new MessageProperties();
                this.reader = XmlStreamedByteStreamReader.Create(httpResponseMessage, quotas);
                this.moveBodyReaderToContent = moveBodyReaderToContent;
            }
 
            InternalByteStreamMessage(ByteStreamBufferedMessageData messageData, MessageHeaders headers, MessageProperties properties, XmlDictionaryReaderQuotas quotas, bool moveBodyReaderToContent)
            {
                this.headers = new MessageHeaders(headers);
                this.properties = new MessageProperties(properties);
                this.bodyWriter = new BufferedBodyWriter(messageData);
                this.reader = new XmlBufferedByteStreamReader(messageData, quotas);
                this.moveBodyReaderToContent = moveBodyReaderToContent;
            }
 
            public override MessageHeaders Headers
            {
                get
                {
                    if (this.IsDisposed)
                    {
                        throw FxTrace.Exception.ObjectDisposed(SR.ObjectDisposed("message"));
                    }
                    return this.headers;
                }
            }
 
            public override bool IsEmpty
            {
                get
                {
                    if (this.IsDisposed)
                    {
                        throw FxTrace.Exception.ObjectDisposed(SR.ObjectDisposed("message"));
                    }
                    return false;
                }
            }
 
            public override bool IsFault
            {
                get
                {
                    if (this.IsDisposed)
                    {
                        throw FxTrace.Exception.ObjectDisposed(SR.ObjectDisposed("message"));
                    }
                    return false;
                }
            }
 
            public override MessageProperties Properties
            {
                get
                {
                    if (this.IsDisposed)
                    {
                        throw FxTrace.Exception.ObjectDisposed(SR.ObjectDisposed("message"));
                    }
                    return this.properties;
                }
            }
 
            public override MessageVersion Version
            {
                get
                {
                    if (this.IsDisposed)
                    {
                        throw FxTrace.Exception.ObjectDisposed(SR.ObjectDisposed("message"));
                    }
                    return MessageVersion.None;
                }
            }
 
            protected override void OnBodyToString(XmlDictionaryWriter writer)
            {
                if (this.bodyWriter.IsBuffered)
                {
                    bodyWriter.WriteBodyContents(writer);
                }
                else
                {
                    writer.WriteString(SR.MessageBodyIsStream);
                }
            }
 
            protected override void OnClose()
            {
                Exception ex = null;
                try
                {
                    base.OnClose();
                }
                catch (Exception e)
                {
                    if (Fx.IsFatal(e))
                    {
                        throw;
                    }
                    ex = e;
                }
 
                try
                {
                    if (properties != null)
                    {
                        properties.Dispose();
                    }
                }
                catch (Exception e)
                {
                    if (Fx.IsFatal(e))
                    {
                        throw;
                    }
                    if (ex == null)
                    {
                        ex = e;
                    }
                }
 
                try
                {
                    if (reader != null)
                    {
                        reader.Close();
                    }
                }
                catch (Exception e)
                {
                    if (Fx.IsFatal(e))
                    {
                        throw;
                    }
                    if (ex == null)
                    {
                        ex = e;
                    }
                }
 
                if (ex != null)
                {
                    throw FxTrace.Exception.AsError(ex);
                }
 
                this.bodyWriter = null;
            }
 
            protected override MessageBuffer OnCreateBufferedCopy(int maxBufferSize)
            {
                BufferedBodyWriter bufferedBodyWriter;
                if (this.bodyWriter.IsBuffered)
                {
                    // Can hand this off in buffered case without making a new one. 
                    bufferedBodyWriter = (BufferedBodyWriter)this.bodyWriter;
                }
                else
                {
                    bufferedBodyWriter = (BufferedBodyWriter)this.bodyWriter.CreateBufferedCopy(maxBufferSize);
                }
 
                // Protected by Message state to be called only once. 
                this.bodyWriter = null;
                return new ByteStreamMessageBuffer(bufferedBodyWriter.MessageData, this.headers, this.properties, this.reader.Quotas, this.moveBodyReaderToContent);
            }
 
            protected override T OnGetBody<T>(XmlDictionaryReader reader)
            {
                Fx.Assert(reader is XmlByteStreamReader, "reader should be XmlByteStreamReader");
                if (this.IsDisposed)
                {
                    throw FxTrace.Exception.ObjectDisposed(SR.ObjectDisposed("message"));
                }
 
                Type typeT = typeof(T);
                if (typeof(Stream) == typeT)
                {
                    Stream stream = (reader as XmlByteStreamReader).ToStream();
                    reader.Close();
                    return (T)(object)stream;
                }
                else if (typeof(byte[]) == typeT)
                {
                    byte[] buffer = (reader as XmlByteStreamReader).ToByteArray();
                    reader.Close();
                    return (T)(object)buffer;
                }
                throw FxTrace.Exception.AsError(
                    new NotSupportedException(SR.ByteStreamMessageGetTypeNotSupported(typeT.FullName)));
            }
 
            protected override XmlDictionaryReader OnGetReaderAtBodyContents()
            {
                XmlDictionaryReader r = this.reader;
                this.reader = null;
 
                if ((r != null) && this.moveBodyReaderToContent)
                {
                    r.MoveToContent();
                }
 
                return r;
            }
 
            protected override IAsyncResult OnBeginWriteMessage(XmlDictionaryWriter writer, AsyncCallback callback, object state)
            {
                WriteMessagePreamble(writer);
                return new OnWriteMessageAsyncResult(writer, this, callback, state);
            }
 
            protected override void OnEndWriteMessage(IAsyncResult result)
            {
                OnWriteMessageAsyncResult.End(result);
            }
 
            protected override void OnWriteBodyContents(XmlDictionaryWriter writer)
            {
                this.bodyWriter.WriteBodyContents(writer);
            }
 
            protected override IAsyncResult OnBeginWriteBodyContents(XmlDictionaryWriter writer, AsyncCallback callback, object state)
            {
                return this.bodyWriter.BeginWriteBodyContents(writer, callback, state);
            }
 
            protected override void OnEndWriteBodyContents(IAsyncResult result)
            {
                this.bodyWriter.EndWriteBodyContents(result);
            }
 
            class OnWriteMessageAsyncResult : AsyncResult
            {
                InternalByteStreamMessage message;
                XmlDictionaryWriter writer;
 
                public OnWriteMessageAsyncResult(XmlDictionaryWriter writer, InternalByteStreamMessage message, AsyncCallback callback, object state)
                    : base(callback, state)
                {
                    this.message = message;
                    this.writer = writer;
 
                    IAsyncResult result = this.message.OnBeginWriteBodyContents(this.writer, PrepareAsyncCompletion(HandleWriteBodyContents), this);
                    bool completeSelf = SyncContinue(result);
 
                    if (completeSelf)
                    {
                        this.Complete(true);
                    }
                }
 
                static bool HandleWriteBodyContents(IAsyncResult result)
                {
                    OnWriteMessageAsyncResult thisPtr = (OnWriteMessageAsyncResult)result.AsyncState;
                    thisPtr.message.OnEndWriteBodyContents(result);
                    thisPtr.message.WriteMessagePostamble(thisPtr.writer);
                    return true;
                }
 
                public static void End(IAsyncResult result)
                {
                    AsyncResult.End<OnWriteMessageAsyncResult>(result);
                }
            }
            
            class BufferedBodyWriter : BodyWriter
            {
                ByteStreamBufferedMessageData bufferedMessageData;
 
                public BufferedBodyWriter(ByteStreamBufferedMessageData bufferedMessageData)
                    : base(true)
                {
                    this.bufferedMessageData = bufferedMessageData;
                }
 
                internal ByteStreamBufferedMessageData MessageData
                {
                    get { return bufferedMessageData; }
                }
 
                protected override BodyWriter OnCreateBufferedCopy(int maxBufferSize)
                {
                    // Never called because when copying a Buffered message, we simply hand off the existing BodyWriter 
                    // to the new message.
                    Fx.Assert(false, "This is never called");
                    return null;
                }
 
                protected override void OnWriteBodyContents(XmlDictionaryWriter writer)
                {
                    writer.WriteStartElement(ByteStreamMessageUtility.StreamElementName, string.Empty);
                    writer.WriteBase64(this.bufferedMessageData.Buffer.Array, this.bufferedMessageData.Buffer.Offset, this.bufferedMessageData.Buffer.Count);
                    writer.WriteEndElement();
                }
            }
 
            abstract class StreamedBodyWriter : BodyWriter
            {
                private StreamedBodyWriter()
                    : base(false)
                {
                }
 
                public static StreamedBodyWriter Create(Stream stream)
                {
                    return new StreamBasedStreamedBodyWriter(stream);
                }
 
                public static StreamedBodyWriter Create(HttpRequestMessage httpRequestMessage)
                {
                    return new HttpRequestMessageStreamedBodyWriter(httpRequestMessage);
                }
 
                public static StreamedBodyWriter Create(HttpResponseMessage httpResponseMessage)
                {
                    return new HttpResponseMessageStreamedBodyWriter(httpResponseMessage);
                }
 
                // OnCreateBufferedCopy / OnWriteBodyContents can only be called once - protected by state on Message (either copied or written once)
                protected override BodyWriter OnCreateBufferedCopy(int maxBufferSize)
                {
                    using (BufferManagerOutputStream bufferedStream = new BufferManagerOutputStream(SR.MaxReceivedMessageSizeExceeded("{0}"), maxBufferSize))
                    {
                        using (XmlDictionaryWriter writer = new XmlByteStreamWriter(bufferedStream, true))
                        {
                            OnWriteBodyContents(writer);
                            writer.Flush();
                            int size;
                            byte[] bytesArray = bufferedStream.ToArray(out size);
                            ByteStreamBufferedMessageData bufferedMessageData = new ByteStreamBufferedMessageData(new ArraySegment<byte>(bytesArray, 0, size));
                            return new BufferedBodyWriter(bufferedMessageData);
                        }
                    }
                }
 
                // OnCreateBufferedCopy / OnWriteBodyContents can only be called once - protected by state on Message (either copied or written once)
                protected override void OnWriteBodyContents(XmlDictionaryWriter writer)
                {
                    writer.WriteStartElement(ByteStreamMessageUtility.StreamElementName, string.Empty);
                    writer.WriteValue(new ByteStreamStreamProvider(this.GetStream()));
                    writer.WriteEndElement();
                }
 
                protected override IAsyncResult OnBeginWriteBodyContents(XmlDictionaryWriter writer, AsyncCallback callback, object state)
                {
                    return new WriteBodyContentsAsyncResult(writer, this.GetStream(), callback, state);
                }
 
                protected override void OnEndWriteBodyContents(IAsyncResult result)
                {
                    WriteBodyContentsAsyncResult.End(result);
                }
 
                protected abstract Stream GetStream();
 
                class ByteStreamStreamProvider : IStreamProvider
                {
                    Stream stream;
 
                    internal ByteStreamStreamProvider(Stream stream)
                    {
                        this.stream = stream;
                    }
 
                    public Stream GetStream()
                    {
                        return stream;
                    }
 
                    public void ReleaseStream(Stream stream)
                    {
                        //Noop
                    }
                }
 
                class WriteBodyContentsAsyncResult : AsyncResult
                {
                    XmlDictionaryWriter writer;
 
                    public WriteBodyContentsAsyncResult(XmlDictionaryWriter writer, Stream stream, AsyncCallback callback, object state)
                        : base(callback, state)
                    {
                        this.writer = writer;
 
                        this.writer.WriteStartElement(ByteStreamMessageUtility.StreamElementName, string.Empty);
                        IAsyncResult result = this.writer.WriteValueAsync(new ByteStreamStreamProvider(stream)).AsAsyncResult(PrepareAsyncCompletion(HandleWriteBodyContents), this);
                        bool completeSelf = SyncContinue(result);
 
                        // Note:  The current task implementation hard codes the "IAsyncResult.CompletedSynchronously" property to false, so this fast path will never
                        // be hit, and we will always hop threads.  CSDMain #210220
                        if (completeSelf)
                        {
                            this.Complete(true);
                        }
                    }
 
                    static bool HandleWriteBodyContents(IAsyncResult result)
                    {
                        // If result is a task, we need to get the result so that exceptions are bubbled up in case the task is faulted.
                        Task t = result as Task;
                        if (t != null)
                        {
                            t.GetAwaiter().GetResult();
                        }
 
                        WriteBodyContentsAsyncResult thisPtr = (WriteBodyContentsAsyncResult)result.AsyncState;
                        thisPtr.writer.WriteEndElement();
                        return true;
                    }
 
                    public static void End(IAsyncResult result)
                    {
                        AsyncResult.End<WriteBodyContentsAsyncResult>(result);
                    }
                }
 
                class StreamBasedStreamedBodyWriter : StreamedBodyWriter
                {
                    private Stream stream;
 
                    public StreamBasedStreamedBodyWriter(Stream stream)
                    {
 
                        this.stream = stream;
                    }
 
                    protected override Stream GetStream()
                    {
                        return this.stream;
                    }
                }
 
                class HttpRequestMessageStreamedBodyWriter : StreamedBodyWriter
                {
                    private HttpRequestMessage httpRequestMessage;
 
                    public HttpRequestMessageStreamedBodyWriter(HttpRequestMessage httpRequestMessage)
                    {
                        Fx.Assert(httpRequestMessage != null, "The 'httpRequestMessage' parameter should not be null.");
 
                        this.httpRequestMessage = httpRequestMessage;
                    }
 
                    protected override Stream GetStream()
                    {
                        HttpContent content = this.httpRequestMessage.Content;
                        if (content != null)
                        {
                            return content.ReadAsStreamAsync().Result;
                        }
 
                        return new MemoryStream(EmptyArray<byte>.Instance);
                    }
 
                    protected override BodyWriter OnCreateBufferedCopy(int maxBufferSize)
                    {
                        HttpContent content = this.httpRequestMessage.Content;
                        if (content != null)
                        {
                            content.LoadIntoBufferAsync(maxBufferSize).Wait();
                        }
 
                        return base.OnCreateBufferedCopy(maxBufferSize);
                    }
                }
 
                class HttpResponseMessageStreamedBodyWriter : StreamedBodyWriter
                {
                    private HttpResponseMessage httpResponseMessage;
 
                    public HttpResponseMessageStreamedBodyWriter(HttpResponseMessage httpResponseMessage)
                    {
                        Fx.Assert(httpResponseMessage != null, "The 'httpResponseMessage' parameter should not be null.");
 
                        this.httpResponseMessage = httpResponseMessage;
                    }
 
                    protected override Stream GetStream()
                    {
                        HttpContent content = this.httpResponseMessage.Content;
                        if (content != null)
                        {
                            return content.ReadAsStreamAsync().Result;
                        }
 
                        return new MemoryStream(EmptyArray<byte>.Instance);
                    }
 
                    protected override BodyWriter OnCreateBufferedCopy(int maxBufferSize)
                    {
                        HttpContent content = this.httpResponseMessage.Content;
                        if (content != null)
                        {
                            content.LoadIntoBufferAsync(maxBufferSize).Wait();
                        }
 
                        return base.OnCreateBufferedCopy(maxBufferSize);
                    }
                }
            }
 
            class ByteStreamMessageBuffer : MessageBuffer
            {
                bool closed;
                MessageHeaders headers;
                ByteStreamBufferedMessageData messageData;
                MessageProperties properties;
                XmlDictionaryReaderQuotas quotas;
                bool moveBodyReaderToContent;
                object thisLock = new object();
 
                public ByteStreamMessageBuffer(ByteStreamBufferedMessageData messageData, MessageHeaders headers, MessageProperties properties, XmlDictionaryReaderQuotas quotas, bool moveBodyReaderToContent)
                    : base()
                {
                    this.messageData = messageData;
                    this.headers = new MessageHeaders(headers);
                    this.properties = new MessageProperties(properties);
                    this.quotas = new XmlDictionaryReaderQuotas();
                    quotas.CopyTo(this.quotas);
                    this.moveBodyReaderToContent = moveBodyReaderToContent;
 
                    this.messageData.Open();                
                }
 
                public override int BufferSize
                {
                    get { return this.messageData.Buffer.Count; }
                }
 
                object ThisLock
                {
                    get { return this.thisLock; }
                }
 
                public override void Close()
                {
                    lock (ThisLock)
                    {
                        if (!closed)
                        {
                            closed = true;
                            this.headers = null;
                            if (properties != null)
                            {
                                properties.Dispose();
                                properties = null;
                            }
                            this.messageData.Close();
                            this.messageData = null;
                            this.quotas = null;
                        }
                    }
                }
 
                public override Message CreateMessage()
                {
                    lock (ThisLock)
                    {
                        if (closed)
                        {
                            throw FxTrace.Exception.ObjectDisposed(SR.ObjectDisposed("message"));
                        }
 
                        return new InternalByteStreamMessage(this.messageData, this.headers, this.properties, this.quotas, this.moveBodyReaderToContent);
                    }
                }
            }
        }
    }
}