GLIM
Loading...
Searching...
No Matches
concurrent_vector.hpp
1#pragma once
2
3#include <deque>
4#include <vector>
5#include <mutex>
6#include <atomic>
7#include <optional>
8#include <condition_variable>
9
10namespace glim {
11
16public:
17 template <typename T, typename Alloc>
18 void regulate(std::deque<T, Alloc>& queue) const {
19 if (queue.size() < max_size) {
20 return;
21 }
22
23 const size_t num_erase = queue.size() - max_size;
24 if (pop_front) {
25 queue.erase(queue.begin(), queue.begin() + num_erase);
26 } else {
27 queue.erase(queue.end() - num_erase, queue.end());
28 }
29 }
30
31 static DataStorePolicy UNLIMITED() { return DataStorePolicy(); }
32 static DataStorePolicy UPTO(const size_t max_size, const bool pop_front = true) { return DataStorePolicy{max_size, pop_front}; }
33
34public:
35 const size_t max_size = std::numeric_limits<size_t>::max();
36 const bool pop_front = true;
37};
38
47template <typename T, typename Alloc = std::allocator<T>>
49public:
50 ConcurrentVector(const DataStorePolicy& policy = DataStorePolicy::UNLIMITED()) : policy(policy) { end_of_data = false; }
51
52 void submit_end_of_data() {
53 end_of_data = true;
54 cond.notify_all();
55 }
56
57 bool empty() const {
58 std::lock_guard<std::mutex> lock(mutex);
59 return values.empty();
60 }
61
62 int size() const {
63 std::lock_guard<std::mutex> lock(mutex);
64 return values.size();
65 }
66
67 void reserve(int n) {
68 std::lock_guard<std::mutex> lock(mutex);
69 values.reserve(n);
70 }
71
72 void push_back(const T& value) {
73 std::lock_guard<std::mutex> lock(mutex);
74 values.push_back(value);
75 policy.regulate(values);
76 cond.notify_one();
77 }
78
79 void clear() {
80 std::lock_guard<std::mutex> lock(mutex);
81 values.clear();
82 }
83
84 T front() const {
85 std::lock_guard<std::mutex> lock(mutex);
86 return values.front();
87 }
88
89 T back() const {
90 std::lock_guard<std::mutex> lock(mutex);
91 return values.back();
92 }
93
98 template <typename Container>
99 void insert(const Container& new_values) {
100 if (new_values.empty()) {
101 return;
102 }
103
104 std::lock_guard<std::mutex> lock(mutex);
105 values.insert(values.end(), new_values.begin(), new_values.end());
106 policy.regulate(values);
107 cond.notify_all();
108 }
109
114 std::optional<T> pop() {
115 std::lock_guard<std::mutex> lock(mutex);
116 if (values.empty()) {
117 return std::nullopt;
118 }
119
120 const T data = values.front();
121 values.pop_front();
122 return data;
123 }
124
130 std::optional<T> pop_wait() {
131 std::unique_lock<std::mutex> lock(mutex);
132
133 std::optional<T> data;
134 cond.wait(lock, [this, &data] {
135 if (values.empty()) {
136 return static_cast<bool>(end_of_data);
137 }
138
139 data = values.front();
140 values.pop_front();
141 return true;
142 });
143
144 return data;
145 }
146
152 std::vector<T, Alloc> get_all_and_clear_wait() {
153 std::unique_lock<std::mutex> lock(mutex);
154
155 std::vector<T, Alloc> buffer;
156 cond.wait(lock, [this, &buffer] {
157 if (values.empty()) {
158 return static_cast<bool>(end_of_data);
159 }
160
161 buffer.assign(values.begin(), values.end());
162 values.clear();
163 return true;
164 });
165
166 return buffer;
167 }
168
173 std::vector<T, Alloc> get_all_and_clear() {
174 std::vector<T, Alloc> buffer;
175 std::lock_guard<std::mutex> lock(mutex);
176 buffer.assign(values.begin(), values.end());
177 values.clear();
178
179 return buffer;
180 }
181
187 std::vector<T, Alloc> get_and_clear(int num_max) {
188 std::vector<T, Alloc> buffer;
189 std::lock_guard<std::mutex> lock(mutex);
190 if (values.size() <= num_max) {
191 buffer.assign(values.begin(), values.end());
192 values.clear();
193 } else {
194 buffer.assign(values.begin(), values.begin() + num_max);
195 values.erase(values.begin(), values.begin() + num_max);
196 }
197
198 return buffer;
199 }
200
201private:
202 const DataStorePolicy policy;
203
204 std::atomic_bool end_of_data;
205 std::condition_variable cond;
206
207 mutable std::mutex mutex;
208 std::deque<T, Alloc> values;
209};
210
211} // namespace glim
Simple thread-safe vector with mutex-lock.
Definition concurrent_vector.hpp:48
std::optional< T > pop()
Get the first element in the queue.
Definition concurrent_vector.hpp:114
std::vector< T, Alloc > get_all_and_clear_wait()
Get all the data and clear the container. If the queue is empty, this method waits until a new data a...
Definition concurrent_vector.hpp:152
std::vector< T, Alloc > get_and_clear(int num_max)
Get up to N data and erase them from the container.
Definition concurrent_vector.hpp:187
std::optional< T > pop_wait()
Get the first element in the queue. If the queue is empty, this method waits until a new data arrives...
Definition concurrent_vector.hpp:130
std::vector< T, Alloc > get_all_and_clear()
Get all the data and clear the container.
Definition concurrent_vector.hpp:173
void insert(const Container &new_values)
Insert new_values at the end of the container.
Definition concurrent_vector.hpp:99
Queue data policy.
Definition concurrent_vector.hpp:15