File: system\io\stream.cs
Project: ndp\clr\src\bcl\mscorlib.csproj (mscorlib)
// ==++==
// 
//   Copyright (c) Microsoft Corporation.  All rights reserved.
// 
// ==--==
/*============================================================
**
** Class:  Stream
** 
** <OWNER>gpaperin</OWNER>
**
**
** Purpose: Abstract base class for all Streams.  Provides
** default implementations of asynchronous reads & writes, in
** terms of the synchronous reads & writes (and vice versa).
**
**
===========================================================*/
using System;
using System.Threading;
#if FEATURE_ASYNC_IO
using System.Threading.Tasks;
#endif
 
using System.Runtime;
using System.Runtime.InteropServices;
#if NEW_EXPERIMENTAL_ASYNC_IO
using System.Runtime.CompilerServices;
#endif
using System.Runtime.ExceptionServices;
using System.Security;
using System.Security.Permissions;
using System.Diagnostics.Contracts;
using System.Reflection;
 
namespace System.IO {
    [Serializable]
    [ComVisible(true)]
#if CONTRACTS_FULL
    [ContractClass(typeof(StreamContract))]
#endif
#if FEATURE_REMOTING
    public abstract class Stream : MarshalByRefObject, IDisposable {
#else // FEATURE_REMOTING
    public abstract class Stream : IDisposable {
#endif // FEATURE_REMOTING
 
        public static readonly Stream Null = new NullStream();
 
        //We pick a value that is the largest multiple of 4096 that is still smaller than the large object heap threshold (85K).
        // The CopyTo/CopyToAsync buffer is short-lived and is likely to be collected at Gen0, and it offers a significant
        // improvement in Copy performance.
        private const int _DefaultCopyBufferSize = 81920;
 
#if NEW_EXPERIMENTAL_ASYNC_IO
        // To implement Async IO operations on streams that don't support async IO
 
        [NonSerialized]
        private ReadWriteTask _activeReadWriteTask;
        [NonSerialized]
        private SemaphoreSlim _asyncActiveSemaphore;
 
        internal SemaphoreSlim EnsureAsyncActiveSemaphoreInitialized()
        {
            // Lazily-initialize _asyncActiveSemaphore.  As we're never accessing the SemaphoreSlim's
            // WaitHandle, we don't need to worry about Disposing it.
            return LazyInitializer.EnsureInitialized(ref _asyncActiveSemaphore, () => new SemaphoreSlim(1, 1));
        }
#endif
 
        public abstract bool CanRead {
            [Pure]
            get;
        }
 
        // If CanSeek is false, Position, Seek, Length, and SetLength should throw.
        public abstract bool CanSeek {
            [Pure]
            get;
        }
 
        [ComVisible(false)]
        public virtual bool CanTimeout {
            [Pure]
            get {
                return false;
            }
        }
        
        public abstract bool CanWrite {
            [Pure]
            get;
        }
 
        public abstract long Length {
            get;
        }
 
        public abstract long Position {
            get;
            set;
        }
 
        [ComVisible(false)]
        public virtual int ReadTimeout {
            get {
                Contract.Ensures(Contract.Result<int>() >= 0);
                throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
            }
            set {
                throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
            }
        }
 
        [ComVisible(false)]
        public virtual int WriteTimeout {
            get {
                Contract.Ensures(Contract.Result<int>() >= 0);
                throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
            }
            set {
                throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
            }
        }
 
#if FEATURE_ASYNC_IO
        [HostProtection(ExternalThreading = true)]
        [ComVisible(false)]
        public Task CopyToAsync(Stream destination)
        {
            return CopyToAsync(destination, _DefaultCopyBufferSize);
        }
 
        [HostProtection(ExternalThreading = true)]
        [ComVisible(false)]
        public Task CopyToAsync(Stream destination, Int32 bufferSize)
        {
            return CopyToAsync(destination, bufferSize, CancellationToken.None);
        }
 
        [HostProtection(ExternalThreading = true)]
        [ComVisible(false)]
        public virtual Task CopyToAsync(Stream destination, Int32 bufferSize, CancellationToken cancellationToken)
        {
            if (destination == null)
                throw new ArgumentNullException("destination");
            if (bufferSize <= 0)
                throw new ArgumentOutOfRangeException("bufferSize", Environment.GetResourceString("ArgumentOutOfRange_NeedPosNum"));
            if (!CanRead && !CanWrite)
                throw new ObjectDisposedException(null, Environment.GetResourceString("ObjectDisposed_StreamClosed"));
            if (!destination.CanRead && !destination.CanWrite)
                throw new ObjectDisposedException("destination", Environment.GetResourceString("ObjectDisposed_StreamClosed"));
            if (!CanRead)
                throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnreadableStream"));
            if (!destination.CanWrite)
                throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnwritableStream"));
            Contract.EndContractBlock();
 
            return CopyToAsyncInternal(destination, bufferSize, cancellationToken);
        }
 
        private async Task CopyToAsyncInternal(Stream destination, Int32 bufferSize, CancellationToken cancellationToken)
        {
            Contract.Requires(destination != null);
            Contract.Requires(bufferSize > 0);
            Contract.Requires(CanRead);
            Contract.Requires(destination.CanWrite);
 
            byte[] buffer = new byte[bufferSize];
            int bytesRead;
            while ((bytesRead = await ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0)
            {
                await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false);
            }
        }
#endif // FEATURE_ASYNC_IO
 
        // Reads the bytes from the current stream and writes the bytes to
        // the destination stream until all bytes are read, starting at
        // the current position.
        public void CopyTo(Stream destination)
        {
            if (destination == null)
                throw new ArgumentNullException("destination");
            if (!CanRead && !CanWrite)
                throw new ObjectDisposedException(null, Environment.GetResourceString("ObjectDisposed_StreamClosed"));
            if (!destination.CanRead && !destination.CanWrite)
                throw new ObjectDisposedException("destination", Environment.GetResourceString("ObjectDisposed_StreamClosed"));
            if (!CanRead)
                throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnreadableStream"));
            if (!destination.CanWrite)
                throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnwritableStream"));
            Contract.EndContractBlock();
 
            InternalCopyTo(destination, _DefaultCopyBufferSize);
        }
 
        public void CopyTo(Stream destination, int bufferSize)
        {
            if (destination == null)
                throw new ArgumentNullException("destination");
            if (bufferSize <= 0)
                throw new ArgumentOutOfRangeException("bufferSize",
                        Environment.GetResourceString("ArgumentOutOfRange_NeedPosNum"));
            if (!CanRead && !CanWrite)
                throw new ObjectDisposedException(null, Environment.GetResourceString("ObjectDisposed_StreamClosed"));
            if (!destination.CanRead && !destination.CanWrite)
                throw new ObjectDisposedException("destination", Environment.GetResourceString("ObjectDisposed_StreamClosed"));
            if (!CanRead)
                throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnreadableStream"));
            if (!destination.CanWrite)
                throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnwritableStream"));
            Contract.EndContractBlock();
 
            InternalCopyTo(destination, bufferSize);
        }
 
        private void InternalCopyTo(Stream destination, int bufferSize)
        {
            Contract.Requires(destination != null);
            Contract.Requires(CanRead);
            Contract.Requires(destination.CanWrite);
            Contract.Requires(bufferSize > 0);
            
            byte[] buffer = new byte[bufferSize];
            int read;
            while ((read = Read(buffer, 0, buffer.Length)) != 0)
                destination.Write(buffer, 0, read);
        }
 
 
        // Stream used to require that all cleanup logic went into Close(),
        // which was thought up before we invented IDisposable.  However, we
        // need to follow the IDisposable pattern so that users can write 
        // sensible subclasses without needing to inspect all their base 
        // classes, and without worrying about version brittleness, from a
        // base class switching to the Dispose pattern.  We're moving
        // Stream to the Dispose(bool) pattern - that's where all subclasses 
        // should put their cleanup starting in V2.
        public virtual void Close()
        {
            /* These are correct, but we'd have to fix PipeStream & NetworkStream very carefully.
            Contract.Ensures(CanRead == false);
            Contract.Ensures(CanWrite == false);
            Contract.Ensures(CanSeek == false);
            */
 
            Dispose(true);
            GC.SuppressFinalize(this);
        }
 
        public void Dispose()
        {
            /* These are correct, but we'd have to fix PipeStream & NetworkStream very carefully.
            Contract.Ensures(CanRead == false);
            Contract.Ensures(CanWrite == false);
            Contract.Ensures(CanSeek == false);
            */
 
            Close();
        }
 
 
        protected virtual void Dispose(bool disposing)
        {
            // Note: Never change this to call other virtual methods on Stream
            // like Write, since the state on subclasses has already been 
            // torn down.  This is the last code to run on cleanup for a stream.
        }
 
        public abstract void Flush();
 
#if FEATURE_ASYNC_IO
        [HostProtection(ExternalThreading=true)]
        [ComVisible(false)]
        public Task FlushAsync()
        {
            return FlushAsync(CancellationToken.None);
        }
 
        [HostProtection(ExternalThreading=true)]
        [ComVisible(false)]
        public virtual Task FlushAsync(CancellationToken cancellationToken)
        {
            return Task.Factory.StartNew(state => ((Stream)state).Flush(), this,
                cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
        }
#endif // FEATURE_ASYNC_IO
 
        [Obsolete("CreateWaitHandle will be removed eventually.  Please use \"new ManualResetEvent(false)\" instead.")]
        protected virtual WaitHandle CreateWaitHandle()
        {
            Contract.Ensures(Contract.Result<WaitHandle>() != null);
            return new ManualResetEvent(false);
        }
 
        [HostProtection(ExternalThreading=true)]
        public virtual IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
        {
            Contract.Ensures(Contract.Result<IAsyncResult>() != null);
            return BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: false);
        }
 
        [HostProtection(ExternalThreading = true)]
        internal IAsyncResult BeginReadInternal(byte[] buffer, int offset, int count, AsyncCallback callback, Object state, bool serializeAsynchronously)
        {
            Contract.Ensures(Contract.Result<IAsyncResult>() != null);
            if (!CanRead) __Error.ReadNotSupported();
 
#if !NEW_EXPERIMENTAL_ASYNC_IO
            return BlockingBeginRead(buffer, offset, count, callback, state);
#else
 
            // Mango did not do Async IO.
            if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8)
            {
                return BlockingBeginRead(buffer, offset, count, callback, state);
            }
 
            // To avoid a race with a stream's position pointer & generating ---- 
            // conditions with internal buffer indexes in our own streams that 
            // don't natively support async IO operations when there are multiple 
            // async requests outstanding, we will block the application's main
            // thread if it does a second IO request until the first one completes.
            var semaphore = EnsureAsyncActiveSemaphoreInitialized();
            Task semaphoreTask = null;
            if (serializeAsynchronously)
            {
                semaphoreTask = semaphore.WaitAsync();
            }
            else
            {
                semaphore.Wait();
            }
 
            // Create the task to asynchronously do a Read.  This task serves both
            // as the asynchronous work item and as the IAsyncResult returned to the user.
            var asyncResult = new ReadWriteTask(true /*isRead*/, delegate
            {
                // The ReadWriteTask stores all of the parameters to pass to Read.
                // As we're currently inside of it, we can get the current task
                // and grab the parameters from it.
                var thisTask = Task.InternalCurrent as ReadWriteTask;
                Contract.Assert(thisTask != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask");
 
                // Do the Read and return the number of bytes read
                var bytesRead = thisTask._stream.Read(thisTask._buffer, thisTask._offset, thisTask._count);
                thisTask.ClearBeginState(); // just to help alleviate some memory pressure
                return bytesRead;
            }, state, this, buffer, offset, count, callback);
 
            // Schedule it
            if (semaphoreTask != null)
                RunReadWriteTaskWhenReady(semaphoreTask, asyncResult);
            else
                RunReadWriteTask(asyncResult);
 
            
            return asyncResult; // return it
#endif
        }
 
        public virtual int EndRead(IAsyncResult asyncResult)
        {
            if (asyncResult == null)
                throw new ArgumentNullException("asyncResult");
            Contract.Ensures(Contract.Result<int>() >= 0);
            Contract.EndContractBlock();
 
#if !NEW_EXPERIMENTAL_ASYNC_IO
            return BlockingEndRead(asyncResult);
#else
            // Mango did not do async IO.
            if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8)
            {
                return BlockingEndRead(asyncResult);
            }
 
            var readTask = _activeReadWriteTask;
 
            if (readTask == null)
            {
                throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple"));
            }
            else if (readTask != asyncResult)
            {
                throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple"));
            }
            else if (!readTask._isRead)
            {
                throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple"));
            }
            
            try 
            {
                return readTask.GetAwaiter().GetResult(); // block until completion, then get result / propagate any exception
            }
            finally
            {
                _activeReadWriteTask = null;
                Contract.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here.");
                _asyncActiveSemaphore.Release();
            }
#endif
        }
 
#if FEATURE_ASYNC_IO
        [HostProtection(ExternalThreading = true)]
        [ComVisible(false)]
        public Task<int> ReadAsync(Byte[] buffer, int offset, int count)
        {
            return ReadAsync(buffer, offset, count, CancellationToken.None);
        }
 
        [HostProtection(ExternalThreading = true)]
        [ComVisible(false)]
        public virtual Task<int> ReadAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
        {
            // If cancellation was requested, bail early with an already completed task.
            // Otherwise, return a task that represents the Begin/End methods.
            return cancellationToken.IsCancellationRequested
                        ? Task.FromCancellation<int>(cancellationToken)
                        : BeginEndReadAsync(buffer, offset, count);
        }
 
        private Task<Int32> BeginEndReadAsync(Byte[] buffer, Int32 offset, Int32 count)
        {            
            return TaskFactory<Int32>.FromAsyncTrim(
                        this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count },
                        (stream, args, callback, state) => stream.BeginRead(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
                        (stream, asyncResult) => stream.EndRead(asyncResult)); // cached by compiler
        }
 
        private struct ReadWriteParameters // struct for arguments to Read and Write calls
        {
            internal byte[] Buffer;
            internal int Offset;
            internal int Count;
        }
#endif //FEATURE_ASYNC_IO
 
 
 
        [HostProtection(ExternalThreading=true)]
        public virtual IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
        {
            Contract.Ensures(Contract.Result<IAsyncResult>() != null);
            return BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: false);
        }
 
        [HostProtection(ExternalThreading = true)]
        internal IAsyncResult BeginWriteInternal(byte[] buffer, int offset, int count, AsyncCallback callback, Object state, bool serializeAsynchronously)
        {
            Contract.Ensures(Contract.Result<IAsyncResult>() != null);
            if (!CanWrite) __Error.WriteNotSupported();
#if !NEW_EXPERIMENTAL_ASYNC_IO
            return BlockingBeginWrite(buffer, offset, count, callback, state);
#else
 
            // Mango did not do Async IO.
            if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8)
            {
                return BlockingBeginWrite(buffer, offset, count, callback, state);
            }
 
            // To avoid a race with a stream's position pointer & generating ---- 
            // conditions with internal buffer indexes in our own streams that 
            // don't natively support async IO operations when there are multiple 
            // async requests outstanding, we will block the application's main
            // thread if it does a second IO request until the first one completes.
            var semaphore = EnsureAsyncActiveSemaphoreInitialized();
            Task semaphoreTask = null;
            if (serializeAsynchronously)
            {
                semaphoreTask = semaphore.WaitAsync(); // kick off the asynchronous wait, but don't block
            }
            else
            {
                semaphore.Wait(); // synchronously wait here
            }
 
            // Create the task to asynchronously do a Write.  This task serves both
            // as the asynchronous work item and as the IAsyncResult returned to the user.
            var asyncResult = new ReadWriteTask(false /*isRead*/, delegate
            {
                // The ReadWriteTask stores all of the parameters to pass to Write.
                // As we're currently inside of it, we can get the current task
                // and grab the parameters from it.
                var thisTask = Task.InternalCurrent as ReadWriteTask;
                Contract.Assert(thisTask != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask");
 
                // Do the Write
                thisTask._stream.Write(thisTask._buffer, thisTask._offset, thisTask._count);  
                thisTask.ClearBeginState(); // just to help alleviate some memory pressure
                return 0; // not used, but signature requires a value be returned
            }, state, this, buffer, offset, count, callback);
 
            // Schedule it
            if (semaphoreTask != null)
                RunReadWriteTaskWhenReady(semaphoreTask, asyncResult);
            else
                RunReadWriteTask(asyncResult);
 
            return asyncResult; // return it
#endif
        }
 
#if NEW_EXPERIMENTAL_ASYNC_IO
        private void RunReadWriteTaskWhenReady(Task asyncWaiter, ReadWriteTask readWriteTask)
        {
            Contract.Assert(readWriteTask != null);  // Should be Contract.Requires, but CCRewrite is doing a poor job with
                                                     // preconditions in async methods that await.  Mike & Manuel are aware. (10/6/2011, bug 290222)
            Contract.Assert(asyncWaiter != null);    // Ditto
 
            // If the wait has already complete, run the task.
            if (asyncWaiter.IsCompleted)
            {
                Contract.Assert(asyncWaiter.IsRanToCompletion, "The semaphore wait should always complete successfully.");
                RunReadWriteTask(readWriteTask);
            }                
            else  // Otherwise, wait for our turn, and then run the task.
            {
                asyncWaiter.ContinueWith((t, state) =>
                    {
                        Contract.Assert(t.IsRanToCompletion, "The semaphore wait should always complete successfully.");
                        var tuple = (Tuple<Stream,ReadWriteTask>)state;
                        tuple.Item1.RunReadWriteTask(tuple.Item2); // RunReadWriteTask(readWriteTask);
                    }, Tuple.Create<Stream,ReadWriteTask>(this, readWriteTask),
                default(CancellationToken),
                TaskContinuationOptions.ExecuteSynchronously, 
                TaskScheduler.Default);
            }
        }
 
        private void RunReadWriteTask(ReadWriteTask readWriteTask)
        {
            Contract.Requires(readWriteTask != null);
            Contract.Assert(_activeReadWriteTask == null, "Expected no other readers or writers");
 
            // Schedule the task.  ScheduleAndStart must happen after the write to _activeReadWriteTask to avoid a race.
            // Internally, we're able to directly call ScheduleAndStart rather than Start, avoiding
            // two interlocked operations.  However, if ReadWriteTask is ever changed to use
            // a cancellation token, this should be changed to use Start.
            _activeReadWriteTask = readWriteTask; // store the task so that EndXx can validate it's given the right one
            readWriteTask.m_taskScheduler = TaskScheduler.Default;
            readWriteTask.ScheduleAndStart(needsProtection: false);
        }
#endif
 
        public virtual void EndWrite(IAsyncResult asyncResult)
        {
            if (asyncResult==null)
                throw new ArgumentNullException("asyncResult");
            Contract.EndContractBlock();
 
#if !NEW_EXPERIMENTAL_ASYNC_IO
            BlockingEndWrite(asyncResult);
#else            
 
            // Mango did not do Async IO.
            if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8)
            {
                BlockingEndWrite(asyncResult);
                return;
            }            
 
            var writeTask = _activeReadWriteTask;
            if (writeTask == null)
            {
                throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple"));
            }
            else if (writeTask != asyncResult)
            {
                throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple"));
            }
            else if (writeTask._isRead)
            {
                throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple"));
            }
 
            try 
            {
                writeTask.GetAwaiter().GetResult(); // block until completion, then propagate any exceptions
                Contract.Assert(writeTask.Status == TaskStatus.RanToCompletion);
            }
            finally
            {
                _activeReadWriteTask = null;
                Contract.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here.");
                _asyncActiveSemaphore.Release();
            }
#endif
        }
 
#if NEW_EXPERIMENTAL_ASYNC_IO
        // Task used by BeginRead / BeginWrite to do Read / Write asynchronously.
        // A single instance of this task serves four purposes:
        // 1. The work item scheduled to run the Read / Write operation
        // 2. The state holding the arguments to be passed to Read / Write
        // 3. The IAsyncResult returned from BeginRead / BeginWrite
        // 4. The completion action that runs to invoke the user-provided callback.
        // This last item is a bit tricky.  Before the AsyncCallback is invoked, the
        // IAsyncResult must have completed, so we can't just invoke the handler
        // from within the task, since it is the IAsyncResult, and thus it's not
        // yet completed.  Instead, we use AddCompletionAction to install this
        // task as its own completion handler.  That saves the need to allocate
        // a separate completion handler, it guarantees that the task will
        // have completed by the time the handler is invoked, and it allows
        // the handler to be invoked synchronously upon the completion of the
        // task.  This all enables BeginRead / BeginWrite to be implemented
        // with a single allocation.
        private sealed class ReadWriteTask : Task<int>, ITaskCompletionAction
        {
            internal readonly bool _isRead;
            internal Stream _stream;
            internal byte [] _buffer;
            internal int _offset;
            internal int _count;
            private AsyncCallback _callback;
            private ExecutionContext _context;
 
            internal void ClearBeginState() // Used to allow the args to Read/Write to be made available for GC
            {
                _stream = null;
                _buffer = null;
            }
 
            [SecuritySafeCritical] // necessary for EC.Capture
            [MethodImpl(MethodImplOptions.NoInlining)]
            public ReadWriteTask(
                bool isRead,
                Func<object,int> function, object state,
                Stream stream, byte[] buffer, int offset, int count, AsyncCallback callback) :
                base(function, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach)
            {
                Contract.Requires(function != null);
                Contract.Requires(stream != null);
                Contract.Requires(buffer != null);
                Contract.EndContractBlock();
 
                StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
 
                // Store the arguments
                _isRead = isRead;
                _stream = stream;
                _buffer = buffer;
                _offset = offset;
                _count = count;
 
                // If a callback was provided, we need to:
                // - Store the user-provided handler
                // - Capture an ExecutionContext under which to invoke the handler
                // - Add this task as its own completion handler so that the Invoke method
                //   will run the callback when this task completes.
                if (callback != null)
                {
                    _callback = callback;
                    _context = ExecutionContext.Capture(ref stackMark, 
                        ExecutionContext.CaptureOptions.OptimizeDefaultCase | ExecutionContext.CaptureOptions.IgnoreSyncCtx);
                    base.AddCompletionAction(this);
                }
            }
 
            [SecurityCritical] // necessary for CoreCLR
            private static void InvokeAsyncCallback(object completedTask)
            {
                var rwc = (ReadWriteTask)completedTask;
                var callback = rwc._callback;
                rwc._callback = null;
                callback(rwc);
            }
 
            [SecurityCritical] // necessary for CoreCLR
            private static ContextCallback s_invokeAsyncCallback;
            
            [SecuritySafeCritical] // necessary for ExecutionContext.Run
            void ITaskCompletionAction.Invoke(Task completingTask)
            {
                // Get the ExecutionContext.  If there is none, just run the callback
                // directly, passing in the completed task as the IAsyncResult.
                // If there is one, process it with ExecutionContext.Run.
                var context = _context;
                if (context == null) 
                {
                    var callback = _callback;
                    _callback = null;
                    callback(completingTask);
                }
                else 
                {
                    _context = null;
        
                    var invokeAsyncCallback = s_invokeAsyncCallback;
                    if (invokeAsyncCallback == null) s_invokeAsyncCallback = invokeAsyncCallback = InvokeAsyncCallback; // benign ----
 
                    using(context) ExecutionContext.Run(context, invokeAsyncCallback, this, true);
                }
            }
        }
#endif
 
#if !FEATURE_PAL && FEATURE_ASYNC_IO
        [HostProtection(ExternalThreading = true)]
        [ComVisible(false)]
        public Task WriteAsync(Byte[] buffer, int offset, int count)
        {
            return WriteAsync(buffer, offset, count, CancellationToken.None);
        }
 
        [HostProtection(ExternalThreading = true)]
        [ComVisible(false)]
        public virtual Task WriteAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
        {
            // If cancellation was requested, bail early with an already completed task.
            // Otherwise, return a task that represents the Begin/End methods.
            return cancellationToken.IsCancellationRequested
                        ? Task.FromCancellation(cancellationToken)
                        : BeginEndWriteAsync(buffer, offset, count);
        }
 
 
        private Task BeginEndWriteAsync(Byte[] buffer, Int32 offset, Int32 count)
        {            
            return TaskFactory<VoidTaskResult>.FromAsyncTrim(
                        this, new ReadWriteParameters { Buffer=buffer, Offset=offset, Count=count },
                        (stream, args, callback, state) => stream.BeginWrite(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
                        (stream, asyncResult) => // cached by compiler
                        {
                            stream.EndWrite(asyncResult);
                            return default(VoidTaskResult);
                        });
        }        
#endif // !FEATURE_PAL && FEATURE_ASYNC_IO
 
        public abstract long Seek(long offset, SeekOrigin origin);
 
        public abstract void SetLength(long value);
 
        public abstract int Read([In, Out] byte[] buffer, int offset, int count);
 
        // Reads one byte from the stream by calling Read(byte[], int, int). 
        // Will return an unsigned byte cast to an int or -1 on end of stream.
        // This implementation does not perform well because it allocates a new
        // byte[] each time you call it, and should be overridden by any 
        // subclass that maintains an internal buffer.  Then, it can help perf
        // significantly for people who are reading one byte at a time.
        public virtual int ReadByte()
        {
            Contract.Ensures(Contract.Result<int>() >= -1);
            Contract.Ensures(Contract.Result<int>() < 256);
 
            byte[] oneByteArray = new byte[1];
            int r = Read(oneByteArray, 0, 1);
            if (r==0)
                return -1;
            return oneByteArray[0];
        }
 
        public abstract void Write(byte[] buffer, int offset, int count);
 
        // Writes one byte from the stream by calling Write(byte[], int, int).
        // This implementation does not perform well because it allocates a new
        // byte[] each time you call it, and should be overridden by any 
        // subclass that maintains an internal buffer.  Then, it can help perf
        // significantly for people who are writing one byte at a time.
        public virtual void WriteByte(byte value)
        {
            byte[] oneByteArray = new byte[1];
            oneByteArray[0] = value;
            Write(oneByteArray, 0, 1);
        }
 
        [HostProtection(Synchronization=true)]
        public static Stream Synchronized(Stream stream) 
        {
            if (stream==null)
                throw new ArgumentNullException("stream");
            Contract.Ensures(Contract.Result<Stream>() != null);
            Contract.EndContractBlock();
            if (stream is SyncStream)
                return stream;
            
            return new SyncStream(stream);
        }
 
#if !FEATURE_PAL  // This method shouldn't have been exposed in Dev10 (we revised object invariants after locking down).
        [Obsolete("Do not call or override this method.")]
        protected virtual void ObjectInvariant() 
        {
        }
#endif
 
        internal IAsyncResult BlockingBeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
        {
            Contract.Ensures(Contract.Result<IAsyncResult>() != null);
 
            // To avoid a race with a stream's position pointer & generating ---- 
            // conditions with internal buffer indexes in our own streams that 
            // don't natively support async IO operations when there are multiple 
            // async requests outstanding, we will block the application's main
            // thread and do the IO synchronously.  
            // This can't perform well - use a different approach.
            SynchronousAsyncResult asyncResult; 
            try {
                int numRead = Read(buffer, offset, count);
                asyncResult = new SynchronousAsyncResult(numRead, state);
            }
            catch (IOException ex) {
                asyncResult = new SynchronousAsyncResult(ex, state, isWrite: false);
            }
            
            if (callback != null) {
                callback(asyncResult);
            }
 
            return asyncResult;
        }
 
        internal static int BlockingEndRead(IAsyncResult asyncResult)
        {
            Contract.Ensures(Contract.Result<int>() >= 0);
 
            return SynchronousAsyncResult.EndRead(asyncResult);
        }
 
        internal IAsyncResult BlockingBeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
        {
            Contract.Ensures(Contract.Result<IAsyncResult>() != null);
 
            // To avoid a race with a stream's position pointer & generating ---- 
            // conditions with internal buffer indexes in our own streams that 
            // don't natively support async IO operations when there are multiple 
            // async requests outstanding, we will block the application's main
            // thread and do the IO synchronously.  
            // This can't perform well - use a different approach.
            SynchronousAsyncResult asyncResult;
            try {
                Write(buffer, offset, count);
                asyncResult = new SynchronousAsyncResult(state);
            }
            catch (IOException ex) {
                asyncResult = new SynchronousAsyncResult(ex, state, isWrite: true);
            }
 
            if (callback != null) {
                callback(asyncResult);
            }
 
            return asyncResult;
        }
 
        internal static void BlockingEndWrite(IAsyncResult asyncResult)
        {
            SynchronousAsyncResult.EndWrite(asyncResult);
        }
 
        [Serializable]
        private sealed class NullStream : Stream
        {
            internal NullStream() {}
 
            public override bool CanRead {
                [Pure]
                get { return true; }
            }
 
            public override bool CanWrite {
                [Pure]
                get { return true; }
            }
 
            public override bool CanSeek {
                [Pure]
                get { return true; }
            }
 
            public override long Length {
                get { return 0; }
            }
 
            public override long Position {
                get { return 0; }
                set {}
            }
 
            protected override void Dispose(bool disposing)
            {
                // Do nothing - we don't want NullStream singleton (static) to be closable
            }
 
            public override void Flush()
            {
            }
 
#if FEATURE_ASYNC_IO
            [ComVisible(false)]
            public override Task FlushAsync(CancellationToken cancellationToken)
            {
                return cancellationToken.IsCancellationRequested ?
                    Task.FromCancellation(cancellationToken) :
                    Task.CompletedTask;
            }
#endif // FEATURE_ASYNC_IO
 
            [HostProtection(ExternalThreading = true)]
            public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
            {
                if (!CanRead) __Error.ReadNotSupported();
 
                return BlockingBeginRead(buffer, offset, count, callback, state);
            }
 
            public override int EndRead(IAsyncResult asyncResult)
            {
                if (asyncResult == null)
                    throw new ArgumentNullException("asyncResult");
                Contract.EndContractBlock();
 
                return BlockingEndRead(asyncResult);
            }
 
            [HostProtection(ExternalThreading = true)]
            public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
            {
                if (!CanWrite) __Error.WriteNotSupported();
 
                return BlockingBeginWrite(buffer, offset, count, callback, state);
            }
 
            public override void EndWrite(IAsyncResult asyncResult)
            {
                if (asyncResult == null)
                    throw new ArgumentNullException("asyncResult");
                Contract.EndContractBlock();
 
                BlockingEndWrite(asyncResult);
            }
 
            public override int Read([In, Out] byte[] buffer, int offset, int count)
            {
                return 0;
            }
 
#if FEATURE_ASYNC_IO
            [ComVisible(false)]
            public override Task<int> ReadAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
            {
                var nullReadTask = s_nullReadTask;
                if (nullReadTask == null) 
                    s_nullReadTask = nullReadTask = new Task<int>(false, 0, (TaskCreationOptions)InternalTaskOptions.DoNotDispose, CancellationToken.None); // benign ----
                return nullReadTask;
            }
            private static Task<int> s_nullReadTask;
#endif //FEATURE_ASYNC_IO
 
            public override int ReadByte()
            {
                return -1;
            }
 
            public override void Write(byte[] buffer, int offset, int count)
            {
            }
 
#if FEATURE_ASYNC_IO
            [ComVisible(false)]
            public override Task WriteAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
            {
                return cancellationToken.IsCancellationRequested ?
                    Task.FromCancellation(cancellationToken) :
                    Task.CompletedTask;
            }
#endif // FEATURE_ASYNC_IO
 
            public override void WriteByte(byte value)
            {
            }
 
            public override long Seek(long offset, SeekOrigin origin)
            {
                return 0;
            }
 
            public override void SetLength(long length)
            {
            }
        }
 
        
        /// <summary>Used as the IAsyncResult object when using asynchronous IO methods on the base Stream class.</summary>
        internal sealed class SynchronousAsyncResult : IAsyncResult {
            
            private readonly Object _stateObject;            
            private readonly bool _isWrite;
            private ManualResetEvent _waitHandle;
            private ExceptionDispatchInfo _exceptionInfo;
 
            private bool _endXxxCalled;
            private Int32 _bytesRead;
 
            internal SynchronousAsyncResult(Int32 bytesRead, Object asyncStateObject) {
                _bytesRead = bytesRead;
                _stateObject = asyncStateObject;
                //_isWrite = false;
            }
 
            internal SynchronousAsyncResult(Object asyncStateObject) {
                _stateObject = asyncStateObject;
                _isWrite = true;
            }
 
            internal SynchronousAsyncResult(Exception ex, Object asyncStateObject, bool isWrite) {
                _exceptionInfo = ExceptionDispatchInfo.Capture(ex);
                _stateObject = asyncStateObject;
                _isWrite = isWrite;                
            }
 
            public bool IsCompleted {
                // We never hand out objects of this type to the user before the synchronous IO completed:
                get { return true; }
            }
 
            public WaitHandle AsyncWaitHandle {
                get {
                    return LazyInitializer.EnsureInitialized(ref _waitHandle, () => new ManualResetEvent(true));                    
                }
            }
 
            public Object AsyncState {
                get { return _stateObject; }
            }
 
            public bool CompletedSynchronously {
                get { return true; }
            }
 
            internal void ThrowIfError() {
                if (_exceptionInfo != null)
                    _exceptionInfo.Throw();
            }                        
 
            internal static Int32 EndRead(IAsyncResult asyncResult) {
 
                SynchronousAsyncResult ar = asyncResult as SynchronousAsyncResult;
                if (ar == null || ar._isWrite)
                    __Error.WrongAsyncResult();
 
                if (ar._endXxxCalled)
                    __Error.EndReadCalledTwice();
 
                ar._endXxxCalled = true;
 
                ar.ThrowIfError();
                return ar._bytesRead;
            }
 
            internal static void EndWrite(IAsyncResult asyncResult) {
 
                SynchronousAsyncResult ar = asyncResult as SynchronousAsyncResult;
                if (ar == null || !ar._isWrite)
                    __Error.WrongAsyncResult();
 
                if (ar._endXxxCalled)
                    __Error.EndWriteCalledTwice();
 
                ar._endXxxCalled = true;
 
                ar.ThrowIfError();
            }
        }   // class SynchronousAsyncResult
 
 
        // SyncStream is a wrapper around a stream that takes 
        // a lock for every operation making it thread safe.
        [Serializable]
        internal sealed class SyncStream : Stream, IDisposable
        {
            private Stream _stream;
            [NonSerialized]
            private bool? _overridesBeginRead;
            [NonSerialized]
            private bool? _overridesBeginWrite;
 
            internal SyncStream(Stream stream)
            {
                if (stream == null)
                    throw new ArgumentNullException("stream");
                Contract.EndContractBlock();
                _stream = stream;
            }
        
            public override bool CanRead {
                [Pure]
                get { return _stream.CanRead; }
            }
        
            public override bool CanWrite {
                [Pure]
                get { return _stream.CanWrite; }
            }
        
            public override bool CanSeek {
                [Pure]
                get { return _stream.CanSeek; }
            }
        
            [ComVisible(false)]
            public override bool CanTimeout {
                [Pure]
                get {
                    return _stream.CanTimeout;
                }
            }
 
            public override long Length {
                get {
                    lock(_stream) {
                        return _stream.Length;
                    }
                }
            }
        
            public override long Position {
                get {
                    lock(_stream) {
                        return _stream.Position;
                    }
                }
                set {
                    lock(_stream) {
                        _stream.Position = value;
                    }
                }
            }
 
            [ComVisible(false)]
            public override int ReadTimeout {
                get {
                    return _stream.ReadTimeout;
                }
                set {
                    _stream.ReadTimeout = value;
                }
            }
 
            [ComVisible(false)]
            public override int WriteTimeout {
                get {
                    return _stream.WriteTimeout;
                }
                set {
                    _stream.WriteTimeout = value;
                }
            }
 
            // In the off chance that some wrapped stream has different 
            // semantics for Close vs. Dispose, let's preserve that.
            public override void Close()
            {
                lock(_stream) {
                    try {
                        _stream.Close();
                    }
                    finally {
                        base.Dispose(true);
                    }
                }
            }
        
            protected override void Dispose(bool disposing)
            {
                lock(_stream) {
                    try {
                        // Explicitly pick up a potentially methodimpl'ed Dispose
                        if (disposing)
                            ((IDisposable)_stream).Dispose();
                    }
                    finally {
                        base.Dispose(disposing);
                    }
                }
            }
        
            public override void Flush()
            {
                lock(_stream)
                    _stream.Flush();
            }
        
            public override int Read([In, Out]byte[] bytes, int offset, int count)
            {
                lock(_stream)
                    return _stream.Read(bytes, offset, count);
            }
        
            public override int ReadByte()
            {
                lock(_stream)
                    return _stream.ReadByte();
            }
 
            private static bool OverridesBeginMethod(Stream stream, string methodName)
            {
                Contract.Requires(stream != null, "Expected a non-null stream.");
                Contract.Requires(methodName == "BeginRead" || methodName == "BeginWrite",
                    "Expected BeginRead or BeginWrite as the method name to check.");
 
                // Get all of the methods on the underlying stream
                var methods = stream.GetType().GetMethods(BindingFlags.Public | BindingFlags.Instance);
 
                // If any of the methods have the desired name and are defined on the base Stream
                // Type, then the method was not overridden.  If none of them were defined on the
                // base Stream, then it must have been overridden.
                foreach (var method in methods)
                {
                    if (method.DeclaringType == typeof(Stream) &&
                        method.Name == methodName)
                    {
                        return false;
                    }
                }
                return true;
            }
        
            [HostProtection(ExternalThreading=true)]
            public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
            {
                // Lazily-initialize whether the wrapped stream overrides BeginRead
                if (_overridesBeginRead == null)
                {
                    _overridesBeginRead = OverridesBeginMethod(_stream, "BeginRead");
                }
 
                lock (_stream)
                {
                    // If the Stream does have its own BeginRead implementation, then we must use that override.
                    // If it doesn't, then we'll use the base implementation, but we'll make sure that the logic
                    // which ensures only one asynchronous operation does so with an asynchronous wait rather
                    // than a synchronous wait.  A synchronous wait will result in a deadlock condition, because
                    // the EndXx method for the outstanding async operation won't be able to acquire the lock on
                    // _stream due to this call blocked while holding the lock.
                    return _overridesBeginRead.Value ?
                        _stream.BeginRead(buffer, offset, count, callback, state) :
                        _stream.BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: true);
                }
            }
        
            public override int EndRead(IAsyncResult asyncResult)
            {
                if (asyncResult == null)
                    throw new ArgumentNullException("asyncResult");
                Contract.Ensures(Contract.Result<int>() >= 0);
                Contract.EndContractBlock();
 
                lock(_stream)
                    return _stream.EndRead(asyncResult);
            }
        
            public override long Seek(long offset, SeekOrigin origin)
            {
                lock(_stream)
                    return _stream.Seek(offset, origin);
            }
        
            public override void SetLength(long length)
            {
                lock(_stream)
                    _stream.SetLength(length);
            }
        
            public override void Write(byte[] bytes, int offset, int count)
            {
                lock(_stream)
                    _stream.Write(bytes, offset, count);
            }
        
            public override void WriteByte(byte b)
            {
                lock(_stream)
                    _stream.WriteByte(b);
            }
        
            [HostProtection(ExternalThreading=true)]
            public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
            {
                // Lazily-initialize whether the wrapped stream overrides BeginWrite
                if (_overridesBeginWrite == null)
                {
                    _overridesBeginWrite = OverridesBeginMethod(_stream, "BeginWrite");
                }
 
                lock (_stream)
                {
                    // If the Stream does have its own BeginWrite implementation, then we must use that override.
                    // If it doesn't, then we'll use the base implementation, but we'll make sure that the logic
                    // which ensures only one asynchronous operation does so with an asynchronous wait rather
                    // than a synchronous wait.  A synchronous wait will result in a deadlock condition, because
                    // the EndXx method for the outstanding async operation won't be able to acquire the lock on
                    // _stream due to this call blocked while holding the lock.
                    return _overridesBeginWrite.Value ?
                        _stream.BeginWrite(buffer, offset, count, callback, state) :
                        _stream.BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: true);
                }
            }
            
            public override void EndWrite(IAsyncResult asyncResult)
            {
                if (asyncResult == null)
                    throw new ArgumentNullException("asyncResult");
                Contract.EndContractBlock();
 
                lock(_stream)
                    _stream.EndWrite(asyncResult);
            }
        }
    }
 
#if CONTRACTS_FULL
    [ContractClassFor(typeof(Stream))]
    internal abstract class StreamContract : Stream
    {
        public override long Seek(long offset, SeekOrigin origin)
        {
            Contract.Ensures(Contract.Result<long>() >= 0);
            throw new NotImplementedException();
        }
 
        public override void SetLength(long value)
        {
            throw new NotImplementedException();
        }
 
        public override int Read(byte[] buffer, int offset, int count)
        {
            Contract.Ensures(Contract.Result<int>() >= 0);
            Contract.Ensures(Contract.Result<int>() <= count);
            throw new NotImplementedException();
        }
 
        public override void Write(byte[] buffer, int offset, int count)
        {
            throw new NotImplementedException();
        }
 
        public override long Position {
            get {
                Contract.Ensures(Contract.Result<long>() >= 0);
                throw new NotImplementedException();
            }
            set {
                throw new NotImplementedException();
            }
        }
 
        public override void Flush()
        {
            throw new NotImplementedException();
        }
 
        public override bool CanRead {
            get { throw new NotImplementedException(); }
        }
 
        public override bool CanWrite {
            get { throw new NotImplementedException(); }
        }
 
        public override bool CanSeek {
            get { throw new NotImplementedException(); }
        }
 
        public override long Length
        {
            get {
                Contract.Ensures(Contract.Result<long>() >= 0);
                throw new NotImplementedException();
            }
        }
    }
#endif  // CONTRACTS_FULL
}