线程池原理#
- 線程是 CPU 調度的基本單位,CPU 創建線程銷毀線程需要一定的代價,當不斷的創建和銷毀線程時可能對系統造成較多的無效開銷。因此,通過維持一定數量的線程來處理任務避免頻繁創建與銷毀線程。
- 線程過多時,線程切換的代價過大,此時多線程會影響性能,因此我們需要將當前運行的線程維持在一定的數量。
- 線程池通常去異步執行耗時任務(不佔用核心線程),充分利用多核性能。通常用於執行耗時的 IO 任務 (通常為
2*cpu核心數),如網絡請求、文件讀寫等,以及耗時的計算密集型任務 (通常為1*cpu核心數)。 - 線程池通常採用 生產消費模型 。由生產者線程向隊列中 push 任務,消費者線程從隊列中取出任務並執行,當隊列為空時,阻塞消費者線程。
- 線程池提供了統一管理線程的接口,簡化了多線程編程。
线程池实现#
#pragma once
#include <condition_variable>
#include <cstddef>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
template <typename T>
class BlockingQueue {
public:
BlockingQueue(bool nonblock = false) : nonblock_(nonblock) {}
void Push(const T &value) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(value);
not_empty_.notify_one();
}
bool Pop(T &value) {
std::unique_lock<std::mutex> lock(mutex_);
not_empty_.wait(lock, [this]() { return !queue_.empty() || nonblock_; });
if (queue_.empty()) {
return false;
}
value = queue_.front();
queue_.pop();
return true;
}
void Cancel() {
std::lock_guard<std::mutex> lock(mutex_);
nonblock_ = true;
not_empty_.notify_all();
}
private:
bool nonblock_; // 當不需要阻塞時為true
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable not_empty_;
};
class ThreadPool {
public:
explicit ThreadPool(int thread_nums) {
for (size_t i = 0; i < thread_nums; ++i) {
workers_.emplace_back([this]() -> void { Worker(); });
}
}
~ThreadPool() {
task_queue_.Cancel();
for (auto &worker: workers_) {
if(worker.joinable()) {
worker.join();
}
}
}
void Post(std::function<void()> task);
private:
void Worker() {
while (true) {
std::function<void()> task;
if (!task_queue_.Pop(task)) {
break;
}
task();
}
}
BlockingQueue<std::function<void()>> task_queue_;
std::vector<std::thread> workers_;
};
优化方案#
上述代碼採用了單個阻塞隊列的形式進行任務調度,當任務過多時生產者消費者線程可能存在過多競爭,因此可以採用兩個隊列的形式進行優化:將兩個隊列優化為生產者隊列,消費者隊列。