File: System\IO\Log\LogReserveAndAppendState.cs
Project: ndp\cdf\src\NetFx35\System.IO.Log\System.IO.Log.csproj (System.IO.Log)
//-----------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------------------------
namespace System.IO.Log
{
    using System;
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.Runtime.InteropServices;
    using System.Threading;
 
 
    internal struct LogReserveAndAppendState
    {
        enum PinIndex
        {
            ReturnLsn = 0,
            Reservations = 1,
            Header = 2,
            Padding = 3,
            FirstData = 4,
        }
 
        enum State
        {
            Unprepared = 0,
            Prepared,
            AwaitCompletion,
            AwaitPolicyCompletion,
            AwaitSecondCompletion,
            Completed
        }
 
        //============================================================
        // Inputs
        //============================================================
        IList<ArraySegment<byte>> data;
        long totalRecordSize;
        ulong userLsn;
        ulong previousLsn;
        LogReservationCollection reservationCollection;
        long[] reservations;
        RecordAppendOptions recordAppendOptions;
        LogRecordSequence recordSequence;
        LogAppendAsyncResult asyncResult;
 
        //============================================================
        // Internal State
        //============================================================
        State currentState;
        Exception exceptionResult;
 
        long[] alignedReservations;
        object boxedResultLsn;
        byte[] headerBits;
        long reservationSize;
        int flags;
 
        //============================================================
        // Pinned data
        //============================================================
        object[] pinnedObjects;
        GCHandle[] handles;
        CLFS_WRITE_ENTRY[] writeEntries;
 
        //============================================================
        // Parameters
        //============================================================        
        public LogAppendAsyncResult AsyncResult
        {
            /* get { return this.asyncResult; } */
            set { this.asyncResult = value; }
        }
 
        public IList<ArraySegment<byte>> Data
        {
            /* get { return this.data; } */
            set { this.data = value; }
        }
 
        public ulong PreviousLsn
        {
            /* get { return this.previousLsn; } */
            set { this.previousLsn = value; }
        }
 
        public long[] Reservations
        {
            get { return this.reservations; }
            set { this.reservations = value; }
        }
 
        public LogRecordSequence RecordSequence
        {
            /* get { return this.recordSequence; } */
            set { this.recordSequence = value; }
        }
 
        public LogReservationCollection ReservationCollection
        {
            /* get { return this.reservationCollection; } */
            set { this.reservationCollection = value; }
        }
 
        public long TotalRecordSize
        {
            /* get { return this.totalRecordSize; } */
            set { this.totalRecordSize = value; }
        }
 
        public ulong UserLsn
        {
            /* get { return this.userLsn; } */
            set { this.userLsn = value; }
        }
 
        public RecordAppendOptions RecordAppendOptions
        {
            /* get { return this.recordAppendOptions; } */
            set { this.recordAppendOptions = value; }
        }
 
        //============================================================
        // Results
        //============================================================        
        public ulong ResultLsn
        {
            get { return (ulong)(this.boxedResultLsn); }
        }
 
        //================================================================
        // Execution: State Machine
        //================================================================
        public void Start()
        {
            if (this.currentState != State.Unprepared)
            {
                // An internal consistency check has failed. The indicates a bug in IO.Log's internal processing
                // Rather than proceeding with non-deterministic execution and risking the loss or corruption of
                // log records, we failfast the process.
                DiagnosticUtility.FailFast("Calling Start twice (reserve n' append)");
            }
            Prepare();
 
            bool complete = false;
            try
            {
                // We use our boxed LSN as a syncRoot, since nobody else
                // is ever going to see it.
                //
                lock (this.boxedResultLsn)
                {
                    Pin();
 
                    this.currentState = State.AwaitCompletion;
                    uint errorCode = ReserveAndAppend();
                    if (errorCode == Error.ERROR_IO_PENDING)
                    {
                        return;
                    }
                    else
                    {
                        AwaitCompletion_Complete(errorCode);
                    }
 
                    // Make sure to check for completion BEFORE
                    // leaving the lock, or there's a ---- with IO
                    // completion that may cause double-completes.
                    //
                    complete = (this.currentState == State.Completed);
                }
            }
#pragma warning suppress 56500 // We will be terminating the process with any exception in this call
            catch (Exception e)
            {
                // The code in the try block should not throw any exceptions.
                // If an exception is caught here, IO.Log may be in an unknown state.
                // We prefer to failfast instead of risking the possibility of log corruption.
                // Any client code using IO.Log must have a recovery model that can deal 
                // with appdomain and process failures.
                DiagnosticUtility.InvokeFinalHandler(e);
            }
 
            if (complete)
            {
                if (this.asyncResult != null)
                {
                    this.asyncResult.Complete(true, this.exceptionResult);
                }
                else if (this.exceptionResult != null)
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(this.exceptionResult);
                }
            }
            else
            {
                if (this.asyncResult == null)
                {
                    // An internal consistency check has failed. The indicates a bug in IO.Log's internal processing
                    // Rather than proceeding with non-deterministic execution and risking the loss or corruption of
                    // log records, we failfast the process.
                    DiagnosticUtility.FailFast("What am I doing returning early without an AsyncResult?");
                }
            }
        }
 
        public void IOComplete(uint errorCode)
        {
            if (this.asyncResult == null)
            {
                // An internal consistency check has failed. The indicates a bug in IO.Log's internal processing
                // Rather than proceeding with non-deterministic execution and risking the loss or corruption of
                // log records, we failfast the process.
                DiagnosticUtility.FailFast("What am I doing in IO completion without an AsyncResult?");
            }
 
            try
            {
                bool complete = false;
 
                // We use our boxed LSN as a syncRoot, since nobody
                // else is ever going to see it.
                //
                lock (this.boxedResultLsn)
                {
                    switch (this.currentState)
                    {
                        case State.AwaitCompletion:
                            AwaitCompletion_Complete(errorCode);
                            break;
 
                        case State.AwaitPolicyCompletion:
                            AwaitPolicyCompletion_Complete(errorCode);
                            break;
 
                        case State.AwaitSecondCompletion:
                            AwaitSecondCompletion_Complete(errorCode);
                            break;
 
                        default:
 
                            // An internal consistency check has failed. The indicates a bug in IO.Log's internal processing
                            // Rather than proceeding with non-deterministic execution and risking the loss or corruption of
                            // log records, we failfast the process.
                            DiagnosticUtility.FailFast("Invalid state for IO completion");
                            break;
                    }
 
                    // Make sure to check for completion BEFORE
                    // leaving the lock, or there's a ---- with IO
                    // completion that may cause double-completes.
                    //
                    complete = (this.currentState == State.Completed);
                }
 
                if (complete)
                {
                    this.asyncResult.Complete(false, this.exceptionResult);
                }
            }
#pragma warning suppress 56500 // We will be terminating the process with any exception in this call
            catch (Exception e)
            {
                // The code in the try block should not throw any exceptions.
                // If an exception is caught here, IO.Log may be in an unknown state.
                // We prefer to failfast instead of risking the possibility of log corruption.
                // Any client code using IO.Log must have a recovery model that can deal 
                // with appdomain and process failures.
                DiagnosticUtility.InvokeFinalHandler(e);
            }
        }
 
        void AwaitCompletion_Complete(uint errorCode)
        {
            Unpin();
 
            if (errorCode == Error.ERROR_SUCCESS)
            {
                Complete(null);
            }
            else if (((errorCode == Error.ERROR_NO_SYSTEM_RESOURCES)
                      || (errorCode == Error.ERROR_LOG_FULL))
                     && (this.recordSequence.RetryAppend))
            {
                this.currentState = State.AwaitPolicyCompletion;
                errorCode = InvokePolicyEngine();
                if (errorCode == Error.ERROR_IO_PENDING)
                {
                    return;
                }
 
                AwaitPolicyCompletion_Complete(errorCode);
            }
            else
            {
                Complete(
                    UnsafeNativeMethods.ReserveAndAppendLogFilter(errorCode));
            }
        }
 
        void AwaitPolicyCompletion_Complete(uint errorCode)
        {
            if (errorCode == Error.ERROR_SUCCESS)
            {
                Pin();
 
                this.currentState = State.AwaitSecondCompletion;
                errorCode = ReserveAndAppend();
                if (errorCode == Error.ERROR_IO_PENDING)
                {
                    return;
                }
 
                AwaitSecondCompletion_Complete(errorCode);
            }
            else
            {
                Complete(
                    UnsafeNativeMethods.HandleLogFullFilter(errorCode));
            }
        }
 
        void AwaitSecondCompletion_Complete(uint errorCode)
        {
            Unpin();
 
            if (errorCode == Error.ERROR_SUCCESS)
            {
                Complete(null);
            }
            else
            {
                Complete(
                    UnsafeNativeMethods.ReserveAndAppendLogFilter(errorCode));
            }
        }
 
        // Get everything ready to run. Only call this once, unlike Pin.
        //
        void Prepare()
        {
            if (this.currentState != State.Unprepared)
            {
                // An internal consistency check has failed. The indicates a bug in IO.Log's internal processing
                // Rather than proceeding with non-deterministic execution and risking the loss or corruption of
                // log records, we failfast the process.
                DiagnosticUtility.FailFast("Calling Prepare Twice");
            }
 
            // Box the result LSN.
            //
            ulong resultLsn = 0;
            this.boxedResultLsn = (object)(resultLsn);
 
            int paddingSize = 0;
            if (this.reservationCollection != null)
            {
                // Determine if we are writing from reservation.
                // Get the best matching reservation if so. Allocate padding
                // if necessary.
                //
                if (this.reservations == null)
                {
                    this.reservationSize = this.reservationCollection.GetMatchingReservation(this.totalRecordSize);
                    if (this.reservationSize < this.totalRecordSize)
                    {
                        // An internal consistency check has failed. The indicates a bug in IO.Log's internal processing
                        // Rather than proceeding with non-deterministic execution and risking the loss or corruption of
                        // log records, we failfast the process.
                        DiagnosticUtility.FailFast("Somehow got a smaller reservation");
                    }
                    if (this.reservationSize <= 0)
                    {
                        // An internal consistency check has failed. The indicates a bug in IO.Log's internal processing
                        // Rather than proceeding with non-deterministic execution and risking the loss or corruption of
                        // log records, we failfast the process.
                        DiagnosticUtility.FailFast("Reservation size must be bigger than zero");
                    }
 
                    paddingSize = checked((int)(this.reservationSize -
                                                this.totalRecordSize));
                }
                else
                {
                    // Otherwise, we are making new reservations
                    // (ReserveAndAppend). The reservations we make
                    // must be adjusted by the header size.
                    //
                    long[] adjustedReservations = new long[this.reservations.Length];
                    for (int i = 0; i < adjustedReservations.Length; i++)
                    {
                        adjustedReservations[i] = (this.reservations[i] +
                                                   LogLogRecordHeader.Size);
                    }
                    this.reservations = adjustedReservations;
                    this.alignedReservations = (long[])this.reservations.Clone();
                }
            }
 
            // Allocate a new LogLogRecordHeader and set it up
            // correctly. 
            //
            this.headerBits = new byte[LogLogRecordHeader.Size + paddingSize];
            LogLogRecordHeader header = new LogLogRecordHeader(this.headerBits);
            header.MajorVersion = LogLogRecordHeader.CurrentMajorVersion;
            header.MinorVersion = LogLogRecordHeader.CurrentMinorVersion;
            header.Padding = (paddingSize != 0);
            if (paddingSize != 0)
            {
                LogLogRecordHeader.EncodePaddingSize(this.headerBits,
                                                     LogLogRecordHeader.Size,
                                                     paddingSize);
            }
 
            // Translate write flags to CLFS flags. 
            //
            this.flags = 0;
            if ((this.recordAppendOptions & RecordAppendOptions.ForceAppend) != 0)
            {
                this.flags |= Const.CLFS_FLAG_FORCE_APPEND;
            }
            if ((this.recordAppendOptions & RecordAppendOptions.ForceFlush) != 0)
            {
                this.flags |= Const.CLFS_FLAG_FORCE_FLUSH;
            }
            if (this.reservationSize > 0)
            {
                this.flags |= Const.CLFS_FLAG_USE_RESERVATION;
            }
 
            this.currentState = State.Prepared;
        }
 
        // Allocate the pinned object array, fill it out, pin the
        // objects, and set up the CLFS_WRITE_ENTRIES all at the same
        // time.
        //
        // (After this function has been called, the GC is not happy.)
        //
        void Pin()
        {
            if (this.handles != null)
            {
                // An internal consistency check has failed. The indicates a bug in IO.Log's internal processing
                // Rather than proceeding with non-deterministic execution and risking the loss or corruption of
                // log records, we failfast the process.
                DiagnosticUtility.FailFast("Already pinned");
            }
 
            int pinnedObjectCount = (int)PinIndex.FirstData;
            pinnedObjectCount += this.data.Count;
            this.pinnedObjects = new object[pinnedObjectCount];
            this.handles = new GCHandle[pinnedObjectCount];
 
            int writeEntryCount = this.data.Count + 1; // Header
            this.writeEntries = new CLFS_WRITE_ENTRY[writeEntryCount];
 
            // Pin objects, and optionally fill in write entries.
            //
            int entryIndex = 0;
 
            // Return LSN
            //
            this.pinnedObjects[(int)PinIndex.ReturnLsn] = this.boxedResultLsn;
            this.handles[(int)PinIndex.ReturnLsn] = GCHandle.Alloc(this.boxedResultLsn, GCHandleType.Pinned);
 
            // Reservations
            //
            if (this.alignedReservations != null)
            {
                this.pinnedObjects[(int)PinIndex.Reservations] = this.alignedReservations;
                this.handles[(int)PinIndex.Reservations] = GCHandle.Alloc(this.alignedReservations, GCHandleType.Pinned);
            }
 
            // Header
            //
            this.pinnedObjects[(int)PinIndex.Header] = this.headerBits;
            this.handles[(int)PinIndex.Header] = GCHandle.Alloc(this.headerBits, GCHandleType.Pinned);
            this.writeEntries[entryIndex].Buffer = Marshal.UnsafeAddrOfPinnedArrayElement(this.headerBits, 0);
            this.writeEntries[entryIndex].ByteLength = this.headerBits.Length;
            entryIndex++;
 
            // Data bits
            //
            for (int i = 0; i < this.data.Count; i++)
            {
                ArraySegment<byte> segment = this.data[i];
 
                this.pinnedObjects[(int)PinIndex.FirstData + i] = segment.Array;
                this.handles[(int)PinIndex.FirstData + i] = GCHandle.Alloc(segment.Array, GCHandleType.Pinned);
                this.writeEntries[entryIndex].Buffer = Marshal.UnsafeAddrOfPinnedArrayElement(segment.Array, segment.Offset);
                this.writeEntries[entryIndex].ByteLength = segment.Count;
                entryIndex++;
            }
 
            if (entryIndex != this.writeEntries.Length)
            {
                // An internal consistency check has failed. The indicates a bug in IO.Log's internal processing
                // Rather than proceeding with non-deterministic execution and risking the loss or corruption of
                // log records, we failfast the process.
                DiagnosticUtility.FailFast("Entry counts do not agree");
            }
        }
 
        // Unpin all the objects, free the GC handles, clear out
        // arrays, etc. Generally remove our hands from around the
        // GC's throat.
        //
        void Unpin()
        {
            if (this.handles != null)
            {
                for (int i = 0; i < this.handles.Length; i++)
                {
                    if (this.handles[i].IsAllocated)
                        this.handles[i].Free();
                }
 
                this.handles = null;
            }
 
            this.pinnedObjects = null;
            this.writeEntries = null;
        }
 
        // Finish up. Synchronize the reservation collection, in the
        // case of failure and success, capture the error (if any),
        // and change state to completed.
        //
        void Complete(Exception error)
        {
            if (this.handles != null)
            {
                // An internal consistency check has failed. The indicates a bug in IO.Log's internal processing
                // Rather than proceeding with non-deterministic execution and risking the loss or corruption of
                // log records, we failfast the process.
                DiagnosticUtility.FailFast("Must unpin before completing");
            }
            if (this.currentState == State.Completed)
            {
                // An internal consistency check has failed. The indicates a bug in IO.Log's internal processing
                // Rather than proceeding with non-deterministic execution and risking the loss or corruption of
                // log records, we failfast the process.
                DiagnosticUtility.FailFast("Called complete twice");
            }
 
            if (error == null)
            {
                if (this.reservations != null)
                {
                    if (this.reservationCollection == null)
                    {
                        // An internal consistency check has failed. The indicates a bug in IO.Log's internal processing
                        // Rather than proceeding with non-deterministic execution and risking the loss or corruption of
                        // log records, we failfast the process.
                        DiagnosticUtility.FailFast("How did I get reservations sans reservationCollection?");
                    }
 
                    foreach (long size in this.reservations)
                    {
                        this.reservationCollection.InternalAddReservation(size);
                    }
                }
            }
            else
            {
                if (this.reservationSize > 0)
                {
                    this.reservationCollection.InternalAddReservation(this.reservationSize);
                }
            }
 
            this.exceptionResult = error;
            this.currentState = State.Completed;
        }
 
        uint ReserveAndAppend()
        {
            uint errorCode = Error.ERROR_SUCCESS;
 
            // Ensure we are packed now
            //
            if (this.asyncResult != null)
            {
                this.asyncResult.Pack(this.pinnedObjects);
            }
 
            try
            {
                unsafe
                {
                    NativeOverlapped* overlapped = null;
                    if (this.asyncResult != null)
                    {
                        overlapped = this.asyncResult.NativeOverlapped;
                        if (overlapped == null)
                        {
                            // An internal consistency check has failed. The indicates a bug in IO.Log's internal processing
                            // Rather than proceeding with non-deterministic execution and risking the loss or corruption of
                            // log records, we failfast the process.
                            DiagnosticUtility.FailFast("Should have packed the async result already");
                        }
                    }
                    else
                    {
                        overlapped = null;
                    }
 
                    int reservationCount = 0;
                    if (this.alignedReservations != null)
                        reservationCount = this.alignedReservations.Length;
 
                    errorCode = UnsafeNativeMethods.ReserveAndAppendLog(
                        this.recordSequence.MarshalContext,
                        this.writeEntries,
                        this.writeEntries.Length,
                        ref this.userLsn,
                        ref this.previousLsn,
                        reservationCount,
                        this.alignedReservations,
                        this.flags,
                        this.handles[(int)PinIndex.ReturnLsn].AddrOfPinnedObject(),
                        overlapped);
                }
            }
            finally
            {
                if (this.asyncResult != null &&
                    errorCode != Error.ERROR_IO_PENDING)
                {
                    this.asyncResult.Free();
                }
            }
 
            return errorCode;
        }
 
        uint InvokePolicyEngine()
        {
            HandleLogFullCallback callback = null;
            if (this.asyncResult != null)
            {
                callback = new HandleLogFullCallback(this.asyncResult.IOCompleted);
            }
 
            LogManagementAsyncResult logManagement;
            logManagement = this.recordSequence.LogStore.LogManagement;
 
            return logManagement.HandleLogFull(callback);
        }
    }
}