From 66fcb0a84fcd55ad8e1444cdd0bc0ad6592f7329 Mon Sep 17 00:00:00 2001 From: Dmitry Vyukov Date: Mon, 1 Jul 2024 14:26:05 +0200 Subject: pkg/fuzzer: try to triage on different VMs Distribute triage requests to different VMs. --- pkg/flatrpc/flatrpc.fbs | 3 + pkg/flatrpc/flatrpc.go | 86 ++++++++++++++++------- pkg/flatrpc/flatrpc.h | 50 +++++++++++-- pkg/flatrpc/helpers.go | 9 +++ pkg/fuzzer/fuzzer.go | 9 +-- pkg/fuzzer/job.go | 12 ++-- pkg/fuzzer/queue/distributor.go | 132 +++++++++++++++++++++++++++++++++++ pkg/fuzzer/queue/distributor_test.go | 48 +++++++++++++ pkg/fuzzer/queue/queue.go | 21 ++++-- pkg/rpcserver/rpcserver.go | 19 +++-- pkg/rpcserver/runner.go | 19 ++++- 11 files changed, 357 insertions(+), 51 deletions(-) create mode 100644 pkg/fuzzer/queue/distributor.go create mode 100644 pkg/fuzzer/queue/distributor_test.go (limited to 'pkg') diff --git a/pkg/flatrpc/flatrpc.fbs b/pkg/flatrpc/flatrpc.fbs index bd8b023d9..c7516e624 100644 --- a/pkg/flatrpc/flatrpc.fbs +++ b/pkg/flatrpc/flatrpc.fbs @@ -163,6 +163,8 @@ struct ExecOptsRaw { // Request to execute a test program. table ExecRequestRaw { id :int64; + // Bitmask of procs to avoid when executing this request, if possible. + avoid :uint64; prog_data :[uint8]; exec_opts :ExecOptsRaw; flags :RequestFlag; @@ -239,6 +241,7 @@ table ProgInfoRaw { // Result of executing a test program. table ExecResultRaw { id :int64; + proc :int32; output :[uint8]; error :string; info :ProgInfoRaw; diff --git a/pkg/flatrpc/flatrpc.go b/pkg/flatrpc/flatrpc.go index a53d0075d..aa0aa0257 100644 --- a/pkg/flatrpc/flatrpc.go +++ b/pkg/flatrpc/flatrpc.go @@ -1923,6 +1923,7 @@ func CreateExecOptsRaw(builder *flatbuffers.Builder, envFlags ExecEnv, execFlags type ExecRequestRawT struct { Id int64 `json:"id"` + Avoid uint64 `json:"avoid"` ProgData []byte `json:"prog_data"` ExecOpts *ExecOptsRawT `json:"exec_opts"` Flags RequestFlag `json:"flags"` @@ -1948,6 +1949,7 @@ func (t *ExecRequestRawT) Pack(builder *flatbuffers.Builder) flatbuffers.UOffset } ExecRequestRawStart(builder) ExecRequestRawAddId(builder, t.Id) + ExecRequestRawAddAvoid(builder, t.Avoid) ExecRequestRawAddProgData(builder, progDataOffset) execOptsOffset := t.ExecOpts.Pack(builder) ExecRequestRawAddExecOpts(builder, execOptsOffset) @@ -1958,6 +1960,7 @@ func (t *ExecRequestRawT) Pack(builder *flatbuffers.Builder) flatbuffers.UOffset func (rcv *ExecRequestRaw) UnPackTo(t *ExecRequestRawT) { t.Id = rcv.Id() + t.Avoid = rcv.Avoid() t.ProgData = rcv.ProgDataBytes() t.ExecOpts = rcv.ExecOpts(nil).UnPack() t.Flags = rcv.Flags() @@ -2016,8 +2019,20 @@ func (rcv *ExecRequestRaw) MutateId(n int64) bool { return rcv._tab.MutateInt64Slot(4, n) } -func (rcv *ExecRequestRaw) ProgData(j int) byte { +func (rcv *ExecRequestRaw) Avoid() uint64 { o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.GetUint64(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *ExecRequestRaw) MutateAvoid(n uint64) bool { + return rcv._tab.MutateUint64Slot(6, n) +} + +func (rcv *ExecRequestRaw) ProgData(j int) byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) if o != 0 { a := rcv._tab.Vector(o) return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1)) @@ -2026,7 +2041,7 @@ func (rcv *ExecRequestRaw) ProgData(j int) byte { } func (rcv *ExecRequestRaw) ProgDataLength() int { - o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) if o != 0 { return rcv._tab.VectorLen(o) } @@ -2034,7 +2049,7 @@ func (rcv *ExecRequestRaw) ProgDataLength() int { } func (rcv *ExecRequestRaw) ProgDataBytes() []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) if o != 0 { return rcv._tab.ByteVector(o + rcv._tab.Pos) } @@ -2042,7 +2057,7 @@ func (rcv *ExecRequestRaw) ProgDataBytes() []byte { } func (rcv *ExecRequestRaw) MutateProgData(j int, n byte) bool { - o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) if o != 0 { a := rcv._tab.Vector(o) return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n) @@ -2051,7 +2066,7 @@ func (rcv *ExecRequestRaw) MutateProgData(j int, n byte) bool { } func (rcv *ExecRequestRaw) ExecOpts(obj *ExecOptsRaw) *ExecOptsRaw { - o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) if o != 0 { x := o + rcv._tab.Pos if obj == nil { @@ -2064,7 +2079,7 @@ func (rcv *ExecRequestRaw) ExecOpts(obj *ExecOptsRaw) *ExecOptsRaw { } func (rcv *ExecRequestRaw) Flags() RequestFlag { - o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) if o != 0 { return RequestFlag(rcv._tab.GetUint64(o + rcv._tab.Pos)) } @@ -2072,11 +2087,11 @@ func (rcv *ExecRequestRaw) Flags() RequestFlag { } func (rcv *ExecRequestRaw) MutateFlags(n RequestFlag) bool { - return rcv._tab.MutateUint64Slot(10, uint64(n)) + return rcv._tab.MutateUint64Slot(12, uint64(n)) } func (rcv *ExecRequestRaw) AllSignal(j int) int32 { - o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(14)) if o != 0 { a := rcv._tab.Vector(o) return rcv._tab.GetInt32(a + flatbuffers.UOffsetT(j*4)) @@ -2085,7 +2100,7 @@ func (rcv *ExecRequestRaw) AllSignal(j int) int32 { } func (rcv *ExecRequestRaw) AllSignalLength() int { - o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(14)) if o != 0 { return rcv._tab.VectorLen(o) } @@ -2093,7 +2108,7 @@ func (rcv *ExecRequestRaw) AllSignalLength() int { } func (rcv *ExecRequestRaw) MutateAllSignal(j int, n int32) bool { - o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(14)) if o != 0 { a := rcv._tab.Vector(o) return rcv._tab.MutateInt32(a+flatbuffers.UOffsetT(j*4), n) @@ -2102,25 +2117,28 @@ func (rcv *ExecRequestRaw) MutateAllSignal(j int, n int32) bool { } func ExecRequestRawStart(builder *flatbuffers.Builder) { - builder.StartObject(5) + builder.StartObject(6) } func ExecRequestRawAddId(builder *flatbuffers.Builder, id int64) { builder.PrependInt64Slot(0, id, 0) } +func ExecRequestRawAddAvoid(builder *flatbuffers.Builder, avoid uint64) { + builder.PrependUint64Slot(1, avoid, 0) +} func ExecRequestRawAddProgData(builder *flatbuffers.Builder, progData flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(progData), 0) + builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(progData), 0) } func ExecRequestRawStartProgDataVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(1, numElems, 1) } func ExecRequestRawAddExecOpts(builder *flatbuffers.Builder, execOpts flatbuffers.UOffsetT) { - builder.PrependStructSlot(2, flatbuffers.UOffsetT(execOpts), 0) + builder.PrependStructSlot(3, flatbuffers.UOffsetT(execOpts), 0) } func ExecRequestRawAddFlags(builder *flatbuffers.Builder, flags RequestFlag) { - builder.PrependUint64Slot(3, uint64(flags), 0) + builder.PrependUint64Slot(4, uint64(flags), 0) } func ExecRequestRawAddAllSignal(builder *flatbuffers.Builder, allSignal flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(4, flatbuffers.UOffsetT(allSignal), 0) + builder.PrependUOffsetTSlot(5, flatbuffers.UOffsetT(allSignal), 0) } func ExecRequestRawStartAllSignalVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(4, numElems, 4) @@ -3000,6 +3018,7 @@ func ProgInfoRawEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { type ExecResultRawT struct { Id int64 `json:"id"` + Proc int32 `json:"proc"` Output []byte `json:"output"` Error string `json:"error"` Info *ProgInfoRawT `json:"info"` @@ -3017,6 +3036,7 @@ func (t *ExecResultRawT) Pack(builder *flatbuffers.Builder) flatbuffers.UOffsetT infoOffset := t.Info.Pack(builder) ExecResultRawStart(builder) ExecResultRawAddId(builder, t.Id) + ExecResultRawAddProc(builder, t.Proc) ExecResultRawAddOutput(builder, outputOffset) ExecResultRawAddError(builder, errorOffset) ExecResultRawAddInfo(builder, infoOffset) @@ -3025,6 +3045,7 @@ func (t *ExecResultRawT) Pack(builder *flatbuffers.Builder) flatbuffers.UOffsetT func (rcv *ExecResultRaw) UnPackTo(t *ExecResultRawT) { t.Id = rcv.Id() + t.Proc = rcv.Proc() t.Output = rcv.OutputBytes() t.Error = string(rcv.Error()) t.Info = rcv.Info(nil).UnPack() @@ -3078,8 +3099,20 @@ func (rcv *ExecResultRaw) MutateId(n int64) bool { return rcv._tab.MutateInt64Slot(4, n) } -func (rcv *ExecResultRaw) Output(j int) byte { +func (rcv *ExecResultRaw) Proc() int32 { o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.GetInt32(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *ExecResultRaw) MutateProc(n int32) bool { + return rcv._tab.MutateInt32Slot(6, n) +} + +func (rcv *ExecResultRaw) Output(j int) byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) if o != 0 { a := rcv._tab.Vector(o) return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1)) @@ -3088,7 +3121,7 @@ func (rcv *ExecResultRaw) Output(j int) byte { } func (rcv *ExecResultRaw) OutputLength() int { - o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) if o != 0 { return rcv._tab.VectorLen(o) } @@ -3096,7 +3129,7 @@ func (rcv *ExecResultRaw) OutputLength() int { } func (rcv *ExecResultRaw) OutputBytes() []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) if o != 0 { return rcv._tab.ByteVector(o + rcv._tab.Pos) } @@ -3104,7 +3137,7 @@ func (rcv *ExecResultRaw) OutputBytes() []byte { } func (rcv *ExecResultRaw) MutateOutput(j int, n byte) bool { - o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) if o != 0 { a := rcv._tab.Vector(o) return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n) @@ -3113,7 +3146,7 @@ func (rcv *ExecResultRaw) MutateOutput(j int, n byte) bool { } func (rcv *ExecResultRaw) Error() []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) if o != 0 { return rcv._tab.ByteVector(o + rcv._tab.Pos) } @@ -3121,7 +3154,7 @@ func (rcv *ExecResultRaw) Error() []byte { } func (rcv *ExecResultRaw) Info(obj *ProgInfoRaw) *ProgInfoRaw { - o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) if o != 0 { x := rcv._tab.Indirect(o + rcv._tab.Pos) if obj == nil { @@ -3134,22 +3167,25 @@ func (rcv *ExecResultRaw) Info(obj *ProgInfoRaw) *ProgInfoRaw { } func ExecResultRawStart(builder *flatbuffers.Builder) { - builder.StartObject(4) + builder.StartObject(5) } func ExecResultRawAddId(builder *flatbuffers.Builder, id int64) { builder.PrependInt64Slot(0, id, 0) } +func ExecResultRawAddProc(builder *flatbuffers.Builder, proc int32) { + builder.PrependInt32Slot(1, proc, 0) +} func ExecResultRawAddOutput(builder *flatbuffers.Builder, output flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(output), 0) + builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(output), 0) } func ExecResultRawStartOutputVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(1, numElems, 1) } func ExecResultRawAddError(builder *flatbuffers.Builder, error flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(error), 0) + builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(error), 0) } func ExecResultRawAddInfo(builder *flatbuffers.Builder, info flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(info), 0) + builder.PrependUOffsetTSlot(4, flatbuffers.UOffsetT(info), 0) } func ExecResultRawEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { return builder.EndObject() diff --git a/pkg/flatrpc/flatrpc.h b/pkg/flatrpc/flatrpc.h index 88defed83..bc22ee63b 100644 --- a/pkg/flatrpc/flatrpc.h +++ b/pkg/flatrpc/flatrpc.h @@ -1769,6 +1769,7 @@ flatbuffers::Offset CreateExecutorMessageRaw(flatbuffers::Fl struct ExecRequestRawT : public flatbuffers::NativeTable { typedef ExecRequestRaw TableType; int64_t id = 0; + uint64_t avoid = 0; std::vector prog_data{}; std::unique_ptr exec_opts{}; rpc::RequestFlag flags = static_cast(0); @@ -1784,14 +1785,18 @@ struct ExecRequestRaw FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { typedef ExecRequestRawBuilder Builder; enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { VT_ID = 4, - VT_PROG_DATA = 6, - VT_EXEC_OPTS = 8, - VT_FLAGS = 10, - VT_ALL_SIGNAL = 12 + VT_AVOID = 6, + VT_PROG_DATA = 8, + VT_EXEC_OPTS = 10, + VT_FLAGS = 12, + VT_ALL_SIGNAL = 14 }; int64_t id() const { return GetField(VT_ID, 0); } + uint64_t avoid() const { + return GetField(VT_AVOID, 0); + } const flatbuffers::Vector *prog_data() const { return GetPointer *>(VT_PROG_DATA); } @@ -1807,6 +1812,7 @@ struct ExecRequestRaw FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && VerifyField(verifier, VT_ID, 8) && + VerifyField(verifier, VT_AVOID, 8) && VerifyOffset(verifier, VT_PROG_DATA) && verifier.VerifyVector(prog_data()) && VerifyField(verifier, VT_EXEC_OPTS, 8) && @@ -1827,6 +1833,9 @@ struct ExecRequestRawBuilder { void add_id(int64_t id) { fbb_.AddElement(ExecRequestRaw::VT_ID, id, 0); } + void add_avoid(uint64_t avoid) { + fbb_.AddElement(ExecRequestRaw::VT_AVOID, avoid, 0); + } void add_prog_data(flatbuffers::Offset> prog_data) { fbb_.AddOffset(ExecRequestRaw::VT_PROG_DATA, prog_data); } @@ -1853,12 +1862,14 @@ struct ExecRequestRawBuilder { inline flatbuffers::Offset CreateExecRequestRaw( flatbuffers::FlatBufferBuilder &_fbb, int64_t id = 0, + uint64_t avoid = 0, flatbuffers::Offset> prog_data = 0, const rpc::ExecOptsRaw *exec_opts = nullptr, rpc::RequestFlag flags = static_cast(0), flatbuffers::Offset> all_signal = 0) { ExecRequestRawBuilder builder_(_fbb); builder_.add_flags(flags); + builder_.add_avoid(avoid); builder_.add_id(id); builder_.add_all_signal(all_signal); builder_.add_exec_opts(exec_opts); @@ -1869,6 +1880,7 @@ inline flatbuffers::Offset CreateExecRequestRaw( inline flatbuffers::Offset CreateExecRequestRawDirect( flatbuffers::FlatBufferBuilder &_fbb, int64_t id = 0, + uint64_t avoid = 0, const std::vector *prog_data = nullptr, const rpc::ExecOptsRaw *exec_opts = nullptr, rpc::RequestFlag flags = static_cast(0), @@ -1878,6 +1890,7 @@ inline flatbuffers::Offset CreateExecRequestRawDirect( return rpc::CreateExecRequestRaw( _fbb, id, + avoid, prog_data__, exec_opts, flags, @@ -2355,6 +2368,7 @@ flatbuffers::Offset CreateProgInfoRaw(flatbuffers::FlatBufferBuilde struct ExecResultRawT : public flatbuffers::NativeTable { typedef ExecResultRaw TableType; int64_t id = 0; + int32_t proc = 0; std::vector output{}; std::string error{}; std::unique_ptr info{}; @@ -2369,13 +2383,17 @@ struct ExecResultRaw FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { typedef ExecResultRawBuilder Builder; enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { VT_ID = 4, - VT_OUTPUT = 6, - VT_ERROR = 8, - VT_INFO = 10 + VT_PROC = 6, + VT_OUTPUT = 8, + VT_ERROR = 10, + VT_INFO = 12 }; int64_t id() const { return GetField(VT_ID, 0); } + int32_t proc() const { + return GetField(VT_PROC, 0); + } const flatbuffers::Vector *output() const { return GetPointer *>(VT_OUTPUT); } @@ -2388,6 +2406,7 @@ struct ExecResultRaw FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && VerifyField(verifier, VT_ID, 8) && + VerifyField(verifier, VT_PROC, 4) && VerifyOffset(verifier, VT_OUTPUT) && verifier.VerifyVector(output()) && VerifyOffset(verifier, VT_ERROR) && @@ -2408,6 +2427,9 @@ struct ExecResultRawBuilder { void add_id(int64_t id) { fbb_.AddElement(ExecResultRaw::VT_ID, id, 0); } + void add_proc(int32_t proc) { + fbb_.AddElement(ExecResultRaw::VT_PROC, proc, 0); + } void add_output(flatbuffers::Offset> output) { fbb_.AddOffset(ExecResultRaw::VT_OUTPUT, output); } @@ -2431,6 +2453,7 @@ struct ExecResultRawBuilder { inline flatbuffers::Offset CreateExecResultRaw( flatbuffers::FlatBufferBuilder &_fbb, int64_t id = 0, + int32_t proc = 0, flatbuffers::Offset> output = 0, flatbuffers::Offset error = 0, flatbuffers::Offset info = 0) { @@ -2439,12 +2462,14 @@ inline flatbuffers::Offset CreateExecResultRaw( builder_.add_info(info); builder_.add_error(error); builder_.add_output(output); + builder_.add_proc(proc); return builder_.Finish(); } inline flatbuffers::Offset CreateExecResultRawDirect( flatbuffers::FlatBufferBuilder &_fbb, int64_t id = 0, + int32_t proc = 0, const std::vector *output = nullptr, const char *error = nullptr, flatbuffers::Offset info = 0) { @@ -2453,6 +2478,7 @@ inline flatbuffers::Offset CreateExecResultRawDirect( return rpc::CreateExecResultRaw( _fbb, id, + proc, output__, error__, info); @@ -3171,6 +3197,7 @@ inline flatbuffers::Offset CreateExecutorMessageRaw(flatbuff inline ExecRequestRawT::ExecRequestRawT(const ExecRequestRawT &o) : id(o.id), + avoid(o.avoid), prog_data(o.prog_data), exec_opts((o.exec_opts) ? new rpc::ExecOptsRaw(*o.exec_opts) : nullptr), flags(o.flags), @@ -3179,6 +3206,7 @@ inline ExecRequestRawT::ExecRequestRawT(const ExecRequestRawT &o) inline ExecRequestRawT &ExecRequestRawT::operator=(ExecRequestRawT o) FLATBUFFERS_NOEXCEPT { std::swap(id, o.id); + std::swap(avoid, o.avoid); std::swap(prog_data, o.prog_data); std::swap(exec_opts, o.exec_opts); std::swap(flags, o.flags); @@ -3196,6 +3224,7 @@ inline void ExecRequestRaw::UnPackTo(ExecRequestRawT *_o, const flatbuffers::res (void)_o; (void)_resolver; { auto _e = id(); _o->id = _e; } + { auto _e = avoid(); _o->avoid = _e; } { auto _e = prog_data(); if (_e) { _o->prog_data.resize(_e->size()); std::copy(_e->begin(), _e->end(), _o->prog_data.begin()); } } { auto _e = exec_opts(); if (_e) _o->exec_opts = std::unique_ptr(new rpc::ExecOptsRaw(*_e)); } { auto _e = flags(); _o->flags = _e; } @@ -3211,6 +3240,7 @@ inline flatbuffers::Offset CreateExecRequestRaw(flatbuffers::Fla (void)_o; struct _VectorArgs { flatbuffers::FlatBufferBuilder *__fbb; const ExecRequestRawT* __o; const flatbuffers::rehasher_function_t *__rehasher; } _va = { &_fbb, _o, _rehasher}; (void)_va; auto _id = _o->id; + auto _avoid = _o->avoid; auto _prog_data = _o->prog_data.size() ? _fbb.CreateVector(_o->prog_data) : 0; auto _exec_opts = _o->exec_opts ? _o->exec_opts.get() : nullptr; auto _flags = _o->flags; @@ -3218,6 +3248,7 @@ inline flatbuffers::Offset CreateExecRequestRaw(flatbuffers::Fla return rpc::CreateExecRequestRaw( _fbb, _id, + _avoid, _prog_data, _exec_opts, _flags, @@ -3428,6 +3459,7 @@ inline flatbuffers::Offset CreateProgInfoRaw(flatbuffers::FlatBuffe inline ExecResultRawT::ExecResultRawT(const ExecResultRawT &o) : id(o.id), + proc(o.proc), output(o.output), error(o.error), info((o.info) ? new rpc::ProgInfoRawT(*o.info) : nullptr) { @@ -3435,6 +3467,7 @@ inline ExecResultRawT::ExecResultRawT(const ExecResultRawT &o) inline ExecResultRawT &ExecResultRawT::operator=(ExecResultRawT o) FLATBUFFERS_NOEXCEPT { std::swap(id, o.id); + std::swap(proc, o.proc); std::swap(output, o.output); std::swap(error, o.error); std::swap(info, o.info); @@ -3451,6 +3484,7 @@ inline void ExecResultRaw::UnPackTo(ExecResultRawT *_o, const flatbuffers::resol (void)_o; (void)_resolver; { auto _e = id(); _o->id = _e; } + { auto _e = proc(); _o->proc = _e; } { auto _e = output(); if (_e) { _o->output.resize(_e->size()); std::copy(_e->begin(), _e->end(), _o->output.begin()); } } { auto _e = error(); if (_e) _o->error = _e->str(); } { auto _e = info(); if (_e) _o->info = std::unique_ptr(_e->UnPack(_resolver)); } @@ -3465,12 +3499,14 @@ inline flatbuffers::Offset CreateExecResultRaw(flatbuffers::FlatB (void)_o; struct _VectorArgs { flatbuffers::FlatBufferBuilder *__fbb; const ExecResultRawT* __o; const flatbuffers::rehasher_function_t *__rehasher; } _va = { &_fbb, _o, _rehasher}; (void)_va; auto _id = _o->id; + auto _proc = _o->proc; auto _output = _o->output.size() ? _fbb.CreateVector(_o->output) : 0; auto _error = _o->error.empty() ? 0 : _fbb.CreateString(_o->error); auto _info = _o->info ? CreateProgInfoRaw(_fbb, _o->info.get(), _rehasher) : 0; return rpc::CreateExecResultRaw( _fbb, _id, + _proc, _output, _error, _info); diff --git a/pkg/flatrpc/helpers.go b/pkg/flatrpc/helpers.go index 697f9eefa..5fb86e44a 100644 --- a/pkg/flatrpc/helpers.go +++ b/pkg/flatrpc/helpers.go @@ -9,6 +9,8 @@ import ( "sync/atomic" "syscall" "unsafe" + + "github.com/google/syzkaller/prog" ) const AllFeatures = ^Feature(0) @@ -39,6 +41,13 @@ type ProgInfo = ProgInfoRawT type ExecResult = ExecResultRawT type StateResult = StateResultRawT +func init() { + var req ExecRequest + if prog.MaxPids > unsafe.Sizeof(req.Avoid)*8 { + panic("all procs won't fit ito ExecRequest.Avoid") + } +} + func (pi *ProgInfo) Clone() *ProgInfo { if pi == nil { return nil diff --git a/pkg/fuzzer/fuzzer.go b/pkg/fuzzer/fuzzer.go index 88e4a5965..17b3d3391 100644 --- a/pkg/fuzzer/fuzzer.go +++ b/pkg/fuzzer/fuzzer.go @@ -137,10 +137,11 @@ func (fuzzer *Fuzzer) processResult(req *queue.Request, res *queue.Result, flags queue, stat = fuzzer.triageCandidateQueue, fuzzer.statJobsTriageCandidate } fuzzer.startJob(stat, &triageJob{ - p: req.Prog.Clone(), - flags: flags, - queue: queue.Append(), - calls: triage, + p: req.Prog.Clone(), + executor: res.Executor, + flags: flags, + queue: queue.Append(), + calls: triage, }) } } diff --git a/pkg/fuzzer/job.go b/pkg/fuzzer/job.go index 8586c6861..85d2dcfe6 100644 --- a/pkg/fuzzer/job.go +++ b/pkg/fuzzer/job.go @@ -54,10 +54,11 @@ func mutateProgRequest(fuzzer *Fuzzer, rnd *rand.Rand) *queue.Request { // During triage we understand if these programs in fact give new coverage, // and if yes, minimize them and add to corpus. type triageJob struct { - p *prog.Prog - flags ProgFlags - fuzzer *Fuzzer - queue queue.Executor + p *prog.Prog + executor queue.ExecutorID + flags ProgFlags + fuzzer *Fuzzer + queue queue.Executor // Set of calls that gave potential new coverage. calls map[int]*triageCall } @@ -179,6 +180,7 @@ func (job *triageJob) handleCall(call int, info *triageCall) { } func (job *triageJob) deflake(exec func(*queue.Request, ProgFlags) *queue.Result) (stop bool) { + avoid := []queue.ExecutorID{job.executor} needRuns := deflakeNeedCorpusRuns if job.fuzzer.Config.Snapshot { needRuns = deflakeNeedSnapshotRuns @@ -201,11 +203,13 @@ func (job *triageJob) deflake(exec func(*queue.Request, ProgFlags) *queue.Result Prog: job.p, ExecOpts: setFlags(flatrpc.ExecFlagCollectCover | flatrpc.ExecFlagCollectSignal), ReturnAllSignal: indices, + Avoid: avoid, Stat: job.fuzzer.statExecTriage, }, progInTriage) if result.Stop() { return true } + avoid = append(avoid, result.Executor) if result.Info == nil { continue // the program has failed } diff --git a/pkg/fuzzer/queue/distributor.go b/pkg/fuzzer/queue/distributor.go new file mode 100644 index 000000000..e6c22df79 --- /dev/null +++ b/pkg/fuzzer/queue/distributor.go @@ -0,0 +1,132 @@ +// Copyright 2024 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package queue + +import ( + "sync" + "sync/atomic" + + "github.com/google/syzkaller/pkg/stat" +) + +// Distributor distributes requests to different VMs during input triage +// (allows to avoid already used VMs). +type Distributor struct { + source Source + seq atomic.Uint64 + empty atomic.Bool + active atomic.Pointer[[]atomic.Uint64] + mu sync.Mutex + queue []*Request + statDelayed *stat.Val + statUndelayed *stat.Val + statViolated *stat.Val +} + +func Distribute(source Source) *Distributor { + return &Distributor{ + source: source, + statDelayed: stat.New("distributor delayed", "Number of test programs delayed due to VM avoidance", + stat.Graph("distributor")), + statUndelayed: stat.New("distributor undelayed", "Number of test programs undelayed for VM avoidance", + stat.Graph("distributor")), + statViolated: stat.New("distributor violated", "Number of test programs violated VM avoidance", + stat.Graph("distributor")), + } +} + +// Next returns the next request to execute on the given vm. +func (dist *Distributor) Next(vm int) *Request { + dist.noteActive(vm) + if req := dist.delayed(vm); req != nil { + return req + } + for { + req := dist.source.Next() + if req == nil || !contains(req.Avoid, vm) || !dist.hasOtherActive(req.Avoid) { + return req + } + dist.delay(req) + } +} + +func (dist *Distributor) delay(req *Request) { + dist.mu.Lock() + defer dist.mu.Unlock() + req.delayedSince = dist.seq.Load() + dist.queue = append(dist.queue, req) + dist.statDelayed.Add(1) + dist.empty.Store(false) +} + +func (dist *Distributor) delayed(vm int) *Request { + if dist.empty.Load() { + return nil + } + dist.mu.Lock() + defer dist.mu.Unlock() + seq := dist.seq.Load() + for i, req := range dist.queue { + violation := contains(req.Avoid, vm) + // The delayedSince check protects from a situation when we had another VM available, + // and delayed a request, but then the VM was taken for reproduction and does not + // serve requests any more. If we could not dispatch a request in 1000 attempts, + // we gave up and give it to any VM. + if violation && req.delayedSince+1000 > seq { + continue + } + dist.statUndelayed.Add(1) + if violation { + dist.statViolated.Add(1) + } + last := len(dist.queue) - 1 + dist.queue[i] = dist.queue[last] + dist.queue[last] = nil + dist.queue = dist.queue[:last] + dist.empty.Store(len(dist.queue) == 0) + return req + } + return nil +} + +func (dist *Distributor) noteActive(vm int) { + active := dist.active.Load() + if active == nil || len(*active) <= vm { + dist.mu.Lock() + active = dist.active.Load() + if active == nil || len(*active) <= vm { + tmp := make([]atomic.Uint64, vm+10) + active = &tmp + dist.active.Store(active) + } + dist.mu.Unlock() + } + (*active)[vm].Store(dist.seq.Add(1)) +} + +// hasOtherActive says if we recently seen activity from VMs not in the set. +func (dist *Distributor) hasOtherActive(set []ExecutorID) bool { + seq := dist.seq.Load() + active := *dist.active.Load() + for vm := range active { + if contains(set, vm) { + continue + } + // 1000 is semi-random notion of recency. + if active[vm].Load()+1000 < seq { + continue + } + return true + } + return false +} + +func contains(set []ExecutorID, vm int) bool { + for _, id := range set { + if id.VM == vm { + return true + } + } + return false +} diff --git a/pkg/fuzzer/queue/distributor_test.go b/pkg/fuzzer/queue/distributor_test.go new file mode 100644 index 000000000..7bbf9c2e7 --- /dev/null +++ b/pkg/fuzzer/queue/distributor_test.go @@ -0,0 +1,48 @@ +// Copyright 2024 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package queue + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDistributor(t *testing.T) { + q := Plain() + dist := Distribute(q) + + req := &Request{} + q.Submit(req) + assert.Equal(t, req, dist.Next(0)) + + q.Submit(req) + assert.Equal(t, req, dist.Next(1)) + + // Avoid VM 0. + req.Avoid = []ExecutorID{{VM: 0}} + q.Submit(req) + var noReq *Request + assert.Equal(t, noReq, dist.Next(0)) + assert.Equal(t, noReq, dist.Next(0)) + assert.Equal(t, req, dist.Next(1)) + + // If only VM 0 queries requests, it should eventually got it. + q.Submit(req) + assert.Equal(t, noReq, dist.Next(0)) + for { + got := dist.Next(0) + if got == req { + break + } + assert.Equal(t, noReq, got) + } + + // If all active VMs are in the avoid set, then they should get + // the request immidiatly. + assert.Equal(t, noReq, dist.Next(1)) + req.Avoid = []ExecutorID{{VM: 0}, {VM: 1}} + q.Submit(req) + assert.Equal(t, req, dist.Next(1)) +} diff --git a/pkg/fuzzer/queue/queue.go b/pkg/fuzzer/queue/queue.go index aadbaade8..cbdb2ba19 100644 --- a/pkg/fuzzer/queue/queue.go +++ b/pkg/fuzzer/queue/queue.go @@ -41,18 +41,28 @@ type Request struct { // Important requests will be retried even from crashed VMs. Important bool + // Avoid specifies set of executors that are preferable to avoid when executing this request. + // The restriction is soft since there can be only one executor at all or available right now. + Avoid []ExecutorID + // The callback will be called on request completion in the LIFO order. // If it returns false, all further processing will be stopped. // It allows wrappers to intercept Done() requests. callback DoneCallback - onceCrashed bool + onceCrashed bool + delayedSince uint64 mu sync.Mutex result *Result done chan struct{} } +type ExecutorID struct { + VM int + Proc int +} + type DoneCallback func(*Request, *Result) bool func (r *Request) OnDone(cb DoneCallback) { @@ -137,10 +147,11 @@ func (r *Request) initChannel() { } type Result struct { - Info *flatrpc.ProgInfo - Output []byte - Status Status - Err error // More details in case of ExecFailure. + Info *flatrpc.ProgInfo + Executor ExecutorID + Output []byte + Status Status + Err error // More details in case of ExecFailure. } func (r *Result) clone() *Result { diff --git a/pkg/rpcserver/rpcserver.go b/pkg/rpcserver/rpcserver.go index 37e74c91b..da3a0f076 100644 --- a/pkg/rpcserver/rpcserver.go +++ b/pkg/rpcserver/rpcserver.go @@ -76,9 +76,12 @@ type Server struct { canonicalModules *cover.Canonicalizer coverFilter []uint64 - mu sync.Mutex - runners map[string]*Runner - execSource queue.Source + mu sync.Mutex + runners map[string]*Runner + execSource *queue.Distributor + // TODO: replace names with indices. + ids map[string]int + idSeq int triagedCorpus atomic.Bool statVMRestarts *stat.Val *runnerStats @@ -140,9 +143,10 @@ func newImpl(ctx context.Context, cfg *Config, mgr Manager) (*Server, error) { sysTarget: sysTarget, timeouts: sysTarget.Timeouts(cfg.Slowdown), runners: make(map[string]*Runner), + ids: make(map[string]int), checker: checker, baseSource: baseSource, - execSource: queue.Retry(baseSource), + execSource: queue.Distribute(queue.Retry(baseSource)), statVMRestarts: stat.New("vm restarts", "Total number of VM starts", stat.Rate{}, stat.NoGraph), @@ -412,6 +416,13 @@ func (serv *Server) CreateInstance(name string, injectExec chan<- bool, updInfo panic(fmt.Sprintf("duplicate instance %s", name)) } serv.runners[name] = runner + id, ok := serv.ids[name] + if !ok { + id = serv.idSeq + serv.idSeq++ + serv.ids[name] = id + } + runner.id = id return runner.resultCh } diff --git a/pkg/rpcserver/runner.go b/pkg/rpcserver/runner.go index a0b519d20..f14a2ec29 100644 --- a/pkg/rpcserver/runner.go +++ b/pkg/rpcserver/runner.go @@ -25,7 +25,8 @@ import ( ) type Runner struct { - source queue.Source + id int + source *queue.Distributor procs int cover bool coverEdges bool @@ -174,7 +175,7 @@ func (runner *Runner) ConnectionLoop() error { } } for len(runner.requests)-len(runner.executing) < 2*runner.procs { - req := runner.source.Next() + req := runner.source.Next(runner.id) if req == nil { break } @@ -320,11 +321,21 @@ func (runner *Runner) sendRequest(req *queue.Request) error { } data = fileData } + var avoid uint64 + for _, id := range req.Avoid { + if id.VM == runner.id { + avoid |= uint64(1 << id.Proc) + } + } + if avoid == (uint64(1)<