aboutsummaryrefslogtreecommitdiffstats
path: root/pkg
diff options
context:
space:
mode:
authorDmitry Vyukov <dvyukov@google.com>2024-07-01 14:26:05 +0200
committerDmitry Vyukov <dvyukov@google.com>2024-08-02 13:16:51 +0000
commit66fcb0a84fcd55ad8e1444cdd0bc0ad6592f7329 (patch)
tree998e52d5569938e0251da1eb7c54c3746186b488 /pkg
parent1e9c4cf3ae82ef82220af312606fffe65e124563 (diff)
pkg/fuzzer: try to triage on different VMs
Distribute triage requests to different VMs.
Diffstat (limited to 'pkg')
-rw-r--r--pkg/flatrpc/flatrpc.fbs3
-rw-r--r--pkg/flatrpc/flatrpc.go86
-rw-r--r--pkg/flatrpc/flatrpc.h50
-rw-r--r--pkg/flatrpc/helpers.go9
-rw-r--r--pkg/fuzzer/fuzzer.go9
-rw-r--r--pkg/fuzzer/job.go12
-rw-r--r--pkg/fuzzer/queue/distributor.go132
-rw-r--r--pkg/fuzzer/queue/distributor_test.go48
-rw-r--r--pkg/fuzzer/queue/queue.go21
-rw-r--r--pkg/rpcserver/rpcserver.go19
-rw-r--r--pkg/rpcserver/runner.go19
11 files changed, 357 insertions, 51 deletions
diff --git a/pkg/flatrpc/flatrpc.fbs b/pkg/flatrpc/flatrpc.fbs
index bd8b023d9..c7516e624 100644
--- a/pkg/flatrpc/flatrpc.fbs
+++ b/pkg/flatrpc/flatrpc.fbs
@@ -163,6 +163,8 @@ struct ExecOptsRaw {
// Request to execute a test program.
table ExecRequestRaw {
id :int64;
+ // Bitmask of procs to avoid when executing this request, if possible.
+ avoid :uint64;
prog_data :[uint8];
exec_opts :ExecOptsRaw;
flags :RequestFlag;
@@ -239,6 +241,7 @@ table ProgInfoRaw {
// Result of executing a test program.
table ExecResultRaw {
id :int64;
+ proc :int32;
output :[uint8];
error :string;
info :ProgInfoRaw;
diff --git a/pkg/flatrpc/flatrpc.go b/pkg/flatrpc/flatrpc.go
index a53d0075d..aa0aa0257 100644
--- a/pkg/flatrpc/flatrpc.go
+++ b/pkg/flatrpc/flatrpc.go
@@ -1923,6 +1923,7 @@ func CreateExecOptsRaw(builder *flatbuffers.Builder, envFlags ExecEnv, execFlags
type ExecRequestRawT struct {
Id int64 `json:"id"`
+ Avoid uint64 `json:"avoid"`
ProgData []byte `json:"prog_data"`
ExecOpts *ExecOptsRawT `json:"exec_opts"`
Flags RequestFlag `json:"flags"`
@@ -1948,6 +1949,7 @@ func (t *ExecRequestRawT) Pack(builder *flatbuffers.Builder) flatbuffers.UOffset
}
ExecRequestRawStart(builder)
ExecRequestRawAddId(builder, t.Id)
+ ExecRequestRawAddAvoid(builder, t.Avoid)
ExecRequestRawAddProgData(builder, progDataOffset)
execOptsOffset := t.ExecOpts.Pack(builder)
ExecRequestRawAddExecOpts(builder, execOptsOffset)
@@ -1958,6 +1960,7 @@ func (t *ExecRequestRawT) Pack(builder *flatbuffers.Builder) flatbuffers.UOffset
func (rcv *ExecRequestRaw) UnPackTo(t *ExecRequestRawT) {
t.Id = rcv.Id()
+ t.Avoid = rcv.Avoid()
t.ProgData = rcv.ProgDataBytes()
t.ExecOpts = rcv.ExecOpts(nil).UnPack()
t.Flags = rcv.Flags()
@@ -2016,9 +2019,21 @@ func (rcv *ExecRequestRaw) MutateId(n int64) bool {
return rcv._tab.MutateInt64Slot(4, n)
}
-func (rcv *ExecRequestRaw) ProgData(j int) byte {
+func (rcv *ExecRequestRaw) Avoid() uint64 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
+ return rcv._tab.GetUint64(o + rcv._tab.Pos)
+ }
+ return 0
+}
+
+func (rcv *ExecRequestRaw) MutateAvoid(n uint64) bool {
+ return rcv._tab.MutateUint64Slot(6, n)
+}
+
+func (rcv *ExecRequestRaw) ProgData(j int) byte {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
+ if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1))
}
@@ -2026,7 +2041,7 @@ func (rcv *ExecRequestRaw) ProgData(j int) byte {
}
func (rcv *ExecRequestRaw) ProgDataLength() int {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
if o != 0 {
return rcv._tab.VectorLen(o)
}
@@ -2034,7 +2049,7 @@ func (rcv *ExecRequestRaw) ProgDataLength() int {
}
func (rcv *ExecRequestRaw) ProgDataBytes() []byte {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
@@ -2042,7 +2057,7 @@ func (rcv *ExecRequestRaw) ProgDataBytes() []byte {
}
func (rcv *ExecRequestRaw) MutateProgData(j int, n byte) bool {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n)
@@ -2051,7 +2066,7 @@ func (rcv *ExecRequestRaw) MutateProgData(j int, n byte) bool {
}
func (rcv *ExecRequestRaw) ExecOpts(obj *ExecOptsRaw) *ExecOptsRaw {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
if o != 0 {
x := o + rcv._tab.Pos
if obj == nil {
@@ -2064,7 +2079,7 @@ func (rcv *ExecRequestRaw) ExecOpts(obj *ExecOptsRaw) *ExecOptsRaw {
}
func (rcv *ExecRequestRaw) Flags() RequestFlag {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(12))
if o != 0 {
return RequestFlag(rcv._tab.GetUint64(o + rcv._tab.Pos))
}
@@ -2072,11 +2087,11 @@ func (rcv *ExecRequestRaw) Flags() RequestFlag {
}
func (rcv *ExecRequestRaw) MutateFlags(n RequestFlag) bool {
- return rcv._tab.MutateUint64Slot(10, uint64(n))
+ return rcv._tab.MutateUint64Slot(12, uint64(n))
}
func (rcv *ExecRequestRaw) AllSignal(j int) int32 {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(12))
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(14))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.GetInt32(a + flatbuffers.UOffsetT(j*4))
@@ -2085,7 +2100,7 @@ func (rcv *ExecRequestRaw) AllSignal(j int) int32 {
}
func (rcv *ExecRequestRaw) AllSignalLength() int {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(12))
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(14))
if o != 0 {
return rcv._tab.VectorLen(o)
}
@@ -2093,7 +2108,7 @@ func (rcv *ExecRequestRaw) AllSignalLength() int {
}
func (rcv *ExecRequestRaw) MutateAllSignal(j int, n int32) bool {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(12))
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(14))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.MutateInt32(a+flatbuffers.UOffsetT(j*4), n)
@@ -2102,25 +2117,28 @@ func (rcv *ExecRequestRaw) MutateAllSignal(j int, n int32) bool {
}
func ExecRequestRawStart(builder *flatbuffers.Builder) {
- builder.StartObject(5)
+ builder.StartObject(6)
}
func ExecRequestRawAddId(builder *flatbuffers.Builder, id int64) {
builder.PrependInt64Slot(0, id, 0)
}
+func ExecRequestRawAddAvoid(builder *flatbuffers.Builder, avoid uint64) {
+ builder.PrependUint64Slot(1, avoid, 0)
+}
func ExecRequestRawAddProgData(builder *flatbuffers.Builder, progData flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(progData), 0)
+ builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(progData), 0)
}
func ExecRequestRawStartProgDataVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
return builder.StartVector(1, numElems, 1)
}
func ExecRequestRawAddExecOpts(builder *flatbuffers.Builder, execOpts flatbuffers.UOffsetT) {
- builder.PrependStructSlot(2, flatbuffers.UOffsetT(execOpts), 0)
+ builder.PrependStructSlot(3, flatbuffers.UOffsetT(execOpts), 0)
}
func ExecRequestRawAddFlags(builder *flatbuffers.Builder, flags RequestFlag) {
- builder.PrependUint64Slot(3, uint64(flags), 0)
+ builder.PrependUint64Slot(4, uint64(flags), 0)
}
func ExecRequestRawAddAllSignal(builder *flatbuffers.Builder, allSignal flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(4, flatbuffers.UOffsetT(allSignal), 0)
+ builder.PrependUOffsetTSlot(5, flatbuffers.UOffsetT(allSignal), 0)
}
func ExecRequestRawStartAllSignalVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
return builder.StartVector(4, numElems, 4)
@@ -3000,6 +3018,7 @@ func ProgInfoRawEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
type ExecResultRawT struct {
Id int64 `json:"id"`
+ Proc int32 `json:"proc"`
Output []byte `json:"output"`
Error string `json:"error"`
Info *ProgInfoRawT `json:"info"`
@@ -3017,6 +3036,7 @@ func (t *ExecResultRawT) Pack(builder *flatbuffers.Builder) flatbuffers.UOffsetT
infoOffset := t.Info.Pack(builder)
ExecResultRawStart(builder)
ExecResultRawAddId(builder, t.Id)
+ ExecResultRawAddProc(builder, t.Proc)
ExecResultRawAddOutput(builder, outputOffset)
ExecResultRawAddError(builder, errorOffset)
ExecResultRawAddInfo(builder, infoOffset)
@@ -3025,6 +3045,7 @@ func (t *ExecResultRawT) Pack(builder *flatbuffers.Builder) flatbuffers.UOffsetT
func (rcv *ExecResultRaw) UnPackTo(t *ExecResultRawT) {
t.Id = rcv.Id()
+ t.Proc = rcv.Proc()
t.Output = rcv.OutputBytes()
t.Error = string(rcv.Error())
t.Info = rcv.Info(nil).UnPack()
@@ -3078,9 +3099,21 @@ func (rcv *ExecResultRaw) MutateId(n int64) bool {
return rcv._tab.MutateInt64Slot(4, n)
}
-func (rcv *ExecResultRaw) Output(j int) byte {
+func (rcv *ExecResultRaw) Proc() int32 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
+ return rcv._tab.GetInt32(o + rcv._tab.Pos)
+ }
+ return 0
+}
+
+func (rcv *ExecResultRaw) MutateProc(n int32) bool {
+ return rcv._tab.MutateInt32Slot(6, n)
+}
+
+func (rcv *ExecResultRaw) Output(j int) byte {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
+ if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1))
}
@@ -3088,7 +3121,7 @@ func (rcv *ExecResultRaw) Output(j int) byte {
}
func (rcv *ExecResultRaw) OutputLength() int {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
if o != 0 {
return rcv._tab.VectorLen(o)
}
@@ -3096,7 +3129,7 @@ func (rcv *ExecResultRaw) OutputLength() int {
}
func (rcv *ExecResultRaw) OutputBytes() []byte {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
@@ -3104,7 +3137,7 @@ func (rcv *ExecResultRaw) OutputBytes() []byte {
}
func (rcv *ExecResultRaw) MutateOutput(j int, n byte) bool {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n)
@@ -3113,7 +3146,7 @@ func (rcv *ExecResultRaw) MutateOutput(j int, n byte) bool {
}
func (rcv *ExecResultRaw) Error() []byte {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
@@ -3121,7 +3154,7 @@ func (rcv *ExecResultRaw) Error() []byte {
}
func (rcv *ExecResultRaw) Info(obj *ProgInfoRaw) *ProgInfoRaw {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(12))
if o != 0 {
x := rcv._tab.Indirect(o + rcv._tab.Pos)
if obj == nil {
@@ -3134,22 +3167,25 @@ func (rcv *ExecResultRaw) Info(obj *ProgInfoRaw) *ProgInfoRaw {
}
func ExecResultRawStart(builder *flatbuffers.Builder) {
- builder.StartObject(4)
+ builder.StartObject(5)
}
func ExecResultRawAddId(builder *flatbuffers.Builder, id int64) {
builder.PrependInt64Slot(0, id, 0)
}
+func ExecResultRawAddProc(builder *flatbuffers.Builder, proc int32) {
+ builder.PrependInt32Slot(1, proc, 0)
+}
func ExecResultRawAddOutput(builder *flatbuffers.Builder, output flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(output), 0)
+ builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(output), 0)
}
func ExecResultRawStartOutputVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
return builder.StartVector(1, numElems, 1)
}
func ExecResultRawAddError(builder *flatbuffers.Builder, error flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(error), 0)
+ builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(error), 0)
}
func ExecResultRawAddInfo(builder *flatbuffers.Builder, info flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(info), 0)
+ builder.PrependUOffsetTSlot(4, flatbuffers.UOffsetT(info), 0)
}
func ExecResultRawEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
return builder.EndObject()
diff --git a/pkg/flatrpc/flatrpc.h b/pkg/flatrpc/flatrpc.h
index 88defed83..bc22ee63b 100644
--- a/pkg/flatrpc/flatrpc.h
+++ b/pkg/flatrpc/flatrpc.h
@@ -1769,6 +1769,7 @@ flatbuffers::Offset<ExecutorMessageRaw> CreateExecutorMessageRaw(flatbuffers::Fl
struct ExecRequestRawT : public flatbuffers::NativeTable {
typedef ExecRequestRaw TableType;
int64_t id = 0;
+ uint64_t avoid = 0;
std::vector<uint8_t> prog_data{};
std::unique_ptr<rpc::ExecOptsRaw> exec_opts{};
rpc::RequestFlag flags = static_cast<rpc::RequestFlag>(0);
@@ -1784,14 +1785,18 @@ struct ExecRequestRaw FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
typedef ExecRequestRawBuilder Builder;
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
VT_ID = 4,
- VT_PROG_DATA = 6,
- VT_EXEC_OPTS = 8,
- VT_FLAGS = 10,
- VT_ALL_SIGNAL = 12
+ VT_AVOID = 6,
+ VT_PROG_DATA = 8,
+ VT_EXEC_OPTS = 10,
+ VT_FLAGS = 12,
+ VT_ALL_SIGNAL = 14
};
int64_t id() const {
return GetField<int64_t>(VT_ID, 0);
}
+ uint64_t avoid() const {
+ return GetField<uint64_t>(VT_AVOID, 0);
+ }
const flatbuffers::Vector<uint8_t> *prog_data() const {
return GetPointer<const flatbuffers::Vector<uint8_t> *>(VT_PROG_DATA);
}
@@ -1807,6 +1812,7 @@ struct ExecRequestRaw FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyField<int64_t>(verifier, VT_ID, 8) &&
+ VerifyField<uint64_t>(verifier, VT_AVOID, 8) &&
VerifyOffset(verifier, VT_PROG_DATA) &&
verifier.VerifyVector(prog_data()) &&
VerifyField<rpc::ExecOptsRaw>(verifier, VT_EXEC_OPTS, 8) &&
@@ -1827,6 +1833,9 @@ struct ExecRequestRawBuilder {
void add_id(int64_t id) {
fbb_.AddElement<int64_t>(ExecRequestRaw::VT_ID, id, 0);
}
+ void add_avoid(uint64_t avoid) {
+ fbb_.AddElement<uint64_t>(ExecRequestRaw::VT_AVOID, avoid, 0);
+ }
void add_prog_data(flatbuffers::Offset<flatbuffers::Vector<uint8_t>> prog_data) {
fbb_.AddOffset(ExecRequestRaw::VT_PROG_DATA, prog_data);
}
@@ -1853,12 +1862,14 @@ struct ExecRequestRawBuilder {
inline flatbuffers::Offset<ExecRequestRaw> CreateExecRequestRaw(
flatbuffers::FlatBufferBuilder &_fbb,
int64_t id = 0,
+ uint64_t avoid = 0,
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> prog_data = 0,
const rpc::ExecOptsRaw *exec_opts = nullptr,
rpc::RequestFlag flags = static_cast<rpc::RequestFlag>(0),
flatbuffers::Offset<flatbuffers::Vector<int32_t>> all_signal = 0) {
ExecRequestRawBuilder builder_(_fbb);
builder_.add_flags(flags);
+ builder_.add_avoid(avoid);
builder_.add_id(id);
builder_.add_all_signal(all_signal);
builder_.add_exec_opts(exec_opts);
@@ -1869,6 +1880,7 @@ inline flatbuffers::Offset<ExecRequestRaw> CreateExecRequestRaw(
inline flatbuffers::Offset<ExecRequestRaw> CreateExecRequestRawDirect(
flatbuffers::FlatBufferBuilder &_fbb,
int64_t id = 0,
+ uint64_t avoid = 0,
const std::vector<uint8_t> *prog_data = nullptr,
const rpc::ExecOptsRaw *exec_opts = nullptr,
rpc::RequestFlag flags = static_cast<rpc::RequestFlag>(0),
@@ -1878,6 +1890,7 @@ inline flatbuffers::Offset<ExecRequestRaw> CreateExecRequestRawDirect(
return rpc::CreateExecRequestRaw(
_fbb,
id,
+ avoid,
prog_data__,
exec_opts,
flags,
@@ -2355,6 +2368,7 @@ flatbuffers::Offset<ProgInfoRaw> CreateProgInfoRaw(flatbuffers::FlatBufferBuilde
struct ExecResultRawT : public flatbuffers::NativeTable {
typedef ExecResultRaw TableType;
int64_t id = 0;
+ int32_t proc = 0;
std::vector<uint8_t> output{};
std::string error{};
std::unique_ptr<rpc::ProgInfoRawT> info{};
@@ -2369,13 +2383,17 @@ struct ExecResultRaw FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
typedef ExecResultRawBuilder Builder;
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
VT_ID = 4,
- VT_OUTPUT = 6,
- VT_ERROR = 8,
- VT_INFO = 10
+ VT_PROC = 6,
+ VT_OUTPUT = 8,
+ VT_ERROR = 10,
+ VT_INFO = 12
};
int64_t id() const {
return GetField<int64_t>(VT_ID, 0);
}
+ int32_t proc() const {
+ return GetField<int32_t>(VT_PROC, 0);
+ }
const flatbuffers::Vector<uint8_t> *output() const {
return GetPointer<const flatbuffers::Vector<uint8_t> *>(VT_OUTPUT);
}
@@ -2388,6 +2406,7 @@ struct ExecResultRaw FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyField<int64_t>(verifier, VT_ID, 8) &&
+ VerifyField<int32_t>(verifier, VT_PROC, 4) &&
VerifyOffset(verifier, VT_OUTPUT) &&
verifier.VerifyVector(output()) &&
VerifyOffset(verifier, VT_ERROR) &&
@@ -2408,6 +2427,9 @@ struct ExecResultRawBuilder {
void add_id(int64_t id) {
fbb_.AddElement<int64_t>(ExecResultRaw::VT_ID, id, 0);
}
+ void add_proc(int32_t proc) {
+ fbb_.AddElement<int32_t>(ExecResultRaw::VT_PROC, proc, 0);
+ }
void add_output(flatbuffers::Offset<flatbuffers::Vector<uint8_t>> output) {
fbb_.AddOffset(ExecResultRaw::VT_OUTPUT, output);
}
@@ -2431,6 +2453,7 @@ struct ExecResultRawBuilder {
inline flatbuffers::Offset<ExecResultRaw> CreateExecResultRaw(
flatbuffers::FlatBufferBuilder &_fbb,
int64_t id = 0,
+ int32_t proc = 0,
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> output = 0,
flatbuffers::Offset<flatbuffers::String> error = 0,
flatbuffers::Offset<rpc::ProgInfoRaw> info = 0) {
@@ -2439,12 +2462,14 @@ inline flatbuffers::Offset<ExecResultRaw> CreateExecResultRaw(
builder_.add_info(info);
builder_.add_error(error);
builder_.add_output(output);
+ builder_.add_proc(proc);
return builder_.Finish();
}
inline flatbuffers::Offset<ExecResultRaw> CreateExecResultRawDirect(
flatbuffers::FlatBufferBuilder &_fbb,
int64_t id = 0,
+ int32_t proc = 0,
const std::vector<uint8_t> *output = nullptr,
const char *error = nullptr,
flatbuffers::Offset<rpc::ProgInfoRaw> info = 0) {
@@ -2453,6 +2478,7 @@ inline flatbuffers::Offset<ExecResultRaw> CreateExecResultRawDirect(
return rpc::CreateExecResultRaw(
_fbb,
id,
+ proc,
output__,
error__,
info);
@@ -3171,6 +3197,7 @@ inline flatbuffers::Offset<ExecutorMessageRaw> CreateExecutorMessageRaw(flatbuff
inline ExecRequestRawT::ExecRequestRawT(const ExecRequestRawT &o)
: id(o.id),
+ avoid(o.avoid),
prog_data(o.prog_data),
exec_opts((o.exec_opts) ? new rpc::ExecOptsRaw(*o.exec_opts) : nullptr),
flags(o.flags),
@@ -3179,6 +3206,7 @@ inline ExecRequestRawT::ExecRequestRawT(const ExecRequestRawT &o)
inline ExecRequestRawT &ExecRequestRawT::operator=(ExecRequestRawT o) FLATBUFFERS_NOEXCEPT {
std::swap(id, o.id);
+ std::swap(avoid, o.avoid);
std::swap(prog_data, o.prog_data);
std::swap(exec_opts, o.exec_opts);
std::swap(flags, o.flags);
@@ -3196,6 +3224,7 @@ inline void ExecRequestRaw::UnPackTo(ExecRequestRawT *_o, const flatbuffers::res
(void)_o;
(void)_resolver;
{ auto _e = id(); _o->id = _e; }
+ { auto _e = avoid(); _o->avoid = _e; }
{ auto _e = prog_data(); if (_e) { _o->prog_data.resize(_e->size()); std::copy(_e->begin(), _e->end(), _o->prog_data.begin()); } }
{ auto _e = exec_opts(); if (_e) _o->exec_opts = std::unique_ptr<rpc::ExecOptsRaw>(new rpc::ExecOptsRaw(*_e)); }
{ auto _e = flags(); _o->flags = _e; }
@@ -3211,6 +3240,7 @@ inline flatbuffers::Offset<ExecRequestRaw> CreateExecRequestRaw(flatbuffers::Fla
(void)_o;
struct _VectorArgs { flatbuffers::FlatBufferBuilder *__fbb; const ExecRequestRawT* __o; const flatbuffers::rehasher_function_t *__rehasher; } _va = { &_fbb, _o, _rehasher}; (void)_va;
auto _id = _o->id;
+ auto _avoid = _o->avoid;
auto _prog_data = _o->prog_data.size() ? _fbb.CreateVector(_o->prog_data) : 0;
auto _exec_opts = _o->exec_opts ? _o->exec_opts.get() : nullptr;
auto _flags = _o->flags;
@@ -3218,6 +3248,7 @@ inline flatbuffers::Offset<ExecRequestRaw> CreateExecRequestRaw(flatbuffers::Fla
return rpc::CreateExecRequestRaw(
_fbb,
_id,
+ _avoid,
_prog_data,
_exec_opts,
_flags,
@@ -3428,6 +3459,7 @@ inline flatbuffers::Offset<ProgInfoRaw> CreateProgInfoRaw(flatbuffers::FlatBuffe
inline ExecResultRawT::ExecResultRawT(const ExecResultRawT &o)
: id(o.id),
+ proc(o.proc),
output(o.output),
error(o.error),
info((o.info) ? new rpc::ProgInfoRawT(*o.info) : nullptr) {
@@ -3435,6 +3467,7 @@ inline ExecResultRawT::ExecResultRawT(const ExecResultRawT &o)
inline ExecResultRawT &ExecResultRawT::operator=(ExecResultRawT o) FLATBUFFERS_NOEXCEPT {
std::swap(id, o.id);
+ std::swap(proc, o.proc);
std::swap(output, o.output);
std::swap(error, o.error);
std::swap(info, o.info);
@@ -3451,6 +3484,7 @@ inline void ExecResultRaw::UnPackTo(ExecResultRawT *_o, const flatbuffers::resol
(void)_o;
(void)_resolver;
{ auto _e = id(); _o->id = _e; }
+ { auto _e = proc(); _o->proc = _e; }
{ auto _e = output(); if (_e) { _o->output.resize(_e->size()); std::copy(_e->begin(), _e->end(), _o->output.begin()); } }
{ auto _e = error(); if (_e) _o->error = _e->str(); }
{ auto _e = info(); if (_e) _o->info = std::unique_ptr<rpc::ProgInfoRawT>(_e->UnPack(_resolver)); }
@@ -3465,12 +3499,14 @@ inline flatbuffers::Offset<ExecResultRaw> CreateExecResultRaw(flatbuffers::FlatB
(void)_o;
struct _VectorArgs { flatbuffers::FlatBufferBuilder *__fbb; const ExecResultRawT* __o; const flatbuffers::rehasher_function_t *__rehasher; } _va = { &_fbb, _o, _rehasher}; (void)_va;
auto _id = _o->id;
+ auto _proc = _o->proc;
auto _output = _o->output.size() ? _fbb.CreateVector(_o->output) : 0;
auto _error = _o->error.empty() ? 0 : _fbb.CreateString(_o->error);
auto _info = _o->info ? CreateProgInfoRaw(_fbb, _o->info.get(), _rehasher) : 0;
return rpc::CreateExecResultRaw(
_fbb,
_id,
+ _proc,
_output,
_error,
_info);
diff --git a/pkg/flatrpc/helpers.go b/pkg/flatrpc/helpers.go
index 697f9eefa..5fb86e44a 100644
--- a/pkg/flatrpc/helpers.go
+++ b/pkg/flatrpc/helpers.go
@@ -9,6 +9,8 @@ import (
"sync/atomic"
"syscall"
"unsafe"
+
+ "github.com/google/syzkaller/prog"
)
const AllFeatures = ^Feature(0)
@@ -39,6 +41,13 @@ type ProgInfo = ProgInfoRawT
type ExecResult = ExecResultRawT
type StateResult = StateResultRawT
+func init() {
+ var req ExecRequest
+ if prog.MaxPids > unsafe.Sizeof(req.Avoid)*8 {
+ panic("all procs won't fit ito ExecRequest.Avoid")
+ }
+}
+
func (pi *ProgInfo) Clone() *ProgInfo {
if pi == nil {
return nil
diff --git a/pkg/fuzzer/fuzzer.go b/pkg/fuzzer/fuzzer.go
index 88e4a5965..17b3d3391 100644
--- a/pkg/fuzzer/fuzzer.go
+++ b/pkg/fuzzer/fuzzer.go
@@ -137,10 +137,11 @@ func (fuzzer *Fuzzer) processResult(req *queue.Request, res *queue.Result, flags
queue, stat = fuzzer.triageCandidateQueue, fuzzer.statJobsTriageCandidate
}
fuzzer.startJob(stat, &triageJob{
- p: req.Prog.Clone(),
- flags: flags,
- queue: queue.Append(),
- calls: triage,
+ p: req.Prog.Clone(),
+ executor: res.Executor,
+ flags: flags,
+ queue: queue.Append(),
+ calls: triage,
})
}
}
diff --git a/pkg/fuzzer/job.go b/pkg/fuzzer/job.go
index 8586c6861..85d2dcfe6 100644
--- a/pkg/fuzzer/job.go
+++ b/pkg/fuzzer/job.go
@@ -54,10 +54,11 @@ func mutateProgRequest(fuzzer *Fuzzer, rnd *rand.Rand) *queue.Request {
// During triage we understand if these programs in fact give new coverage,
// and if yes, minimize them and add to corpus.
type triageJob struct {
- p *prog.Prog
- flags ProgFlags
- fuzzer *Fuzzer
- queue queue.Executor
+ p *prog.Prog
+ executor queue.ExecutorID
+ flags ProgFlags
+ fuzzer *Fuzzer
+ queue queue.Executor
// Set of calls that gave potential new coverage.
calls map[int]*triageCall
}
@@ -179,6 +180,7 @@ func (job *triageJob) handleCall(call int, info *triageCall) {
}
func (job *triageJob) deflake(exec func(*queue.Request, ProgFlags) *queue.Result) (stop bool) {
+ avoid := []queue.ExecutorID{job.executor}
needRuns := deflakeNeedCorpusRuns
if job.fuzzer.Config.Snapshot {
needRuns = deflakeNeedSnapshotRuns
@@ -201,11 +203,13 @@ func (job *triageJob) deflake(exec func(*queue.Request, ProgFlags) *queue.Result
Prog: job.p,
ExecOpts: setFlags(flatrpc.ExecFlagCollectCover | flatrpc.ExecFlagCollectSignal),
ReturnAllSignal: indices,
+ Avoid: avoid,
Stat: job.fuzzer.statExecTriage,
}, progInTriage)
if result.Stop() {
return true
}
+ avoid = append(avoid, result.Executor)
if result.Info == nil {
continue // the program has failed
}
diff --git a/pkg/fuzzer/queue/distributor.go b/pkg/fuzzer/queue/distributor.go
new file mode 100644
index 000000000..e6c22df79
--- /dev/null
+++ b/pkg/fuzzer/queue/distributor.go
@@ -0,0 +1,132 @@
+// Copyright 2024 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package queue
+
+import (
+ "sync"
+ "sync/atomic"
+
+ "github.com/google/syzkaller/pkg/stat"
+)
+
+// Distributor distributes requests to different VMs during input triage
+// (allows to avoid already used VMs).
+type Distributor struct {
+ source Source
+ seq atomic.Uint64
+ empty atomic.Bool
+ active atomic.Pointer[[]atomic.Uint64]
+ mu sync.Mutex
+ queue []*Request
+ statDelayed *stat.Val
+ statUndelayed *stat.Val
+ statViolated *stat.Val
+}
+
+func Distribute(source Source) *Distributor {
+ return &Distributor{
+ source: source,
+ statDelayed: stat.New("distributor delayed", "Number of test programs delayed due to VM avoidance",
+ stat.Graph("distributor")),
+ statUndelayed: stat.New("distributor undelayed", "Number of test programs undelayed for VM avoidance",
+ stat.Graph("distributor")),
+ statViolated: stat.New("distributor violated", "Number of test programs violated VM avoidance",
+ stat.Graph("distributor")),
+ }
+}
+
+// Next returns the next request to execute on the given vm.
+func (dist *Distributor) Next(vm int) *Request {
+ dist.noteActive(vm)
+ if req := dist.delayed(vm); req != nil {
+ return req
+ }
+ for {
+ req := dist.source.Next()
+ if req == nil || !contains(req.Avoid, vm) || !dist.hasOtherActive(req.Avoid) {
+ return req
+ }
+ dist.delay(req)
+ }
+}
+
+func (dist *Distributor) delay(req *Request) {
+ dist.mu.Lock()
+ defer dist.mu.Unlock()
+ req.delayedSince = dist.seq.Load()
+ dist.queue = append(dist.queue, req)
+ dist.statDelayed.Add(1)
+ dist.empty.Store(false)
+}
+
+func (dist *Distributor) delayed(vm int) *Request {
+ if dist.empty.Load() {
+ return nil
+ }
+ dist.mu.Lock()
+ defer dist.mu.Unlock()
+ seq := dist.seq.Load()
+ for i, req := range dist.queue {
+ violation := contains(req.Avoid, vm)
+ // The delayedSince check protects from a situation when we had another VM available,
+ // and delayed a request, but then the VM was taken for reproduction and does not
+ // serve requests any more. If we could not dispatch a request in 1000 attempts,
+ // we gave up and give it to any VM.
+ if violation && req.delayedSince+1000 > seq {
+ continue
+ }
+ dist.statUndelayed.Add(1)
+ if violation {
+ dist.statViolated.Add(1)
+ }
+ last := len(dist.queue) - 1
+ dist.queue[i] = dist.queue[last]
+ dist.queue[last] = nil
+ dist.queue = dist.queue[:last]
+ dist.empty.Store(len(dist.queue) == 0)
+ return req
+ }
+ return nil
+}
+
+func (dist *Distributor) noteActive(vm int) {
+ active := dist.active.Load()
+ if active == nil || len(*active) <= vm {
+ dist.mu.Lock()
+ active = dist.active.Load()
+ if active == nil || len(*active) <= vm {
+ tmp := make([]atomic.Uint64, vm+10)
+ active = &tmp
+ dist.active.Store(active)
+ }
+ dist.mu.Unlock()
+ }
+ (*active)[vm].Store(dist.seq.Add(1))
+}
+
+// hasOtherActive says if we recently seen activity from VMs not in the set.
+func (dist *Distributor) hasOtherActive(set []ExecutorID) bool {
+ seq := dist.seq.Load()
+ active := *dist.active.Load()
+ for vm := range active {
+ if contains(set, vm) {
+ continue
+ }
+ // 1000 is semi-random notion of recency.
+ if active[vm].Load()+1000 < seq {
+ continue
+ }
+ return true
+ }
+ return false
+}
+
+func contains(set []ExecutorID, vm int) bool {
+ for _, id := range set {
+ if id.VM == vm {
+ return true
+ }
+ }
+ return false
+}
diff --git a/pkg/fuzzer/queue/distributor_test.go b/pkg/fuzzer/queue/distributor_test.go
new file mode 100644
index 000000000..7bbf9c2e7
--- /dev/null
+++ b/pkg/fuzzer/queue/distributor_test.go
@@ -0,0 +1,48 @@
+// Copyright 2024 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package queue
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestDistributor(t *testing.T) {
+ q := Plain()
+ dist := Distribute(q)
+
+ req := &Request{}
+ q.Submit(req)
+ assert.Equal(t, req, dist.Next(0))
+
+ q.Submit(req)
+ assert.Equal(t, req, dist.Next(1))
+
+ // Avoid VM 0.
+ req.Avoid = []ExecutorID{{VM: 0}}
+ q.Submit(req)
+ var noReq *Request
+ assert.Equal(t, noReq, dist.Next(0))
+ assert.Equal(t, noReq, dist.Next(0))
+ assert.Equal(t, req, dist.Next(1))
+
+ // If only VM 0 queries requests, it should eventually got it.
+ q.Submit(req)
+ assert.Equal(t, noReq, dist.Next(0))
+ for {
+ got := dist.Next(0)
+ if got == req {
+ break
+ }
+ assert.Equal(t, noReq, got)
+ }
+
+ // If all active VMs are in the avoid set, then they should get
+ // the request immidiatly.
+ assert.Equal(t, noReq, dist.Next(1))
+ req.Avoid = []ExecutorID{{VM: 0}, {VM: 1}}
+ q.Submit(req)
+ assert.Equal(t, req, dist.Next(1))
+}
diff --git a/pkg/fuzzer/queue/queue.go b/pkg/fuzzer/queue/queue.go
index aadbaade8..cbdb2ba19 100644
--- a/pkg/fuzzer/queue/queue.go
+++ b/pkg/fuzzer/queue/queue.go
@@ -41,18 +41,28 @@ type Request struct {
// Important requests will be retried even from crashed VMs.
Important bool
+ // Avoid specifies set of executors that are preferable to avoid when executing this request.
+ // The restriction is soft since there can be only one executor at all or available right now.
+ Avoid []ExecutorID
+
// The callback will be called on request completion in the LIFO order.
// If it returns false, all further processing will be stopped.
// It allows wrappers to intercept Done() requests.
callback DoneCallback
- onceCrashed bool
+ onceCrashed bool
+ delayedSince uint64
mu sync.Mutex
result *Result
done chan struct{}
}
+type ExecutorID struct {
+ VM int
+ Proc int
+}
+
type DoneCallback func(*Request, *Result) bool
func (r *Request) OnDone(cb DoneCallback) {
@@ -137,10 +147,11 @@ func (r *Request) initChannel() {
}
type Result struct {
- Info *flatrpc.ProgInfo
- Output []byte
- Status Status
- Err error // More details in case of ExecFailure.
+ Info *flatrpc.ProgInfo
+ Executor ExecutorID
+ Output []byte
+ Status Status
+ Err error // More details in case of ExecFailure.
}
func (r *Result) clone() *Result {
diff --git a/pkg/rpcserver/rpcserver.go b/pkg/rpcserver/rpcserver.go
index 37e74c91b..da3a0f076 100644
--- a/pkg/rpcserver/rpcserver.go
+++ b/pkg/rpcserver/rpcserver.go
@@ -76,9 +76,12 @@ type Server struct {
canonicalModules *cover.Canonicalizer
coverFilter []uint64
- mu sync.Mutex
- runners map[string]*Runner
- execSource queue.Source
+ mu sync.Mutex
+ runners map[string]*Runner
+ execSource *queue.Distributor
+ // TODO: replace names with indices.
+ ids map[string]int
+ idSeq int
triagedCorpus atomic.Bool
statVMRestarts *stat.Val
*runnerStats
@@ -140,9 +143,10 @@ func newImpl(ctx context.Context, cfg *Config, mgr Manager) (*Server, error) {
sysTarget: sysTarget,
timeouts: sysTarget.Timeouts(cfg.Slowdown),
runners: make(map[string]*Runner),
+ ids: make(map[string]int),
checker: checker,
baseSource: baseSource,
- execSource: queue.Retry(baseSource),
+ execSource: queue.Distribute(queue.Retry(baseSource)),
statVMRestarts: stat.New("vm restarts", "Total number of VM starts",
stat.Rate{}, stat.NoGraph),
@@ -412,6 +416,13 @@ func (serv *Server) CreateInstance(name string, injectExec chan<- bool, updInfo
panic(fmt.Sprintf("duplicate instance %s", name))
}
serv.runners[name] = runner
+ id, ok := serv.ids[name]
+ if !ok {
+ id = serv.idSeq
+ serv.idSeq++
+ serv.ids[name] = id
+ }
+ runner.id = id
return runner.resultCh
}
diff --git a/pkg/rpcserver/runner.go b/pkg/rpcserver/runner.go
index a0b519d20..f14a2ec29 100644
--- a/pkg/rpcserver/runner.go
+++ b/pkg/rpcserver/runner.go
@@ -25,7 +25,8 @@ import (
)
type Runner struct {
- source queue.Source
+ id int
+ source *queue.Distributor
procs int
cover bool
coverEdges bool
@@ -174,7 +175,7 @@ func (runner *Runner) ConnectionLoop() error {
}
}
for len(runner.requests)-len(runner.executing) < 2*runner.procs {
- req := runner.source.Next()
+ req := runner.source.Next(runner.id)
if req == nil {
break
}
@@ -320,11 +321,21 @@ func (runner *Runner) sendRequest(req *queue.Request) error {
}
data = fileData
}
+ var avoid uint64
+ for _, id := range req.Avoid {
+ if id.VM == runner.id {
+ avoid |= uint64(1 << id.Proc)
+ }
+ }
+ if avoid == (uint64(1)<<runner.procs)-1 {
+ avoid = 0
+ }
msg := &flatrpc.HostMessage{
Msg: &flatrpc.HostMessages{
Type: flatrpc.HostMessagesRawExecRequest,
Value: &flatrpc.ExecRequest{
Id: id,
+ Avoid: avoid,
ProgData: data,
Flags: flags,
ExecOpts: &opts,
@@ -408,6 +419,10 @@ func (runner *Runner) handleExecResult(msg *flatrpc.ExecResult) error {
resErr = errors.New(msg.Error)
}
req.Done(&queue.Result{
+ Executor: queue.ExecutorID{
+ VM: runner.id,
+ Proc: int(msg.Proc),
+ },
Status: status,
Info: msg.Info,
Output: slices.Clone(msg.Output),