|
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------
namespace System.Runtime
{
using System;
using System.Collections.Generic;
using System.Threading;
[Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.PrivatePrimitive, SupportsAsync = true, ReleaseMethod = "Dispatch")]
sealed class InputQueue<T> : IDisposable where T : class
{
static Action<object> completeOutstandingReadersCallback;
static Action<object> completeWaitersFalseCallback;
static Action<object> completeWaitersTrueCallback;
static Action<object> onDispatchCallback;
static Action<object> onInvokeDequeuedCallback;
QueueState queueState;
[Fx.Tag.SynchronizationObject(Blocking = false, Kind = Fx.Tag.SynchronizationKind.LockStatement)]
ItemQueue itemQueue;
[Fx.Tag.SynchronizationObject]
Queue<IQueueReader> readerQueue;
[Fx.Tag.SynchronizationObject]
List<IQueueWaiter> waiterList;
public InputQueue()
{
this.itemQueue = new ItemQueue();
this.readerQueue = new Queue<IQueueReader>();
this.waiterList = new List<IQueueWaiter>();
this.queueState = QueueState.Open;
}
public InputQueue(Func<Action<AsyncCallback, IAsyncResult>> asyncCallbackGenerator)
: this()
{
Fx.Assert(asyncCallbackGenerator != null, "use default ctor if you don't have a generator");
AsyncCallbackGenerator = asyncCallbackGenerator;
}
public int PendingCount
{
get
{
lock (ThisLock)
{
return this.itemQueue.ItemCount;
}
}
}
// Users like ServiceModel can hook this abort ICommunicationObject or handle other non-IDisposable objects
public Action<T> DisposeItemCallback
{
get;
set;
}
// Users like ServiceModel can hook this to wrap the AsyncQueueReader callback functionality for tracing, etc
Func<Action<AsyncCallback, IAsyncResult>> AsyncCallbackGenerator
{
get;
set;
}
object ThisLock
{
get { return this.itemQueue; }
}
public IAsyncResult BeginDequeue(TimeSpan timeout, AsyncCallback callback, object state)
{
Item item = default(Item);
lock (ThisLock)
{
if (queueState == QueueState.Open)
{
if (itemQueue.HasAvailableItem)
{
item = itemQueue.DequeueAvailableItem();
}
else
{
AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state);
readerQueue.Enqueue(reader);
return reader;
}
}
else if (queueState == QueueState.Shutdown)
{
if (itemQueue.HasAvailableItem)
{
item = itemQueue.DequeueAvailableItem();
}
else if (itemQueue.HasAnyItem)
{
AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state);
readerQueue.Enqueue(reader);
return reader;
}
}
}
InvokeDequeuedCallback(item.DequeuedCallback);
return new CompletedAsyncResult<T>(item.GetValue(), callback, state);
}
public IAsyncResult BeginWaitForItem(TimeSpan timeout, AsyncCallback callback, object state)
{
lock (ThisLock)
{
if (queueState == QueueState.Open)
{
if (!itemQueue.HasAvailableItem)
{
AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state);
waiterList.Add(waiter);
return waiter;
}
}
else if (queueState == QueueState.Shutdown)
{
if (!itemQueue.HasAvailableItem && itemQueue.HasAnyItem)
{
AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state);
waiterList.Add(waiter);
return waiter;
}
}
}
return new CompletedAsyncResult<bool>(true, callback, state);
}
public void Close()
{
Dispose();
}
[Fx.Tag.Blocking(CancelMethod = "Close")]
public T Dequeue(TimeSpan timeout)
{
T value;
if (!this.Dequeue(timeout, out value))
{
throw Fx.Exception.AsError(new TimeoutException(InternalSR.TimeoutInputQueueDequeue(timeout)));
}
return value;
}
[Fx.Tag.Blocking(CancelMethod = "Close")]
public bool Dequeue(TimeSpan timeout, out T value)
{
WaitQueueReader reader = null;
Item item = new Item();
lock (ThisLock)
{
if (queueState == QueueState.Open)
{
if (itemQueue.HasAvailableItem)
{
item = itemQueue.DequeueAvailableItem();
}
else
{
reader = new WaitQueueReader(this);
readerQueue.Enqueue(reader);
}
}
else if (queueState == QueueState.Shutdown)
{
if (itemQueue.HasAvailableItem)
{
item = itemQueue.DequeueAvailableItem();
}
else if (itemQueue.HasAnyItem)
{
reader = new WaitQueueReader(this);
readerQueue.Enqueue(reader);
}
else
{
value = default(T);
return true;
}
}
else // queueState == QueueState.Closed
{
value = default(T);
return true;
}
}
if (reader != null)
{
return reader.Wait(timeout, out value);
}
else
{
InvokeDequeuedCallback(item.DequeuedCallback);
value = item.GetValue();
return true;
}
}
public void Dispatch()
{
IQueueReader reader = null;
Item item = new Item();
IQueueReader[] outstandingReaders = null;
IQueueWaiter[] waiters = null;
bool itemAvailable = true;
lock (ThisLock)
{
itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown));
this.GetWaiters(out waiters);
if (queueState != QueueState.Closed)
{
itemQueue.MakePendingItemAvailable();
if (readerQueue.Count > 0)
{
item = itemQueue.DequeueAvailableItem();
reader = readerQueue.Dequeue();
if (queueState == QueueState.Shutdown && readerQueue.Count > 0 && itemQueue.ItemCount == 0)
{
outstandingReaders = new IQueueReader[readerQueue.Count];
readerQueue.CopyTo(outstandingReaders, 0);
readerQueue.Clear();
itemAvailable = false;
}
}
}
}
if (outstandingReaders != null)
{
if (completeOutstandingReadersCallback == null)
{
completeOutstandingReadersCallback = new Action<object>(CompleteOutstandingReadersCallback);
}
ActionItem.Schedule(completeOutstandingReadersCallback, outstandingReaders);
}
if (waiters != null)
{
CompleteWaitersLater(itemAvailable, waiters);
}
if (reader != null)
{
InvokeDequeuedCallback(item.DequeuedCallback);
reader.Set(item);
}
}
[Fx.Tag.Blocking(CancelMethod = "Close", Conditional = "!result.IsCompleted")]
public bool EndDequeue(IAsyncResult result, out T value)
{
CompletedAsyncResult<T> typedResult = result as CompletedAsyncResult<T>;
if (typedResult != null)
{
value = CompletedAsyncResult<T>.End(result);
return true;
}
return AsyncQueueReader.End(result, out value);
}
[Fx.Tag.Blocking(CancelMethod = "Close", Conditional = "!result.IsCompleted")]
public T EndDequeue(IAsyncResult result)
{
T value;
if (!this.EndDequeue(result, out value))
{
throw Fx.Exception.AsError(new TimeoutException());
}
return value;
}
[Fx.Tag.Blocking(CancelMethod = "Dispatch", Conditional = "!result.IsCompleted")]
public bool EndWaitForItem(IAsyncResult result)
{
CompletedAsyncResult<bool> typedResult = result as CompletedAsyncResult<bool>;
if (typedResult != null)
{
return CompletedAsyncResult<bool>.End(result);
}
return AsyncQueueWaiter.End(result);
}
public void EnqueueAndDispatch(T item)
{
EnqueueAndDispatch(item, null);
}
// dequeuedCallback is called as an item is dequeued from the InputQueue. The
// InputQueue lock is not held during the callback. However, the user code will
// not be notified of the item being available until the callback returns. If you
// are not sure if the callback will block for a long time, then first call
// IOThreadScheduler.ScheduleCallback to get to a "safe" thread.
public void EnqueueAndDispatch(T item, Action dequeuedCallback)
{
EnqueueAndDispatch(item, dequeuedCallback, true);
}
public void EnqueueAndDispatch(Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread)
{
Fx.Assert(exception != null, "EnqueueAndDispatch: exception parameter should not be null");
EnqueueAndDispatch(new Item(exception, dequeuedCallback), canDispatchOnThisThread);
}
public void EnqueueAndDispatch(T item, Action dequeuedCallback, bool canDispatchOnThisThread)
{
Fx.Assert(item != null, "EnqueueAndDispatch: item parameter should not be null");
EnqueueAndDispatch(new Item(item, dequeuedCallback), canDispatchOnThisThread);
}
public bool EnqueueWithoutDispatch(T item, Action dequeuedCallback)
{
Fx.Assert(item != null, "EnqueueWithoutDispatch: item parameter should not be null");
return EnqueueWithoutDispatch(new Item(item, dequeuedCallback));
}
public bool EnqueueWithoutDispatch(Exception exception, Action dequeuedCallback)
{
Fx.Assert(exception != null, "EnqueueWithoutDispatch: exception parameter should not be null");
return EnqueueWithoutDispatch(new Item(exception, dequeuedCallback));
}
public void Shutdown()
{
this.Shutdown(null);
}
// Don't let any more items in. Differs from Close in that we keep around
// existing items in our itemQueue for possible future calls to Dequeue
public void Shutdown(Func<Exception> pendingExceptionGenerator)
{
IQueueReader[] outstandingReaders = null;
lock (ThisLock)
{
if (queueState == QueueState.Shutdown)
{
return;
}
if (queueState == QueueState.Closed)
{
return;
}
this.queueState = QueueState.Shutdown;
if (readerQueue.Count > 0 && this.itemQueue.ItemCount == 0)
{
outstandingReaders = new IQueueReader[readerQueue.Count];
readerQueue.CopyTo(outstandingReaders, 0);
readerQueue.Clear();
}
}
if (outstandingReaders != null)
{
for (int i = 0; i < outstandingReaders.Length; i++)
{
Exception exception = (pendingExceptionGenerator != null) ? pendingExceptionGenerator() : null;
outstandingReaders[i].Set(new Item(exception, null));
}
}
}
[Fx.Tag.Blocking(CancelMethod = "Dispatch")]
public bool WaitForItem(TimeSpan timeout)
{
WaitQueueWaiter waiter = null;
bool itemAvailable = false;
lock (ThisLock)
{
if (queueState == QueueState.Open)
{
if (itemQueue.HasAvailableItem)
{
itemAvailable = true;
}
else
{
waiter = new WaitQueueWaiter();
waiterList.Add(waiter);
}
}
else if (queueState == QueueState.Shutdown)
{
if (itemQueue.HasAvailableItem)
{
itemAvailable = true;
}
else if (itemQueue.HasAnyItem)
{
waiter = new WaitQueueWaiter();
waiterList.Add(waiter);
}
else
{
return true;
}
}
else // queueState == QueueState.Closed
{
return true;
}
}
if (waiter != null)
{
return waiter.Wait(timeout);
}
else
{
return itemAvailable;
}
}
public void Dispose()
{
bool dispose = false;
lock (ThisLock)
{
if (queueState != QueueState.Closed)
{
queueState = QueueState.Closed;
dispose = true;
}
}
if (dispose)
{
while (readerQueue.Count > 0)
{
IQueueReader reader = readerQueue.Dequeue();
reader.Set(default(Item));
}
while (itemQueue.HasAnyItem)
{
Item item = itemQueue.DequeueAnyItem();
DisposeItem(item);
InvokeDequeuedCallback(item.DequeuedCallback);
}
}
}
void DisposeItem(Item item)
{
T value = item.Value;
if (value != null)
{
if (value is IDisposable)
{
((IDisposable)value).Dispose();
}
else
{
Action<T> disposeItemCallback = this.DisposeItemCallback;
if (disposeItemCallback != null)
{
disposeItemCallback(value);
}
}
}
}
static void CompleteOutstandingReadersCallback(object state)
{
IQueueReader[] outstandingReaders = (IQueueReader[])state;
for (int i = 0; i < outstandingReaders.Length; i++)
{
outstandingReaders[i].Set(default(Item));
}
}
static void CompleteWaiters(bool itemAvailable, IQueueWaiter[] waiters)
{
for (int i = 0; i < waiters.Length; i++)
{
waiters[i].Set(itemAvailable);
}
}
static void CompleteWaitersFalseCallback(object state)
{
CompleteWaiters(false, (IQueueWaiter[])state);
}
static void CompleteWaitersLater(bool itemAvailable, IQueueWaiter[] waiters)
{
if (itemAvailable)
{
if (completeWaitersTrueCallback == null)
{
completeWaitersTrueCallback = new Action<object>(CompleteWaitersTrueCallback);
}
ActionItem.Schedule(completeWaitersTrueCallback, waiters);
}
else
{
if (completeWaitersFalseCallback == null)
{
completeWaitersFalseCallback = new Action<object>(CompleteWaitersFalseCallback);
}
ActionItem.Schedule(completeWaitersFalseCallback, waiters);
}
}
static void CompleteWaitersTrueCallback(object state)
{
CompleteWaiters(true, (IQueueWaiter[])state);
}
static void InvokeDequeuedCallback(Action dequeuedCallback)
{
if (dequeuedCallback != null)
{
dequeuedCallback();
}
}
static void InvokeDequeuedCallbackLater(Action dequeuedCallback)
{
if (dequeuedCallback != null)
{
if (onInvokeDequeuedCallback == null)
{
onInvokeDequeuedCallback = new Action<object>(OnInvokeDequeuedCallback);
}
ActionItem.Schedule(onInvokeDequeuedCallback, dequeuedCallback);
}
}
static void OnDispatchCallback(object state)
{
((InputQueue<T>)state).Dispatch();
}
static void OnInvokeDequeuedCallback(object state)
{
Fx.Assert(state != null, "InputQueue.OnInvokeDequeuedCallback: (state != null)");
Action dequeuedCallback = (Action)state;
dequeuedCallback();
}
void EnqueueAndDispatch(Item item, bool canDispatchOnThisThread)
{
bool disposeItem = false;
IQueueReader reader = null;
bool dispatchLater = false;
IQueueWaiter[] waiters = null;
bool itemAvailable = true;
lock (ThisLock)
{
itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown));
this.GetWaiters(out waiters);
if (queueState == QueueState.Open)
{
if (canDispatchOnThisThread)
{
if (readerQueue.Count == 0)
{
itemQueue.EnqueueAvailableItem(item);
}
else
{
reader = readerQueue.Dequeue();
}
}
else
{
if (readerQueue.Count == 0)
{
itemQueue.EnqueueAvailableItem(item);
}
else
{
itemQueue.EnqueuePendingItem(item);
dispatchLater = true;
}
}
}
else // queueState == QueueState.Closed || queueState == QueueState.Shutdown
{
disposeItem = true;
}
}
if (waiters != null)
{
if (canDispatchOnThisThread)
{
CompleteWaiters(itemAvailable, waiters);
}
else
{
CompleteWaitersLater(itemAvailable, waiters);
}
}
if (reader != null)
{
InvokeDequeuedCallback(item.DequeuedCallback);
reader.Set(item);
}
if (dispatchLater)
{
if (onDispatchCallback == null)
{
onDispatchCallback = new Action<object>(OnDispatchCallback);
}
ActionItem.Schedule(onDispatchCallback, this);
}
else if (disposeItem)
{
InvokeDequeuedCallback(item.DequeuedCallback);
DisposeItem(item);
}
}
// This will not block, however, Dispatch() must be called later if this function
// returns true.
bool EnqueueWithoutDispatch(Item item)
{
lock (ThisLock)
{
// Open
if (queueState != QueueState.Closed && queueState != QueueState.Shutdown)
{
if (readerQueue.Count == 0 && waiterList.Count == 0)
{
itemQueue.EnqueueAvailableItem(item);
return false;
}
else
{
itemQueue.EnqueuePendingItem(item);
return true;
}
}
}
DisposeItem(item);
InvokeDequeuedCallbackLater(item.DequeuedCallback);
return false;
}
void GetWaiters(out IQueueWaiter[] waiters)
{
if (waiterList.Count > 0)
{
waiters = waiterList.ToArray();
waiterList.Clear();
}
else
{
waiters = null;
}
}
// Used for timeouts. The InputQueue must remove readers from its reader queue to prevent
// dispatching items to timed out readers.
bool RemoveReader(IQueueReader reader)
{
Fx.Assert(reader != null, "InputQueue.RemoveReader: (reader != null)");
lock (ThisLock)
{
if (queueState == QueueState.Open || queueState == QueueState.Shutdown)
{
bool removed = false;
for (int i = readerQueue.Count; i > 0; i--)
{
IQueueReader temp = readerQueue.Dequeue();
if (object.ReferenceEquals(temp, reader))
{
removed = true;
}
else
{
readerQueue.Enqueue(temp);
}
}
return removed;
}
}
return false;
}
enum QueueState
{
Open,
Shutdown,
Closed
}
interface IQueueReader
{
void Set(Item item);
}
interface IQueueWaiter
{
void Set(bool itemAvailable);
}
struct Item
{
Action dequeuedCallback;
Exception exception;
T value;
public Item(T value, Action dequeuedCallback)
: this(value, null, dequeuedCallback)
{
}
public Item(Exception exception, Action dequeuedCallback)
: this(null, exception, dequeuedCallback)
{
}
Item(T value, Exception exception, Action dequeuedCallback)
{
this.value = value;
this.exception = exception;
this.dequeuedCallback = dequeuedCallback;
}
public Action DequeuedCallback
{
get { return this.dequeuedCallback; }
}
public Exception Exception
{
get { return this.exception; }
}
public T Value
{
get { return this.value; }
}
public T GetValue()
{
if (this.exception != null)
{
throw Fx.Exception.AsError(this.exception);
}
return this.value;
}
}
[Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.AsyncResult, SupportsAsync = true, ReleaseMethod = "Set")]
class AsyncQueueReader : AsyncResult, IQueueReader
{
static Action<object> timerCallback = new Action<object>(AsyncQueueReader.TimerCallback);
bool expired;
InputQueue<T> inputQueue;
T item;
IOThreadTimer timer;
public AsyncQueueReader(InputQueue<T> inputQueue, TimeSpan timeout, AsyncCallback callback, object state)
: base(callback, state)
{
if (inputQueue.AsyncCallbackGenerator != null)
{
base.VirtualCallback = inputQueue.AsyncCallbackGenerator();
}
this.inputQueue = inputQueue;
if (timeout != TimeSpan.MaxValue)
{
this.timer = new IOThreadTimer(timerCallback, this, false);
this.timer.Set(timeout);
}
}
[Fx.Tag.Blocking(Conditional = "!result.IsCompleted", CancelMethod = "Set")]
public static bool End(IAsyncResult result, out T value)
{
AsyncQueueReader readerResult = AsyncResult.End<AsyncQueueReader>(result);
if (readerResult.expired)
{
value = default(T);
return false;
}
else
{
value = readerResult.item;
return true;
}
}
public void Set(Item item)
{
this.item = item.Value;
if (this.timer != null)
{
this.timer.Cancel();
}
Complete(false, item.Exception);
}
static void TimerCallback(object state)
{
AsyncQueueReader thisPtr = (AsyncQueueReader)state;
if (thisPtr.inputQueue.RemoveReader(thisPtr))
{
thisPtr.expired = true;
thisPtr.Complete(false);
}
}
}
[Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.AsyncResult, SupportsAsync = true, ReleaseMethod = "Set")]
class AsyncQueueWaiter : AsyncResult, IQueueWaiter
{
static Action<object> timerCallback = new Action<object>(AsyncQueueWaiter.TimerCallback);
bool itemAvailable;
[Fx.Tag.SynchronizationObject(Blocking = false)]
object thisLock = new object();
IOThreadTimer timer;
public AsyncQueueWaiter(TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state)
{
if (timeout != TimeSpan.MaxValue)
{
this.timer = new IOThreadTimer(timerCallback, this, false);
this.timer.Set(timeout);
}
}
object ThisLock
{
get
{
return this.thisLock;
}
}
[Fx.Tag.Blocking(Conditional = "!result.IsCompleted", CancelMethod = "Set")]
public static bool End(IAsyncResult result)
{
AsyncQueueWaiter waiterResult = AsyncResult.End<AsyncQueueWaiter>(result);
return waiterResult.itemAvailable;
}
public void Set(bool itemAvailable)
{
bool timely;
lock (ThisLock)
{
timely = (this.timer == null) || this.timer.Cancel();
this.itemAvailable = itemAvailable;
}
if (timely)
{
Complete(false);
}
}
static void TimerCallback(object state)
{
AsyncQueueWaiter thisPtr = (AsyncQueueWaiter)state;
thisPtr.Complete(false);
}
}
class ItemQueue
{
int head;
Item[] items;
int pendingCount;
int totalCount;
public ItemQueue()
{
this.items = new Item[1];
}
public bool HasAnyItem
{
get { return this.totalCount > 0; }
}
public bool HasAvailableItem
{
get { return this.totalCount > this.pendingCount; }
}
public int ItemCount
{
get { return this.totalCount; }
}
public Item DequeueAnyItem()
{
if (this.pendingCount == this.totalCount)
{
this.pendingCount--;
}
return DequeueItemCore();
}
public Item DequeueAvailableItem()
{
Fx.AssertAndThrow(this.totalCount != this.pendingCount, "ItemQueue does not contain any available items");
return DequeueItemCore();
}
public void EnqueueAvailableItem(Item item)
{
EnqueueItemCore(item);
}
public void EnqueuePendingItem(Item item)
{
EnqueueItemCore(item);
this.pendingCount++;
}
public void MakePendingItemAvailable()
{
Fx.AssertAndThrow(this.pendingCount != 0, "ItemQueue does not contain any pending items");
this.pendingCount--;
}
Item DequeueItemCore()
{
Fx.AssertAndThrow(totalCount != 0, "ItemQueue does not contain any items");
Item item = this.items[this.head];
this.items[this.head] = new Item();
this.totalCount--;
this.head = (this.head + 1) % this.items.Length;
return item;
}
void EnqueueItemCore(Item item)
{
if (this.totalCount == this.items.Length)
{
Item[] newItems = new Item[this.items.Length * 2];
for (int i = 0; i < this.totalCount; i++)
{
newItems[i] = this.items[(head + i) % this.items.Length];
}
this.head = 0;
this.items = newItems;
}
int tail = (this.head + this.totalCount) % this.items.Length;
this.items[tail] = item;
this.totalCount++;
}
}
[Fx.Tag.SynchronizationObject(Blocking = false)]
[Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.ManualResetEvent, ReleaseMethod = "Set")]
class WaitQueueReader : IQueueReader
{
Exception exception;
InputQueue<T> inputQueue;
T item;
[Fx.Tag.SynchronizationObject]
ManualResetEvent waitEvent;
public WaitQueueReader(InputQueue<T> inputQueue)
{
this.inputQueue = inputQueue;
waitEvent = new ManualResetEvent(false);
}
public void Set(Item item)
{
lock (this)
{
Fx.Assert(this.item == null, "InputQueue.WaitQueueReader.Set: (this.item == null)");
Fx.Assert(this.exception == null, "InputQueue.WaitQueueReader.Set: (this.exception == null)");
this.exception = item.Exception;
this.item = item.Value;
waitEvent.Set();
}
}
[Fx.Tag.Blocking(CancelMethod = "Set")]
public bool Wait(TimeSpan timeout, out T value)
{
bool isSafeToClose = false;
try
{
if (!TimeoutHelper.WaitOne(waitEvent, timeout))
{
if (this.inputQueue.RemoveReader(this))
{
value = default(T);
isSafeToClose = true;
return false;
}
else
{
waitEvent.WaitOne();
}
}
isSafeToClose = true;
}
finally
{
if (isSafeToClose)
{
waitEvent.Close();
}
}
if (this.exception != null)
{
throw Fx.Exception.AsError(this.exception);
}
value = item;
return true;
}
}
[Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.ManualResetEvent, ReleaseMethod = "Set")]
class WaitQueueWaiter : IQueueWaiter
{
bool itemAvailable;
[Fx.Tag.SynchronizationObject]
ManualResetEvent waitEvent;
public WaitQueueWaiter()
{
waitEvent = new ManualResetEvent(false);
}
public void Set(bool itemAvailable)
{
lock (this)
{
this.itemAvailable = itemAvailable;
waitEvent.Set();
}
}
[Fx.Tag.Blocking(CancelMethod = "Set")]
public bool Wait(TimeSpan timeout)
{
if (!TimeoutHelper.WaitOne(waitEvent, timeout))
{
return false;
}
return this.itemAvailable;
}
}
}
}
|