Replace pointers by values or unique_ptr for better leak-safety

This commit is contained in:
Christoph Hertzberg 2018-08-23 19:41:59 +02:00
parent 39335cf51e
commit a709c8efb4

View File

@ -58,11 +58,9 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
coprimes_.push_back(i);
}
}
queues_.resize(num_threads_);
for (int i = 0; i < num_threads_; i++) {
queues_.push_back(new Queue());
}
for (int i = 0; i < num_threads_; i++) {
threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); }));
threads_.emplace_back(env_.CreateThread([this, i]() { WorkerLoop(i); }));
}
}
@ -78,13 +76,12 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
// Since we were cancelled, there might be entries in the queues.
// Empty them to prevent their destructor from asserting.
for (size_t i = 0; i < queues_.size(); i++) {
queues_[i]->Flush();
queues_[i].Flush();
}
}
// Join threads explicitly to avoid destruction order issues.
for (size_t i = 0; i < num_threads_; i++) delete threads_[i];
for (size_t i = 0; i < num_threads_; i++) delete queues_[i];
threads_.resize(0);
}
void Schedule(std::function<void()> fn) {
@ -92,13 +89,13 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
PerThread* pt = GetPerThread();
if (pt->pool == this) {
// Worker thread of this pool, push onto the thread's queue.
Queue* q = queues_[pt->thread_id];
t = q->PushFront(std::move(t));
Queue& q = queues_[pt->thread_id];
t = q.PushFront(std::move(t));
} else {
// A free-standing thread (or worker of another pool), push onto a random
// queue.
Queue* q = queues_[Rand(&pt->rand) % queues_.size()];
t = q->PushBack(std::move(t));
Queue& q = queues_[Rand(&pt->rand) % queues_.size()];
t = q.PushBack(std::move(t));
}
// Note: below we touch this after making w available to worker threads.
// Strictly speaking, this can lead to a racy-use-after-free. Consider that
@ -157,8 +154,8 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
Environment env_;
const int num_threads_;
const bool allow_spinning_;
MaxSizeVector<Thread*> threads_;
MaxSizeVector<Queue*> queues_;
MaxSizeVector<std::unique_ptr<Thread> > threads_;
MaxSizeVector<Queue> queues_;
MaxSizeVector<unsigned> coprimes_;
MaxSizeVector<EventCount::Waiter> waiters_;
std::atomic<unsigned> blocked_;
@ -173,7 +170,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
pt->pool = this;
pt->rand = std::hash<std::thread::id>()(std::this_thread::get_id());
pt->thread_id = thread_id;
Queue* q = queues_[thread_id];
Queue& q = queues_[thread_id];
EventCount::Waiter* waiter = &waiters_[thread_id];
// TODO(dvyukov,rmlarsen): The time spent in Steal() is proportional
// to num_threads_ and we assume that new work is scheduled at a
@ -189,10 +186,10 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
// counter-productive for the types of I/O workloads the single thread
// pools tend to be used for.
while (!cancelled_) {
Task t = q->PopFront();
Task t = q.PopFront();
for (int i = 0; i < spin_count && !t.f; i++) {
if (!cancelled_.load(std::memory_order_relaxed)) {
t = q->PopFront();
t = q.PopFront();
}
}
if (!t.f) {
@ -206,7 +203,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
}
} else {
while (!cancelled_) {
Task t = q->PopFront();
Task t = q.PopFront();
if (!t.f) {
t = Steal();
if (!t.f) {
@ -243,7 +240,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
unsigned inc = coprimes_[r % coprimes_.size()];
unsigned victim = r % size;
for (unsigned i = 0; i < size; i++) {
Task t = queues_[victim]->PopBack();
Task t = queues_[victim].PopBack();
if (t.f) {
return t;
}
@ -270,7 +267,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
if (cancelled_) {
return false;
} else {
*t = queues_[victim]->PopBack();
*t = queues_[victim].PopBack();
return true;
}
}
@ -278,7 +275,8 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
// If we are shutting down and all worker threads blocked without work,
// that's we are done.
blocked_++;
if (done_ && blocked_ == num_threads_) {
// TODO is blocked_ required to be unsigned?
if (done_ && blocked_ == static_cast<unsigned>(num_threads_)) {
ec_.CancelWait(waiter);
// Almost done, but need to re-check queues.
// Consider that all queues are empty and all worker threads are preempted
@ -311,7 +309,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
unsigned inc = coprimes_[r % coprimes_.size()];
unsigned victim = r % size;
for (unsigned i = 0; i < size; i++) {
if (!queues_[victim]->Empty()) {
if (!queues_[victim].Empty()) {
return victim;
}
victim += inc;