File: net\System\Net\connectionpool.cs
Project: ndp\fx\src\System.csproj (System)
//------------------------------------------------------------------------------
// <copyright file="ConnectionPool.cs" company="Microsoft">
//     Copyright (c) Microsoft Corporation.  All rights reserved.
// </copyright>
//------------------------------------------------------------------------------
 
namespace System.Net {
 
    using System;
    using System.Net.Sockets;
    using System.Collections;
    using System.Diagnostics;
    using System.Globalization;
    using System.Runtime.InteropServices;
    using System.Security;
    using System.Security.Permissions;
    using System.Threading;
 
    internal delegate void GeneralAsyncDelegate(object request, object state);
    internal delegate PooledStream CreateConnectionDelegate(ConnectionPool pool);
 
    /// <devdoc>
    /// <para>
    ///     Impliments basic ConnectionPooling by pooling PooledStreams
    /// </para>
    /// </devdoc>
    internal class ConnectionPool {
        private enum State {
            Initializing,
            Running,
            ShuttingDown,
        }
 
        private static TimerThread.Callback s_CleanupCallback = new TimerThread.Callback(CleanupCallbackWrapper);
        private static TimerThread.Callback s_CancelErrorCallback = new TimerThread.Callback(CancelErrorCallbackWrapper);
        private static TimerThread.Queue s_CancelErrorQueue = TimerThread.GetOrCreateQueue(ErrorWait);
 
        private const int MaxQueueSize    = (int)0x00100000;
 
        // The order of these is important; we want the WaitAny call to be signaled
        // for a free object before a creation signal.  Only the index first signaled
        // object is returned from the WaitAny call.
        private const int SemaphoreHandleIndex = (int)0x0;
        private const int ErrorHandleIndex     = (int)0x1;
        private const int CreationHandleIndex  = (int)0x2;
 
        private const int WaitTimeout   = (int)0x102;
        private const int WaitAbandoned = (int)0x80;
 
        private const int ErrorWait     = 5 * 1000; // 5 seconds
 
        private readonly TimerThread.Queue m_CleanupQueue;
 
        private State                    m_State;
        private InterlockedStack         m_StackOld;
        private InterlockedStack         m_StackNew;
 
        private int                      m_WaitCount;
        private WaitHandle[]             m_WaitHandles;
 
        private Exception                m_ResError;
        private volatile bool            m_ErrorOccured;
 
        private TimerThread.Timer        m_ErrorTimer;
 
        private ArrayList                m_ObjectList;
        private int                      m_TotalObjects;
 
        private Queue                    m_QueuedRequests;
        private Thread                   m_AsyncThread;
 
        private int                      m_MaxPoolSize;
        private int                      m_MinPoolSize;
        private ServicePoint             m_ServicePoint;
        private CreateConnectionDelegate m_CreateConnectionCallback;
 
        private Mutex CreationMutex {
            get {
                return (Mutex) m_WaitHandles[CreationHandleIndex];
            }
        }
 
        private ManualResetEvent ErrorEvent {
            get {
                return (ManualResetEvent) m_WaitHandles[ErrorHandleIndex];
            }
        }
 
        private Semaphore Semaphore {
            get {
                return (Semaphore) m_WaitHandles[SemaphoreHandleIndex];
            }
        }
 
        /// <summary>
        ///    <para>Constructor - binds pool with a servicePoint and sets up a cleanup Timer to remove Idle Connections</para>
        /// </summary>
        internal ConnectionPool(ServicePoint servicePoint, int maxPoolSize, int minPoolSize, int idleTimeout, CreateConnectionDelegate createConnectionCallback) : base() {
            m_State                = State.Initializing;
 
            m_CreateConnectionCallback = createConnectionCallback;
            m_MaxPoolSize = maxPoolSize;
            m_MinPoolSize = minPoolSize;
            m_ServicePoint = servicePoint;
 
            Initialize();
 
            if (idleTimeout > 0) {
                // special case: if the timeout value is 1 then the timer thread should have a duration
                // of 1 to avoid having the timer callback run constantly
                m_CleanupQueue = TimerThread.GetOrCreateQueue(idleTimeout == 1 ? 1 : (idleTimeout / 2));
                m_CleanupQueue.CreateTimer(s_CleanupCallback, this);
            }
        }
 
        /// <summary>
        ///    <para>Internal init stuff, creates stacks, queue, wait handles etc</para>
        /// </summary>
        private void Initialize() {
            m_StackOld          = new InterlockedStack();
            m_StackNew          = new InterlockedStack();
 
            m_QueuedRequests = new Queue();
 
            m_WaitHandles     = new WaitHandle[3];
            m_WaitHandles[SemaphoreHandleIndex] = new Semaphore(0, MaxQueueSize);
            m_WaitHandles[ErrorHandleIndex]     = new ManualResetEvent(false);
            m_WaitHandles[CreationHandleIndex]  = new Mutex();
 
            m_ErrorTimer         = null;  // No error yet.
 
            m_ObjectList            = new ArrayList();
            m_State = State.Running;
        }
 
 
        /// <summary>
        ///    <para>Async state object, used for storing state on async calls</para>
        /// </summary>
        private class AsyncConnectionPoolRequest {
            public AsyncConnectionPoolRequest(ConnectionPool pool, object owningObject, GeneralAsyncDelegate asyncCallback, int creationTimeout) {
                Pool = pool;
                OwningObject = owningObject;
                AsyncCallback = asyncCallback;
                CreationTimeout = creationTimeout;
            }
            public object OwningObject;
            public GeneralAsyncDelegate AsyncCallback;
            public bool Completed;
            public ConnectionPool Pool;
            public int CreationTimeout;
        }
 
        /// <summary>
        ///    <para>Queues a AsyncConnectionPoolRequest to our queue of requests needing
        ///     a pooled stream. If an AsyncThread is not created, we create one,
        ///     and let it process the queued items</para>
        /// </summary>
        private void QueueRequest(AsyncConnectionPoolRequest asyncRequest) {
            lock(m_QueuedRequests) {
                m_QueuedRequests.Enqueue(asyncRequest);
                if (m_AsyncThread == null) {
                    m_AsyncThread = new Thread(new ThreadStart(AsyncThread));
                    m_AsyncThread.IsBackground = true;
                    m_AsyncThread.Start();
                }
            }
        }
 
        /// <summary>
        ///    <para>Processes async queued requests that are blocked on needing a free pooled stream
        ///         works as follows:
        ///         1. while there are blocked requests, take one out of the queue
        ///         2. Wait for a free connection, when one becomes avail, then notify the request that its there
        ///         3. repeat 1 until there are no more queued requests
        ///         4. if there are no more requests waiting to for a free stream, then close down this thread
        ///</para>
        /// </summary>
        private void AsyncThread() {
            do {
                while (m_QueuedRequests.Count > 0) {
                    bool continueLoop = true;
                    AsyncConnectionPoolRequest asyncState = null;
                    lock (m_QueuedRequests) {
                        asyncState = (AsyncConnectionPoolRequest) m_QueuedRequests.Dequeue();
                    }
 
                    WaitHandle [] localWaitHandles = m_WaitHandles;
                    PooledStream PooledStream = null;
                    try {
                        while ((PooledStream == null) && continueLoop) {
                            int result = WaitHandle.WaitAny(localWaitHandles, asyncState.CreationTimeout, false);
                            PooledStream =
                                Get(asyncState.OwningObject, result, ref continueLoop, ref localWaitHandles);
                        }
 
                        PooledStream.Activate(asyncState.OwningObject, asyncState.AsyncCallback);
                    } catch (Exception e) {
                        if(PooledStream != null){
                            PutConnection(PooledStream, asyncState.OwningObject, asyncState.CreationTimeout, false);
                        }
                        asyncState.AsyncCallback(asyncState.OwningObject, e);
                    }
                }
                Thread.Sleep(500);
                lock(m_QueuedRequests) {
                    if (m_QueuedRequests.Count == 0) {
                        m_AsyncThread = null;
                        break;
                    }
                }
            } while (true);
        }
 
        /// <summary>
        ///    <para>Count of total pooled streams associated with this pool, including streams that are being used</para>
        /// </summary>
        internal int Count {
            get { return(m_TotalObjects); }
        }
 
        /// <summary>
        ///    <para>Our ServicePoint, used for IP resolution</para>
        /// </summary>
        internal ServicePoint ServicePoint {
            get {
                return m_ServicePoint;
            }
        }
 
        /// <summary>
        ///    <para>Our Max Size of outstanding pooled streams</para>
        /// </summary>
        internal int MaxPoolSize {
            get {
                return m_MaxPoolSize;
            }
        }
 
        /// <summary>
        ///    <para>Our Min Size of the pool to remove idled items down to</para>
        /// </summary>
        internal int MinPoolSize {
            get {
                return m_MinPoolSize;
            }
        }
 
        /// <summary>
        ///    <para>An Error occurred usually due to an abort</para>
        /// </summary>
        private bool ErrorOccurred {
            get { return m_ErrorOccured; }
        }
 
        private static void CleanupCallbackWrapper(TimerThread.Timer timer, int timeNoticed, object context)
        {
            ConnectionPool pThis = (ConnectionPool) context;
 
            try
            {
                pThis.CleanupCallback();
            }
            finally
            {
                pThis.m_CleanupQueue.CreateTimer(s_CleanupCallback, context);
            }
        }
 
        /// <summary>
        /// Cleans up everything in both the old and new stack.  If a connection is in use
        /// then it will be on neither stack and it is the responsibility of the object 
        /// using that connection to clean it up when it is finished using it.  This does 
        /// not clean up the ConnectionPool object and new connections can still be 
        /// created if needed in the future should this ConnectionPool object be reused
        /// 
        /// preconditions: none
        /// 
        /// postconditions: any connections not currently in use by an object will be 
        /// gracefully terminated and purged from this connection pool
        /// </summary>
        internal void ForceCleanup()
        {
            if (Logging.On) {
                Logging.Enter(Logging.Web, "ConnectionPool::ForceCleanup");
            }
                      
            // If WaitOne returns false, all connections in the pool are in use 
            // so no cleanup should be performed. The last object owning
            // a connection from the pool will perform final cleanup.
            while (Count > 0) {
                if (Semaphore.WaitOne(0, false)) {
                    // Try to clean up from new stack first, if there isn't anything on new
                    // then try old.  When we lock the Semaphore, it gives us a license to 
                    // remove only one connection from the pool but it can be from either
                    // stack since if the Semaphore is locked by another thread it means that
                    // there must have been more than one connection available in either stack
                    PooledStream pooledStream = (PooledStream)m_StackNew.Pop();
 
                    // no streams in stack new, there must therefore be one in stack old since we
                    // were able to acquire the semaphore
                    if(pooledStream == null) {
                        pooledStream = (PooledStream)m_StackOld.Pop();
                    }
 
                    Debug.Assert(pooledStream != null, "Acquired Semaphore with no connections in either stack");
                    Destroy(pooledStream);   
                }
                else {
                    // couldn't get semaphore, nothing to do here
                    break;
                }
            }    
                   
            if (Logging.On) {
                Logging.Exit(Logging.Web, "ConnectionPool::ForceCleanup");
            }
        }
 
        /// <summary>
        ///    <para>This is called by a timer, to check for needed cleanup of idle pooled streams</para>
        /// </summary>
        private void CleanupCallback()
        {
            // Called when the cleanup-timer ticks over.
            //
            // This is the automatic prunning method.  Every period, we will perform a two-step
            // process.  First, for the objects above MinPool, we will obtain the semaphore for
            // the object and then destroy it if it was on the old stack.  We will continue this
            // until we either reach MinPool size, or we are unable to obtain a free object, or
            // until we have exhausted all the objects on the old stack.  After that, push all
            // objects on the new stack to the old stack.  So, every period the objects on the
            // old stack are destroyed and the objects on the new stack are pushed to the old
            // stack.  All objects that are currently out and in use are not on either stack.
            // With this logic, a object is prunned if unused for at least one period but not
            // more than two periods.
 
            // Destroy free objects above MinPool size from old stack.
            while(Count > MinPoolSize) { // While above MinPoolSize...
 
                // acquiring the Semaphore gives us a license to remove one and only
                // one connection from the pool
                if (Semaphore.WaitOne(0, false) ) { // != WaitTimeout
                    // We obtained a objects from the semaphore.
                    PooledStream pooledStream = (PooledStream) m_StackOld.Pop();
 
                    if (null != pooledStream) {
                        // If we obtained one from the old stack, destroy it.
                        Destroy(pooledStream);
                    }
                    else {
                        // Else we exhausted the old stack, so break
                        // and release the Semaphore to indicate that 
                        // no connection was actually removed so whatever
                        // we had locked is still available.
                        Semaphore.ReleaseSemaphore();
                        break;
                    }
                }
                else break;
            }
 
            // Push to the old-stack.  For each free object, move object from new stack
            // to old stack.  The Semaphore guarantees that we are allowed to handle
            // one connection at a time so moving a connection between stacks is safe since
            // one connection is reserved for the duration of this loop and we only touch
            // one connection at a time on the new stack
            if(Semaphore.WaitOne(0, false)) { //  != WaitTimeout
                for(;;) {
                    PooledStream pooledStream = (PooledStream) m_StackNew.Pop();
 
                    if (null == pooledStream)
                        break;
 
                    GlobalLog.Assert(!pooledStream.IsEmancipated, "Pooled object not in pool.");
                    GlobalLog.Assert(pooledStream.CanBePooled, "Pooled object is not poolable.");
 
                    m_StackOld.Push(pooledStream);
                }
                // no connections were actually destroyed so signal that a connection is now
                // available since we are no longer reserving a connection by holding the 
                // Semaphore
                Semaphore.ReleaseSemaphore();
            }
        }
 
        /// <summary>
        ///    <para>Creates a new PooledStream, performs checks as well on the new stream</para>
        /// </summary>
        private PooledStream Create(CreateConnectionDelegate createConnectionCallback) {
            GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::Create");
            PooledStream newObj = null;
 
            try {
                newObj = createConnectionCallback(this);
 
                if (null == newObj)
                    throw new InternalException();    // Create succeeded, but null object
 
                if (!newObj.CanBePooled)
                    throw new InternalException();    // Create succeeded, but non-poolable object
 
                newObj.PrePush(null);
 
                lock (m_ObjectList.SyncRoot) {
                    m_ObjectList.Add(newObj);
                    m_TotalObjects = m_ObjectList.Count;
                }
 
                GlobalLog.Print("Create pooledStream#"+ValidationHelper.HashString(newObj));
            }
            catch(Exception e)  {
                GlobalLog.Print("Pool Exception: " + e.Message);
 
                newObj = null; // set to null, so we do not return bad new object
                // Failed to create instance
                m_ResError = e;
                Abort();
            }            
            GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::Create",ValidationHelper.HashString(newObj));
            return newObj;
        }
 
 
        /// <summary>
        ///    <para>Destroys a pooled stream from the pool</para>
        /// </summary>
        private void Destroy(PooledStream pooledStream) {
            GlobalLog.Print("Destroy pooledStream#"+ValidationHelper.HashString(pooledStream));
 
            if (null != pooledStream) {
                try
                {
                    lock (m_ObjectList.SyncRoot) {
                        m_ObjectList.Remove(pooledStream);
                        m_TotalObjects = m_ObjectList.Count;
                    }
                }
                finally
                {
                    pooledStream.Dispose();
                }
            }
        }
 
        private static void CancelErrorCallbackWrapper(TimerThread.Timer timer, int timeNoticed, object context)
        {
            ((ConnectionPool) context).CancelErrorCallback();
        }
 
        /// <summary>
        ///    <para>Called on error, after we waited a set amount of time from aborting</para>
        /// </summary>
        private void CancelErrorCallback()
        {
            TimerThread.Timer timer = m_ErrorTimer;
            if (timer != null && timer.Cancel())
            {
                m_ErrorOccured = false;
                ErrorEvent.Reset();
                m_ErrorTimer = null;
                m_ResError = null;
            }
        }
 
        /// <summary>
        ///    <para>Retrieves a pooled stream from the pool proper
        ///     this work by first attemting to find something in the pool on the New stack
        ///     and then trying the Old stack if something is not there availble </para>
        /// </summary>
        private PooledStream GetFromPool(object owningObject) {
            PooledStream res = null;
            GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::GetFromPool");
            res = (PooledStream) m_StackNew.Pop();
            if (null == res) {
                res = (PooledStream) m_StackOld.Pop();
            }
 
            // The semaphore guaranteed that a connection was available so if res is
            // null it means that this contract has been violated somewhere
            GlobalLog.Assert(res != null, "GetFromPool called with nothing in the pool!");
 
            if (null != res) {
                res.PostPop(owningObject);
                GlobalLog.Print("GetFromGeneralPool pooledStream#"+ValidationHelper.HashString(res));
            }
 
            GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::GetFromPool",ValidationHelper.HashString(res));
            return(res);
        }
 
        /// <summary>
        ///    <para>Retrieves the pooled stream out of the pool, does this by using the result
        ///    of a WaitAny as input, and then based on whether it has a mutex, event, semaphore,
        ///     or timeout decides what action to take</para>
        /// </summary>
        private PooledStream Get(object owningObject, int result, ref bool continueLoop, ref WaitHandle [] waitHandles) {
                PooledStream pooledStream = null;
                GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::Get", result.ToString());
 
 
                // From the WaitAny docs: "If more than one object became signaled during
                // the call, this is the array index of the signaled object with the
                // smallest index value of all the signaled objects."  This is important
                // so that the free object signal will be returned before a creation
                // signal.
 
                switch (result) {
                case WaitTimeout:
                    Interlocked.Decrement(ref m_WaitCount);
                    continueLoop = false;
                    GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::Get","throw Timeout WebException");
                    throw new WebException(NetRes.GetWebStatusString("net_timeout", WebExceptionStatus.ConnectFailure), WebExceptionStatus.Timeout);
 
 
                case ErrorHandleIndex:
                    // Throw the error that PoolCreateRequest stashed.
                    int newWaitCount = Interlocked.Decrement(ref m_WaitCount);
                    continueLoop = false;
                    Exception exceptionToThrow = m_ResError;
                    if (newWaitCount == 0) {
                        CancelErrorCallback();
                    }
                    throw exceptionToThrow;
 
                // The creation mutex signaled, which means no connections are available in 
                // the connection pool.  This means you might be able to create a connection.
                case CreationHandleIndex:
                    try {
                        continueLoop = true;
                        // try creating a new connection
                        pooledStream = UserCreateRequest();
 
                        if (null != pooledStream) {
                            pooledStream.PostPop(owningObject);
                            Interlocked.Decrement(ref m_WaitCount);
                            continueLoop = false;
 
                        }
                        else {
                            // If we were not able to create an object, check to see if
                            // we reached MaxPoolSize.  If so, we will no longer wait on
                            // the CreationHandle, but instead wait for a free object or
                            // the timeout.
 
                            // Consider changing: if we receive the CreationHandle midway into the wait
                            // period and re-wait, we will be waiting on the full period
                            if (Count >= MaxPoolSize && 0 != MaxPoolSize) {
                                if (!ReclaimEmancipatedObjects()) {
                                    // modify handle array not to wait on creation mutex anymore
                                    waitHandles    = new WaitHandle[2];
                                    waitHandles[0] = m_WaitHandles[0];
                                    waitHandles[1] = m_WaitHandles[1];
                                }
                            }
 
                        }
                    }
                    finally {
                        CreationMutex.ReleaseMutex();
                    }
                    break;
 
                default:
                    // the semaphore was signaled which can only happen
                    // when a connection has been placed in the pool
                    // so there is guaranteed available inventory
                    Interlocked.Decrement(ref m_WaitCount);
                    pooledStream = GetFromPool(owningObject);
                    continueLoop = false;
                    break;
                }
                GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::Get",ValidationHelper.HashString(pooledStream));
                return pooledStream;
        }
 
        /// <devdoc>
        ///    <para>Aborts the queued requests to the pool</para>
        /// </devdoc>
        internal void Abort() {
            if (m_ResError == null) {
                m_ResError = new WebException(
                        NetRes.GetWebStatusString("net_requestaborted", WebExceptionStatus.RequestCanceled),
                        WebExceptionStatus.RequestCanceled);
            }
            ErrorEvent.Set();
            m_ErrorOccured = true;
            m_ErrorTimer = s_CancelErrorQueue.CreateTimer(s_CancelErrorCallback, this);
        }
 
        /// <devdoc>
        ///    <para>Attempts to create a PooledStream, by trying to get a pooled Connection,
        ///         or by creating its own new one</para>
        /// </devdoc>
        internal PooledStream GetConnection(object owningObject, 
                                            GeneralAsyncDelegate asyncCallback, 
                                            int creationTimeout) {
            int result;
            PooledStream stream = null;
            bool continueLoop = true;
            bool async = (asyncCallback != null) ? true : false;
 
            GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::GetConnection");
 
            if(m_State != State.Running) {
                throw new InternalException();
            }
 
            Interlocked.Increment(ref m_WaitCount);
            WaitHandle[] localWaitHandles = m_WaitHandles;
 
            if (async) {
                result = WaitHandle.WaitAny(localWaitHandles, 0, false);
                if (result != WaitTimeout) {
                    stream = Get(owningObject, result, ref continueLoop, ref localWaitHandles);
                }
                if (stream == null) {
                    GlobalLog.Print("GetConnection:"+ValidationHelper.HashString(this)+" going async");
                    AsyncConnectionPoolRequest asyncState = new AsyncConnectionPoolRequest(this, owningObject, asyncCallback, creationTimeout);
                    QueueRequest(asyncState);
                }
            } else {
                // loop while we don't have an error/timeout and we haven't gotten a stream yet
                while ((stream == null) && continueLoop) {
                    result = WaitHandle.WaitAny(localWaitHandles, creationTimeout, false);
                    stream = Get(owningObject, result, ref continueLoop, ref localWaitHandles);
                }
            }
 
            if (null != stream) {
                // if there is already a stream, then we're not going async
                if (!stream.IsInitalizing) {
                    asyncCallback = null;
                }
 
                try{
                    // If activate returns false, it is going to finish asynchronously 
                    // and therefore the stream will be returned in a callback and
                    // we should not return it here (return null)
                    if (stream.Activate(owningObject, asyncCallback) == false)
                        stream = null;
                }
                catch{
                    PutConnection(stream,owningObject,creationTimeout, false);
                    throw;
                }
            } else if (!async) {
                throw new InternalException();
            }
 
            GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::GetConnection", ValidationHelper.HashString(stream));
            return(stream);
        }
 
        /// <devdoc>
        ///     <para>
        ///     Attempts to return a PooledStream to the pool.  Default is that it can be reused if it can 
        ///     also be pooled.
        ///     </para>
        /// </devdoc>
        internal void PutConnection(PooledStream pooledStream, object owningObject, int creationTimeout)
        {
            // ok to reuse
            PutConnection(pooledStream, owningObject, creationTimeout, true);
        }
 
        /// <devdoc>
        ///    <para>
        ///    Attempts to return a PooledStream to the pool.  If canReuse is false, then the 
        ///    connection will be destroyed even if it is marked as reusable and a new conneciton will
        ///    be created.  If it is true, then the connection will still be checked to ensure that
        ///    it can be pooled and will be cleaned up if it can not for another reason.
        ///    </para>
        /// </devdoc>
        internal void PutConnection(PooledStream pooledStream, object owningObject, int creationTimeout, bool canReuse) {
            GlobalLog.Print("ConnectionPool#" + ValidationHelper.HashString(this) + "::PutConnection");
            if (pooledStream == null) {
                throw new ArgumentNullException("pooledStream");
            }
 
            pooledStream.PrePush(owningObject);
 
            if (m_State != State.ShuttingDown) {
                pooledStream.Deactivate();
 
                // cancel our error status, if we have no new requests waiting anymore
                if (m_WaitCount == 0) {
                    CancelErrorCallback();
                }
 
                if (canReuse && pooledStream.CanBePooled) {
                    PutNew(pooledStream);
                }
                else {
                    try {
                        Destroy(pooledStream);
                    } finally { // Make sure to release the mutex even under error conditions.
                        // Make sure we recreate a new pooled stream, if there are requests for a stream
                        // at this point
                        if (m_WaitCount > 0) {
                            if (!CreationMutex.WaitOne(creationTimeout, false)) {
                                Abort();
                            } else {
                                try {
                                    pooledStream = UserCreateRequest();
                                    if (null != pooledStream) {
                                        PutNew(pooledStream);
                                    }
                                } finally {
                                    CreationMutex.ReleaseMutex();
                                }
                            }
                        }
                    }
                }
            }
            else {
                // If we're shutting down, we destroy the object.
                Destroy(pooledStream);
            }
        }
 
 
        /// <devdoc>
        ///    <para>Places a new/reusable stream in the new stack of the pool</para>
        /// </devdoc>
        private void PutNew(PooledStream pooledStream) {
            GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::PutNew", "#"+ValidationHelper.HashString(pooledStream));
 
            GlobalLog.Assert(null != pooledStream, "Why are we adding a null object to the pool?");
            GlobalLog.Assert(pooledStream.CanBePooled, "Non-poolable object in pool.");
 
            m_StackNew.Push(pooledStream);
            // ensure that the semaphore's count is incremented to signal an available connection is in
            // the pool
            Semaphore.ReleaseSemaphore();
            GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::PutNew");
        }
 
 
        /// <devdoc>
        ///    <para>Reclaim any pooled Streams that have seen their users/WebRequests GCed away</para>
        /// </devdoc>
        private bool ReclaimEmancipatedObjects() {
            bool emancipatedObjectFound = false;
            GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::ReclaimEmancipatedObjects");
 
            lock(m_ObjectList.SyncRoot) {
 
                object[] objectList = m_ObjectList.ToArray();
                if (null != objectList) {
 
                    for (int i = 0; i < objectList.Length; ++i) {
                        PooledStream pooledStream = (PooledStream) objectList[i];
 
                        if (null != pooledStream) {
                            bool locked = false;
 
                            try {
                                Monitor.TryEnter(pooledStream, ref locked);
 
                                if (locked) {
                                    if (pooledStream.IsEmancipated) {
 
                                        GlobalLog.Print("EmancipatedObject pooledStream#"+ValidationHelper.HashString(pooledStream));
                                        PutConnection(pooledStream, null, Timeout.Infinite);
                                        emancipatedObjectFound = true;
                                    }
                                }
                            }
                            finally {
                                if (locked)
                                    Monitor.Exit(pooledStream);
                            }
                        }
                    }
                }
            }
            GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::ReclaimEmancipatedObjects",emancipatedObjectFound.ToString());
            return emancipatedObjectFound;
        }
 
        /// <devdoc>
        ///    <para>Creates a new PooledStream is allowable</para>
        /// </devdoc>
        private PooledStream UserCreateRequest() {
            // called by user when they were not able to obtain a free object but
            // instead obtained creation mutex
            GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::UserCreateRequest");
 
            PooledStream pooledStream = null;
 
            if (!ErrorOccurred) {
                 if (Count < MaxPoolSize || 0 == MaxPoolSize) {
                    // If we have an odd number of total objects, reclaim any dead objects.
                    // If we did not find any objects to reclaim, create a new one.
 
                    // 
                     if ((Count & 0x1) == 0x1 || !ReclaimEmancipatedObjects())
                        pooledStream = Create(m_CreateConnectionCallback);
                }
            }
            GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::UserCreateRequest", ValidationHelper.HashString(pooledStream));
            return pooledStream;
        }
    }
 
 
    /// <devdoc>
    ///    <para>Used to Pool streams in a thread safe manner</para>
    /// </devdoc>
    sealed internal class InterlockedStack {
        private readonly Stack _stack = new Stack();
        private int _count;
 
#if DEBUG
        private readonly Hashtable doublepush = new Hashtable();
#endif
 
        internal InterlockedStack() {
        }
 
        internal void Push(Object pooledStream) {
            GlobalLog.Assert(null != pooledStream, "push null");
            if (null == pooledStream) { throw new ArgumentNullException("pooledStream"); }
            lock(_stack.SyncRoot) {
#if DEBUG
                GlobalLog.Assert(null == doublepush[pooledStream], "object already in stack");
                doublepush[pooledStream] = _stack.Count;
#endif
                _stack.Push(pooledStream);
#if DEBUG
                GlobalLog.Assert(_count+1 == _stack.Count, "push count mishandle");
#endif
                _count = _stack.Count;
            }
        }
 
        internal Object Pop() {
            lock(_stack.SyncRoot) {
                object pooledStream = null;
                if (0 <_stack.Count) {
                    pooledStream = _stack.Pop();
#if DEBUG
                    GlobalLog.Assert(_count-1 == _stack.Count, "pop count mishandle");
                    doublepush.Remove(pooledStream);
#endif
                    _count = _stack.Count;
                }
                return pooledStream;
            }
        }
    }
 
}