rknfish

rknfish

i need more love
github

面接のクラシック - 手書きスレッドプール

スレッドプールの原理#

  1. スレッドは CPU スケジューリングの基本単位であり、CPU がスレッドを作成・破棄するには一定のコストがかかります。スレッドを頻繁に作成・破棄すると、システムに多くの無駄なオーバーヘッドを引き起こす可能性があります。したがって、一定数のスレッドを維持してタスクを処理することで頻繁なスレッドの作成と破棄を避けます
  2. スレッドが多すぎると、スレッド切り替えのコストが大きくなります。この場合、マルチスレッドはパフォーマンスに影響を与えるため、現在実行中のスレッドを一定の数に維持する必要があります。
  3. スレッドプールは通常、非同期に時間のかかるタスクを実行します(コアスレッドを占有しない)、マルチコアの性能を十分に活用します。通常、時間のかかる IO タスク(通常は 2*cpu コア数)を実行するために使用され、ネットワークリクエスト、ファイルの読み書きなど、また時間のかかる計算集約型タスク(通常は 1*cpu コア数)にも使用されます。
  4. スレッドプールは通常、生産消費モデルを採用しています。生産者スレッドがキューにタスクをプッシュし、消費者スレッドがキューからタスクを取り出して実行します。キューが空のときは、消費者スレッドがブロックされます。
  5. スレッドプールはスレッドを統一的に管理するインターフェースを提供し、マルチスレッドプログラミングを簡素化します。

スレッドプールの実装#

#pragma once

#include <condition_variable>
#include <cstddef>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

template <typename T> 
class BlockingQueue {
public:
  BlockingQueue(bool nonblock = false) : nonblock_(nonblock) {}

  void Push(const T &value) {
    std::lock_guard<std::mutex> lock(mutex_);
    queue_.push(value);
    not_empty_.notify_one();
  }

  bool Pop(T &value) {
    std::unique_lock<std::mutex> lock(mutex_);
    not_empty_.wait(lock, [this]() { return !queue_.empty() || nonblock_; });
    if (queue_.empty()) {
      return false;
    }

    value = queue_.front();
    queue_.pop();

    return true;
  }

  void Cancel() {
    std::lock_guard<std::mutex> lock(mutex_);
    nonblock_ = true;
    not_empty_.notify_all();
  }

private:
  bool nonblock_; // ブロックが必要ない場合はtrue
  std::queue<T> queue_;
  std::mutex mutex_;
  std::condition_variable not_empty_;
};

class ThreadPool {

public:
  explicit ThreadPool(int thread_nums) {
    for (size_t i = 0; i < thread_nums; ++i) {
      workers_.emplace_back([this]() -> void { Worker(); });
    }
  }

  ~ThreadPool() {
    task_queue_.Cancel();
    for (auto &worker: workers_) {
      if(worker.joinable()) {
        worker.join();
      } 
    }
  }

  void Post(std::function<void()> task);

private:
  void Worker() {
    while (true) {
      std::function<void()> task;
      if (!task_queue_.Pop(task)) {
        break;
      }
      task();
    }
  }

  BlockingQueue<std::function<void()>> task_queue_;
  std::vector<std::thread> workers_;
};

最適化案#

上記のコードは、単一のブロッキングキューの形式でタスクスケジューリングを行っていますが、タスクが多すぎると生産者と消費者スレッドの競争が激しくなる可能性があります。したがって、2 つのキューの形式に最適化することができます:生産者キューと消費者キューに分けることです。

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。