aboutsummaryrefslogtreecommitdiffstats
path: root/executor
diff options
context:
space:
mode:
authorDmitry Vyukov <dvyukov@google.com>2024-07-11 17:46:10 +0200
committerDmitry Vyukov <dvyukov@google.com>2024-07-11 15:56:14 +0000
commitea61de3f3cc7e9b244a0b591c076e385cacef326 (patch)
treeaeea968e6f4f29ac514bc7402000868dd159cc0c /executor
parentc1609952357e2db80f13337af1bf5ce88fb8f936 (diff)
executor: factor output finishing into separate function
This will allow to reuse finish_output function for snapshot mode as well. NFC
Diffstat (limited to 'executor')
-rw-r--r--executor/executor.cc43
-rw-r--r--executor/executor_runner.h40
2 files changed, 46 insertions, 37 deletions
diff --git a/executor/executor.cc b/executor/executor.cc
index ec3bdcc74..bfc27a620 100644
--- a/executor/executor.cc
+++ b/executor/executor.cc
@@ -463,6 +463,8 @@ static bool copyout(char* addr, uint64 size, uint64* res);
static void setup_control_pipes();
static bool coverage_filter(uint64 pc);
static std::tuple<rpc::ComparisonRaw, bool, bool> convert(const kcov_comparison_t& cmp);
+static flatbuffers::span<uint8_t> finish_output(OutputData* output, int proc_id, uint64 req_id,
+ uint64 elapsed, uint64 freshness, uint32 status, const std::vector<uint8_t>* process_output);
#include "syscalls.h"
@@ -1269,6 +1271,47 @@ void write_extra_output()
write_output(-1, &extra_cov, rpc::CallFlag::NONE, 997, all_extra_signal);
}
+flatbuffers::span<uint8_t> finish_output(OutputData* output, int proc_id, uint64 req_id, uint64 elapsed,
+ uint64 freshness, uint32 status, const std::vector<uint8_t>* process_output)
+{
+ uint8* prog_data = input_data;
+ uint32 num_calls = read_input(&prog_data);
+ int output_size = output->size.load(std::memory_order_relaxed) ?: kMaxOutput;
+ uint32 completed = output->completed.load(std::memory_order_relaxed);
+ completed = std::min(completed, kMaxCalls);
+ debug("handle completion: completed=%u output_size=%u\n", completed, output_size);
+ ShmemBuilder fbb(output, output_size);
+ auto empty_call = rpc::CreateCallInfoRawDirect(fbb, rpc::CallFlag::NONE, 998);
+ std::vector<flatbuffers::Offset<rpc::CallInfoRaw>> calls(num_calls, empty_call);
+ std::vector<flatbuffers::Offset<rpc::CallInfoRaw>> extra;
+ for (uint32_t i = 0; i < completed; i++) {
+ const auto& call = output->calls[i];
+ if (call.index == -1) {
+ extra.push_back(call.offset);
+ continue;
+ }
+ if (call.index < 0 || call.index >= static_cast<int>(num_calls) || call.offset.o > kMaxOutput) {
+ debug("bad call index/offset: proc=%d req=%llu call=%d/%d completed=%d offset=%u",
+ proc_id, req_id, call.index, num_calls,
+ completed, call.offset.o);
+ continue;
+ }
+ calls[call.index] = call.offset;
+ }
+ auto prog_info_off = rpc::CreateProgInfoRawDirect(fbb, &calls, &extra, 0, elapsed, freshness);
+ flatbuffers::Offset<flatbuffers::String> error_off = 0;
+ if (status == kFailStatus)
+ error_off = fbb.CreateString("process failed");
+ flatbuffers::Offset<flatbuffers::Vector<uint8_t>> output_off = 0;
+ if (process_output)
+ output_off = fbb.CreateVector(*process_output);
+ auto exec_off = rpc::CreateExecResultRaw(fbb, req_id, output_off, error_off, prog_info_off);
+ auto msg_off = rpc::CreateExecutorMessageRaw(fbb, rpc::ExecutorMessagesRaw::ExecResult,
+ flatbuffers::Offset<void>(exec_off.o));
+ fbb.FinishSizePrefixed(msg_off);
+ return fbb.GetBufferSpan();
+}
+
void thread_create(thread_t* th, int id, bool need_coverage)
{
th->created = true;
diff --git a/executor/executor_runner.h b/executor/executor_runner.h
index 2ace905a7..3481c052f 100644
--- a/executor/executor_runner.h
+++ b/executor/executor_runner.h
@@ -343,50 +343,16 @@ private:
uint64 elapsed = (current_time_ms() - exec_start_) * 1000 * 1000;
uint8* prog_data = msg_->prog_data.data();
input_data = prog_data;
- uint32 num_calls = read_input(&prog_data);
-
- int output_size = resp_mem_->size.load(std::memory_order_relaxed) ?: kMaxOutput;
- uint32 completed = resp_mem_->completed.load(std::memory_order_relaxed);
- completed = std::min(completed, kMaxCalls);
- debug("handle completion: completed=%u output_size=%u\n", completed, output_size);
- ShmemBuilder fbb(resp_mem_, output_size);
- auto empty_call = rpc::CreateCallInfoRawDirect(fbb, rpc::CallFlag::NONE, 998);
- std::vector<flatbuffers::Offset<rpc::CallInfoRaw>> calls(num_calls, empty_call);
- std::vector<flatbuffers::Offset<rpc::CallInfoRaw>> extra;
- for (uint32_t i = 0; i < completed; i++) {
- const auto& call = resp_mem_->calls[i];
- if (call.index == -1) {
- extra.push_back(call.offset);
- continue;
- }
- if (call.index < 0 || call.index >= static_cast<int>(num_calls) || call.offset.o > kMaxOutput) {
- debug("bad call index/offset: proc=%d req=%llu call=%d/%d completed=%d offset=%u",
- id_, static_cast<uint64>(msg_->id), call.index, num_calls,
- completed, call.offset.o);
- continue;
- }
- calls[call.index] = call.offset;
- }
-
- auto prog_info_off = rpc::CreateProgInfoRawDirect(fbb, &calls, &extra, 0, elapsed, freshness_++);
-
- flatbuffers::Offset<flatbuffers::String> error_off = 0;
- if (status == kFailStatus)
- error_off = fbb.CreateString("process failed");
- flatbuffers::Offset<flatbuffers::Vector<uint8_t>> output_off = 0;
+ std::vector<uint8_t>* output = nullptr;
if (IsSet(msg_->flags, rpc::RequestFlag::ReturnOutput)) {
+ output = &output_;
if (status) {
char tmp[128];
snprintf(tmp, sizeof(tmp), "\nprocess exited with status %d\n", status);
output_.insert(output_.end(), tmp, tmp + strlen(tmp));
}
- output_off = fbb.CreateVector(output_);
}
- auto exec_off = rpc::CreateExecResultRaw(fbb, msg_->id, output_off, error_off, prog_info_off);
- auto msg_off = rpc::CreateExecutorMessageRaw(fbb, rpc::ExecutorMessagesRaw::ExecResult,
- flatbuffers::Offset<void>(exec_off.o));
- fbb.FinishSizePrefixed(msg_off);
- auto data = fbb.GetBufferSpan();
+ auto data = finish_output(resp_mem_, id_, msg_->id, elapsed, freshness_++, status, output);
conn_.Send(data.data(), data.size());
resp_mem_->Reset();