aboutsummaryrefslogtreecommitdiffstats
path: root/executor/executor_runner.h
diff options
context:
space:
mode:
Diffstat (limited to 'executor/executor_runner.h')
-rw-r--r--executor/executor_runner.h40
1 files changed, 3 insertions, 37 deletions
diff --git a/executor/executor_runner.h b/executor/executor_runner.h
index 2ace905a7..3481c052f 100644
--- a/executor/executor_runner.h
+++ b/executor/executor_runner.h
@@ -343,50 +343,16 @@ private:
uint64 elapsed = (current_time_ms() - exec_start_) * 1000 * 1000;
uint8* prog_data = msg_->prog_data.data();
input_data = prog_data;
- uint32 num_calls = read_input(&prog_data);
-
- int output_size = resp_mem_->size.load(std::memory_order_relaxed) ?: kMaxOutput;
- uint32 completed = resp_mem_->completed.load(std::memory_order_relaxed);
- completed = std::min(completed, kMaxCalls);
- debug("handle completion: completed=%u output_size=%u\n", completed, output_size);
- ShmemBuilder fbb(resp_mem_, output_size);
- auto empty_call = rpc::CreateCallInfoRawDirect(fbb, rpc::CallFlag::NONE, 998);
- std::vector<flatbuffers::Offset<rpc::CallInfoRaw>> calls(num_calls, empty_call);
- std::vector<flatbuffers::Offset<rpc::CallInfoRaw>> extra;
- for (uint32_t i = 0; i < completed; i++) {
- const auto& call = resp_mem_->calls[i];
- if (call.index == -1) {
- extra.push_back(call.offset);
- continue;
- }
- if (call.index < 0 || call.index >= static_cast<int>(num_calls) || call.offset.o > kMaxOutput) {
- debug("bad call index/offset: proc=%d req=%llu call=%d/%d completed=%d offset=%u",
- id_, static_cast<uint64>(msg_->id), call.index, num_calls,
- completed, call.offset.o);
- continue;
- }
- calls[call.index] = call.offset;
- }
-
- auto prog_info_off = rpc::CreateProgInfoRawDirect(fbb, &calls, &extra, 0, elapsed, freshness_++);
-
- flatbuffers::Offset<flatbuffers::String> error_off = 0;
- if (status == kFailStatus)
- error_off = fbb.CreateString("process failed");
- flatbuffers::Offset<flatbuffers::Vector<uint8_t>> output_off = 0;
+ std::vector<uint8_t>* output = nullptr;
if (IsSet(msg_->flags, rpc::RequestFlag::ReturnOutput)) {
+ output = &output_;
if (status) {
char tmp[128];
snprintf(tmp, sizeof(tmp), "\nprocess exited with status %d\n", status);
output_.insert(output_.end(), tmp, tmp + strlen(tmp));
}
- output_off = fbb.CreateVector(output_);
}
- auto exec_off = rpc::CreateExecResultRaw(fbb, msg_->id, output_off, error_off, prog_info_off);
- auto msg_off = rpc::CreateExecutorMessageRaw(fbb, rpc::ExecutorMessagesRaw::ExecResult,
- flatbuffers::Offset<void>(exec_off.o));
- fbb.FinishSizePrefixed(msg_off);
- auto data = fbb.GetBufferSpan();
+ auto data = finish_output(resp_mem_, id_, msg_->id, elapsed, freshness_++, status, output);
conn_.Send(data.data(), data.size());
resp_mem_->Reset();