Thread Pool Principles#
- Threads are the basic unit of CPU scheduling. Creating and destroying threads incurs a certain cost, and constantly creating and destroying threads can lead to significant overhead for the system. Therefore, maintaining a certain number of threads to handle tasks avoids frequent creation and destruction of threads.
- When there are too many threads, the cost of thread switching becomes too high, which can affect performance in a multithreaded environment. Thus, we need to keep the number of currently running threads at a certain level.
- Thread pools are typically used to asynchronously execute time-consuming tasks (without occupying core threads), making full use of multicore performance. They are usually employed for time-consuming I/O tasks (typically
2*cpucore count), such as network requests, file reading and writing, as well as time-consuming compute-intensive tasks (typically1*cpucore count). - Thread pools generally adopt a producer-consumer model. Producer threads push tasks into the queue, while consumer threads take tasks from the queue and execute them. When the queue is empty, the consumer threads are blocked.
- Thread pools provide a unified interface for managing threads, simplifying multithreaded programming.
Thread Pool Implementation#
#pragma once
#include <condition_variable>
#include <cstddef>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
template <typename T>
class BlockingQueue {
public:
BlockingQueue(bool nonblock = false) : nonblock_(nonblock) {}
void Push(const T &value) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(value);
not_empty_.notify_one();
}
bool Pop(T &value) {
std::unique_lock<std::mutex> lock(mutex_);
not_empty_.wait(lock, [this]() { return !queue_.empty() || nonblock_; });
if (queue_.empty()) {
return false;
}
value = queue_.front();
queue_.pop();
return true;
}
void Cancel() {
std::lock_guard<std::mutex> lock(mutex_);
nonblock_ = true;
not_empty_.notify_all();
}
private:
bool nonblock_; // true when non-blocking is needed
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable not_empty_;
};
class ThreadPool {
public:
explicit ThreadPool(int thread_nums) {
for (size_t i = 0; i < thread_nums; ++i) {
workers_.emplace_back([this]() -> void { Worker(); });
}
}
~ThreadPool() {
task_queue_.Cancel();
for (auto &worker: workers_) {
if(worker.joinable()) {
worker.join();
}
}
}
void Post(std::function<void()> task);
private:
void Worker() {
while (true) {
std::function<void()> task;
if (!task_queue_.Pop(task)) {
break;
}
task();
}
}
BlockingQueue<std::function<void()>> task_queue_;
std::vector<std::thread> workers_;
};
Optimization Plan#
The above code uses a single blocking queue for task scheduling. When there are too many tasks, the producer and consumer threads may experience excessive competition. Therefore, an optimization can be made by using two queues: a producer queue and a consumer queue.