From 407dc917f170cc9d08f3f1f9bdeb9e44ddebc653 Mon Sep 17 00:00:00 2001 From: Morph <39850852+Morph1984@users.noreply.github.com> Date: Sun, 19 Mar 2023 14:24:18 -0400 Subject: bounded_threadsafe_queue: Deduplicate and add PushModes Adds the PushModes Try and Wait to allow producers to specify how they want to push their data to the queue if the queue is full. If the queue is full: - Try will fail to push to the queue, returning false. Try only returns true if it successfully pushes to the queue. This may result in items not being pushed into the queue. - Wait will wait until a slot is available to push to the queue, resulting in potential for deadlock if a consumer is not running. --- src/common/bounded_threadsafe_queue.h | 170 +++++++++++++++++----------------- src/common/logging/backend.cpp | 2 +- 2 files changed, 85 insertions(+), 87 deletions(-) (limited to 'src/common') diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h index eb88cc1d1..975215863 100644 --- a/src/common/bounded_threadsafe_queue.h +++ b/src/common/bounded_threadsafe_queue.h @@ -23,60 +23,76 @@ class SPSCQueue { public: bool TryPush(T&& t) { - const size_t write_index = m_write_index.load(); - - // Check if we have free slots to write to. - if ((write_index - m_read_index.load()) == Capacity) { - return false; - } - - // Determine the position to write to. - const size_t pos = write_index % Capacity; - - // Push into the queue. - m_data[pos] = std::move(t); - - // Increment the write index. - ++m_write_index; + return Push(std::move(t)); + } - // Notify the consumer that we have pushed into the queue. - std::scoped_lock lock{cv_mutex}; - cv.notify_one(); + template + bool TryEmplace(Args&&... args) { + return Emplace(std::forward(args)...); + } - return true; + void PushWait(T&& t) { + Push(std::move(t)); } template - bool TryPush(Args&&... args) { - const size_t write_index = m_write_index.load(); + void EmplaceWait(Args&&... args) { + Emplace(std::forward(args)...); + } - // Check if we have free slots to write to. - if ((write_index - m_read_index.load()) == Capacity) { - return false; - } + bool TryPop(T& t) { + return Pop(t); + } - // Determine the position to write to. - const size_t pos = write_index % Capacity; + void PopWait(T& t, std::stop_token stop_token) { + Wait(stop_token); + Pop(t); + } - // Emplace into the queue. - std::construct_at(std::addressof(m_data[pos]), std::forward(args)...); + T PopWait(std::stop_token stop_token) { + Wait(stop_token); + T t; + Pop(t); + return t; + } - // Increment the write index. - ++m_write_index; + void Clear() { + while (!Empty()) { + Pop(); + } + } - // Notify the consumer that we have pushed into the queue. - std::scoped_lock lock{cv_mutex}; - cv.notify_one(); + bool Empty() const { + return m_read_index.load() == m_write_index.load(); + } - return true; + size_t Size() const { + return m_write_index.load() - m_read_index.load(); } - void Push(T&& t) { +private: + enum class PushMode { + Try, + Wait, + Count, + }; + + template + bool Push(T&& t) { const size_t write_index = m_write_index.load(); - // Wait until we have free slots to write to. - while ((write_index - m_read_index.load()) == Capacity) { - std::this_thread::yield(); + if constexpr (Mode == PushMode::Try) { + // Check if we have free slots to write to. + if ((write_index - m_read_index.load()) == Capacity) { + return false; + } + } else if constexpr (Mode == PushMode::Wait) { + // Wait until we have free slots to write to. + while ((write_index - m_read_index.load()) == Capacity) { + std::this_thread::yield(); + } + } else { + static_assert(Mode < PushMode::Count, "Invalid PushMode."); } // Determine the position to write to. @@ -91,15 +107,26 @@ public: // Notify the consumer that we have pushed into the queue. std::scoped_lock lock{cv_mutex}; cv.notify_one(); + + return true; } - template - void Push(Args&&... args) { + template + bool Emplace(Args&&... args) { const size_t write_index = m_write_index.load(); - // Wait until we have free slots to write to. - while ((write_index - m_read_index.load()) == Capacity) { - std::this_thread::yield(); + if constexpr (Mode == PushMode::Try) { + // Check if we have free slots to write to. + if ((write_index - m_read_index.load()) == Capacity) { + return false; + } + } else if constexpr (Mode == PushMode::Wait) { + // Wait until we have free slots to write to. + while ((write_index - m_read_index.load()) == Capacity) { + std::this_thread::yield(); + } + } else { + static_assert(Mode < PushMode::Count, "Invalid PushMode."); } // Determine the position to write to. @@ -114,39 +141,10 @@ public: // Notify the consumer that we have pushed into the queue. std::scoped_lock lock{cv_mutex}; cv.notify_one(); - } - - bool TryPop(T& t) { - return Pop(t); - } - - void PopWait(T& t, std::stop_token stop_token) { - Wait(stop_token); - Pop(t); - } - - T PopWait(std::stop_token stop_token) { - Wait(stop_token); - T t; - Pop(t); - return t; - } - void Clear() { - while (!Empty()) { - Pop(); - } - } - - bool Empty() const { - return m_read_index.load() == m_write_index.load(); - } - - size_t Size() const { - return m_write_index.load() - m_read_index.load(); + return true; } -private: void Pop() { const size_t read_index = m_read_index.load(); @@ -208,20 +206,20 @@ public: } template - bool TryPush(Args&&... args) { + bool TryEmplace(Args&&... args) { std::scoped_lock lock{write_mutex}; - return spsc_queue.TryPush(std::forward(args)...); + return spsc_queue.TryEmplace(std::forward(args)...); } - void Push(T&& t) { + void PushWait(T&& t) { std::scoped_lock lock{write_mutex}; - spsc_queue.Push(std::move(t)); + spsc_queue.PushWait(std::move(t)); } template - void Push(Args&&... args) { + void EmplaceWait(Args&&... args) { std::scoped_lock lock{write_mutex}; - spsc_queue.Push(std::forward(args)...); + spsc_queue.EmplaceWait(std::forward(args)...); } bool TryPop(T& t) { @@ -262,20 +260,20 @@ public: } template - bool TryPush(Args&&... args) { + bool TryEmplace(Args&&... args) { std::scoped_lock lock{write_mutex}; - return spsc_queue.TryPush(std::forward(args)...); + return spsc_queue.TryEmplace(std::forward(args)...); } - void Push(T&& t) { + void PushWait(T&& t) { std::scoped_lock lock{write_mutex}; - spsc_queue.Push(std::move(t)); + spsc_queue.PushWait(std::move(t)); } template - void Push(Args&&... args) { + void EmplaceWait(Args&&... args) { std::scoped_lock lock{write_mutex}; - spsc_queue.Push(std::forward(args)...); + spsc_queue.EmplaceWait(std::forward(args)...); } bool TryPop(T& t) { diff --git a/src/common/logging/backend.cpp b/src/common/logging/backend.cpp index e1ce9db99..f96c7c222 100644 --- a/src/common/logging/backend.cpp +++ b/src/common/logging/backend.cpp @@ -207,7 +207,7 @@ public: if (!filter.CheckMessage(log_class, log_level)) { return; } - message_queue.Push( + message_queue.EmplaceWait( CreateEntry(log_class, log_level, filename, line_num, function, std::move(message))); } -- cgit v1.2.3