Skip to content

Instantly share code, notes, and snippets.

@billchen2k
Created August 30, 2024 08:57
Show Gist options
  • Save billchen2k/26dc6cf99b7bcdb11fdf517547ecaf65 to your computer and use it in GitHub Desktop.
Save billchen2k/26dc6cf99b7bcdb11fdf517547ecaf65 to your computer and use it in GitHub Desktop.

Contents

Intro

Conditional Variables are a synchronization primitive in C++ that allows threads to wait for a certain condition to be met1. This is useful when you want to coordinate the execution of multiple threads. To use a conditional variable, you will need:

  • A conditional variable std::contional_variable cv
  • A mutex std::mutex mtx and a lock std::unique_lock<std::mutex> lck(mtx)
  • Appropriate wait conditions to check for
  • Appropriate notification calls from other threads

Here is a simplified structure of how to use a conditional variable:

std::mutex mtx;

// Thread 1
{
    // Acquire the lock
    std::unique_lock<std::mutex> lck(mtx);
    cv.wait(lck, [] {
        return condition;
    });
}

// Thread 2
{
    // Acquire the lock
    std::unique_lock<std::mutex> lck(mtx);
    condition = true;
    cv.notify_one();
}

Here, Thread 1 is suspended by .wait call until conition == true. However, cv won't automatically wake up the thread as soon as the condition is met. Instead, it checks the condition everytime when receiving a notification from another thread. This is where the .notify_one() and .notify_all() come in. In other threads, by calling either:

  • cv.notify_one()( to wake up one thread suspended by the cv randomly)
  • cv.notify_all() (to wake up all threads suspended by the cv)

The cv will check the condition and wake up the thread if the condition is met.

Example

Here we use an example to illustrate the actual use of conditional variables. It is abstracted from a real-world scenario I am working on recently, where we need to instrument performance data for each running thread. Say if we have multiple worker threads that have two stages: preparation and payload. It is a common use case that we want to wait for all threads to finish their preperation, do something else in the main threads, and start the actual payload for all the worker threads.

In this example, we have a Worker class that creates multiple worker threads, where each thread:

  • Sets its thread name using pthread_setname_np (Preperation Stage)
  • Delay for 1 second (Payload Stage)

We also have a Reporter class that reports the thread id and thread names of all worker threads, which requires all threads to finish their preperation stage before reporting. After reporting, we want to start the payload stage for all worker threads.

To achieve this, we use a conditional variable cv_ready to wait for all threads to finish their preperation stage. Its condition is num_ready == num_workers, where num_ready is the number of threads that have finished their preperation stage, and num_workers is the total number of worker threads. We use cv_ready.notify_one() in the worker threads to notify the main thread to check the condition and return. After preperation, the worker threads are suspended by cv, and this is when we initialize the Reporter class and call report() to report the thread information. After reporting, we call cv.notify_all() to start the payload stage for all worker threads.

#include <condition_variable>
#include <string>
#include <pthread.h>
#include <thread>
#include <vector>
#include <iostream>

/**
 * Report information of all worker threads, only works for macOS.
 */
#include <mach/mach.h>
class Reporter {
public:
    Reporter() {
        task_threads(mach_task_self(), &threads, &thread_count);
        std::cout << "[Reporter] Reporter loaded. Found " << thread_count << " threads." << std::endl;
    }

    void report() {
        std::cout << "[Reporter] Reporting all threads:" << std::endl;
        for (int i = 0; i < thread_count; i++) {
            char thread_name[64];
            uint64_t tid;
            pthread_t pthread = pthread_from_mach_thread_np(threads[i]);
            pthread_getname_np(pthread, thread_name, sizeof(thread_name));
            pthread_threadid_np(pthread, &tid);
            std::cout << "[Reporter]   - thread " << i << " name: " 
                      << thread_name << " tid: " << tid << std::endl;
        }
    }

private:
    thread_act_array_t threads;
    mach_msg_type_number_t thread_count;
};

class Worker {
public:
    Worker(int16_t n) : num_workers(n), num_ready(0) { }

    void worker_thread(std::string thread_name, uint16_t thread_id) {

        // Thread preparation
        {
            std::lock_guard<std::mutex> lck(mtx);
            pthread_setname_np(thread_name.c_str());
            std::cout << "[Worker] Thread " << thread_name << " ready to run." << std::endl;
            num_ready++;
            cv_ready.notify_one();
        }

        // Suspend all threads until ready
        std::unique_lock<std::mutex> lck(mtx);
        cv.wait(lck, [this] {
            return num_ready == num_workers;
        });

        // Allow parallel execution of worker threads
        lck.unlock();

        // Thread payload
        std::this_thread::sleep_for(std::chrono::seconds (1));

        {
            std::lock_guard<std::mutex> lck(mtx);
            std::cout << "[Worker] Thread " << thread_name << " finished." << std::endl;
        }
    }

    void create_workers() {
        std::string main_thread_name = "main";
        pthread_setname_np(main_thread_name.c_str());
        for (int i = 0; i < num_workers; i++) {
            std::string worker_name = "worker-" + std::to_string(i);
            m_threads.emplace_back(std::thread(&Worker::worker_thread, this, worker_name, i));
        }

        // Wait for all threads to be loaded
        std::unique_lock<std::mutex> lck(mtx_ready);
        cv_ready.wait(lck, [this] {
            return num_ready == num_workers;
        });


    }

    void report() {
        Reporter *reporter = new Reporter();
        reporter->report();
    }

    void run() {
        // Notify all threads to start
        cv.notify_all();
        for (auto &t: m_threads) {
            t.join();
        }
    }

private:
    int16_t num_workers;
    int16_t num_ready;
    std::vector<std::thread> m_threads;

    std::condition_variable cv;
    std::mutex mtx;

    std::condition_variable cv_ready;
    std::mutex mtx_ready;
};

int main() {
    // Create threads
    Worker *worker = new Worker(5);
    worker->create_workers();

    // Do something before actual threads execution, after all threads are created and configured.
    worker->report();

    // Start all worker threads
    worker->run();
}

We can get the following output:

[Worker] Thread worker-0 ready to run.
[Worker] Thread worker-3 ready to run.
[Worker] Thread worker-1 ready to run.
[Worker] Thread worker-2 ready to run.
[Worker] Thread worker-4 ready to run.
[Reporter] Reporter loaded. Found 6 threads.
[Reporter]   - thread 0 name: main tid: 4454155
[Reporter]   - thread 1 name: worker-0 tid: 4454239
[Reporter]   - thread 2 name: worker-1 tid: 4454240
[Reporter]   - thread 3 name: worker-2 tid: 4454241
[Reporter]   - thread 4 name: worker-3 tid: 4454242
[Reporter]   - thread 5 name: worker-4 tid: 4454243
[Worker] Thread worker-4 finished.
[Worker] Thread worker-3 finished.
[Worker] Thread worker-0 finished.
[Worker] Thread worker-2 finished.
[Worker] Thread worker-1 finished.

Here, the first 5 lines are the worker threads' preparation stage, and the next 7 lines are the thread information reported by the Reporter class. After reporting, the worker threads start their payload stage simultaneously and finish in random order.

If you are using Linux

Note that the code uses macOS system calls in the Reporter. If you are using a Linux system, you may use the following Reporter class instead:

/**
 * Reporter for Linux.
 */
#include <dirent.h>
#include <fstream>
class Reporter {
public:
    Reporter() {
        DIR *dir = opendir("/proc/self/task");
        if (dir) {
            struct dirent *entry;
            while ((entry = readdir(dir)) != nullptr) {
                if (entry->d_type == DT_DIR && isdigit(entry->d_name[0])) {
                    thread_ids.push_back(entry->d_name);
                }
            }
            closedir(dir);
        }
        thread_count = thread_ids.size();
        std::cout << "[Reporter] Reporter loaded. Found " << thread_count << " threads." << std::endl;
    }

    void report() {
        std::cout << "[Reporter] Reporting all threads:" << std::endl;
        for (int i = 0; i < thread_count; i++) {
            std::string tid = thread_ids[i];
            std::string thread_name = get_thread_name(tid);
            std::cout << "[Reporter]   - thread " << i << " name: " << thread_name
                      << " tid: " << tid << std::endl;
        }
    }

private:
  std::vector<std::string> thread_ids;
  int thread_count;

  std::string get_thread_name(const std::string &tid) {
      std::ifstream comm_file("/proc/self/task/" + tid + "/comm");
      std::string name;
      if (comm_file.is_open()) {
          std::getline(comm_file, name);
      }
      return name;
  }
};

You also need to change pthread_setname_np to pthread_setname_np(pthread_self(), thread_name.c_str()) in the Worker class.

Dive Deeper

Why do we need unique_lock and mutex?

As stated in the cpp reference:

Any thread that intends to wait on a std::condition_variable must acquire a std::unique_lock<std::mutex> on the mutex used to protect the shared variable.

But why? In intuition, if cv is designed to suspend a thread until a condition is met, a lock does not seems necessary. If we want to ensure thread-safety when evaluating conditions, why couldn't we just use atomic variables in the condition predicate?

Let's look into the details of the process:

{
    // Lock aquired
    std::unique_lock<std::mutex> lck(mtx);

    // Suspend the thread, release the lock,
    //   wait for notifications from other threads.
    cv.wait(lck, [] {
        return ready;
    });
    // When other thread calls notify:
    //   cv re-acquires the mutex and check the condition again.
    //     If met, wake up the thread, keep holdong the lock, and continue.
    //     If not met, **release** the lock, and suspend the thread until notified.

    // Equivalent to:
    while(!ready) {
        // Suspend the thread and release the lock
        cv.wait(lck);
        // When notified, re-acquire the lock
        //    so that the condition checking is atomic.
    }
}

The reasons of using locks revolves around the wait and condition checking. First, without the lock, the condition could change between the time the condition is checked and the time the thread is suspended. A lock ensures the atomicity of the process. Another reason is to avoid missing notifications, as the notification might be called just before it starts waiting. Therefore, the notifying threads should also use the same mutex before calling notify.

In other words, the mutex is designed to protect the conditional variable itself. It will be released as soon as the thread is suspended, so that other threads can access the conditional variable for notification and make changes to the conditions2.

What if the condition is met before wait/never met?

When a thread is already suspended and the cv is notified, it will check the condition and suspend the thread again or wake up. However, the first wait call of the cv will not check the condition. It will suspend the thread, release the mutex, and wait for notification. Having the condition already met when calling wait is like calling the it function without the condition predicate:

{
    std::unique_lock<std::mutex> lck(mtx);
    // After calling wait, release the lock, suspend the thread, and wait for notification.
    cv.wait(lck);
}

If the condition is never met, well, it suspends forever (as expected). In the example above, the condition for cv_ready is num_ready == num_workers. If this is never satisfied, the uspendcreate_workers function will hang indefinitely, indicating not all threads are ready.

What if notify() is called before wait/never called?

If we notify before the wait call, the notification will be lost since these calls are not queued. Without more notifications, the thread will hang indefinitely until a spurious wakeup3. This sometimes mistakenly happen if you forgot to acquire the mutex in the notifying thread, emphasizing the importance of mutex in cvs.

If the notification is never called (and the conditions are met), it does not mean the thread will never wake up. There are two possibilities:

  • Timeout Expire: If you use wait_for or wait_until, cv will wake up the thread after the timeout expires despite the condition not met, and return false so you can handle timeout cases:
if (cv.wait_for(lck, std::chrono::seconds(1), 
                [] { return ready; })) {
    // Condition met
} else {
    // Timeout
}

if (cv.wait_until(lck, std::chrono::system_clock::now() + std::chrono::seconds(1),
                  [] { return ready; })) {
    // Condition met
} else {
    // Timeout
}
  • Spurious Wakeup: By definition, a spurious wakeup is a situation where the conditional variable wakes up without being notified4. As a phenomelon driven by platform-specific implementations and OS-level thread scheduling considerations5, it is neither guaranteed nor predictable. Therefore, we should always check for the conditions after waking up (this is why the predicate parameter in the wait function is typically necessary).

Suspend vs. Block

Throughout this article, I have been using the term "suspend" to describe the behavior of a thread when it calls cv.wait instead of "block". I prefer to use "suspend" because it better describes the behavior of this wait. When a thread is suspended, it does not consume any CPU time until the cv is signaled (as long as built with -pthread flag in Linux systems)6. The thread is now unscheduled with its id inserted at the tail of a list of waiting threads. In contrast, "blocking" a thread implies that the thread is performing some operation that is preventing it from running through, such as contention for a lock or waiting for I/O, which consumes more CPU resources. For example, a do-while loop can be considered as blocking the thread:

do {
    // Busy waiting for the condition, blocking the thread.
} while (!ready);

Conclusion

That's a lot to take in! I do spend some time trying to understand this topic, and I hope this article can help you get a general idea of how conditional variables work.

In one word, conditional variables suspend a thread at specific locations, and wait for notifications that instruct it to wake up the thread (condition met) or continue to wait (condition not met).

Footnotes

  1. https://en.cppreference.com/w/cpp/thread/condition_variable

  2. https://stackoverflow.com/a/2763749/10926869

  3. https://leimao.github.io/blog/CPP-Condition-Variable

  4. https://en.wikipedia.org/wiki/Spurious_wakeup

  5. https://stackoverflow.com/a/1461956/10926869

  6. https://stackoverflow.com/a/3966781/10926869

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment