From ea61de3f3cc7e9b244a0b591c076e385cacef326 Mon Sep 17 00:00:00 2001 From: Dmitry Vyukov Date: Thu, 11 Jul 2024 17:46:10 +0200 Subject: executor: factor output finishing into separate function This will allow to reuse finish_output function for snapshot mode as well. NFC --- executor/executor.cc | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) (limited to 'executor/executor.cc') diff --git a/executor/executor.cc b/executor/executor.cc index ec3bdcc74..bfc27a620 100644 --- a/executor/executor.cc +++ b/executor/executor.cc @@ -463,6 +463,8 @@ static bool copyout(char* addr, uint64 size, uint64* res); static void setup_control_pipes(); static bool coverage_filter(uint64 pc); static std::tuple convert(const kcov_comparison_t& cmp); +static flatbuffers::span finish_output(OutputData* output, int proc_id, uint64 req_id, + uint64 elapsed, uint64 freshness, uint32 status, const std::vector* process_output); #include "syscalls.h" @@ -1269,6 +1271,47 @@ void write_extra_output() write_output(-1, &extra_cov, rpc::CallFlag::NONE, 997, all_extra_signal); } +flatbuffers::span finish_output(OutputData* output, int proc_id, uint64 req_id, uint64 elapsed, + uint64 freshness, uint32 status, const std::vector* process_output) +{ + uint8* prog_data = input_data; + uint32 num_calls = read_input(&prog_data); + int output_size = output->size.load(std::memory_order_relaxed) ?: kMaxOutput; + uint32 completed = output->completed.load(std::memory_order_relaxed); + completed = std::min(completed, kMaxCalls); + debug("handle completion: completed=%u output_size=%u\n", completed, output_size); + ShmemBuilder fbb(output, output_size); + auto empty_call = rpc::CreateCallInfoRawDirect(fbb, rpc::CallFlag::NONE, 998); + std::vector> calls(num_calls, empty_call); + std::vector> extra; + for (uint32_t i = 0; i < completed; i++) { + const auto& call = output->calls[i]; + if (call.index == -1) { + extra.push_back(call.offset); + continue; + } + if (call.index < 0 || call.index >= static_cast(num_calls) || call.offset.o > kMaxOutput) { + debug("bad call index/offset: proc=%d req=%llu call=%d/%d completed=%d offset=%u", + proc_id, req_id, call.index, num_calls, + completed, call.offset.o); + continue; + } + calls[call.index] = call.offset; + } + auto prog_info_off = rpc::CreateProgInfoRawDirect(fbb, &calls, &extra, 0, elapsed, freshness); + flatbuffers::Offset error_off = 0; + if (status == kFailStatus) + error_off = fbb.CreateString("process failed"); + flatbuffers::Offset> output_off = 0; + if (process_output) + output_off = fbb.CreateVector(*process_output); + auto exec_off = rpc::CreateExecResultRaw(fbb, req_id, output_off, error_off, prog_info_off); + auto msg_off = rpc::CreateExecutorMessageRaw(fbb, rpc::ExecutorMessagesRaw::ExecResult, + flatbuffers::Offset(exec_off.o)); + fbb.FinishSizePrefixed(msg_off); + return fbb.GetBufferSpan(); +} + void thread_create(thread_t* th, int id, bool need_coverage) { th->created = true; -- cgit mrf-deployment