14template <
typename Result,
typename Input,
typename Model>
class Instance {
18 std::shared_ptr<std::promise<Result>>
pro;
25 std::atomic_bool
run_{
false};
40 item.pro->set_value(Result());
51 virtual std::shared_future<Result>
commit(
const Input &input) {
54 item.
pro.reset(
new std::promise<Result>());
60 return item.
pro->get_future();
63 virtual std::vector<std::shared_future<Result>>
64 commits(
const std::vector<Input> &inputs) {
65 std::vector<std::shared_future<Result>> output;
68 for (
int i = 0; i < (int)inputs.size(); ++i) {
70 item.
input = inputs[i];
71 item.
pro.reset(
new std::promise<Result>());
72 output.emplace_back(item.
pro->get_future());
80 template <
typename LoadMethod>
81 bool start(
const LoadMethod &loadmethod,
int max_items_processed = 1,
82 void *stream =
nullptr) {
85 this->stream_ = stream;
86 this->max_items_processed_ = max_items_processed;
87 std::promise<bool> status;
89 std::make_shared<std::thread>(&Instance::worker<LoadMethod>,
this,
90 std::ref(loadmethod), std::ref(status));
91 return status.get_future().get();
95 template <
typename LoadMethod>
96 void worker(
const LoadMethod &loadmethod, std::promise<bool> &status) {
97 std::shared_ptr<Model> model = loadmethod();
98 if (model ==
nullptr) {
99 status.set_value(
false);
104 status.set_value(
true);
106 std::vector<Item> fetch_items;
107 std::vector<Input> inputs;
109 inputs.resize(fetch_items.size());
110 std::transform(fetch_items.begin(), fetch_items.end(), inputs.begin(),
111 [](Item &item) { return item.input; });
113 auto ret = model->forwards(inputs,
stream_);
114 for (
int i = 0; i < (int)fetch_items.size(); ++i) {
115 if (i < (
int)ret.size()) {
116 fetch_items[i].pro->set_value(ret[i]);
118 fetch_items[i].pro->set_value(Result());
128 virtual bool get_items_and_wait(std::vector<Item> &fetch_items,
137 for (
int i = 0; i < max_size && !
input_queue_.empty(); ++i) {
138 fetch_items.emplace_back(std::move(
input_queue_.front()));
144 virtual bool get_item_and_wait(
Item &fetch_item) {