Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify the issue of premature thread exit caused by Monitor.Wait(). #95

Merged
merged 11 commits into from
May 22, 2023

Conversation

lausannel
Copy link
Collaborator

@lausannel lausannel commented May 22, 2023

#93 Try to fix this issue, this occurs because multiple threads are waiting simultaneously, but only one of them will be awakened in the end, others will continue waiting, making the timeout period not the same as the time set by the user.

@lausannel
Copy link
Collaborator Author

Fixed, compare the following 2 pieces of code

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Collections.Concurrent;

class Program
{
    static readonly object _locker = new object();

    public class Client{
        public int i;
    }

    public static ConcurrentClientQueue clientQueue = new ConcurrentClientQueue();

     public class ConcurrentClientQueue
    {
        public ConcurrentQueue<Client> ClientQueue { get; }

        public ConcurrentClientQueue(List<Client> clients)
        {
            ClientQueue = new ConcurrentQueue<Client>(clients);
        }
        public ConcurrentClientQueue()
        {
            ClientQueue = new ConcurrentQueue<Client>();
        }
        public void Add(Client client) => Return(client);

        public void Return(Client client)
        {
            Monitor.Enter(ClientQueue);
            ClientQueue.Enqueue(client);
            Console.WriteLine("Return client {0} Time {1}", client.i, DateTime.Now.ToString("HH:mm:ss.fff.ffff"));
            Monitor.Pulse(ClientQueue);
            Monitor.Exit(ClientQueue);
            Thread.Sleep(0);
        }
        int _ref = 0;
        public void AddRef()
        {
            lock (this)
            {
                _ref++;
            }
        }
        public int GetRef()
        {
            return _ref;
        }
        public void RemoveRef()
        {
            lock (this)
            {
                _ref--;
            }
        }
        public int Timeout { get; set; } = 10;
        public Client Take()
        {
            Client client = null;
            Monitor.Enter(ClientQueue);
            if (ClientQueue.IsEmpty)
            {
                Console.WriteLine("Queue empty, Thread {0} wait, time {1}", Thread.CurrentThread.ManagedThreadId, DateTime.Now.ToString("HH:mm:ss.fff.ffff"));
                Monitor.Wait(ClientQueue, TimeSpan.FromSeconds(Timeout));
            }
            Console.WriteLine("Thread {0} wake up, time {1}", Thread.CurrentThread.ManagedThreadId, DateTime.Now.ToString("HH:mm:ss.fff.ffff"));
            if (!ClientQueue.TryDequeue(out client))
            {
                Console.WriteLine("Thread {0} get client failed, time {1}", Thread.CurrentThread.ManagedThreadId, DateTime.Now.ToString("HH:mm:ss.fff.ffff"));
            }
            else
            {
                Console.WriteLine("Thread {0} get client {1} success, time {2}", Thread.CurrentThread.ManagedThreadId, client.i, DateTime.Now.ToString("HH:mm:ss.fff.ffff"));
            }
            Monitor.Exit(ClientQueue);
            if (client == null)
            {
                throw new TimeoutException($"Connection pool is empty and wait time out({Timeout}s)!");
            }
            return client;

    static void Main()
    {
        for(int i = 0; i < 1; i++){
            clientQueue.Add(new Client(){i = i});
        }

        Thread t1 = new Thread(() => ThreadWork(1));
        Thread t2 = new Thread(() => ThreadWork(2));
        Thread t3 = new Thread(() => ThreadWork(3));
        t1.Start();
        t2.Start();
        t3.Start();
        t1.Join();
        t2.Join();
        t3.Join();
    }

    static void ThreadWork(int threadNumber)
    {
        var client = clientQueue.Take();
        Console.WriteLine("Thread {0} took client {1} Time {2}", threadNumber, client.i, DateTime.Now.ToString("HH:mm:ss.fff.ffff"));
        long useless = 0;
        for(long i = 0; i < 4000000000;i++){
            useless++;
        }
        clientQueue.Add(client);
    }
}

The following error occurs during the execution of the code because both Thread 2 and Thread 3 are waiting for Thread 1 to complete its task. However, Monitor.Pulse will only wake up one of the threads, allowing either Thread 2 or Thread 3 to acquire the resource and continue execution while the other thread continues to wait. As a result, when the total waiting time reaches 10 seconds, the waiting thread exits prematurely.

Return client 0 Time 17:12:52.415.4157
Thread 4 wake up, time 17:12:52.465.4650
Thread 4 get client 0 success, time 17:12:52.465.4651
Thread 1 took client 0 Time 17:12:52.465.4652
Queue empty, Thread 5 wait, time 17:12:52.465.4652
Queue empty, Thread 6 wait, time 17:12:52.465.4659
Return client 0 Time 17:13:00.631.6310
Thread 5 wake up, time 17:13:00.631.6313
Thread 5 get client 0 success, time 17:13:00.631.6314
Thread 2 took client 0 Time 17:13:00.631.6314
Thread 6 wake up, time 17:13:02.466.4662
Thread 6 get client failed, time 17:13:02.466.4663
Unhandled exception. System.TimeoutException: Connection pool is empty and wait time out(10s)!
   at ...

In the second piece of code, we replace the Taken function with the following definition and replace Pulse with PulseAll

public Client Take()
{
    Client client = null;
    Monitor.Enter(ClientQueue);
    while(true){
        bool timeout = false;
        if (ClientQueue.IsEmpty)
        {
            Console.WriteLine("Queue empty, Thread {0} wait, time {1}", Thread.CurrentThread.ManagedThreadId, DateTime.Now.ToString("HH:mm:ss.fff.ffff"));
            timeout = !Monitor.Wait(ClientQueue, TimeSpan.FromSeconds(Timeout));
        }
        Console.WriteLine("Thread {0} wake up, time {1}", Thread.CurrentThread.ManagedThreadId, DateTime.Now.ToString("HH:mm:ss.fff.ffff"));
        if (!ClientQueue.TryDequeue(out client))
        {
            Console.WriteLine("Thread {0} get client failed, time {1}", Thread.CurrentThread.ManagedThreadId, DateTime.Now.ToString("HH:mm:ss.fff.ffff"));
        }
        else
        {
            Console.WriteLine("Thread {0} get client {1} success, time {2}", Thread.CurrentThread.ManagedThreadId, client.i, DateTime.Now.ToString("HH:mm:ss.fff.ffff"));
        }
        
        if(client != null || timeout){
            break;
        }
    }
    Monitor.Exit(ClientQueue);
    if (client == null)
    {
        throw new TimeoutException($"Connection pool is empty and wait time out({Timeout}s)!");
    }
    return client;
}

This time, when the client returns, using Monitor.PulseAll will wake up all threads. When two threads compete for the client through ConcurrentClientQueue, the winner will exit while the loser refreshes its waiting time and continues to wait. Therefore, the error situation described earlier will not occur.

Return client 0 Time 17:14:26.078.0782
Thread 4 wake up, time 17:14:26.126.1260
Thread 4 get client 0 success, time 17:14:26.126.1261
Thread 1 took client 0 Time 17:14:26.126.1261
Queue empty, Thread 5 wait, time 17:14:26.126.1262
Queue empty, Thread 6 wait, time 17:14:26.126.1267
Return client 0 Time 17:14:34.248.2489
Thread 6 wake up, time 17:14:34.249.2491
Thread 6 get client 0 success, time 17:14:34.249.2492
Thread 3 took client 0 Time 17:14:34.249.2492
Thread 5 wake up, time 17:14:34.249.2492
Thread 5 get client failed, time 17:14:34.249.2497
Queue empty, Thread 5 wait, time 17:14:34.249.2497
Return client 0 Time 17:14:42.390.3900
Thread 5 wake up, time 17:14:42.390.3902
Thread 5 get client 0 success, time 17:14:42.390.3902
Thread 2 took client 0 Time 17:14:42.390.3905
Return client 0 Time 17:14:50.435.4352

@lausannel lausannel requested a review from Aiemu May 22, 2023 10:19
Copy link
Collaborator

@Aiemu Aiemu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@lausannel lausannel merged commit d325338 into main May 22, 2023
2 checks passed
@lausannel lausannel deleted the fix/concurrency branch May 22, 2023 14:16
@MoeGuoH
Copy link

MoeGuoH commented May 23, 2023

👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants