From ca824e095e9a0ead367b3f1929c49d40cc21e7e0 Mon Sep 17 00:00:00 2001 From: tamasmeszaros Date: Wed, 6 Dec 2023 17:01:40 +0100 Subject: [PATCH] Trying to fix hanging job tests on windows The issue might be that the worker's m_running flag is not set atomically. It is possible that the finalize message is pushed and is consumed before setting the running flag to false. See the loop in BoostThreadWorker::run --- src/slic3r/GUI/Jobs/BoostThreadWorker.cpp | 6 +-- src/slic3r/GUI/Jobs/BoostThreadWorker.hpp | 47 ++++++++++++++++++++++- src/slic3r/GUI/Jobs/ThreadSafeQueue.hpp | 25 ++++++------ 3 files changed, 62 insertions(+), 16 deletions(-) diff --git a/src/slic3r/GUI/Jobs/BoostThreadWorker.cpp b/src/slic3r/GUI/Jobs/BoostThreadWorker.cpp index f0ebf7baf4..2c3158e4cd 100644 --- a/src/slic3r/GUI/Jobs/BoostThreadWorker.cpp +++ b/src/slic3r/GUI/Jobs/BoostThreadWorker.cpp @@ -43,9 +43,10 @@ void BoostThreadWorker::WorkerMessage::deliver(BoostThreadWorker &runner) void BoostThreadWorker::run() { bool stop = false; + while (!stop) { m_input_queue - .consume_one(BlockingWait{0, &m_running}, [this, &stop](JobEntry &e) { + .consume_one(BlockingWait{0}, [this, &stop](JobEntry &e) { if (!e.job) stop = true; else { @@ -60,7 +61,6 @@ void BoostThreadWorker::run() e.canceled = m_canceled.load(); m_output_queue.push(std::move(e)); // finalization message } - m_running.store(false); }); }; } @@ -83,7 +83,7 @@ std::future BoostThreadWorker::call_on_main_thread(std::function BoostThreadWorker::BoostThreadWorker(std::shared_ptr pri, boost::thread::attributes &attribs, const char * name) - : m_progress(std::move(pri)), m_name{name} + : m_progress(std::move(pri)), m_input_queue{m_running}, m_output_queue{m_running}, m_name{name} { if (m_progress) m_progress->set_cancel_callback([this](){ cancel(); }); diff --git a/src/slic3r/GUI/Jobs/BoostThreadWorker.hpp b/src/slic3r/GUI/Jobs/BoostThreadWorker.hpp index 85bbac4537..823ad9adfe 100644 --- a/src/slic3r/GUI/Jobs/BoostThreadWorker.hpp +++ b/src/slic3r/GUI/Jobs/BoostThreadWorker.hpp @@ -67,8 +67,51 @@ class BoostThreadWorker : public Worker, private Job::Ctl void deliver(BoostThreadWorker &runner); }; - using JobQueue = ThreadSafeQueueSPSC; - using MessageQueue = ThreadSafeQueueSPSC; + template + class RawQueue: public std::deque { + std::atomic *m_running_ptr = nullptr; + + public: + using std::deque::deque; + explicit RawQueue(std::atomic &rflag): m_running_ptr{&rflag} {} + + void set_running() { m_running_ptr->store(true); } + void set_stopped() { m_running_ptr->store(false); } + }; + + template + class RawJobQueue: public RawQueue { + public: + using RawQueue::RawQueue; + void pop_front() + { + RawQueue::pop_front(); + this->set_running(); + } + }; + + template + class RawMsgQueue: public RawQueue { + public: + using RawQueue::RawQueue; + void push_back(El &&entry) + { + this->emplace_back(std::move(entry)); + } + + template + auto & emplace_back(EArgs&&...args) + { + auto &el = RawQueue::emplace_back(std::forward(args)...); + if (el.get_type() == WorkerMessage::Finalize) + this->set_stopped(); + + return el; + } + }; + + using JobQueue = ThreadSafeQueueSPSC; + using MessageQueue = ThreadSafeQueueSPSC; boost::thread m_thread; std::atomic m_running{false}, m_canceled{false}; diff --git a/src/slic3r/GUI/Jobs/ThreadSafeQueue.hpp b/src/slic3r/GUI/Jobs/ThreadSafeQueue.hpp index 94c63a0fd5..df1779c06f 100644 --- a/src/slic3r/GUI/Jobs/ThreadSafeQueue.hpp +++ b/src/slic3r/GUI/Jobs/ThreadSafeQueue.hpp @@ -19,11 +19,6 @@ struct BlockingWait { // Timeout to wait for the arrival of new element into the queue. unsigned timeout_ms = 0; - - // An optional atomic flag to set true if an incoming element gets - // consumed. The flag will be atomically set to true when popping the - // front of the queue. - std::atomic *pop_flag = nullptr; }; // A thread safe queue for one producer and one consumer. @@ -35,10 +30,20 @@ class ThreadSafeQueueSPSC std::queue> m_queue; mutable std::mutex m_mutex; std::condition_variable m_cond_var; + public: + template + ThreadSafeQueueSPSC(Qargs &&...qargs) + : m_queue{Container{std::forward(qargs)...}} {} + + ThreadSafeQueueSPSC(const ThreadSafeQueueSPSC&) = default; + ThreadSafeQueueSPSC(ThreadSafeQueueSPSC&&) = default; + ThreadSafeQueueSPSC& operator=(const ThreadSafeQueueSPSC&) = default; + ThreadSafeQueueSPSC& operator=(ThreadSafeQueueSPSC &&) = default; // Consume one element, block if the queue is empty. - template bool consume_one(const BlockingWait &blkw, Fn &&fn) + template + bool consume_one(const BlockingWait &blkw, Fn &&fn) { static_assert(!std::is_reference_v, ""); static_assert(std::is_default_constructible_v, ""); @@ -63,13 +68,10 @@ public: el = m_queue.front(); m_queue.pop(); - - if (blkw.pop_flag) - // The optional flag is set before the lock us unlocked. - blkw.pop_flag->store(true); } fn(el); + return true; } @@ -96,7 +98,8 @@ public: } // Push element into the queue. - template void push(TArgs&&...el) + template + void push(TArgs&&...el) { std::lock_guard lk{m_mutex}; m_queue.emplace(std::forward(el)...);