File: System\Data\SqlClient\SqlSequentialTextReader.cs
Project: ndp\fx\src\data\System.Data.csproj (System.Data)
using System;
using System.Data.Common;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace System.Data.SqlClient
{
    sealed internal class SqlSequentialTextReader : System.IO.TextReader
    {
        private SqlDataReader _reader;  // The SqlDataReader that we are reading data from
        private int _columnIndex;       // The index of out column in the table
        private Encoding _encoding;     // Encoding for this character stream
        private Decoder _decoder;       // Decoder based on the encoding (NOTE: Decoders are stateful as they are designed to process streams of data)
        private byte[] _leftOverBytes;  // Bytes leftover from the last Read() operation - this can be null if there were no bytes leftover (Possible optimization: re-use the same array?)
        private int _peekedChar;        // The last character that we peeked at (or -1 if we haven't peeked at anything)
        private Task _currentTask;      // The current async task
        private CancellationTokenSource _disposalTokenSource;    // Used to indicate that a cancellation is requested due to disposal
 
        internal SqlSequentialTextReader(SqlDataReader reader, int columnIndex, Encoding encoding)
        {
            Debug.Assert(reader != null, "Null reader when creating sequential textreader");
            Debug.Assert(columnIndex >= 0, "Invalid column index when creating sequential textreader");
            Debug.Assert(encoding != null, "Null encoding when creating sequential textreader");
 
            _reader = reader;
            _columnIndex = columnIndex;
            _encoding = encoding;
            _decoder = encoding.GetDecoder();
            _leftOverBytes = null;
            _peekedChar = -1;
            _currentTask = null;
            _disposalTokenSource = new CancellationTokenSource();
        }
 
        internal int ColumnIndex
        {
            get { return _columnIndex; }
        }
 
        public override int Peek()
        {
            if (_currentTask != null)
            {
                throw ADP.AsyncOperationPending();
            }
            if (IsClosed)
            {
                throw ADP.ObjectDisposed(this);
            }
 
            if (!HasPeekedChar)
            {
                _peekedChar = Read();
            }
 
            Debug.Assert(_peekedChar == -1 || ((_peekedChar >= char.MinValue) && (_peekedChar <= char.MaxValue)), string.Format("Bad peeked character: {0}", _peekedChar));
            return _peekedChar;
        }
 
        public override int Read()
        {
            if (_currentTask != null)
            {
                throw ADP.AsyncOperationPending();
            }
            if (IsClosed)
            {
                throw ADP.ObjectDisposed(this);
            }
 
            int readChar = -1;
 
            // If there is already a peeked char, then return it
            if (HasPeekedChar)
            {
                readChar = _peekedChar;
                _peekedChar = -1;
            }
            // If there is data available try to read a char
            else
            {
                char[] tempBuffer = new char[1];
                int charsRead = InternalRead(tempBuffer, 0, 1);
                if (charsRead == 1)
                {
                    readChar = tempBuffer[0];
                }
            }
 
            Debug.Assert(readChar == -1 || ((readChar >= char.MinValue) && (readChar <= char.MaxValue)), string.Format("Bad read character: {0}", readChar));
            return readChar;
        }
 
        public override int Read(char[] buffer, int index, int count)
        {
            ValidateReadParameters(buffer, index, count);
 
            if (IsClosed)
            {
                throw ADP.ObjectDisposed(this);
            }
            if (_currentTask != null)
            {
                throw ADP.AsyncOperationPending();
            }
 
            int charsRead = 0;
            int charsNeeded = count;
            // Load in peeked char
            if ((charsNeeded > 0) && (HasPeekedChar))
            {
                Debug.Assert((_peekedChar >= char.MinValue) && (_peekedChar <= char.MaxValue), string.Format("Bad peeked character: {0}", _peekedChar));
                buffer[index + charsRead] = (char)_peekedChar;
                charsRead++;
                charsNeeded--;
                _peekedChar = -1;
            }
 
            // If we need more data and there is data avaiable, read
            charsRead += InternalRead(buffer, index + charsRead, charsNeeded);
 
            return charsRead;
        }
 
        public override Task<int> ReadAsync(char[] buffer, int index, int count)
        {
            ValidateReadParameters(buffer, index, count);
            TaskCompletionSource<int> completion = new TaskCompletionSource<int>();
 
            if (IsClosed)
            {
                completion.SetException(ADP.ExceptionWithStackTrace(ADP.ObjectDisposed(this)));
            }
            else
            {
                try
                {
                    Task original = Interlocked.CompareExchange<Task>(ref _currentTask, completion.Task, null);
                    if (original != null)
                    {
                        completion.SetException(ADP.ExceptionWithStackTrace(ADP.AsyncOperationPending()));
                    }
                    else
                    {
                        bool completedSynchronously = true;
                        int charsRead = 0;
                        int adjustedIndex = index;
                        int charsNeeded = count;
 
                        // Load in peeked char
                        if ((HasPeekedChar) && (charsNeeded > 0))
                        {
                            // Take a copy of _peekedChar in case it is cleared during close
                            int peekedChar = _peekedChar;
                            if (peekedChar >= char.MinValue)
                            {
                                Debug.Assert((_peekedChar >= char.MinValue) && (_peekedChar <= char.MaxValue), string.Format("Bad peeked character: {0}", _peekedChar));
                                buffer[adjustedIndex] = (char)peekedChar;
                                adjustedIndex++;
                                charsRead++;
                                charsNeeded--;
                                _peekedChar = -1;
                            }
                        }
 
                        int byteBufferUsed;
                        byte[] byteBuffer = PrepareByteBuffer(charsNeeded, out byteBufferUsed);
 
                        // Permit a 0 byte read in order to advance the reader to the correct column
                        if ((byteBufferUsed < byteBuffer.Length) || (byteBuffer.Length == 0))
                        {
                            int bytesRead;
                            var reader = _reader;
                            if (reader != null) 
                            {
                                Task<int> getBytesTask = reader.GetBytesAsync(_columnIndex, byteBuffer, byteBufferUsed, byteBuffer.Length - byteBufferUsed, Timeout.Infinite, _disposalTokenSource.Token, out bytesRead);
                                if (getBytesTask == null) {
                                    byteBufferUsed += bytesRead;
                                }
                                else {
                                    // We need more data - setup the callback, and mark this as not completed sync
                                    completedSynchronously = false;
                                    getBytesTask.ContinueWith((t) =>
                                    {
                                        _currentTask = null;
                                        // If we completed but the textreader is closed, then report cancellation
                                        if ((t.Status == TaskStatus.RanToCompletion) && (!IsClosed))
                                        {
                                            try
                                            {
                                                int bytesReadFromStream = t.Result;
                                                byteBufferUsed += bytesReadFromStream;
                                                if (byteBufferUsed > 0)
                                                {
                                                    charsRead += DecodeBytesToChars(byteBuffer, byteBufferUsed, buffer, adjustedIndex, charsNeeded);
                                                }
                                                completion.SetResult(charsRead);
                                            }
                                            catch (Exception ex)
                                            {
                                                completion.SetException(ex);
                                            }
                                        }
                                        else if (IsClosed)
                                        {
                                            completion.SetException(ADP.ExceptionWithStackTrace(ADP.ObjectDisposed(this)));
                                        }
                                        else if (t.Status == TaskStatus.Faulted)
                                        {
                                            if (t.Exception.InnerException is SqlException)
                                            {
                                                // ReadAsync can't throw a SqlException, so wrap it in an IOException
                                                completion.SetException(ADP.ExceptionWithStackTrace(ADP.ErrorReadingFromStream(t.Exception.InnerException)));
                                            }
                                            else
                                            {
                                                completion.SetException(t.Exception.InnerException);
                                            }
                                        }
                                        else
                                        {
                                            completion.SetCanceled();
                                        }
                                    }, TaskScheduler.Default);
                                }
                            
 
                                if ((completedSynchronously) && (byteBufferUsed > 0))
                                {
                                    // No more data needed, decode what we have
                                    charsRead += DecodeBytesToChars(byteBuffer, byteBufferUsed, buffer, adjustedIndex, charsNeeded);
                                }
                            }
                            else
                            {
                                // Reader is null, close must of happened in the middle of this read
                                completion.SetException(ADP.ExceptionWithStackTrace(ADP.ObjectDisposed(this)));
                            }
                        }
                    
 
                        if (completedSynchronously)
                        {
                            _currentTask = null;
                            if (IsClosed)
                            {
                                completion.SetCanceled();
                            }
                            else
                            {
                                completion.SetResult(charsRead);
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    // In case of any errors, ensure that the completion is completed and the task is set back to null if we switched it
                    completion.TrySetException(ex);
                    Interlocked.CompareExchange(ref _currentTask, null, completion.Task);
                    throw;
                }
            }
 
            return completion.Task;
        }
 
        protected override void Dispose(bool disposing)
        {
            if (disposing)
            {
                // Set the textreader as closed
                SetClosed();
            }
 
            base.Dispose(disposing);
        }
 
        /// <summary>
        /// Forces the TextReader to act as if it was closed
        /// This does not actually close the stream, read off the rest of the data or dispose this
        /// </summary>
        internal void SetClosed()
        {
            _disposalTokenSource.Cancel();
            _reader = null;
            _peekedChar = -1;
 
            // Wait for pending task
            var currentTask = _currentTask;
            if (currentTask != null) 
            {
                ((IAsyncResult)currentTask).AsyncWaitHandle.WaitOne();
            }
        }
 
        /// <summary>
        /// Performs the actual reading and converting
        /// NOTE: This assumes that buffer, index and count are all valid, we're not closed (!IsClosed) and that there is data left (IsDataLeft())
        /// </summary>
        /// <param name="buffer"></param>
        /// <param name="index"></param>
        /// <param name="count"></param>
        /// <returns></returns>
        private int InternalRead(char[] buffer, int index, int count)
        {
            Debug.Assert(buffer != null, "Null output buffer");
            Debug.Assert((index >= 0) && (count >= 0) && (index + count <= buffer.Length), string.Format("Bad count: {0} or index: {1}", count, index));
            Debug.Assert(!IsClosed, "Can't read while textreader is closed");
 
            try
            {
                int byteBufferUsed;
                byte[] byteBuffer = PrepareByteBuffer(count, out byteBufferUsed);
                byteBufferUsed += _reader.GetBytesInternalSequential(_columnIndex, byteBuffer, byteBufferUsed, byteBuffer.Length - byteBufferUsed);
 
                if (byteBufferUsed > 0) {
                    return DecodeBytesToChars(byteBuffer, byteBufferUsed, buffer, index, count);
                }
                else {
                    // Nothing to read, or nothing read
                    return 0;
                }
            }
            catch (SqlException ex)
            {
                // Read can't throw a SqlException - so wrap it in an IOException
                throw ADP.ErrorReadingFromStream(ex);
            }
        }
 
        /// <summary>
        /// Creates a byte array large enough to store all bytes for the characters in the current encoding, then fills it with any leftover bytes
        /// </summary>
        /// <param name="numberOfChars">Number of characters that are to be read</param>
        /// <param name="byteBufferUsed">Number of bytes pre-filled by the leftover bytes</param>
        /// <returns>A byte array of the correct size, pre-filled with leftover bytes</returns>
        private byte[] PrepareByteBuffer(int numberOfChars, out int byteBufferUsed)
        {
            Debug.Assert(numberOfChars >= 0, "Can't prepare a byte buffer for negative characters");
 
            byte[] byteBuffer;
 
            if (numberOfChars == 0)
            {
                byteBuffer = new byte[0];
                byteBufferUsed = 0;
            }
            else 
            {
                int byteBufferSize = _encoding.GetMaxByteCount(numberOfChars);
 
                if (_leftOverBytes != null)
                {
                    // If we have more leftover bytes than we need for this conversion, then just re-use the leftover buffer
                    if (_leftOverBytes.Length > byteBufferSize)
                    {
                        byteBuffer = _leftOverBytes;
                        byteBufferUsed = byteBuffer.Length;
                    }
                    else
                    {
                        // Otherwise, copy over the leftover buffer
                        byteBuffer = new byte[byteBufferSize];
                        Array.Copy(_leftOverBytes, byteBuffer, _leftOverBytes.Length);
                        byteBufferUsed = _leftOverBytes.Length;
                    }
                }
                else
                {
                    byteBuffer = new byte[byteBufferSize];
                    byteBufferUsed = 0;
                }
            }
 
            return byteBuffer;
        }
 
        /// <summary>
        /// Decodes the given bytes into characters, and stores the leftover bytes for later use
        /// </summary>
        /// <param name="inBuffer">Buffer of bytes to decode</param>
        /// <param name="inBufferCount">Number of bytes to decode from the inBuffer</param>
        /// <param name="outBuffer">Buffer to write the characters to</param>
        /// <param name="outBufferOffset">Offset to start writing to outBuffer at</param>
        /// <param name="outBufferCount">Maximum number of characters to decode</param>
        /// <returns>The actual number of characters decoded</returns>
        private int DecodeBytesToChars(byte[] inBuffer, int inBufferCount, char[] outBuffer, int outBufferOffset, int outBufferCount)
        {
            Debug.Assert(inBuffer != null, "Null input buffer");
            Debug.Assert((inBufferCount > 0) && (inBufferCount <= inBuffer.Length), string.Format("Bad inBufferCount: {0}", inBufferCount));
            Debug.Assert(outBuffer != null, "Null output buffer");
            Debug.Assert((outBufferOffset >= 0) && (outBufferCount > 0) && (outBufferOffset + outBufferCount <= outBuffer.Length), string.Format("Bad outBufferCount: {0} or outBufferOffset: {1}", outBufferCount, outBufferOffset));
 
            int charsRead;
            int bytesUsed;
            bool completed;
            _decoder.Convert(inBuffer, 0, inBufferCount, outBuffer, outBufferOffset, outBufferCount, false, out bytesUsed, out charsRead, out completed);
            
            // completed may be false and there is no spare bytes if the Decoder has stored bytes to use later
            if ((!completed) && (bytesUsed < inBufferCount))
            {
                _leftOverBytes = new byte[inBufferCount - bytesUsed];
                Array.Copy(inBuffer, bytesUsed, _leftOverBytes, 0, _leftOverBytes.Length);
            }
            else
            {
                // If Convert() sets completed to true, then it must have used all of the bytes we gave it
                Debug.Assert(bytesUsed >= inBufferCount, "Converted completed, but not all bytes were used");
                _leftOverBytes = null;
            }
 
            Debug.Assert(((_reader == null) || (_reader.ColumnDataBytesRemaining() > 0) || (!completed) || (_leftOverBytes == null)), "Stream has run out of data and the decoder finished, but there are leftover bytes");
            Debug.Assert(charsRead > 0, "Converted no chars. Bad encoding?");
 
            return charsRead;
        }
 
        /// <summary>
        /// True if this TextReader is supposed to be closed
        /// </summary>
        private bool IsClosed
        {
            get { return (_reader == null); } 
        }
 
        /// <summary>
        /// True if there is data left to read
        /// </summary>
        /// <returns></returns>
        private bool IsDataLeft
        {
            get { return ((_leftOverBytes != null) || (_reader.ColumnDataBytesRemaining() > 0)); }
        }
 
        /// <summary>
        /// True if there is a peeked character available
        /// </summary>
        private bool HasPeekedChar
        {
            get { return (_peekedChar >= char.MinValue); }
        }
 
        /// <summary>
        /// Checks the the parameters passed into a Read() method are valid
        /// </summary>
        /// <param name="buffer"></param>
        /// <param name="index"></param>
        /// <param name="count"></param>
        internal static void ValidateReadParameters(char[] buffer, int index, int count) 
        {
            if (buffer == null)
            {
                throw ADP.ArgumentNull(ADP.ParameterBuffer);
            }
			if (index < 0)
            {
                throw ADP.ArgumentOutOfRange(ADP.ParameterIndex);
            }
			if (count < 0)
            {
                throw ADP.ArgumentOutOfRange(ADP.ParameterCount);
            }
            try
            {
                if (checked(index + count) > buffer.Length)
                {
                    throw ExceptionBuilder.InvalidOffsetLength();
                }
            }
            catch (OverflowException)
            {
                // If we've overflowed when adding index and count, then they never would have fit into buffer anyway
                throw ExceptionBuilder.InvalidOffsetLength();
            }
        }
    }
 
    sealed internal class SqlUnicodeEncoding : UnicodeEncoding
    {
        private static SqlUnicodeEncoding _singletonEncoding = new SqlUnicodeEncoding();
 
        private SqlUnicodeEncoding() : base(bigEndian: false, byteOrderMark: false, throwOnInvalidBytes: false)
        {}
 
        public override Decoder GetDecoder()
        {
            return new SqlUnicodeDecoder();
        }
 
        public override int GetMaxByteCount(int charCount)
        {
            // SQL Server never sends a BOM, so we can assume that its 2 bytes per char
            return charCount * 2;
        }
 
        public static Encoding SqlUnicodeEncodingInstance
        {
            get { return _singletonEncoding; }
        }
 
        sealed private class SqlUnicodeDecoder : Decoder
        {
            public override int GetCharCount(byte[] bytes, int index, int count)
            {
                // SQL Server never sends a BOM, so we can assume that its 2 bytes per char
                return count / 2;
            }
 
            public override int GetChars(byte[] bytes, int byteIndex, int byteCount, char[] chars, int charIndex)
            {
                // This method is required - simply call Convert()
                int bytesUsed;
                int charsUsed;
                bool completed;
                Convert(bytes, byteIndex, byteCount, chars, charIndex, chars.Length - charIndex, true, out bytesUsed, out charsUsed, out completed);
                return charsUsed;
            }
 
            public override void Convert(byte[] bytes, int byteIndex, int byteCount, char[] chars, int charIndex, int charCount, bool flush, out int bytesUsed, out int charsUsed, out bool completed)
            {
                // Assume 2 bytes per char and no BOM
                charsUsed = Math.Min(charCount, byteCount / 2);
                bytesUsed = charsUsed * 2;
                completed = (bytesUsed == byteCount);
 
                // BlockCopy uses offsets\length measured in bytes, not the actual array index
                Buffer.BlockCopy(bytes, byteIndex, chars, charIndex * 2, bytesUsed);
            }
        }
    }    
}