线程池原理#
- 线程是 CPU 调度的基本单位,CPU 创建线程销毁线程需要一定的代价,当不断的创建和销毁线程时可能对系统造成较多的无效开销。因此,通过维持一定数量的线程来处理任务避免频繁创建与销毁线程。
- 线程过多时,线程切换的代价过大,此时多线程会影响性能,因此我们需要将当前运行的线程维持在一定的数量。
- 线程池通常去异步执行耗时任务(不占用核心线程),充分利用多核性能。通常用于执行耗时的 IO 任务 (通常为
2*cpu核心数),如网络请求、文件读写等,以及耗时的计算密集型任务 (通常为1*cpu核心数)。 - 线程池通常采用 生产消费模型 。由生产者线程向队列中 push 任务,消费者线程从队列中取出任务并执行,当队列为空时,阻塞消费者线程。
- 线程池提供了统一管理线程的接口,简化了多线程编程。
线程池实现#
#pragma once
#include <condition_variable>
#include <cstddef>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
template <typename T>
class BlockingQueue {
public:
BlockingQueue(bool nonblock = false) : nonblock_(nonblock) {}
void Push(const T &value) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(value);
not_empty_.notify_one();
}
bool Pop(T &value) {
std::unique_lock<std::mutex> lock(mutex_);
not_empty_.wait(lock, [this]() { return !queue_.empty() || nonblock_; });
if (queue_.empty()) {
return false;
}
value = queue_.front();
queue_.pop();
return true;
}
void Cancel() {
std::lock_guard<std::mutex> lock(mutex_);
nonblock_ = true;
not_empty_.notify_all();
}
private:
bool nonblock_; // 当不需要阻塞时为true
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable not_empty_;
};
class ThreadPool {
public:
explicit ThreadPool(int thread_nums) {
for (size_t i = 0; i < thread_nums; ++i) {
workers_.emplace_back([this]() -> void { Worker(); });
}
}
~ThreadPool() {
task_queue_.Cancel();
for (auto &worker: workers_) {
if(worker.joinable()) {
worker.join();
}
}
}
void Post(std::function<void()> task);
private:
void Worker() {
while (true) {
std::function<void()> task;
if (!task_queue_.Pop(task)) {
break;
}
task();
}
}
BlockingQueue<std::function<void()>> task_queue_;
std::vector<std::thread> workers_;
};
优化方案#
上述代码采用了单个阻塞队列的形式进行任务调度,当任务过多时候生产者消费者线程可能存在过多竞争,因此可以采用两个队列的形式进行优化:将两个队列优化为生产者队列,消费者队列。