diff options
Diffstat (limited to 'executor/executor_runner.h')
| -rw-r--r-- | executor/executor_runner.h | 40 |
1 files changed, 3 insertions, 37 deletions
diff --git a/executor/executor_runner.h b/executor/executor_runner.h index 2ace905a7..3481c052f 100644 --- a/executor/executor_runner.h +++ b/executor/executor_runner.h @@ -343,50 +343,16 @@ private: uint64 elapsed = (current_time_ms() - exec_start_) * 1000 * 1000; uint8* prog_data = msg_->prog_data.data(); input_data = prog_data; - uint32 num_calls = read_input(&prog_data); - - int output_size = resp_mem_->size.load(std::memory_order_relaxed) ?: kMaxOutput; - uint32 completed = resp_mem_->completed.load(std::memory_order_relaxed); - completed = std::min(completed, kMaxCalls); - debug("handle completion: completed=%u output_size=%u\n", completed, output_size); - ShmemBuilder fbb(resp_mem_, output_size); - auto empty_call = rpc::CreateCallInfoRawDirect(fbb, rpc::CallFlag::NONE, 998); - std::vector<flatbuffers::Offset<rpc::CallInfoRaw>> calls(num_calls, empty_call); - std::vector<flatbuffers::Offset<rpc::CallInfoRaw>> extra; - for (uint32_t i = 0; i < completed; i++) { - const auto& call = resp_mem_->calls[i]; - if (call.index == -1) { - extra.push_back(call.offset); - continue; - } - if (call.index < 0 || call.index >= static_cast<int>(num_calls) || call.offset.o > kMaxOutput) { - debug("bad call index/offset: proc=%d req=%llu call=%d/%d completed=%d offset=%u", - id_, static_cast<uint64>(msg_->id), call.index, num_calls, - completed, call.offset.o); - continue; - } - calls[call.index] = call.offset; - } - - auto prog_info_off = rpc::CreateProgInfoRawDirect(fbb, &calls, &extra, 0, elapsed, freshness_++); - - flatbuffers::Offset<flatbuffers::String> error_off = 0; - if (status == kFailStatus) - error_off = fbb.CreateString("process failed"); - flatbuffers::Offset<flatbuffers::Vector<uint8_t>> output_off = 0; + std::vector<uint8_t>* output = nullptr; if (IsSet(msg_->flags, rpc::RequestFlag::ReturnOutput)) { + output = &output_; if (status) { char tmp[128]; snprintf(tmp, sizeof(tmp), "\nprocess exited with status %d\n", status); output_.insert(output_.end(), tmp, tmp + strlen(tmp)); } - output_off = fbb.CreateVector(output_); } - auto exec_off = rpc::CreateExecResultRaw(fbb, msg_->id, output_off, error_off, prog_info_off); - auto msg_off = rpc::CreateExecutorMessageRaw(fbb, rpc::ExecutorMessagesRaw::ExecResult, - flatbuffers::Offset<void>(exec_off.o)); - fbb.FinishSizePrefixed(msg_off); - auto data = fbb.GetBufferSpan(); + auto data = finish_output(resp_mem_, id_, msg_->id, elapsed, freshness_++, status, output); conn_.Send(data.data(), data.size()); resp_mem_->Reset(); |
