Trying to fix hanging job tests on windows

The issue might be that the worker's m_running flag is not set atomically. It is possible that the finalize message is pushed and is consumed before setting the running flag to false. See the loop in BoostThreadWorker::run
This commit is contained in:
tamasmeszaros 2023-12-06 17:01:40 +01:00
parent 4bbe6e21d0
commit ca824e095e
3 changed files with 62 additions and 16 deletions

View File

@ -43,9 +43,10 @@ void BoostThreadWorker::WorkerMessage::deliver(BoostThreadWorker &runner)
void BoostThreadWorker::run() void BoostThreadWorker::run()
{ {
bool stop = false; bool stop = false;
while (!stop) { while (!stop) {
m_input_queue m_input_queue
.consume_one(BlockingWait{0, &m_running}, [this, &stop](JobEntry &e) { .consume_one(BlockingWait{0}, [this, &stop](JobEntry &e) {
if (!e.job) if (!e.job)
stop = true; stop = true;
else { else {
@ -60,7 +61,6 @@ void BoostThreadWorker::run()
e.canceled = m_canceled.load(); e.canceled = m_canceled.load();
m_output_queue.push(std::move(e)); // finalization message m_output_queue.push(std::move(e)); // finalization message
} }
m_running.store(false);
}); });
}; };
} }
@ -83,7 +83,7 @@ std::future<void> BoostThreadWorker::call_on_main_thread(std::function<void ()>
BoostThreadWorker::BoostThreadWorker(std::shared_ptr<ProgressIndicator> pri, BoostThreadWorker::BoostThreadWorker(std::shared_ptr<ProgressIndicator> pri,
boost::thread::attributes &attribs, boost::thread::attributes &attribs,
const char * name) const char * name)
: m_progress(std::move(pri)), m_name{name} : m_progress(std::move(pri)), m_input_queue{m_running}, m_output_queue{m_running}, m_name{name}
{ {
if (m_progress) if (m_progress)
m_progress->set_cancel_callback([this](){ cancel(); }); m_progress->set_cancel_callback([this](){ cancel(); });

View File

@ -67,8 +67,51 @@ class BoostThreadWorker : public Worker, private Job::Ctl
void deliver(BoostThreadWorker &runner); void deliver(BoostThreadWorker &runner);
}; };
using JobQueue = ThreadSafeQueueSPSC<JobEntry>; template<class El>
using MessageQueue = ThreadSafeQueueSPSC<WorkerMessage>; class RawQueue: public std::deque<El> {
std::atomic<bool> *m_running_ptr = nullptr;
public:
using std::deque<El>::deque;
explicit RawQueue(std::atomic<bool> &rflag): m_running_ptr{&rflag} {}
void set_running() { m_running_ptr->store(true); }
void set_stopped() { m_running_ptr->store(false); }
};
template<class El>
class RawJobQueue: public RawQueue<El> {
public:
using RawQueue<El>::RawQueue;
void pop_front()
{
RawQueue<El>::pop_front();
this->set_running();
}
};
template<class El>
class RawMsgQueue: public RawQueue<El> {
public:
using RawQueue<El>::RawQueue;
void push_back(El &&entry)
{
this->emplace_back(std::move(entry));
}
template<class...EArgs>
auto & emplace_back(EArgs&&...args)
{
auto &el = RawQueue<El>::emplace_back(std::forward<EArgs>(args)...);
if (el.get_type() == WorkerMessage::Finalize)
this->set_stopped();
return el;
}
};
using JobQueue = ThreadSafeQueueSPSC<JobEntry, RawJobQueue>;
using MessageQueue = ThreadSafeQueueSPSC<WorkerMessage, RawMsgQueue>;
boost::thread m_thread; boost::thread m_thread;
std::atomic<bool> m_running{false}, m_canceled{false}; std::atomic<bool> m_running{false}, m_canceled{false};

View File

@ -19,11 +19,6 @@ struct BlockingWait
{ {
// Timeout to wait for the arrival of new element into the queue. // Timeout to wait for the arrival of new element into the queue.
unsigned timeout_ms = 0; unsigned timeout_ms = 0;
// An optional atomic flag to set true if an incoming element gets
// consumed. The flag will be atomically set to true when popping the
// front of the queue.
std::atomic<bool> *pop_flag = nullptr;
}; };
// A thread safe queue for one producer and one consumer. // A thread safe queue for one producer and one consumer.
@ -35,10 +30,20 @@ class ThreadSafeQueueSPSC
std::queue<T, Container<T, ContainerArgs...>> m_queue; std::queue<T, Container<T, ContainerArgs...>> m_queue;
mutable std::mutex m_mutex; mutable std::mutex m_mutex;
std::condition_variable m_cond_var; std::condition_variable m_cond_var;
public: public:
template<class...Qargs>
ThreadSafeQueueSPSC(Qargs &&...qargs)
: m_queue{Container<T, ContainerArgs...>{std::forward<Qargs>(qargs)...}} {}
ThreadSafeQueueSPSC(const ThreadSafeQueueSPSC&) = default;
ThreadSafeQueueSPSC(ThreadSafeQueueSPSC&&) = default;
ThreadSafeQueueSPSC& operator=(const ThreadSafeQueueSPSC&) = default;
ThreadSafeQueueSPSC& operator=(ThreadSafeQueueSPSC &&) = default;
// Consume one element, block if the queue is empty. // Consume one element, block if the queue is empty.
template<class Fn> bool consume_one(const BlockingWait &blkw, Fn &&fn) template<class Fn>
bool consume_one(const BlockingWait &blkw, Fn &&fn)
{ {
static_assert(!std::is_reference_v<T>, ""); static_assert(!std::is_reference_v<T>, "");
static_assert(std::is_default_constructible_v<T>, ""); static_assert(std::is_default_constructible_v<T>, "");
@ -63,13 +68,10 @@ public:
el = m_queue.front(); el = m_queue.front();
m_queue.pop(); m_queue.pop();
if (blkw.pop_flag)
// The optional flag is set before the lock us unlocked.
blkw.pop_flag->store(true);
} }
fn(el); fn(el);
return true; return true;
} }
@ -96,7 +98,8 @@ public:
} }
// Push element into the queue. // Push element into the queue.
template<class...TArgs> void push(TArgs&&...el) template<class...TArgs>
void push(TArgs&&...el)
{ {
std::lock_guard lk{m_mutex}; std::lock_guard lk{m_mutex};
m_queue.emplace(std::forward<TArgs>(el)...); m_queue.emplace(std::forward<TArgs>(el)...);