Add comments, remove some redundant template fiddling

Also make ThreadSafeQueueSPSC not copyable and non movable
This commit is contained in:
tamasmeszaros 2023-12-07 15:42:17 +01:00
parent 155b152637
commit 7975dfab26
2 changed files with 14 additions and 10 deletions

View File

@ -67,6 +67,11 @@ class BoostThreadWorker : public Worker, private Job::Ctl
void deliver(BoostThreadWorker &runner);
};
// The m_running state flag needs special attention. Previously, it was set simply in the run()
// method whenever a new job was taken from the input queue and unset after the finalize message
// was pushed into the output queue. This was not correct. It must not be possible to consume
// the finalize message before the flag gets unset, these two operations must be done atomically
// So the underlying queues are here extended to support handling of this m_running flag.
template<class El>
class RawQueue: public std::deque<El> {
std::atomic<bool> *m_running_ptr;
@ -79,6 +84,7 @@ class BoostThreadWorker : public Worker, private Job::Ctl
void set_stopped() { m_running_ptr->store(false); }
};
// The running flag is set if a job is popped from the queue
template<class El>
class RawJobQueue: public RawQueue<El> {
public:
@ -90,6 +96,7 @@ class BoostThreadWorker : public Worker, private Job::Ctl
}
};
// The running flag is unset when the finalize message is pushed into the queue
template<class El>
class RawMsgQueue: public RawQueue<El> {
public:

View File

@ -21,10 +21,6 @@ struct BlockingWait
unsigned timeout_ms = 0;
};
template<class T, class... Args>
using NonSpecialMembersOnly = std::enable_if_t<
(sizeof...(Args) >= 1) && !(... || std::is_convertible_v<Args, T>)>;
// A thread safe queue for one producer and one consumer.
template<class T,
template<class, class...> class Container = std::deque,
@ -36,15 +32,16 @@ class ThreadSafeQueueSPSC
std::condition_variable m_cond_var;
public:
template<class...Qargs, class = NonSpecialMembersOnly<ThreadSafeQueueSPSC, Qargs...>>
// Forward arguments to the underlying queue
template<class...Qargs>
ThreadSafeQueueSPSC(Qargs &&...qargs)
: m_queue{Container<T, ContainerArgs...>{std::forward<Qargs>(qargs)...}} {}
ThreadSafeQueueSPSC() = default;
ThreadSafeQueueSPSC(const ThreadSafeQueueSPSC&) = default;
ThreadSafeQueueSPSC(ThreadSafeQueueSPSC&&) = default;
ThreadSafeQueueSPSC& operator=(const ThreadSafeQueueSPSC&) = default;
ThreadSafeQueueSPSC& operator=(ThreadSafeQueueSPSC &&) = default;
ThreadSafeQueueSPSC(const ThreadSafeQueueSPSC&) = delete;
ThreadSafeQueueSPSC(ThreadSafeQueueSPSC&&) = delete;
ThreadSafeQueueSPSC& operator=(const ThreadSafeQueueSPSC&) = delete;
ThreadSafeQueueSPSC& operator=(ThreadSafeQueueSPSC &&) = delete;
// Consume one element, block if the queue is empty.
template<class Fn>