File: System\ServiceModel\PeerResolvers\CustomPeerResolverService.cs
Project: ndp\cdf\src\WCF\ServiceModel\System.ServiceModel.csproj (System.ServiceModel)
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation.  All rights reserved.
//------------------------------------------------------------
namespace System.ServiceModel.PeerResolvers
{
    using System;
    using System.Collections.Generic;
    using System.Runtime;
    using System.ServiceModel;
    using System.ServiceModel.Channels;
    using System.Threading;
 
 
    [ObsoleteAttribute ("PeerChannel feature is obsolete and will be removed in the future.", false)]
    [ServiceBehavior(UseSynchronizationContext = false, InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Multiple)]
    public class CustomPeerResolverService : IPeerResolverContract
    {
        internal enum RegistrationState
        {
            OK, Deleted
        }
 
        internal class RegistrationEntry
        {
            Guid clientId;
            Guid registrationId;
            string meshId;
            DateTime expires;
            PeerNodeAddress address;
            RegistrationState state;
 
 
            public RegistrationEntry(Guid clientId, Guid registrationId, string meshId, DateTime expires, PeerNodeAddress address)
            {
                this.ClientId = clientId;
                this.RegistrationId = registrationId;
                this.MeshId = meshId;
                this.Expires = expires;
                this.Address = address;
                this.State = RegistrationState.OK;
            }
 
            public Guid ClientId
            {
                get { return clientId; }
                set { clientId = value; }
            }
 
            public Guid RegistrationId
            {
                get { return registrationId; }
                set { registrationId = value; }
            }
 
            public string MeshId
            {
                get { return meshId; }
                set { meshId = value; }
            }
 
            public DateTime Expires
            {
                get { return expires; }
                set { expires = value; }
            }
 
            public PeerNodeAddress Address
            {
                get { return address; }
                set { address = value; }
            }
 
            public RegistrationState State
            {
                get { return state; }
                set { state = value; }
            }
 
        }
 
        internal class LiteLock
        {
            bool forWrite;
            bool upgraded;
            ReaderWriterLock locker;
            TimeSpan timeout = TimeSpan.FromMinutes(1);
            LockCookie lc;
 
            LiteLock(ReaderWriterLock locker, bool forWrite)
            {
                this.locker = locker;
                this.forWrite = forWrite;
            }
 
            public static void Acquire(out LiteLock liteLock, ReaderWriterLock locker)
            {
                Acquire(out liteLock, locker, false);
            }
 
            public static void Acquire(out LiteLock liteLock, ReaderWriterLock locker, bool forWrite)
            {
                LiteLock theLock = new LiteLock(locker, forWrite);
                try { }
                finally
                {
                    if (forWrite)
                    {
                        locker.AcquireWriterLock(theLock.timeout);
                    }
                    else
                    {
                        locker.AcquireReaderLock(theLock.timeout);
                    }
                    liteLock = theLock;
                }
            }
 
            public static void Release(LiteLock liteLock)
            {
                if (liteLock == null)
                {
                    return;
                }
 
                if (liteLock.forWrite)
                {
                    liteLock.locker.ReleaseWriterLock();
                }
                else
                {
                    Fx.Assert(!liteLock.upgraded, "Can't release while upgraded!");
                    liteLock.locker.ReleaseReaderLock();
                }
            }
            public void UpgradeToWriterLock()
            {
                Fx.Assert(!forWrite, "Invalid call to Upgrade!!");
                Fx.Assert(!upgraded, "Already upgraded!");
                try { }
                finally
                {
                    lc = locker.UpgradeToWriterLock(timeout);
                    upgraded = true;
                }
            }
            public void DowngradeFromWriterLock()
            {
                Fx.Assert(!forWrite, "Invalid call to Downgrade!!");
                if (upgraded)
                {
                    locker.DowngradeFromWriterLock(ref lc);
                    upgraded = false;
                }
            }
        }
 
        internal class MeshEntry
        {
            Dictionary<Guid, RegistrationEntry> entryTable;
            Dictionary<string, RegistrationEntry> service2EntryTable;
            List<RegistrationEntry> entryList;
            ReaderWriterLock gate;
 
            internal MeshEntry()
            {
                EntryTable = new Dictionary<Guid, RegistrationEntry>();
                Service2EntryTable = new Dictionary<string, RegistrationEntry>();
                EntryList = new List<RegistrationEntry>();
                Gate = new ReaderWriterLock();
            }
 
            public Dictionary<Guid, RegistrationEntry> EntryTable
            {
                get { return entryTable; }
                set { entryTable = value; }
            }
 
            public Dictionary<string, RegistrationEntry> Service2EntryTable
            {
                get { return service2EntryTable; }
                set { service2EntryTable = value; }
            }
 
            public List<RegistrationEntry> EntryList
            {
                get { return entryList; }
                set { entryList = value; }
            }
 
            public ReaderWriterLock Gate
            {
                get { return gate; }
                set { gate = value; }
            }
        }
 
        Dictionary<string, MeshEntry> meshId2Entry = new Dictionary<string, MeshEntry>();
        ReaderWriterLock gate;
 
        TimeSpan timeout = TimeSpan.FromMinutes(1);
        TimeSpan cleanupInterval = TimeSpan.FromMinutes(1);
        TimeSpan refreshInterval = TimeSpan.FromMinutes(10);
        bool controlShape;
        bool isCleaning;
        IOThreadTimer timer;
        object thisLock = new object();
        bool opened;
        TimeSpan LockWait = TimeSpan.FromSeconds(5);
 
        public CustomPeerResolverService()
        {
            isCleaning = false;
            gate = new ReaderWriterLock();
        }
 
        public TimeSpan CleanupInterval
        {
            get
            {
                return cleanupInterval;
            }
            set
            {
                if (value < TimeSpan.Zero)
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException("value", value,
                        SR.GetString(SR.SFxTimeoutOutOfRange0)));
                }
 
                if (TimeoutHelper.IsTooLarge(value))
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException("value", value,
                        SR.GetString(SR.SFxTimeoutOutOfRangeTooBig)));
                }
 
                lock (ThisLock)
                {
                    ThrowIfOpened("Set CleanupInterval");
                    this.cleanupInterval = value;
                }
            }
        }
 
        public TimeSpan RefreshInterval
        {
            get
            {
                return refreshInterval;
            }
            set
            {
                if (value < TimeSpan.Zero)
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException("value", value,
                        SR.GetString(SR.SFxTimeoutOutOfRange0)));
                }
 
                if (TimeoutHelper.IsTooLarge(value))
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException("value", value,
                        SR.GetString(SR.SFxTimeoutOutOfRangeTooBig)));
                }
 
                lock (ThisLock)
                {
                    ThrowIfOpened("Set RefreshInterval");
                    this.refreshInterval = value;
                }
            }
        }
 
        public bool ControlShape
        {
            get
            {
                return this.controlShape;
            }
            set
            {
                lock (ThisLock)
                {
                    ThrowIfOpened("Set ControlShape");
                    this.controlShape = value;
                }
            }
        }
 
        MeshEntry GetMeshEntry(string meshId) { return GetMeshEntry(meshId, true); }
        MeshEntry GetMeshEntry(string meshId, bool createIfNotExists)
        {
            MeshEntry meshEntry = null;
            LiteLock ll = null;
            try
            {
                LiteLock.Acquire(out ll, gate);
                if (!this.meshId2Entry.TryGetValue(meshId, out meshEntry) && createIfNotExists)
                {
                    meshEntry = new MeshEntry();
                    try
                    {
                        ll.UpgradeToWriterLock();
                        meshId2Entry.Add(meshId, meshEntry);
                    }
                    finally
                    {
                        ll.DowngradeFromWriterLock();
                    }
                }
            }
            finally
            {
                LiteLock.Release(ll);
            }
            Fx.Assert(meshEntry != null || !createIfNotExists, "GetMeshEntry failed to get an entry!");
            return meshEntry;
        }
 
        public virtual RegisterResponseInfo Register(Guid clientId, string meshId, PeerNodeAddress address)
        {
            Guid registrationId = Guid.NewGuid();
            DateTime expiry = DateTime.UtcNow + RefreshInterval;
 
            RegistrationEntry entry = null;
            MeshEntry meshEntry = null;
 
            lock (ThisLock)
            {
                entry = new RegistrationEntry(clientId, registrationId, meshId, expiry, address);
                meshEntry = GetMeshEntry(meshId);
                if (meshEntry.Service2EntryTable.ContainsKey(address.ServicePath))
                    PeerExceptionHelper.ThrowInvalidOperation_DuplicatePeerRegistration(address.ServicePath);
                LiteLock ll = null;
 
                try
                {
                    // meshEntry.gate can be held by this thread for write if this is coming from update
                    // else MUST not be held at all.
                    if (!meshEntry.Gate.IsWriterLockHeld)
                    {
                        LiteLock.Acquire(out ll, meshEntry.Gate, true);
                    }
                    meshEntry.EntryTable.Add(registrationId, entry);
                    meshEntry.EntryList.Add(entry);
                    meshEntry.Service2EntryTable.Add(address.ServicePath, entry);
                }
                finally
                {
                    if (ll != null)
                        LiteLock.Release(ll);
                }
            }
            return new RegisterResponseInfo(registrationId, RefreshInterval);
        }
 
        public virtual RegisterResponseInfo Register(RegisterInfo registerInfo)
        {
            if (registerInfo == null)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("registerInfo", SR.GetString(SR.PeerNullRegistrationInfo));
            }
 
            ThrowIfClosed("Register");
 
            if (!registerInfo.HasBody() || String.IsNullOrEmpty(registerInfo.MeshId))
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("registerInfo", SR.GetString(SR.PeerInvalidMessageBody, registerInfo));
            }
            return Register(registerInfo.ClientId, registerInfo.MeshId, registerInfo.NodeAddress);
        }
 
        public virtual RegisterResponseInfo Update(UpdateInfo updateInfo)
        {
            if (updateInfo == null)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("updateInfo", SR.GetString(SR.PeerNullRegistrationInfo));
            }
 
            ThrowIfClosed("Update");
 
            if (!updateInfo.HasBody() || String.IsNullOrEmpty(updateInfo.MeshId) || updateInfo.NodeAddress == null)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("updateInfo", SR.GetString(SR.PeerInvalidMessageBody, updateInfo));
            }
 
            Guid registrationId = updateInfo.RegistrationId;
            RegistrationEntry entry;
 
            MeshEntry meshEntry = GetMeshEntry(updateInfo.MeshId);
            LiteLock ll = null;
 
            //handle cases when Update ----s with Register.
            if (updateInfo.RegistrationId == Guid.Empty || meshEntry == null)
                return Register(updateInfo.ClientId, updateInfo.MeshId, updateInfo.NodeAddress);
            //
            // preserve locking order between ThisLock and the LiteLock.
            lock (ThisLock)
            {
                try
                {
                    LiteLock.Acquire(out ll, meshEntry.Gate);
                    if (!meshEntry.EntryTable.TryGetValue(updateInfo.RegistrationId, out entry))
                    {
                        try
                        {
                            // upgrade to writer lock
                            ll.UpgradeToWriterLock();
                            return Register(updateInfo.ClientId, updateInfo.MeshId, updateInfo.NodeAddress);
                        }
                        finally
                        {
                            ll.DowngradeFromWriterLock();
                        }
                    }
                    lock (entry)
                    {
                        entry.Address = updateInfo.NodeAddress;
                        entry.Expires = DateTime.UtcNow + this.RefreshInterval;
                    }
                }
                finally
                {
                    LiteLock.Release(ll);
                }
            }
            return new RegisterResponseInfo(registrationId, RefreshInterval);
        }
 
        public virtual ResolveResponseInfo Resolve(ResolveInfo resolveInfo)
        {
            if (resolveInfo == null)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("resolveInfo", SR.GetString(SR.PeerNullResolveInfo));
            }
 
            ThrowIfClosed("Resolve");
 
            if (!resolveInfo.HasBody())
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("resolveInfo", SR.GetString(SR.PeerInvalidMessageBody, resolveInfo));
            }
 
            int currentCount = 0;
            int index = 0;
            int maxEntries = resolveInfo.MaxAddresses;
            ResolveResponseInfo response = new ResolveResponseInfo();
            List<PeerNodeAddress> results = new List<PeerNodeAddress>();
            List<RegistrationEntry> entries = null;
            PeerNodeAddress address;
            RegistrationEntry entry;
            MeshEntry meshEntry = GetMeshEntry(resolveInfo.MeshId, false);
            if (meshEntry != null)
            {
                LiteLock ll = null;
                try
                {
                    LiteLock.Acquire(out ll, meshEntry.Gate);
                    entries = meshEntry.EntryList;
                    if (entries.Count <= maxEntries)
                    {
                        foreach (RegistrationEntry e in entries)
                        {
                            results.Add(e.Address);
                        }
                    }
                    else
                    {
                        Random random = new Random();
                        while (currentCount < maxEntries)
                        {
                            index = random.Next(entries.Count);
                            entry = entries[index];
                            Fx.Assert(entry.State == RegistrationState.OK, "A deleted registration is still around!");
                            address = entry.Address;
                            if (!results.Contains(address))
                                results.Add(address);
                            currentCount++;
                        }
                    }
                }
                finally
                {
                    LiteLock.Release(ll);
                }
            }
            response.Addresses = results.ToArray();
            return response;
        }
 
        public virtual void Unregister(UnregisterInfo unregisterInfo)
        {
            if (unregisterInfo == null)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("unregisterinfo", SR.GetString(SR.PeerNullRegistrationInfo));
            }
 
            ThrowIfClosed("Unregister");
 
            if (!unregisterInfo.HasBody() || String.IsNullOrEmpty(unregisterInfo.MeshId) || unregisterInfo.RegistrationId == Guid.Empty)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("unregisterInfo", SR.GetString(SR.PeerInvalidMessageBody, unregisterInfo));
            }
 
            RegistrationEntry registration = null;
            MeshEntry meshEntry = GetMeshEntry(unregisterInfo.MeshId, false);
            //there could be a ---- that two different threads could be working on the same entry
            //we wont optimize for that case.
            LiteLock ll = null;
            try
            {
                LiteLock.Acquire(out ll, meshEntry.Gate, true);
                if (!meshEntry.EntryTable.TryGetValue(unregisterInfo.RegistrationId, out registration))
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("unregisterInfo", SR.GetString(SR.PeerInvalidMessageBody, unregisterInfo));
                meshEntry.EntryTable.Remove(unregisterInfo.RegistrationId);
                meshEntry.EntryList.Remove(registration);
                meshEntry.Service2EntryTable.Remove(registration.Address.ServicePath);
                registration.State = RegistrationState.Deleted;
            }
            finally
            {
                LiteLock.Release(ll);
            }
        }
 
        public virtual RefreshResponseInfo Refresh(RefreshInfo refreshInfo)
        {
            if (refreshInfo == null)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("refreshInfo", SR.GetString(SR.PeerNullRefreshInfo));
            }
 
            ThrowIfClosed("Refresh");
 
            if (!refreshInfo.HasBody() || String.IsNullOrEmpty(refreshInfo.MeshId) || refreshInfo.RegistrationId == Guid.Empty)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("refreshInfo", SR.GetString(SR.PeerInvalidMessageBody, refreshInfo));
            }
            RefreshResult result = RefreshResult.RegistrationNotFound;
            RegistrationEntry entry = null;
            MeshEntry meshEntry = GetMeshEntry(refreshInfo.MeshId, false);
            LiteLock ll = null;
 
            if (meshEntry != null)
            {
                try
                {
                    LiteLock.Acquire(out ll, meshEntry.Gate);
                    if (!meshEntry.EntryTable.TryGetValue(refreshInfo.RegistrationId, out entry))
                        return new RefreshResponseInfo(RefreshInterval, result);
                    lock (entry)
                    {
                        if (entry.State == RegistrationState.OK)
                        {
                            entry.Expires = DateTime.UtcNow + RefreshInterval;
                            result = RefreshResult.Success;
                        }
                    }
                }
                finally
                {
                    LiteLock.Release(ll);
                }
            }
            return new RefreshResponseInfo(RefreshInterval, result);
        }
 
        public virtual ServiceSettingsResponseInfo GetServiceSettings()
        {
            ThrowIfClosed("GetServiceSettings");
            ServiceSettingsResponseInfo info = new ServiceSettingsResponseInfo(this.ControlShape);
            return info;
        }
 
        public virtual void Open()
        {
            ThrowIfOpened("Open");
            if (this.refreshInterval <= TimeSpan.Zero)
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("RefreshInterval", SR.GetString(SR.RefreshIntervalMustBeGreaterThanZero, this.refreshInterval));
            if (this.CleanupInterval <= TimeSpan.Zero)
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("CleanupInterval", SR.GetString(SR.CleanupIntervalMustBeGreaterThanZero, this.cleanupInterval));
 
            //check that we are good to open
            timer = new IOThreadTimer(new Action<object>(CleanupActivity), null, false);
            timer.Set(CleanupInterval);
            opened = true;
        }
 
        public virtual void Close()
        {
            ThrowIfClosed("Close");
            timer.Cancel();
            opened = false;
        }
 
        internal virtual void CleanupActivity(object state)
        {
            if (!opened)
                return;
 
            if (!isCleaning)
            {
                lock (ThisLock)
                {
                    if (!isCleaning)
                    {
                        isCleaning = true;
                        try
                        {
                            MeshEntry meshEntry = null;
                            //acquire a write lock.  from the reader/writer lock can we postpone until no contention?
                            ICollection<string> keys = null;
                            LiteLock ll = null;
                            try
                            {
                                LiteLock.Acquire(out ll, gate);
                                keys = meshId2Entry.Keys;
                            }
                            finally
                            {
                                LiteLock.Release(ll);
                            }
                            foreach (string meshId in keys)
                            {
                                meshEntry = GetMeshEntry(meshId);
                                CleanupMeshEntry(meshEntry);
                            }
                        }
                        finally
                        {
                            isCleaning = false;
                            if (opened)
                                timer.Set(this.CleanupInterval);
                        }
                    }
                }
            }
        }
 
        //always call this from a readlock
        void CleanupMeshEntry(MeshEntry meshEntry)
        {
            List<Guid> remove = new List<Guid>();
            if (!opened)
                return;
            LiteLock ll = null;
            try
            {
                LiteLock.Acquire(out ll, meshEntry.Gate, true);
                foreach (KeyValuePair<Guid, RegistrationEntry> item in meshEntry.EntryTable)
                {
                    if ((item.Value.Expires <= DateTime.UtcNow) || (item.Value.State == RegistrationState.Deleted))
                    {
                        remove.Add(item.Key);
                        meshEntry.EntryList.Remove(item.Value);
                        meshEntry.Service2EntryTable.Remove(item.Value.Address.ServicePath);
                    }
                }
                foreach (Guid id in remove)
                {
                    meshEntry.EntryTable.Remove(id);
                }
            }
            finally
            {
                LiteLock.Release(ll);
            }
        }
 
        object ThisLock
        {
            get
            {
                return this.thisLock;
            }
        }
        void ThrowIfOpened(string operation)
        {
            if (opened)
                PeerExceptionHelper.ThrowInvalidOperation_NotValidWhenOpen(operation);
        }
        void ThrowIfClosed(string operation)
        {
            if (!opened)
                PeerExceptionHelper.ThrowInvalidOperation_NotValidWhenClosed(operation);
 
        }
    }
}