diff options
| author | Dmitry Vyukov <dvyukov@google.com> | 2024-07-11 17:46:10 +0200 |
|---|---|---|
| committer | Dmitry Vyukov <dvyukov@google.com> | 2024-07-11 15:56:14 +0000 |
| commit | ea61de3f3cc7e9b244a0b591c076e385cacef326 (patch) | |
| tree | aeea968e6f4f29ac514bc7402000868dd159cc0c /executor | |
| parent | c1609952357e2db80f13337af1bf5ce88fb8f936 (diff) | |
executor: factor output finishing into separate function
This will allow to reuse finish_output function for snapshot mode as well.
NFC
Diffstat (limited to 'executor')
| -rw-r--r-- | executor/executor.cc | 43 | ||||
| -rw-r--r-- | executor/executor_runner.h | 40 |
2 files changed, 46 insertions, 37 deletions
diff --git a/executor/executor.cc b/executor/executor.cc index ec3bdcc74..bfc27a620 100644 --- a/executor/executor.cc +++ b/executor/executor.cc @@ -463,6 +463,8 @@ static bool copyout(char* addr, uint64 size, uint64* res); static void setup_control_pipes(); static bool coverage_filter(uint64 pc); static std::tuple<rpc::ComparisonRaw, bool, bool> convert(const kcov_comparison_t& cmp); +static flatbuffers::span<uint8_t> finish_output(OutputData* output, int proc_id, uint64 req_id, + uint64 elapsed, uint64 freshness, uint32 status, const std::vector<uint8_t>* process_output); #include "syscalls.h" @@ -1269,6 +1271,47 @@ void write_extra_output() write_output(-1, &extra_cov, rpc::CallFlag::NONE, 997, all_extra_signal); } +flatbuffers::span<uint8_t> finish_output(OutputData* output, int proc_id, uint64 req_id, uint64 elapsed, + uint64 freshness, uint32 status, const std::vector<uint8_t>* process_output) +{ + uint8* prog_data = input_data; + uint32 num_calls = read_input(&prog_data); + int output_size = output->size.load(std::memory_order_relaxed) ?: kMaxOutput; + uint32 completed = output->completed.load(std::memory_order_relaxed); + completed = std::min(completed, kMaxCalls); + debug("handle completion: completed=%u output_size=%u\n", completed, output_size); + ShmemBuilder fbb(output, output_size); + auto empty_call = rpc::CreateCallInfoRawDirect(fbb, rpc::CallFlag::NONE, 998); + std::vector<flatbuffers::Offset<rpc::CallInfoRaw>> calls(num_calls, empty_call); + std::vector<flatbuffers::Offset<rpc::CallInfoRaw>> extra; + for (uint32_t i = 0; i < completed; i++) { + const auto& call = output->calls[i]; + if (call.index == -1) { + extra.push_back(call.offset); + continue; + } + if (call.index < 0 || call.index >= static_cast<int>(num_calls) || call.offset.o > kMaxOutput) { + debug("bad call index/offset: proc=%d req=%llu call=%d/%d completed=%d offset=%u", + proc_id, req_id, call.index, num_calls, + completed, call.offset.o); + continue; + } + calls[call.index] = call.offset; + } + auto prog_info_off = rpc::CreateProgInfoRawDirect(fbb, &calls, &extra, 0, elapsed, freshness); + flatbuffers::Offset<flatbuffers::String> error_off = 0; + if (status == kFailStatus) + error_off = fbb.CreateString("process failed"); + flatbuffers::Offset<flatbuffers::Vector<uint8_t>> output_off = 0; + if (process_output) + output_off = fbb.CreateVector(*process_output); + auto exec_off = rpc::CreateExecResultRaw(fbb, req_id, output_off, error_off, prog_info_off); + auto msg_off = rpc::CreateExecutorMessageRaw(fbb, rpc::ExecutorMessagesRaw::ExecResult, + flatbuffers::Offset<void>(exec_off.o)); + fbb.FinishSizePrefixed(msg_off); + return fbb.GetBufferSpan(); +} + void thread_create(thread_t* th, int id, bool need_coverage) { th->created = true; diff --git a/executor/executor_runner.h b/executor/executor_runner.h index 2ace905a7..3481c052f 100644 --- a/executor/executor_runner.h +++ b/executor/executor_runner.h @@ -343,50 +343,16 @@ private: uint64 elapsed = (current_time_ms() - exec_start_) * 1000 * 1000; uint8* prog_data = msg_->prog_data.data(); input_data = prog_data; - uint32 num_calls = read_input(&prog_data); - - int output_size = resp_mem_->size.load(std::memory_order_relaxed) ?: kMaxOutput; - uint32 completed = resp_mem_->completed.load(std::memory_order_relaxed); - completed = std::min(completed, kMaxCalls); - debug("handle completion: completed=%u output_size=%u\n", completed, output_size); - ShmemBuilder fbb(resp_mem_, output_size); - auto empty_call = rpc::CreateCallInfoRawDirect(fbb, rpc::CallFlag::NONE, 998); - std::vector<flatbuffers::Offset<rpc::CallInfoRaw>> calls(num_calls, empty_call); - std::vector<flatbuffers::Offset<rpc::CallInfoRaw>> extra; - for (uint32_t i = 0; i < completed; i++) { - const auto& call = resp_mem_->calls[i]; - if (call.index == -1) { - extra.push_back(call.offset); - continue; - } - if (call.index < 0 || call.index >= static_cast<int>(num_calls) || call.offset.o > kMaxOutput) { - debug("bad call index/offset: proc=%d req=%llu call=%d/%d completed=%d offset=%u", - id_, static_cast<uint64>(msg_->id), call.index, num_calls, - completed, call.offset.o); - continue; - } - calls[call.index] = call.offset; - } - - auto prog_info_off = rpc::CreateProgInfoRawDirect(fbb, &calls, &extra, 0, elapsed, freshness_++); - - flatbuffers::Offset<flatbuffers::String> error_off = 0; - if (status == kFailStatus) - error_off = fbb.CreateString("process failed"); - flatbuffers::Offset<flatbuffers::Vector<uint8_t>> output_off = 0; + std::vector<uint8_t>* output = nullptr; if (IsSet(msg_->flags, rpc::RequestFlag::ReturnOutput)) { + output = &output_; if (status) { char tmp[128]; snprintf(tmp, sizeof(tmp), "\nprocess exited with status %d\n", status); output_.insert(output_.end(), tmp, tmp + strlen(tmp)); } - output_off = fbb.CreateVector(output_); } - auto exec_off = rpc::CreateExecResultRaw(fbb, msg_->id, output_off, error_off, prog_info_off); - auto msg_off = rpc::CreateExecutorMessageRaw(fbb, rpc::ExecutorMessagesRaw::ExecResult, - flatbuffers::Offset<void>(exec_off.o)); - fbb.FinishSizePrefixed(msg_off); - auto data = fbb.GetBufferSpan(); + auto data = finish_output(resp_mem_, id_, msg_->id, elapsed, freshness_++, status, output); conn_.Send(data.data(), data.size()); resp_mem_->Reset(); |
