File: System\IO\Log\FileLogRecordStream.cs
Project: ndp\cdf\src\NetFx35\System.IO.Log\System.IO.Log.csproj (System.IO.Log)
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation.  All rights reserved.
//------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Text;
using System.Diagnostics;
 
namespace System.IO.Log
{
 
    // This class is not thread safe.
    // FileLogRecordStream lets the user read log records in the form of a stream.
    internal class FileLogRecordStream : System.IO.Stream
    {
        // The stream initialy reads the first 4K bytes of the log record.  
        // When need arises, it reads the entire record and creates a new Memory stream with the record data. 
        const int INITIAL_READ = 4096;
 
        // The memory stream is created from the record data.  It handles all stream operations.
        MemoryStream stream;
        SequenceNumber recordSequenceNumber;
        SimpleFileLog log;
        FileLogRecordHeader header;
 
        // Sequence numbers of the previous and next records.  Returned from ReadRecord and ReadRecordPrefix.
        SequenceNumber prevSeqNum;
        SequenceNumber nextSeqNum;
        int recordSize;
        bool entireRecordRead;
        object streamLock;
 
        internal FileLogRecordStream(SimpleFileLog log, SequenceNumber recordSequenceNumber)
        {
            this.log = log;
            this.recordSequenceNumber = recordSequenceNumber;
            this.stream = null;
            this.recordSize = 0;
            this.entireRecordRead = false;
            this.streamLock = new object();
 
            CreateMemoryStream();
        }
 
        // Creates a memory stream with the first 4K bytes of the record.
        private void CreateMemoryStream()
        {
            byte[] pData;
            int cbData = INITIAL_READ;
            int totalSize;
 
            log.ReadRecordPrefix(this.recordSequenceNumber, out pData, ref cbData, out totalSize, out this.prevSeqNum, out this.nextSeqNum);
 
            if (cbData < FileLogRecordHeader.Size)
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(Error.LogCorrupt());
 
            this.header = new FileLogRecordHeader(pData);
            this.recordSize = totalSize - FileLogRecordHeader.Size;
 
            int streamSize = Math.Min(this.recordSize,
                                      INITIAL_READ - FileLogRecordHeader.Size);
 
            this.stream = new MemoryStream(
                pData,
                FileLogRecordHeader.Size,
                streamSize,
                false);
 
            if (totalSize <= INITIAL_READ)
            {
                // Record is smaller than 4K.  We have read the entire record.
                this.entireRecordRead = true;
            }
        }
 
        internal FileLogRecordHeader Header
        {
            get { return this.header; }
        }
 
        internal SequenceNumber RecordSequenceNumber
        {
            get { return this.recordSequenceNumber; }
        }
 
        internal SequenceNumber PrevLsn
        {
            get { return this.prevSeqNum; }
        }
 
        internal SequenceNumber NextLsn
        {
            get { return this.nextSeqNum; }
        }
 
        private MemoryStream Stream
        {
            get { return this.stream; }
        }
 
        // Multiple threads in Read can result in corrupt data.
        public override int Read(byte[] buffer, int offset, int count)
        {
            int bytesRead = this.Stream.Read(buffer, offset, count);
 
            if (bytesRead == count || this.entireRecordRead)
            {
                return bytesRead;
            }
            else
            {
                // We have reached the end of the stream.  Read the entire record and create a new stream.
                ReadEntireRecord();
 
                return this.Stream.Read(buffer, offset + bytesRead, count - bytesRead) + bytesRead;
            }
        }
 
        // The method reads the entire record and creates a new memory stream.  
        private void ReadEntireRecord()
        {
            lock (this.streamLock)
            {
                if (this.entireRecordRead)
                    return;
 
                // Check if the stream is open.
                if (!this.stream.CanRead)
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(Error.ObjectDisposed());
                }
 
                byte[] data;
                int size;
                SequenceNumber prev;
                SequenceNumber next;
 
                log.ReadRecord(this.recordSequenceNumber, out data, out size, out prev, out next);
                if (!(size == (this.recordSize + FileLogRecordHeader.Size)
                   && next == this.nextSeqNum))
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(Error.LogCorrupt());
                }
 
                long position = this.Stream.Position;
 
                // Create a new stream
                this.stream = new MemoryStream(
                    data,
                    FileLogRecordHeader.Size,
                    data.Length - FileLogRecordHeader.Size,
                    false);
 
                this.Stream.Position = position;
                this.entireRecordRead = true;
            }
        }
 
        public override bool CanRead
        {
            get { return this.stream.CanRead; }
        }
 
        public override bool CanSeek
        {
            get { return this.stream.CanSeek; }
        }
 
        public override bool CanWrite
        {
            get { return this.stream.CanWrite; }
        }
 
        public override void Close()
        {
            lock (this.streamLock)
            {
                this.stream.Close();
            }
        }
 
        public override void Flush()
        {
            this.Stream.Flush();
        }
 
        public override long Length
        {
            get
            {
                if (!this.stream.CanRead)
                {
#pragma warning suppress 56503
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(Error.ObjectDisposed());
                }
 
                return this.recordSize;
            }
        }
 
        public override long Position
        {
            get
            {
                return this.Stream.Position;
            }
 
            set
            {
                this.Stream.Position = value;
            }
        }
 
        public override long Seek(long offset, SeekOrigin origin)
        {
            if (origin == SeekOrigin.End &&
                !this.entireRecordRead)
            {
                // The stream we have was created with the first 4K bytes of the record.  
                offset += this.recordSize;
                origin = SeekOrigin.Begin;
            }
 
            return this.Stream.Seek(offset, origin);
        }
 
        public override void SetLength(long value)
        {
            this.stream.SetLength(value);
        }
 
        public override void Write(byte[] buffer, int offset, int count)
        {
            if (!this.stream.CanRead)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(Error.ObjectDisposed());
            }
            else
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(Error.NotSupported());
            }
        }
 
        public override void WriteByte(byte value)
        {
            if (!this.stream.CanRead)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(Error.ObjectDisposed());
            }
            else
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(Error.NotSupported());
            }
        }
 
    }
}