Sunday, May 6, 2018

FIFO Semaphore

In one of our project requirement we had to built the max concurrency requirement for which we decided to use Semaphore but we had requirement to build FIFO Semaphore as by default Semaphore does not guarantee FIFO access to waiting thread.

Therefore, I have use ConcurrentQueue together with Semaphore to come with FIFO Sempahore




using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace NRL.Shared.Locking
{
    public class ThreadPoolAsyncLock
    {
        private ConcurrentQueue<(SemaphoreSlim Semaphore, DateTime Time)> queue = new ConcurrentQueue<(SemaphoreSlim, DateTime)>();

        private (DateTime start, int requests) requestTime = (DateTime.UtcNow, 0);

        private SemaphoreSlim semaphoreSlims;

        public static TimeSpan Timeout = TimeSpan.FromMilliseconds(500);

        public static int MaxRequest = 30;

        public ThreadPoolAsyncLock(int maxThread = 30)
        {
            semaphoreSlims = new SemaphoreSlim(maxThread, maxThread);
        }

        public async Task<TaskRunner<TResult>> ScheduleTask<TResult>()
        {
            /* This lock will make sure no one enter in main semaphore unless some one from main semaphore releases this */
            await AcquireOrQueue().WaitAsync(Timeout).ConfigureAwait(false);

            /* Main semaphore wait */
            if (await semaphoreSlims.WaitAsync(Timeout).ConfigureAwait(false))
               {
                 new TaskRunner<TResult>(ReleaseLock) { IsLocked = true };
               }


            return new TaskRunner<TResult>(ReleaseLock);
        }

        private SemaphoreSlim AcquireOrQueue()
        {
            SemaphoreSlim slim = null;
            lock (semaphoreSlims)
            {
                if (semaphoreSlims.CurrentCount > 0 && (DateTime.UtcNow.Subtract(requestTime.start).Seconds > 1 || requestTime.requests < MaxRequest))
                {
                    slim = new SemaphoreSlim(1, 1);
                    Interlocked.Increment(ref requestTime.requests);
                }
                else
                {
                    slim = new SemaphoreSlim(0, 1);
                    queue.Enqueue((slim, DateTime.UtcNow));
                }
            }

            return slim;
        }

        /// <summary>
        /// Dequeue the thread waiting to enter main semaphore
        /// </summary>
        /// <remarks>
        /// It only allows the dequeue if the main semaphore is not full Or Max requests hasn't been reached within last second.
        /// </remarks>
        private void DeQueue()
        {
            lock (semaphoreSlims)
            {
                if (semaphoreSlims.CurrentCount == 0 || (requestTime.requests >= MaxRequest && DateTime.UtcNow.Subtract(requestTime.start).Seconds <= 1))
                    return;

                Interlocked.Decrement(ref requestTime.requests);
                if (queue.TryDequeue(out var semaphore))
                {
                    semaphore.Semaphore.Release();
                    requestTime.start = semaphore.Time;
                }
            }
        }

        private void ReleaseLock()
        {
            lock (semaphoreSlims)
            {
                semaphoreSlims.Release();
                DeQueue();
            }
        }

        /// <summary>
        /// The disposable releaser tasked with releasing the semaphore.
        /// </summary>
        public sealed class TaskRunner<TResult> : IDisposable
        {
            /// <summary>
            /// A value indicating whether this instance of the given entity has been disposed.
            /// </summary>
            /// <value><see langword="true"/> if this instance has been disposed; otherwise, <see langword="false"/>.</value>
            /// <remarks>
            /// If the entity is disposed, it must not be disposed a second
            /// time. The isDisposed field is set the first time the entity
            /// is disposed. If the isDisposed field is true, then the Dispose()
            /// method will not dispose again. This help not to prolong the entity's
            /// life in the Garbage Collector.
            /// </remarks>
            private bool isDisposed;

            
            public bool IsLocked { get; set; }

            /// <summary>
            /// A Task to run after acquiring and locking the thread.
            /// </summary>
            /// <remarks>
            /// If the entity acquires the lock, then entity shoule call <see cref="RunAsycn" /> or <see cref="Run"/> to run the task />
            /// </remarks>
            private Task<TResult> task;

            public delegate void TaskDisposeCallBack();

            /// <summary>
            /// Task dispose call back to release or dispose any resources requried for this runner.
            /// </summary>
            private TaskDisposeCallBack taskDisposeCallBack;

            public TaskRunner(TaskDisposeCallBack callBack)
            {
                taskDisposeCallBack = callBack;
            }

            /// <summary>
            /// Finalizes an instance of the <see cref="TaskRunner{TResult}"/> class.
            /// </summary>
            ~TaskRunner()
            {
                // Do not re-create Dispose clean-up code here.
                // Calling Dispose(false) is optimal in terms of
                // readability and maintainability.
                this.Dispose(false);
            }

            /// <summary>
            /// Disposes of the resources (other than memory) used by the module that implements <see cref="T:System.Web.IHttpModule"/>.
            /// </summary>
            public void Dispose()
            {
                this.Dispose(true);

                // This object will be cleaned up by the Dispose method.
                // Therefore, you should call GC.SuppressFinalize to
                // take this object off the finalization queue
                // and prevent finalization code for this object
                // from executing a second time.
                GC.SuppressFinalize(this);
            }

            /// <summary>
            /// Disposes the object and frees resources for the Garbage Collector.
            /// </summary>
            /// <param name="disposing">
            /// If true, the object gets disposed.
            /// </param>
            private void Dispose(bool disposing)
            {
                if (this.isDisposed)
                {
                    return;
                }

                if (disposing)
                {
                    taskDisposeCallBack?.Invoke();
                }

                // Call the appropriate methods to clean up
                // unmanaged resources here.
                // Note disposing is done.
                this.isDisposed = true;
            }
        }
    }
}


Usage


    using (var @lock = await new ThreadPoolAsyncLock().ScheduledTask().ConfigureAwait(false))
            {
                if (@lock.IsLocked)
                    return query.Track().Skip((page - 1) * count).Take(count).GetResult();

                throw new HttpResponseException((HttpStatusCode)429);
            }


No comments:

Post a Comment