ReUseX  0.0.1
3D Point Cloud Processing for Building Reuse
Loading...
Searching...
No Matches
cpm.hpp
Go to the documentation of this file.
1#pragma once
2#include <algorithm>
3#include <condition_variable>
4#include <future>
5#include <memory>
6#include <queue>
7#include <thread>
8#include <atomic>
9
10// Comsumer Producer Model
11
13
14template <typename Result, typename Input, typename Model> class Instance {
15 protected:
16 struct Item {
17 Input input;
18 std::shared_ptr<std::promise<Result>> pro;
19 };
20
21 std::condition_variable cond_;
22 std::queue<Item> input_queue_;
23 std::mutex queue_lock_;
24 std::shared_ptr<std::thread> worker_;
25 std::atomic_bool run_{false};
26 std::atomic_int max_items_processed_{0};
27 void *stream_ = nullptr;
28
29 public:
30 virtual ~Instance() { stop(); }
31
32 void stop() {
33 run_ = false;
34 cond_.notify_one();
35 {
36 std::unique_lock<std::mutex> l(queue_lock_);
37 while (!input_queue_.empty()) {
38 auto &item = input_queue_.front();
39 if (item.pro)
40 item.pro->set_value(Result());
41 input_queue_.pop();
42 }
43 };
44
45 if (worker_) {
46 worker_->join();
47 worker_.reset();
48 }
49 }
50
51 virtual std::shared_future<Result> commit(const Input &input) {
52 Item item;
53 item.input = input;
54 item.pro.reset(new std::promise<Result>());
55 {
56 std::unique_lock<std::mutex> __lock_(queue_lock_);
57 input_queue_.push(item);
58 }
59 cond_.notify_one();
60 return item.pro->get_future();
61 }
62
63 virtual std::vector<std::shared_future<Result>>
64 commits(const std::vector<Input> &inputs) {
65 std::vector<std::shared_future<Result>> output;
66 {
67 std::unique_lock<std::mutex> __lock_(queue_lock_);
68 for (int i = 0; i < (int)inputs.size(); ++i) {
69 Item item;
70 item.input = inputs[i];
71 item.pro.reset(new std::promise<Result>());
72 output.emplace_back(item.pro->get_future());
73 input_queue_.push(item);
74 }
75 }
76 cond_.notify_one();
77 return output;
78 }
79
80 template <typename LoadMethod>
81 bool start(const LoadMethod &loadmethod, int max_items_processed = 1,
82 void *stream = nullptr) {
83 stop();
84
85 this->stream_ = stream;
86 this->max_items_processed_ = max_items_processed;
87 std::promise<bool> status;
88 worker_ =
89 std::make_shared<std::thread>(&Instance::worker<LoadMethod>, this,
90 std::ref(loadmethod), std::ref(status));
91 return status.get_future().get();
92 }
93
94 private:
95 template <typename LoadMethod>
96 void worker(const LoadMethod &loadmethod, std::promise<bool> &status) {
97 std::shared_ptr<Model> model = loadmethod();
98 if (model == nullptr) {
99 status.set_value(false);
100 return;
101 }
102
103 run_ = true;
104 status.set_value(true);
105
106 std::vector<Item> fetch_items;
107 std::vector<Input> inputs;
108 while (get_items_and_wait(fetch_items, max_items_processed_)) {
109 inputs.resize(fetch_items.size());
110 std::transform(fetch_items.begin(), fetch_items.end(), inputs.begin(),
111 [](Item &item) { return item.input; });
112
113 auto ret = model->forwards(inputs, stream_);
114 for (int i = 0; i < (int)fetch_items.size(); ++i) {
115 if (i < (int)ret.size()) {
116 fetch_items[i].pro->set_value(ret[i]);
117 } else {
118 fetch_items[i].pro->set_value(Result());
119 }
120 }
121 inputs.clear();
122 fetch_items.clear();
123 }
124 model.reset();
125 run_ = false;
126 }
127
128 virtual bool get_items_and_wait(std::vector<Item> &fetch_items,
129 int max_size) {
130 std::unique_lock<std::mutex> l(queue_lock_);
131 cond_.wait(l, [&]() { return !run_ || !input_queue_.empty(); });
132
133 if (!run_)
134 return false;
135
136 fetch_items.clear();
137 for (int i = 0; i < max_size && !input_queue_.empty(); ++i) {
138 fetch_items.emplace_back(std::move(input_queue_.front()));
139 input_queue_.pop();
140 }
141 return true;
142 }
143
144 virtual bool get_item_and_wait(Item &fetch_item) {
145 std::unique_lock<std::mutex> l(queue_lock_);
146 cond_.wait(l, [&]() { return !run_ || !input_queue_.empty(); });
147
148 if (!run_)
149 return false;
150
151 fetch_item = std::move(input_queue_.front());
152 input_queue_.pop();
153 return true;
154 }
155};
156}; // namespace ReUseX::vision::tensor_rt::cpm
std::shared_ptr< std::thread > worker_
Definition cpm.hpp:24
bool start(const LoadMethod &loadmethod, int max_items_processed=1, void *stream=nullptr)
Definition cpm.hpp:81
virtual std::shared_future< Result > commit(const Input &input)
Definition cpm.hpp:51
std::condition_variable cond_
Definition cpm.hpp:21
virtual std::vector< std::shared_future< Result > > commits(const std::vector< Input > &inputs)
Definition cpm.hpp:64
std::shared_ptr< std::promise< Result > > pro
Definition cpm.hpp:18