8#include <condition_variable>
17 template <
typename T,
typename Alloc>
18 void regulate(std::deque<T, Alloc>& queue)
const {
19 if (queue.size() < max_size) {
23 const size_t num_erase = queue.size() - max_size;
25 queue.erase(queue.begin(), queue.begin() + num_erase);
27 queue.erase(queue.end() - num_erase, queue.end());
35 const size_t max_size = std::numeric_limits<size_t>::max();
36 const bool pop_front =
true;
47template <
typename T,
typename Alloc = std::allocator<T>>
52 void submit_end_of_data() {
58 std::lock_guard<std::mutex> lock(mutex);
59 return values.empty();
63 std::lock_guard<std::mutex> lock(mutex);
68 std::lock_guard<std::mutex> lock(mutex);
72 void push_back(
const T& value) {
73 std::lock_guard<std::mutex> lock(mutex);
74 values.push_back(value);
75 policy.regulate(values);
80 std::lock_guard<std::mutex> lock(mutex);
85 std::lock_guard<std::mutex> lock(mutex);
86 return values.front();
90 std::lock_guard<std::mutex> lock(mutex);
98 template <
typename Container>
99 void insert(
const Container& new_values) {
100 if (new_values.empty()) {
104 std::lock_guard<std::mutex> lock(mutex);
105 values.insert(values.end(), new_values.begin(), new_values.end());
106 policy.regulate(values);
115 std::lock_guard<std::mutex> lock(mutex);
116 if (values.empty()) {
120 const T data = values.front();
131 std::unique_lock<std::mutex> lock(mutex);
133 std::optional<T> data;
134 cond.wait(lock, [
this, &data] {
135 if (values.empty()) {
136 return static_cast<bool>(end_of_data);
139 data = values.front();
153 std::unique_lock<std::mutex> lock(mutex);
155 std::vector<T, Alloc> buffer;
156 cond.wait(lock, [
this, &buffer] {
157 if (values.empty()) {
158 return static_cast<bool>(end_of_data);
161 buffer.assign(values.begin(), values.end());
174 std::vector<T, Alloc> buffer;
175 std::lock_guard<std::mutex> lock(mutex);
176 buffer.assign(values.begin(), values.end());
188 std::vector<T, Alloc> buffer;
189 std::lock_guard<std::mutex> lock(mutex);
190 if (values.size() <= num_max) {
191 buffer.assign(values.begin(), values.end());
194 buffer.assign(values.begin(), values.begin() + num_max);
195 values.erase(values.begin(), values.begin() + num_max);
204 std::atomic_bool end_of_data;
205 std::condition_variable cond;
207 mutable std::mutex mutex;
208 std::deque<T, Alloc> values;
Simple thread-safe vector with mutex-lock.
Definition concurrent_vector.hpp:48
std::optional< T > pop()
Get the first element in the queue.
Definition concurrent_vector.hpp:114
std::vector< T, Alloc > get_all_and_clear_wait()
Get all the data and clear the container. If the queue is empty, this method waits until a new data a...
Definition concurrent_vector.hpp:152
std::vector< T, Alloc > get_and_clear(int num_max)
Get up to N data and erase them from the container.
Definition concurrent_vector.hpp:187
std::optional< T > pop_wait()
Get the first element in the queue. If the queue is empty, this method waits until a new data arrives...
Definition concurrent_vector.hpp:130
std::vector< T, Alloc > get_all_and_clear()
Get all the data and clear the container.
Definition concurrent_vector.hpp:173
void insert(const Container &new_values)
Insert new_values at the end of the container.
Definition concurrent_vector.hpp:99
Queue data policy.
Definition concurrent_vector.hpp:15