|
//------------------------------------------------------------------------------
// <copyright file="WebEventBuffer.cs" company="Microsoft">
// Copyright (c) Microsoft Corporation. All rights reserved.
// </copyright>
//------------------------------------------------------------------------------
namespace System.Web.Management {
using System.Configuration;
using System.Web.Configuration;
using System.Configuration.Provider;
using System.Collections.Specialized;
using System.Collections;
using System.Web.Util;
using System.Web.Mail;
using System.Globalization;
using System.Xml;
using System.Threading;
using System.Web.Hosting;
using System.Security.Permissions;
public enum EventNotificationType
{
// regularly scheduled notification
Regular,
// urgent notification
Urgent,
// notification triggered by a user requested flush
Flush,
// Notification fired when buffer=false
Unbuffered,
}
internal enum FlushCallReason {
UrgentFlushThresholdExceeded,
Timer,
StaticFlush
}
public sealed class WebEventBufferFlushInfo {
WebBaseEventCollection _events;
DateTime _lastNotification;
int _eventsDiscardedSinceLastNotification;
int _eventsInBuffer;
int _notificationSequence;
EventNotificationType _notificationType;
internal WebEventBufferFlushInfo( WebBaseEventCollection events,
EventNotificationType notificationType,
int notificationSequence,
DateTime lastNotification,
int eventsDiscardedSinceLastNotification,
int eventsInBuffer) {
_events = events;
_notificationType = notificationType;
_notificationSequence = notificationSequence;
_lastNotification = lastNotification;
_eventsDiscardedSinceLastNotification = eventsDiscardedSinceLastNotification;
_eventsInBuffer = eventsInBuffer;
}
public WebBaseEventCollection Events {
get { return _events; }
}
public DateTime LastNotificationUtc {
get { return _lastNotification; }
}
public int EventsDiscardedSinceLastNotification {
get { return _eventsDiscardedSinceLastNotification; }
}
public int EventsInBuffer {
get { return _eventsInBuffer; }
}
public int NotificationSequence {
get { return _notificationSequence; }
}
public EventNotificationType NotificationType {
get { return _notificationType; }
}
}
internal delegate void WebEventBufferFlushCallback(WebEventBufferFlushInfo flushInfo);
internal sealed class WebEventBuffer {
static long Infinite = Int64.MaxValue;
long _burstWaitTimeMs = 2 * 1000;
BufferedWebEventProvider _provider;
long _regularFlushIntervalMs;
int _urgentFlushThreshold;
int _maxBufferSize;
int _maxFlushSize;
long _urgentFlushIntervalMs;
int _maxBufferThreads;
Queue _buffer = null;
Timer _timer;
DateTime _lastFlushTime = DateTime.MinValue;
DateTime _lastScheduledFlushTime = DateTime.MinValue;
DateTime _lastAdd = DateTime.MinValue;
DateTime _startTime = DateTime.MinValue;
bool _urgentFlushScheduled;
int _discardedSinceLastFlush = 0;
int _threadsInFlush = 0;
int _notificationSequence = 0;
bool _regularTimeoutUsed;
#if DBG
DateTime _nextFlush = DateTime.MinValue;
DateTime _lastRegularFlush = DateTime.MinValue;
DateTime _lastUrgentFlush = DateTime.MinValue;
int _totalAdded = 0;
int _totalFlushed = 0;
int _totalAbandoned = 0;
#endif
WebEventBufferFlushCallback _flushCallback;
internal WebEventBuffer(BufferedWebEventProvider provider, string bufferMode,
WebEventBufferFlushCallback callback) {
Debug.Assert(callback != null, "callback != null");
_provider = provider;
HealthMonitoringSection section = RuntimeConfig.GetAppLKGConfig().HealthMonitoring;
BufferModesCollection bufferModes = section.BufferModes;
BufferModeSettings bufferModeInfo = bufferModes[bufferMode];
if (bufferModeInfo == null) {
throw new ConfigurationErrorsException(SR.GetString(SR.Health_mon_buffer_mode_not_found, bufferMode));
}
if (bufferModeInfo.RegularFlushInterval == TimeSpan.MaxValue) {
_regularFlushIntervalMs = Infinite;
}
else {
try {
_regularFlushIntervalMs = (long)bufferModeInfo.RegularFlushInterval.TotalMilliseconds;
}
catch (OverflowException) {
_regularFlushIntervalMs = Infinite;
}
}
if (bufferModeInfo.UrgentFlushInterval == TimeSpan.MaxValue) {
_urgentFlushIntervalMs = Infinite;
}
else {
try {
_urgentFlushIntervalMs = (long)bufferModeInfo.UrgentFlushInterval.TotalMilliseconds;
}
catch (OverflowException) {
_urgentFlushIntervalMs = Infinite;
}
}
_urgentFlushThreshold = bufferModeInfo.UrgentFlushThreshold;
_maxBufferSize = bufferModeInfo.MaxBufferSize;
_maxFlushSize = bufferModeInfo.MaxFlushSize;
_maxBufferThreads = bufferModeInfo.MaxBufferThreads;
_burstWaitTimeMs = Math.Min(_burstWaitTimeMs, _urgentFlushIntervalMs);
_flushCallback = callback;
_buffer = new Queue();
if (_regularFlushIntervalMs != Infinite) {
_startTime = DateTime.UtcNow;
_regularTimeoutUsed = true;
_urgentFlushScheduled = false;
SetTimer(GetNextRegularFlushDueTimeInMs());
}
Debug.Trace("WebEventBuffer",
"\n_regularFlushIntervalMs=" + _regularFlushIntervalMs +
"\n_urgentFlushThreshold=" + _urgentFlushThreshold +
"\n_maxBufferSize=" + _maxBufferSize +
"\n_maxFlushSize=" + _maxFlushSize +
"\n_urgentFlushIntervalMs=" + _urgentFlushIntervalMs);
}
void FlushTimerCallback(object state) {
Flush(_maxFlushSize, FlushCallReason.Timer);
}
//
// If we're in notification mode, meaning urgentFlushThreshold="1", we'll flush
// as soon as there's an event in the buffer.
//
// For example, if bufferMode == "notification", we have this setting:
// <add name="Notification" maxBufferSize="300" maxFlushSize="20"
// urgentFlushThreshold="1" regularFlushInterval="Infinite"
// urgentFlushInterval="00:01:00" maxBufferThreads="1" />
//
// The ideal situation is that we have events coming in regularly,
// and we flush (max 20 events at a time), wait for _urgentFlushIntervalMs (1 minute),
// then flush the buffer, then wait 1 minute, then flush, and so on and on.
//
// However, there is a scenario where there's been no event coming in, and suddenly
// a burst of events (e.g. 20) arrive. If we flush immediately when the 1st event comes in,
// we then have to wait for 1 minute before we can flush the remaining 19 events.
//
// To solve this problem, we demand that if we're in notification mode, and
// we just added an event to an empty buffer, then we may anticipate a burst
// by waiting _burstWaitTimeMs amount of time (2s).
//
// But how long does a buffer needs to be empty before we consider
// waiting for a burst? We cannot come up with a good formula, and thus
// pick this:
// ((now - _lastAdd).TotalMilliseconds) >= _urgentFlushIntervalMs
//
bool AnticipateBurst(DateTime now) {
// Please note this is called while we're within the lock held in AddEvent.
return _urgentFlushThreshold == 1 && // we're in notification mode
_buffer.Count == 1 && // we just added an event to an empty buffer
((now - _lastAdd).TotalMilliseconds) >= _urgentFlushIntervalMs;
}
long GetNextRegularFlushDueTimeInMs() {
long nextRegularFlushFromStartTime;
long nowFromStartTime;
long regularFlushIntervalms = _regularFlushIntervalMs;
// Need to calculate in milliseconds in order to avoid time shift due to round-down
if (_regularFlushIntervalMs == Infinite) {
return Infinite;
}
DateTime now = DateTime.UtcNow;
nowFromStartTime = (long)((now - _startTime).TotalMilliseconds);
// For some unknown reason the Timer may fire prematurely (usually less than 50ms). This will bring
// us into a situation where the timer fired just tens of milliseconds before the originally planned
// fire time, and this method will return a due time == tens of milliseconds.
// To workaround this problem, I added 499 ms when doing the calculation to compensate for a
// premature firing.
nextRegularFlushFromStartTime = ((nowFromStartTime + regularFlushIntervalms + 499) / regularFlushIntervalms) * regularFlushIntervalms;
Debug.Assert(nextRegularFlushFromStartTime >= nowFromStartTime);
return nextRegularFlushFromStartTime - nowFromStartTime;
}
void SetTimer(long waitTimeMs) {
if (_timer == null) {
_timer = new System.Threading.Timer(new TimerCallback(this.FlushTimerCallback),
null, waitTimeMs, Timeout.Infinite);
}
else {
_timer.Change(waitTimeMs, Timeout.Infinite);
}
#if DBG
_nextFlush = DateTime.UtcNow.AddMilliseconds(waitTimeMs);
#endif
}
// This method can be called by the timer, or by AddEvent.
//
// Basic design:
// - We have one timer, and one buffer.
// - We flush periodically every _regularFlushIntervalMs ms
// - But if # of items in buffer has reached _urgentFlushThreshold, we will flush more frequently,
// but at most once every _urgentFlushIntervalMs ms. However, these urgent flushes will not
// prevent the regular flush from happening.
// - We never flush synchronously, meaning if we're called by AddEvent and decide to flush
// because we've reached the _urgentFlushThreshold, we will still use the timer thread
// to flush the buffer.
// - At any point only a maximum of _maxBufferThreads threads can be flushing. If exceeded,
// we will delay a flush.
//
//
// For example, let's say we have this setting:
// "1 minute urgentFlushInterval and 5 minute regularFlushInterval"
//
// Assume regular flush timer starts at 10:00am. It means regular
// flush will happen at 10:05am, 10:10am, 10:15am, and so on,
// regardless of when urgent flush happens.
//
// An "urgent flush" happens whenever urgentFlushThreshold is reached.
// However, when we schedule an "urgent flush", we ensure that the time
// between an urgent flush and the last flush (no matter it's urgent or
// regular) will be at least urgentFlushInterval.
//
// One interesting case here. Assume at 10:49:30 we had an urgent
// flush, but the # of events left is still above urgentFlushThreshold.
// You may think we'll schedule the next urgent flush at 10:50:30
// (urgentFlushInterval == 1 min). However, because we know we will
// have a regular flush at 10:50:00, we won't schedule the next urgent
// flush. Instead, during the regular flush at 10:50:00 happens, we'll
// check if there're still too many events; and if so, we will schedule
// the next urgent flush at 10:51:00.
//
internal void Flush(int max, FlushCallReason reason) {
WebBaseEvent[] events = null;
DateTime nowUtc = DateTime.UtcNow;
long waitTime = 0;
DateTime lastFlushTime = DateTime.MaxValue;
int discardedSinceLastFlush = -1;
int eventsInBuffer = -1;
int toFlush = 0;
EventNotificationType notificationType = EventNotificationType.Regular;
// By default, this call will flush, but will not schedule the next flush.
bool flushThisTime = true;
bool scheduleNextFlush = false;
bool nextFlushIsUrgent = false;
lock(_buffer) {
Debug.Assert(max > 0, "max > 0");
if (_buffer.Count == 0) {
// We have nothing in the buffer. Don't flush this time.
Debug.Trace("WebEventBufferExtended", "Flush: buffer is empty, don't flush");
flushThisTime = false;
}
switch (reason) {
case FlushCallReason.StaticFlush:
// It means somebody calls provider.Flush()
break;
case FlushCallReason.Timer:
// It's a callback from a timer. We will schedule the next regular flush if needed.
if (_regularFlushIntervalMs != Infinite) {
scheduleNextFlush = true;
waitTime = GetNextRegularFlushDueTimeInMs();
}
break;
case FlushCallReason.UrgentFlushThresholdExceeded:
// It means this method is called by AddEvent because the urgent flush threshold is reached.
// If an urgent flush has already been scheduled by someone else, we don't need to duplicate the
// effort. Just return.
if (_urgentFlushScheduled) {
return;
}
// Flush triggered by AddEvent isn't synchronous, so we won't flush this time, but will
// schedule an urgent flush instead.
flushThisTime = false;
scheduleNextFlush = true;
nextFlushIsUrgent = true;
// Calculate how long we have to wait when scheduling the flush
if (AnticipateBurst(nowUtc)) {
Debug.Trace("WebEventBuffer", "Flush: Called by AddEvent. Waiting for burst");
waitTime = _burstWaitTimeMs;
}
else {
Debug.Trace("WebEventBuffer", "Flush: Called by AddEvent. Schedule an immediate flush");
waitTime = 0;
}
// Have to wait longer because of _urgentFlushIntervalMs
long msSinceLastScheduledFlush = (long)(nowUtc - _lastScheduledFlushTime).TotalMilliseconds;
if (msSinceLastScheduledFlush + waitTime < _urgentFlushIntervalMs ) {
Debug.Trace("WebEventBuffer", "Flush: Called by AddEvent. Have to wait longer because of _urgentFlushIntervalMs.");
waitTime = _urgentFlushIntervalMs - msSinceLastScheduledFlush;
}
Debug.Trace("WebEventBuffer", "Wait time=" + waitTime +
"; nowUtc=" + PrintTime(nowUtc) +
"; _lastScheduledFlushTime=" + PrintTime(_lastScheduledFlushTime) +
"; _urgentFlushIntervalMs=" + _urgentFlushIntervalMs);
break;
}
Debug.Trace("WebEventBuffer", "Flush called: max=" + max +
"; reason=" + reason);
if (flushThisTime) {
// Check if we've exceeded the # of flushing threads. If so,
// don't flush this time.
if (_threadsInFlush >= _maxBufferThreads) {
// Won't set flushThisTime to false because we depend on
// the logic inside the next "if" block to schedule the
// next urgent flush as needed.
toFlush = 0;
}
else {
toFlush = Math.Min(_buffer.Count, max);
}
}
#if DBG
DebugUpdateStats(flushThisTime, nowUtc, toFlush, reason);
#endif
if (flushThisTime) {
Debug.Assert(reason != FlushCallReason.UrgentFlushThresholdExceeded, "reason != FlushCallReason.UrgentFlushThresholdExceeded");
if (toFlush > 0) {
// Move the to-be-flushed events to an array
events = new WebBaseEvent[toFlush];
for (int i = 0; i < toFlush; i++) {
events[i] = (WebBaseEvent)_buffer.Dequeue();
}
lastFlushTime = _lastFlushTime;
// Update _lastFlushTime and _lastScheduledFlushTime.
// These information are used when Flush is called the next time.
_lastFlushTime = nowUtc;
if (reason == FlushCallReason.Timer) {
_lastScheduledFlushTime = nowUtc;
}
discardedSinceLastFlush = _discardedSinceLastFlush;
_discardedSinceLastFlush = 0;
if (reason == FlushCallReason.StaticFlush) {
notificationType = EventNotificationType.Flush;
}
else {
Debug.Assert(!(!_regularTimeoutUsed && !_urgentFlushScheduled),
"It's impossible to have a non-regular flush and yet the flush isn't urgent");
notificationType = _regularTimeoutUsed ?
EventNotificationType.Regular :
EventNotificationType.Urgent;
}
}
eventsInBuffer = _buffer.Count;
// If we still have at least _urgentFlushThreshold left, set timer
// to flush asap.
if (eventsInBuffer >= _urgentFlushThreshold) {
Debug.Trace("WebEventBuffer", "Flush: going to flush " + toFlush + " events, but still have at least _urgentFlushThreshold left. Schedule a flush");
scheduleNextFlush = true;
nextFlushIsUrgent = true;
waitTime = _urgentFlushIntervalMs;
}
else {
Debug.Trace("WebEventBuffer", "Flush: going to flush " + toFlush + " events");
}
}
// We are done moving the flushed events to the 'events' array.
// Now schedule the next flush if needed.
_urgentFlushScheduled = false;
if (scheduleNextFlush) {
if (nextFlushIsUrgent) {
long nextRegular = GetNextRegularFlushDueTimeInMs();
// If next regular flush is closer than next urgent flush,
// use regular flush instead.
if (nextRegular < waitTime) {
Debug.Trace("WebEventBuffer", "Switch to use regular timeout");
waitTime = nextRegular;
_regularTimeoutUsed = true;
}
else {
_regularTimeoutUsed = false;
}
}
else {
_regularTimeoutUsed = true;
}
SetTimer(waitTime);
_urgentFlushScheduled = nextFlushIsUrgent;
#if DBG
Debug.Trace("WebEventBuffer", "Flush: Registered for a flush. Waittime = " + waitTime + "ms" +
"; _nextFlush=" + PrintTime(_nextFlush) +
"; _urgentFlushScheduled=" + _urgentFlushScheduled);
#endif
}
// Cleanup. If we are called by a timer callback, but we haven't scheduled for the next
// one (can only happen if _regularFlushIntervalMs == Infinite), we should dispose the timer
if (reason == FlushCallReason.Timer && !scheduleNextFlush) {
Debug.Trace("WebEventBuffer", "Flush: Disposing the timer");
Debug.Assert(_regularFlushIntervalMs == Infinite, "We can dispose the timer only if _regularFlushIntervalMs == Infinite");
((IDisposable)_timer).Dispose();
_timer = null;
_urgentFlushScheduled = false;
}
// We want to increment the thread count within the lock to ensure we don't let too many threads in
if (events != null) {
Interlocked.Increment(ref _threadsInFlush);
}
} // Release lock
// Now call the providers to flush the events
if (events != null) {
Debug.Assert(lastFlushTime != DateTime.MaxValue, "lastFlushTime != DateTime.MaxValue");
Debug.Assert(discardedSinceLastFlush != -1, "discardedSinceLastFlush != -1");
Debug.Assert(eventsInBuffer != -1, "eventsInBuffer != -1");
Debug.Trace("WebEventBufferSummary", "_threadsInFlush=" + _threadsInFlush);
using (new ApplicationImpersonationContext()) {
try {
WebEventBufferFlushInfo flushInfo = new WebEventBufferFlushInfo(
new WebBaseEventCollection(events),
notificationType,
Interlocked.Increment(ref _notificationSequence),
lastFlushTime,
discardedSinceLastFlush,
eventsInBuffer);
_flushCallback(flushInfo);
}
catch (Exception e) {
try {
_provider.LogException(e);
}
catch {
// Ignore all errors
}
}
#pragma warning disable 1058
catch { // non compliant exceptions are caught and logged as Unknown
try {
_provider.LogException(new Exception(SR.GetString(SR.Provider_Error)));
}
catch {
// Ignore all errors
}
}
#pragma warning restore 1058
}
Interlocked.Decrement(ref _threadsInFlush);
}
}
internal void AddEvent(WebBaseEvent webEvent) {
lock(_buffer) {
#if DBG
_totalAdded++;
#endif
// If we have filled up the buffer, remove items using FIFO order.
if (_buffer.Count == _maxBufferSize) {
Debug.Trace("WebEventBuffer", "Buffer is full. Need to remove one from the tail");
_buffer.Dequeue();
_discardedSinceLastFlush++;
#if DBG
_totalAbandoned++;
#endif
}
_buffer.Enqueue(webEvent);
// If we have at least _urgentFlushThreshold, flush. Please note the flush is async.
if (_buffer.Count >= _urgentFlushThreshold) {
Flush(_maxFlushSize, FlushCallReason.UrgentFlushThresholdExceeded);
}
// Note that Flush uses _lastAdd, which is the time an event (not including this one)
// was last added. That's why we call it after calling Flush.
_lastAdd = DateTime.UtcNow;
} // Release the lock
}
internal void Shutdown() {
if (_timer != null) {
_timer.Dispose();
_timer = null;
}
}
string PrintTime(DateTime t) {
return t.ToString("T", DateTimeFormatInfo.InvariantInfo) + "." + t.Millisecond.ToString("d03", CultureInfo.InvariantCulture);
}
#if DBG
void DebugUpdateStats(bool flushThisTime, DateTime now, int toFlush, FlushCallReason reason) {
Debug.Assert(_totalAdded == _totalAbandoned + _totalFlushed + _buffer.Count,
"_totalAdded == _totalAbandoned + _totalFlushed + _buffer.Count");
_totalFlushed += toFlush;
if (reason != FlushCallReason.Timer) {
return;
}
Debug.Trace("WebEventBufferSummary",
"_Added=" + _totalAdded + "; deleted=" + _totalAbandoned +
"; Flushed=" + _totalFlushed + "; buffer=" + _buffer.Count +
"; toFlush=" + toFlush +
"; lastFlush=" + PrintTime(_lastRegularFlush) +
"; lastUrgentFlush=" + PrintTime(_lastUrgentFlush) +
"; GetRegFlushDueTime=" + GetNextRegularFlushDueTimeInMs() +
"; toFlush=" + toFlush +
"; now=" + PrintTime(now));
if (!_regularTimeoutUsed) {
if (!flushThisTime) {
return;
}
Debug.Assert((now - _lastUrgentFlush).TotalMilliseconds + 499 > _urgentFlushIntervalMs,
"(now - _lastUrgentFlush).TotalMilliseconds + 499 > _urgentFlushIntervalMs" +
"\n_lastUrgentFlush=" + PrintTime(_lastUrgentFlush) +
"\nnow=" + PrintTime(now) +
"\n_urgentFlushIntervalMs=" + _urgentFlushIntervalMs);
_lastUrgentFlush = now;
}
else {
/*
// It's a regular callback
if (_lastRegularFlush != DateTime.MinValue) {
Debug.Assert(Math.Abs((now - _lastRegularFlush).TotalMilliseconds - _regularFlushIntervalMs) < 2000,
"Math.Abs((now - _lastRegularFlush).TotalMilliseconds - _regularFlushIntervalMs) < 2000" +
"\n_lastRegularFlush=" + PrintTime(_lastRegularFlush) +
"\nnow=" + PrintTime(now) +
"\n_regularFlushIntervalMs=" + _regularFlushIntervalMs);
}
*/
_lastRegularFlush = now;
}
}
#endif
}
}
|