Showing posts with label Async Request Throtlling. Show all posts
Showing posts with label Async Request Throtlling. Show all posts

Tuesday, May 8, 2018

Asynchronously Throttling EPiFind Request To Stay under EPiFind Limit On Azure

ThrottledSemaphore


/// <summary>
    /// Throttled semaphore to allow maximum number of request concurrently per time period.
    /// </summary>
    /// <remarks>
    ///     <para>
    ///         In order to allow N no. of request concurrently for given time period <see cref="ThrottledSemaphore"/>
    ///         the caller should call the <see cref="TryLock"/> thorugh <see cref="using"/> as TryLock returns the IDisposable <see cref="ThrottledLock"/>
    ///         which tells caller whether the lock <see cref="ThrottledLock.IsLocked"/> has been taken or not.
    ///     </para>
    ///     <para>
    ///         <code>
    ///            private static ThrottledSemaphore throttledSemaphore = new ThrottledSemaphore();
    ///            using (var @lock = await throttledSemaphore.TryLock().ConfigureAwait(false))
    ///                   {
    ///                     if (@lock.IsLocked)
    ///                        return query.Track().Skip((page - 1) * count).Take(count).GetResult();
    ///                     throw new HttpResponseException((HttpStatusCode)429);
    ///                   }
    ///         </code>
    ///     </para>
    /// </remarks>
    public class ThrottledSemaphore
    {
        private ConcurrentQueue<DateTime> times;

        private SemaphoreSlim semaphore;

        public int MaxConcurrency;

        private TimeSpan Period;

        public ThrottledSemaphore(TimeSpan period, int maxConcurrency = 20)
        {
            Period = period;
            MaxConcurrency = maxConcurrency;
            semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency);
            times = new ConcurrentQueue<DateTime>();
        }

        public async Task<ThrottledLock> TryLock(int timeoutMilliseconds = 500)
        {
            if (await semaphore.WaitAsync(timeoutMilliseconds).ConfigureAwait(false))
            {
                await Wait().ConfigureAwait(false);

                return new ThrottledLock(ReleaseLock) { IsLocked = true };
            }

            return new ThrottledLock(null);
        }

        private void ReleaseLock()
        {
            lock (semaphore)
            {
                times.Enqueue(DateTime.UtcNow);
                semaphore.Release();
            }
        }

        private async Task Wait()
        {
            var now = DateTime.UtcNow;
            var lastTime = DateTime.MinValue;

            if (times.Count >= MaxConcurrency)
                times.TryDequeue(out lastTime);

            var until = lastTime.Add(Period);
            if (MaxConcurrency - semaphore.CurrentCount + times.Count >= MaxConcurrency && until > now)
            {
                await Task.Delay(until - now).ConfigureAwait(false);
            }
        }

        /// <summary>
        /// The disposable releaser tasked with releasing the semaphore.
        /// </summary>
        public sealed class ThrottledLock : IDisposable
        {
            /// <summary>
            /// A value indicating whether this instance of the given entity has been disposed.
            /// </summary>
            /// <value><see langword="true"/> if this instance has been disposed; otherwise, <see langword="false"/>.</value>
            /// <remarks>
            /// If the entity is disposed, it must not be disposed a second
            /// time. The isDisposed field is set the first time the entity
            /// is disposed. If the isDisposed field is true, then the Dispose()
            /// method will not dispose again. This help not to prolong the entity's
            /// life in the Garbage Collector.
            /// </remarks>
            private bool isDisposed;

            public bool IsLocked { get; set; }

            public delegate void TaskDisposeCallBack();

            /// <summary>
            /// Task dispose call back to release or dispose any resources requried for this runner.
            /// </summary>
            private TaskDisposeCallBack taskDisposeCallBack;

            public ThrottledLock(TaskDisposeCallBack callBack)
            {
                taskDisposeCallBack = callBack;
            }

            /// <summary>
            /// Finalizes an instance of the <see cref="ThrottledLock"/> class.
            /// </summary>
            ~ThrottledLock()
            {
                // Do not re-create Dispose clean-up code here.
                // Calling Dispose(false) is optimal in terms of
                // readability and maintainability.
                this.Dispose(false);
            }

            /// <summary>
            /// Disposes of the resources (other than memory) used by the module that implements <see cref="T:System.Web.IHttpModule"/>.
            /// </summary>
            public void Dispose()
            {
                this.Dispose(true);

                // This object will be cleaned up by the Dispose method.
                // Therefore, you should call GC.SuppressFinalize to
                // take this object off the finalization queue
                // and prevent finalization code for this object
                // from executing a second time.
                GC.SuppressFinalize(this);
            }

            /// <summary>
            /// Disposes the object and frees resources for the Garbage Collector.
            /// </summary>
            /// <param name="disposing">
            /// If true, the object gets disposed.
            /// </param>
            private void Dispose(bool disposing)
            {
                if (this.isDisposed)
                {
                    return;
                }

                if (disposing)
                {
                    taskDisposeCallBack?.Invoke();
                }

                // Call the appropriate methods to clean up
                // unmanaged resources here.
                // Note disposing is done.
                this.isDisposed = true;
            }
        }
    }



Content Search Service/Usage


public static class ContentSearchService
    {
        private static IClient client = SearchClient.Instance;

        private static ThrottledSemaphore searchThrottle;

        static ContentSearchService()
        {
            // Throttle the user search requests to to avoid exceeding our allowed requests/sec to Episerver Find
            searchThrottle = new ThrottledSemaphore(
                period: TimeSpan.FromSeconds(1),
                maxConcurrency: CalculateSearchConcurrency(ServerStateEvent.GetServerCount()));
            ServerStateEvent.OnServerCountChanged += count => searchThrottle.MaxConcurrency = CalculateSearchConcurrency(count);
        }
  
 public static async Task<UnifiedSearchResults> GetSearchResults(string keyword, IList<Type> types = null, SortBy sort = SortBy.Relevance, DateTime? before = null, int page = 1, int count = 24)
        {
            var query = client.UnifiedSearch().For(keyword) as ITypeSearch<ISearchContent>;

            // Filter for specific page types
            if (!types.IsNullOrEmpty())
                query = query.FilterByExactTypes(types);

            // Filter before a specific date time
            if (before.HasValue)
                query = query.Filter(t => t.SearchPublishDate.Before(before.Value.RoundToMinute(RoundingDirection.Floor)));

            switch (sort)
            {
                case SortBy.Earliest:
                    query = query.OrderBy(o => o.SearchPublishDate);
                    break;
                case SortBy.Latest:
                    query = query.OrderByDescending(o => o.SearchPublishDate);
                    break;
                default:
                    query = query
                        .BoostMatching(s => s.SearchTitle.Match(keyword), 2)
                        .BoostMatching(s => s.SearchSection.Match(keyword), 1.5);
                    break;
            }

            using (var @lock = await searchThrottle.TryLock().ConfigureAwait(false))
            {
                if (@lock.IsLocked)
                    return query.Track().Skip((page - 1) * count).Take(count).GetResult();

                throw new HttpResponseException((HttpStatusCode)429);
            }
        }

        private static int CalculateSearchConcurrency(int serverCount)
        {
            // Divide the maximum concurrent user searches between the active servers (excluding the CMS admin server).
            return serverCount > 1 ? (Settings.Search.MaxConcurrency / (serverCount - 1)) : 1;
        }
    }



Monitoring Azure Servers Turning On and Shutting down


 public static class ServerStateEvent
    {
        // HACK: Guid copied from EPiServer.Events.Clients.Internal.ServerStateService.StateEventId and may be subject to change without notice
        private static readonly Guid EventId = new Guid("{51da5053-6af8-4a10-9bd4-8417e48f38bd}");

        private static readonly Guid RaiserId = Guid.NewGuid();

        private static Event serverEvent;

        private static ILogger logger = LogManager.GetLogger(typeof(ServerStateEvent));

        private static IServerStateService stateService = LicensingServices.Instance.GetService<IServerStateService>();

        private static int? serverCount = null;

        /// <summary>
        /// Event occurs when there has been a change to the number of server instances running in the application
        /// </summary>
        public static event Action<int> OnServerCountChanged;

        public static void Register()
        {
            var registry = ServiceLocator.Current.GetInstance<IEventRegistry>();
            serverEvent = registry.Get(EventId);
            serverEvent.Raised += OnRaised;
        }

        public static int GetServerCount()
        {
            if (serverCount == null)
                serverCount = stateService.ActiveServers();

            return serverCount.Value;
        }

        private static void OnRaised(object sender, EventNotificationEventArgs args)
        {
            if (args.Param is StateMessage message && (message.Type == StateMessageType.Hello || message.Type == StateMessageType.Bye))
            {
                // TODO: Remove when logging is removed
                var serverCountBefore = serverCount;

                // Calculate the new server count and fire the change event
                serverCount = stateService.ActiveServers();
                OnServerCountChanged?.Invoke(serverCount.Value);

                // TODO: Remove logging when confident that event is firing when needed
                logger.Information($"A ServerStateEvent was raised. Application: {message.ApplicationName} | Type: {message.Type} | Servers Before: {serverCountBefore} | Servers After: {serverCount}");
            }
        }
    }




 [InitializableModule]
    [ModuleDependency(typeof(EventsInitialization))]
    public class ServerStateConfig : IInitializableModule
    {
        public void Initialize(InitializationEngine context)
        {
            ServerStateEvent.Register();
        }

        public void Uninitialize(InitializationEngine context) { }
    }