diff options
| author | Dmitry Vyukov <dvyukov@google.com> | 2025-11-24 08:05:16 +0100 |
|---|---|---|
| committer | Dmitry Vyukov <dvyukov@google.com> | 2026-01-09 12:51:45 +0000 |
| commit | 533546563b43424f499d2593e18f52d248283b42 (patch) | |
| tree | e92a24f0af4ed789b0cf3b2cfcd007ecfc2de455 | |
| parent | 1e5e6467ab883505a42c23233dc27b2ea2d36da1 (diff) | |
syz-agent: add agentic server
Add server for running agentic workflows as part of syzbot.
The architecture and use are similar to that of syz-ci.
| -rw-r--r-- | Makefile | 5 | ||||
| -rw-r--r-- | pkg/updater/updater.go | 3 | ||||
| -rw-r--r-- | syz-agent/agent.go | 224 |
3 files changed, 230 insertions, 2 deletions
@@ -104,7 +104,7 @@ ifeq ("$(TARGETOS)", "trusty") endif .PHONY: all clean host target \ - manager executor kfuzztest ci hub \ + manager executor kfuzztest ci hub agent \ execprog mutate prog2c trace2syz repro upgrade db \ usbgen symbolize cover kconf syz-build crush \ bin/syz-extract bin/syz-fmt \ @@ -172,6 +172,9 @@ ci: descriptions hub: descriptions GOOS=$(HOSTOS) GOARCH=$(HOSTARCH) $(HOSTGO) build $(GOHOSTFLAGS) -o ./bin/syz-hub github.com/google/syzkaller/syz-hub +agent: descriptions + GOOS=$(HOSTOS) GOARCH=$(HOSTARCH) $(HOSTGO) build $(GOHOSTFLAGS) -o ./bin/syz-agent github.com/google/syzkaller/syz-agent + repro: descriptions GOOS=$(HOSTOS) GOARCH=$(HOSTARCH) $(HOSTGO) build $(GOHOSTFLAGS) -o ./bin/syz-repro github.com/google/syzkaller/tools/syz-repro diff --git a/pkg/updater/updater.go b/pkg/updater/updater.go index bf7b816ca..ed4938609 100644 --- a/pkg/updater/updater.go +++ b/pkg/updater/updater.go @@ -88,6 +88,7 @@ func New(cfg *Config) (*Updater, error) { "tag": true, // contains syzkaller repo git hash "bin/syz-ci": true, // these are just copied from syzkaller dir "bin/syz-manager": true, + "bin/syz-agent": true, "sys/*/test/*": true, } for target := range cfg.Targets { @@ -273,7 +274,7 @@ func (upd *Updater) build(commit *vcs.Commit) error { } } // This will also generate descriptions and should go before the 'go test' below. - cmd := osutil.Command(instance.MakeBin, "host", "ci") + cmd := osutil.Command(instance.MakeBin, "host", "ci", "agent") cmd.Dir = upd.syzkallerDir cmd.Env = append([]string{"GOPATH=" + upd.gopathDir}, os.Environ()...) if _, err := osutil.Run(time.Hour, cmd); err != nil { diff --git a/syz-agent/agent.go b/syz-agent/agent.go new file mode 100644 index 000000000..a1d11445c --- /dev/null +++ b/syz-agent/agent.go @@ -0,0 +1,224 @@ +// Copyright 2025 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "maps" + _ "net/http/pprof" + "path/filepath" + "sync" + "time" + + "github.com/google/syzkaller/dashboard/dashapi" + "github.com/google/syzkaller/pkg/aflow" + _ "github.com/google/syzkaller/pkg/aflow/flow" + "github.com/google/syzkaller/pkg/aflow/trajectory" + "github.com/google/syzkaller/pkg/config" + "github.com/google/syzkaller/pkg/log" + "github.com/google/syzkaller/pkg/mgrconfig" + "github.com/google/syzkaller/pkg/osutil" + "github.com/google/syzkaller/pkg/tool" + "github.com/google/syzkaller/pkg/updater" + "github.com/google/syzkaller/prog" +) + +type Config struct { + // Currently serves only net/http/pprof handlers. + HTTP string `json:"http"` + DashboardAddr string `json:"dashboard_addr"` + DashboardClient string `json:"dashboard_client"` // Global non-namespace client. + DashboardKey string `json:"dashboard_key"` + SyzkallerRepo string `json:"syzkaller_repo"` + SyzkallerBranch string `json:"syzkaller_branch"` + // Pre-built tools/clang/codesearch clang tool. + CodesearchToolBin string `json:"codesearch_tool_bin"` + KernelConfig string `json:"kernel_config"` + Target string `json:"target"` + Image string `json:"image"` + Type string `json:"type"` + VM json.RawMessage `json:"vm"` + // Use fixed base commit for patching jobs (for testing). + FixedBaseCommit string `json:"fixed_base_commit"` + // Use this LLM model (for testing, if empty use a default model). + Model string `json:"model"` +} + +func main() { + var ( + flagConfig = flag.String("config", "", "config file") + flagExitOnUpgrade = flag.Bool("exit-on-upgrade", false, + "exit after a syz-ci upgrade is applied; otherwise syz-ci restarts") + flagAutoUpdate = flag.Bool("autoupdate", true, "auto-update the binary (for testing)") + ) + defer tool.Init()() + log.SetName("syz-agent") + if err := run(*flagConfig, *flagExitOnUpgrade, *flagAutoUpdate); err != nil { + log.Fatal(err) + } +} + +func run(configFile string, exitOnUpgrade, autoUpdate bool) error { + cfg := &Config{ + SyzkallerRepo: "https://github.com/google/syzkaller.git", + SyzkallerBranch: "master", + Model: aflow.DefaultModel, + } + if err := config.LoadFile(configFile, cfg); err != nil { + return fmt.Errorf("failed to load config: %w", err) + } + tool.ServeHTTP(cfg.HTTP) + os, vmarch, arch, _, _, err := mgrconfig.SplitTarget(cfg.Target) + if err != nil { + return err + } + dash, err := dashapi.New(cfg.DashboardClient, cfg.DashboardAddr, cfg.DashboardKey) + if err != nil { + return err + } + buildSem := osutil.NewSemaphore(1) + updater, err := updater.New(&updater.Config{ + ExitOnUpdate: exitOnUpgrade, + BuildSem: buildSem, + SyzkallerRepo: cfg.SyzkallerRepo, + SyzkallerBranch: cfg.SyzkallerBranch, + Targets: map[updater.Target]bool{ + { + OS: os, + VMArch: vmarch, + Arch: arch, + }: true, + }, + }) + if err != nil { + return err + } + updatePending := make(chan struct{}) + shutdownPending := make(chan struct{}) + osutil.HandleInterrupts(shutdownPending) + updater.UpdateOnStart(autoUpdate, updatePending, shutdownPending) + + const workdir = "workdir" + const cacheSize = 1 << 40 // 1TB should be enough for everyone! + cache, err := aflow.NewCache(filepath.Join(workdir, "cache"), cacheSize) + if err != nil { + return err + } + + s := &Server{ + cfg: cfg, + dash: dash, + cache: cache, + workdir: workdir, + } + + ctx, stop := context.WithCancel(context.Background()) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + ok, err := s.poll(ctx) + if err != nil { + log.Error(err) + dash.LogError("syz-agent", "%v", err) + } + var delay time.Duration + if !ok { + // Don't poll dashboard too often, if there are no jobs, + // or errors are happenning. + delay = 10 * time.Second + } + select { + case <-ctx.Done(): + return + case <-time.After(delay): + } + } + }() + + select { + case <-shutdownPending: + case <-updatePending: + } + stop() + wg.Wait() + + select { + case <-shutdownPending: + default: + updater.UpdateAndRestart() + } + return nil +} + +type Server struct { + cfg *Config + dash *dashapi.Dashboard + cache *aflow.Cache + workdir string +} + +func (s *Server) poll(ctx context.Context) ( + bool, error) { + req := &dashapi.AIJobPollReq{ + LLMModel: s.cfg.Model, + CodeRevision: prog.GitRevision, + } + for _, flow := range aflow.Flows { + req.Workflows = append(req.Workflows, dashapi.AIWorkflow{ + Type: flow.Type, + Name: flow.Name, + }) + } + resp, err := s.dash.AIJobPoll(req) + if err != nil { + return false, err + } + if resp.ID == "" { + return false, nil + } + doneReq := &dashapi.AIJobDoneReq{ + ID: resp.ID, + } + results, jobErr := s.executeJob(ctx, resp) + doneReq.Results = results + if jobErr != nil { + doneReq.Error = jobErr.Error() + } + if err := s.dash.AIJobDone(doneReq); err != nil { + return false, err + } + if jobErr != nil { + return false, jobErr + } + return true, nil +} + +func (s *Server) executeJob(ctx context.Context, req *dashapi.AIJobPollResp) (map[string]any, error) { + flow := aflow.Flows[req.Workflow] + if flow == nil { + return nil, fmt.Errorf("unsupported flow %q", req.Workflow) + } + inputs := map[string]any{ + "Syzkaller": osutil.Abs(filepath.FromSlash("syzkaller/current")), + "CodesearchToolBin": s.cfg.CodesearchToolBin, + "Image": s.cfg.Image, + "Type": s.cfg.Type, + "VM": s.cfg.VM, + "FixedBaseCommit": s.cfg.FixedBaseCommit, + } + maps.Insert(inputs, maps.All(req.Args)) + onEvent := func(span *trajectory.Span) error { + log.Logf(0, "%v", span) + return s.dash.AITrajectoryLog(&dashapi.AITrajectoryReq{ + JobID: req.ID, + Span: span, + }) + } + return flow.Execute(ctx, s.cfg.Model, s.workdir, inputs, s.cache, onEvent) +} |
