File: services\monitoring\system\diagnosticts\AsyncStreamReader.cs
Project: ndp\fx\src\System.csproj (System)
// ==++==
// 
//   Copyright (c) Microsoft Corporation.  All rights reserved.
// 
// ==--==
/*============================================================
**
** Class:  AsyncStreamReader
**
** Purpose: For reading text from streams using a particular 
** encoding in an asychronous manner used by the process class
**
**
===========================================================*/
 
 
namespace System.Diagnostics {    
    using System;
    using System.IO;
    using System.Text;
    using System.Runtime.InteropServices;
    using System.Threading;    
    using System.Collections;    
        
    internal delegate void UserCallBack(String data);
 
    internal class AsyncStreamReader : IDisposable
    {
        internal const int DefaultBufferSize = 1024;  // Byte buffer size
        private const int MinBufferSize = 128;
    
        private Stream stream;
        private Encoding encoding;
        private Decoder decoder;
        private byte[] byteBuffer;
        private char[] charBuffer;
        // Record the number of valid bytes in the byteBuffer, for a few checks.
 
        // This is the maximum number of chars we can get from one call to 
        // ReadBuffer.  Used so ReadBuffer can tell when to copy data into
        // a user's char[] directly, instead of our internal char[].
        private int _maxCharsPerBuffer;
 
        // Store a backpointer to the process class, to check for user callbacks
        private Process process;
 
        // Delegate to call user function.
        private UserCallBack userCallBack;
 
        // Internal Cancel operation
        private bool cancelOperation;       
        private ManualResetEvent eofEvent;
        private Queue messageQueue;
        private StringBuilder sb;
        private bool bLastCarriageReturn;
 
        // Cache the last position scanned in sb when searching for lines.
        private int currentLinePos;
		
        internal AsyncStreamReader(Process process, Stream stream, UserCallBack callback, Encoding encoding) 
            : this(process, stream, callback, encoding, DefaultBufferSize) {
        }
        
 
        // Creates a new AsyncStreamReader for the given stream.  The 
        // character encoding is set by encoding and the buffer size, 
        // in number of 16-bit characters, is set by bufferSize.  
        // 
        internal AsyncStreamReader(Process process, Stream stream, UserCallBack callback,  Encoding encoding, int bufferSize)
        {
            Debug.Assert (process != null && stream !=null && encoding !=null && callback != null, "Invalid arguments!");
            Debug.Assert(stream.CanRead, "Stream must be readable!");
            Debug.Assert(bufferSize > 0, "Invalid buffer size!");
 
            Init(process, stream, callback, encoding, bufferSize);
            messageQueue = new Queue();
        }
            
        private void Init(Process process, Stream stream, UserCallBack callback, Encoding encoding, int bufferSize) {
            this.process = process;
            this.stream = stream;
            this.encoding = encoding;
            this.userCallBack = callback;
            decoder = encoding.GetDecoder();
            if (bufferSize < MinBufferSize) bufferSize = MinBufferSize;
            byteBuffer = new byte[bufferSize];
            _maxCharsPerBuffer = encoding.GetMaxCharCount(bufferSize);
            charBuffer = new char[_maxCharsPerBuffer];
            cancelOperation = false;
            eofEvent = new ManualResetEvent(false);
            sb = null;
            this.bLastCarriageReturn = false;
        }
 
        public virtual void Close()
        {
            Dispose(true);
        }
        
        void IDisposable.Dispose() {
            Dispose(true);
            GC.SuppressFinalize(this);
        }
 
        protected virtual void Dispose(bool disposing)
        {
            if (disposing) {
                if (stream != null)
                    stream.Close();
            }
            if (stream != null) {
                stream = null;
                encoding = null;
                decoder = null;
                byteBuffer = null;
                charBuffer = null;
            }
            
            if( eofEvent != null) {
                eofEvent.Close();
                eofEvent = null;
            }
        }
        
        public virtual Encoding CurrentEncoding {
            get { return encoding; }
        }
        
        public virtual Stream BaseStream {
            get { return stream; }
        }
 
        // User calls BeginRead to start the asynchronous read
        internal void BeginReadLine() {
            if( cancelOperation) {
                cancelOperation = false;
            }
            
            if( sb == null ) {
                sb = new StringBuilder(DefaultBufferSize);
                stream.BeginRead(byteBuffer, 0 , byteBuffer.Length,  new AsyncCallback(ReadBuffer), null);
            }
            else {
                FlushMessageQueue();
            }
        }
 
        internal void CancelOperation() {
            cancelOperation = true;
        }
 
        // This is the async callback function. Only one thread could/should call this.
        private void ReadBuffer(IAsyncResult ar) {
            
            int byteLen;
            
            try {
                byteLen = stream.EndRead(ar);
            }
            catch (IOException ) {
                // We should ideally consume errors from operations getting cancelled
                // so that we don't crash the unsuspecting parent with an unhandled exc. 
                // This seems to come in 2 forms of exceptions (depending on platform and scenario), 
                // namely OperationCanceledException and IOException (for errorcode that we don't 
                // map explicitly).   
                byteLen = 0; // Treat this as EOF
            }
            catch (OperationCanceledException ) {
                // We should consume any OperationCanceledException from child read here  
                // so that we don't crash the parent with an unhandled exc
                byteLen = 0; // Treat this as EOF
            }
                
            if (byteLen == 0) { 
                // We're at EOF, we won't call this function again from here on.
                lock(messageQueue) {
                    if( sb.Length != 0) {
                        messageQueue.Enqueue(sb.ToString());
                        sb.Length = 0;
                    }
                    messageQueue.Enqueue(null);
                }
 
                try {
                    // UserCallback could throw, we should still set the eofEvent 
                    FlushMessageQueue();
                }
                finally {
                    eofEvent.Set();
                }
            } else {
                int charLen = decoder.GetChars(byteBuffer, 0, byteLen, charBuffer, 0);
                sb.Append(charBuffer, 0, charLen);
                GetLinesFromStringBuilder();
                stream.BeginRead(byteBuffer, 0 , byteBuffer.Length,  new AsyncCallback(ReadBuffer), null);
            }
        }
        
 
        // Read lines stored in StringBuilder and the buffer we just read into. 
        // A line is defined as a sequence of characters followed by
        // a carriage return ('\r'), a line feed ('\n'), or a carriage return
        // immediately followed by a line feed. The resulting string does not
        // contain the terminating carriage return and/or line feed. The returned
        // value is null if the end of the input stream has been reached.
        //
 
        private void GetLinesFromStringBuilder() {
            int currentIndex = currentLinePos;
            int lineStart = 0;
            int len = sb.Length;
 
            // skip a beginning '\n' character of new block if last block ended 
            // with '\r'
            if (bLastCarriageReturn && (len > 0) && sb[0] == '\n')
            {
                currentIndex = 1;
                lineStart = 1;
                bLastCarriageReturn = false;
            }
 
            while (currentIndex < len) {
                char ch = sb[currentIndex];
                // Note the following common line feed chars:
                // \n - UNIX   \r\n - DOS   \r - Mac
                if (ch == '\r' || ch == '\n') {
                    string s = sb.ToString(lineStart, currentIndex - lineStart);
                    lineStart = currentIndex + 1;
                    // skip the "\n" character following "\r" character
                    if ((ch == '\r') && (lineStart < len) && (sb[lineStart] == '\n'))
                    {
                        lineStart++;
                        currentIndex++;                
                    }
                                        
                    lock(messageQueue) {
                        messageQueue.Enqueue(s);
                    }
                }
                currentIndex++;
            }
            // Protect length as IndexOutOfRangeException was being thrown when less than a
            // character's worth of bytes was read at the beginning of a line.
            if (len > 0 && sb[len - 1] == '\r') {
                bLastCarriageReturn = true;
            }
            // Keep the rest characaters which can't form a new line in string builder.
            if (lineStart < len) {
                if (lineStart == 0) {
                    // we found no breaklines, in this case we cache the position
                    // so next time we don't have to restart from the beginning
                    currentLinePos = currentIndex;
                }
                else {
                    sb.Remove(0, lineStart);
                    currentLinePos = 0;
                }
            }
            else {
                sb.Length = 0;
                currentLinePos = 0;
            }
 
            FlushMessageQueue();
        }
 
        private void FlushMessageQueue() {            
            while(true) {
               
                // When we call BeginReadLine, we also need to flush the queue
                // So there could be a ---- between the ReadBuffer and BeginReadLine
                // We need to take lock before DeQueue.
                if( messageQueue.Count > 0) {
                    lock(messageQueue) {
                        if( messageQueue.Count > 0) {
                            string s = (string)messageQueue.Dequeue();
                            // skip if the read is the read is cancelled
                            // this might happen inside UserCallBack
                            // However, continue to drain the queue
                            if (!cancelOperation)
                            {
                                userCallBack(s);
                            }							
                        }
                    }
                }
                else {
                    break;
                }
            }
        }
        
        // Wait until we hit EOF. This is called from Process.WaitForExit
        // We will lose some information if we don't do this.
        internal void WaitUtilEOF() {
            if( eofEvent != null) {
                eofEvent.WaitOne();
                eofEvent.Close();
                eofEvent = null;
            }            
        }
    }
}