Tuesday, May 8, 2018

Asynchronously Throttling EPiFind Request To Stay under EPiFind Limit On Azure

ThrottledSemaphore


/// <summary>
    /// Throttled semaphore to allow maximum number of request concurrently per time period.
    /// </summary>
    /// <remarks>
    ///     <para>
    ///         In order to allow N no. of request concurrently for given time period <see cref="ThrottledSemaphore"/>
    ///         the caller should call the <see cref="TryLock"/> thorugh <see cref="using"/> as TryLock returns the IDisposable <see cref="ThrottledLock"/>
    ///         which tells caller whether the lock <see cref="ThrottledLock.IsLocked"/> has been taken or not.
    ///     </para>
    ///     <para>
    ///         <code>
    ///            private static ThrottledSemaphore throttledSemaphore = new ThrottledSemaphore();
    ///            using (var @lock = await throttledSemaphore.TryLock().ConfigureAwait(false))
    ///                   {
    ///                     if (@lock.IsLocked)
    ///                        return query.Track().Skip((page - 1) * count).Take(count).GetResult();
    ///                     throw new HttpResponseException((HttpStatusCode)429);
    ///                   }
    ///         </code>
    ///     </para>
    /// </remarks>
    public class ThrottledSemaphore
    {
        private ConcurrentQueue<DateTime> times;

        private SemaphoreSlim semaphore;

        public int MaxConcurrency;

        private TimeSpan Period;

        public ThrottledSemaphore(TimeSpan period, int maxConcurrency = 20)
        {
            Period = period;
            MaxConcurrency = maxConcurrency;
            semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency);
            times = new ConcurrentQueue<DateTime>();
        }

        public async Task<ThrottledLock> TryLock(int timeoutMilliseconds = 500)
        {
            if (await semaphore.WaitAsync(timeoutMilliseconds).ConfigureAwait(false))
            {
                await Wait().ConfigureAwait(false);

                return new ThrottledLock(ReleaseLock) { IsLocked = true };
            }

            return new ThrottledLock(null);
        }

        private void ReleaseLock()
        {
            lock (semaphore)
            {
                times.Enqueue(DateTime.UtcNow);
                semaphore.Release();
            }
        }

        private async Task Wait()
        {
            var now = DateTime.UtcNow;
            var lastTime = DateTime.MinValue;

            if (times.Count >= MaxConcurrency)
                times.TryDequeue(out lastTime);

            var until = lastTime.Add(Period);
            if (MaxConcurrency - semaphore.CurrentCount + times.Count >= MaxConcurrency && until > now)
            {
                await Task.Delay(until - now).ConfigureAwait(false);
            }
        }

        /// <summary>
        /// The disposable releaser tasked with releasing the semaphore.
        /// </summary>
        public sealed class ThrottledLock : IDisposable
        {
            /// <summary>
            /// A value indicating whether this instance of the given entity has been disposed.
            /// </summary>
            /// <value><see langword="true"/> if this instance has been disposed; otherwise, <see langword="false"/>.</value>
            /// <remarks>
            /// If the entity is disposed, it must not be disposed a second
            /// time. The isDisposed field is set the first time the entity
            /// is disposed. If the isDisposed field is true, then the Dispose()
            /// method will not dispose again. This help not to prolong the entity's
            /// life in the Garbage Collector.
            /// </remarks>
            private bool isDisposed;

            public bool IsLocked { get; set; }

            public delegate void TaskDisposeCallBack();

            /// <summary>
            /// Task dispose call back to release or dispose any resources requried for this runner.
            /// </summary>
            private TaskDisposeCallBack taskDisposeCallBack;

            public ThrottledLock(TaskDisposeCallBack callBack)
            {
                taskDisposeCallBack = callBack;
            }

            /// <summary>
            /// Finalizes an instance of the <see cref="ThrottledLock"/> class.
            /// </summary>
            ~ThrottledLock()
            {
                // Do not re-create Dispose clean-up code here.
                // Calling Dispose(false) is optimal in terms of
                // readability and maintainability.
                this.Dispose(false);
            }

            /// <summary>
            /// Disposes of the resources (other than memory) used by the module that implements <see cref="T:System.Web.IHttpModule"/>.
            /// </summary>
            public void Dispose()
            {
                this.Dispose(true);

                // This object will be cleaned up by the Dispose method.
                // Therefore, you should call GC.SuppressFinalize to
                // take this object off the finalization queue
                // and prevent finalization code for this object
                // from executing a second time.
                GC.SuppressFinalize(this);
            }

            /// <summary>
            /// Disposes the object and frees resources for the Garbage Collector.
            /// </summary>
            /// <param name="disposing">
            /// If true, the object gets disposed.
            /// </param>
            private void Dispose(bool disposing)
            {
                if (this.isDisposed)
                {
                    return;
                }

                if (disposing)
                {
                    taskDisposeCallBack?.Invoke();
                }

                // Call the appropriate methods to clean up
                // unmanaged resources here.
                // Note disposing is done.
                this.isDisposed = true;
            }
        }
    }



Content Search Service/Usage


public static class ContentSearchService
    {
        private static IClient client = SearchClient.Instance;

        private static ThrottledSemaphore searchThrottle;

        static ContentSearchService()
        {
            // Throttle the user search requests to to avoid exceeding our allowed requests/sec to Episerver Find
            searchThrottle = new ThrottledSemaphore(
                period: TimeSpan.FromSeconds(1),
                maxConcurrency: CalculateSearchConcurrency(ServerStateEvent.GetServerCount()));
            ServerStateEvent.OnServerCountChanged += count => searchThrottle.MaxConcurrency = CalculateSearchConcurrency(count);
        }
  
 public static async Task<UnifiedSearchResults> GetSearchResults(string keyword, IList<Type> types = null, SortBy sort = SortBy.Relevance, DateTime? before = null, int page = 1, int count = 24)
        {
            var query = client.UnifiedSearch().For(keyword) as ITypeSearch<ISearchContent>;

            // Filter for specific page types
            if (!types.IsNullOrEmpty())
                query = query.FilterByExactTypes(types);

            // Filter before a specific date time
            if (before.HasValue)
                query = query.Filter(t => t.SearchPublishDate.Before(before.Value.RoundToMinute(RoundingDirection.Floor)));

            switch (sort)
            {
                case SortBy.Earliest:
                    query = query.OrderBy(o => o.SearchPublishDate);
                    break;
                case SortBy.Latest:
                    query = query.OrderByDescending(o => o.SearchPublishDate);
                    break;
                default:
                    query = query
                        .BoostMatching(s => s.SearchTitle.Match(keyword), 2)
                        .BoostMatching(s => s.SearchSection.Match(keyword), 1.5);
                    break;
            }

            using (var @lock = await searchThrottle.TryLock().ConfigureAwait(false))
            {
                if (@lock.IsLocked)
                    return query.Track().Skip((page - 1) * count).Take(count).GetResult();

                throw new HttpResponseException((HttpStatusCode)429);
            }
        }

        private static int CalculateSearchConcurrency(int serverCount)
        {
            // Divide the maximum concurrent user searches between the active servers (excluding the CMS admin server).
            return serverCount > 1 ? (Settings.Search.MaxConcurrency / (serverCount - 1)) : 1;
        }
    }



Monitoring Azure Servers Turning On and Shutting down


 public static class ServerStateEvent
    {
        // HACK: Guid copied from EPiServer.Events.Clients.Internal.ServerStateService.StateEventId and may be subject to change without notice
        private static readonly Guid EventId = new Guid("{51da5053-6af8-4a10-9bd4-8417e48f38bd}");

        private static readonly Guid RaiserId = Guid.NewGuid();

        private static Event serverEvent;

        private static ILogger logger = LogManager.GetLogger(typeof(ServerStateEvent));

        private static IServerStateService stateService = LicensingServices.Instance.GetService<IServerStateService>();

        private static int? serverCount = null;

        /// <summary>
        /// Event occurs when there has been a change to the number of server instances running in the application
        /// </summary>
        public static event Action<int> OnServerCountChanged;

        public static void Register()
        {
            var registry = ServiceLocator.Current.GetInstance<IEventRegistry>();
            serverEvent = registry.Get(EventId);
            serverEvent.Raised += OnRaised;
        }

        public static int GetServerCount()
        {
            if (serverCount == null)
                serverCount = stateService.ActiveServers();

            return serverCount.Value;
        }

        private static void OnRaised(object sender, EventNotificationEventArgs args)
        {
            if (args.Param is StateMessage message && (message.Type == StateMessageType.Hello || message.Type == StateMessageType.Bye))
            {
                // TODO: Remove when logging is removed
                var serverCountBefore = serverCount;

                // Calculate the new server count and fire the change event
                serverCount = stateService.ActiveServers();
                OnServerCountChanged?.Invoke(serverCount.Value);

                // TODO: Remove logging when confident that event is firing when needed
                logger.Information($"A ServerStateEvent was raised. Application: {message.ApplicationName} | Type: {message.Type} | Servers Before: {serverCountBefore} | Servers After: {serverCount}");
            }
        }
    }




 [InitializableModule]
    [ModuleDependency(typeof(EventsInitialization))]
    public class ServerStateConfig : IInitializableModule
    {
        public void Initialize(InitializationEngine context)
        {
            ServerStateEvent.Register();
        }

        public void Uninitialize(InitializationEngine context) { }
    }


No comments:

Post a Comment