aboutsummaryrefslogtreecommitdiffstats
path: root/syz-cluster/controller
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2025-02-13 13:59:19 +0100
committerAleksandr Nogikh <nogikh@google.com>2025-02-14 13:40:12 +0000
commiteaf86f3f4dc8a7190abf09fe840e20bcf83709d8 (patch)
treec28d030a8923833ffc39005b1a57946cd48fea62 /syz-cluster/controller
parent59c86b9e1c7a0f91fbb1b680676f33b4cc7bf137 (diff)
syz-cluster/controller: move the API server to pkg/controller
This will facilitate its reuse in tests.
Diffstat (limited to 'syz-cluster/controller')
-rw-r--r--syz-cluster/controller/api.go198
-rw-r--r--syz-cluster/controller/api_test.go141
-rw-r--r--syz-cluster/controller/main.go3
-rw-r--r--syz-cluster/controller/processor_test.go12
4 files changed, 7 insertions, 347 deletions
diff --git a/syz-cluster/controller/api.go b/syz-cluster/controller/api.go
deleted file mode 100644
index cddaaf2e2..000000000
--- a/syz-cluster/controller/api.go
+++ /dev/null
@@ -1,198 +0,0 @@
-// Copyright 2024 syzkaller project authors. All rights reserved.
-// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
-
-// nolint: dupl // The methods look similar, but extracting the common parts will only make the code worse.
-package main
-
-import (
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "net/http"
-
- "github.com/google/syzkaller/syz-cluster/pkg/api"
- "github.com/google/syzkaller/syz-cluster/pkg/app"
- "github.com/google/syzkaller/syz-cluster/pkg/service"
-)
-
-type ControllerAPI struct {
- seriesService *service.SeriesService
- sessionService *service.SessionService
- buildService *service.BuildService
- testService *service.SessionTestService
- findingService *service.FindingService
-}
-
-func NewControllerAPI(env *app.AppEnvironment) *ControllerAPI {
- return &ControllerAPI{
- seriesService: service.NewSeriesService(env),
- sessionService: service.NewSessionService(env),
- buildService: service.NewBuildService(env),
- testService: service.NewSessionTestService(env),
- findingService: service.NewFindingService(env),
- }
-}
-
-func (c ControllerAPI) Mux() *http.ServeMux {
- mux := http.NewServeMux()
- mux.HandleFunc("/sessions/{session_id}/series", c.getSessionSeries)
- mux.HandleFunc("/sessions/{session_id}/skip", c.skipSession)
- mux.HandleFunc("/sessions/upload", c.uploadSession)
- mux.HandleFunc("/series/{series_id}", c.getSeries)
- mux.HandleFunc("/builds/last", c.getLastBuild)
- mux.HandleFunc("/builds/upload", c.uploadBuild)
- mux.HandleFunc("/tests/upload", c.uploadTest)
- mux.HandleFunc("/findings/upload", c.uploadFinding)
- mux.HandleFunc("/series/upload", c.uploadSeries)
- return mux
-}
-
-func (c ControllerAPI) getSessionSeries(w http.ResponseWriter, r *http.Request) {
- resp, err := c.seriesService.GetSessionSeries(r.Context(), r.PathValue("session_id"))
- if err == service.ErrSeriesNotFound || err == service.ErrSessionNotFound {
- http.Error(w, fmt.Sprint(err), http.StatusNotFound)
- return
- } else if err != nil {
- http.Error(w, fmt.Sprint(err), http.StatusInternalServerError)
- return
- }
- reply(w, resp)
-}
-
-func (c ControllerAPI) skipSession(w http.ResponseWriter, r *http.Request) {
- req := parseBody[api.SkipRequest](w, r)
- if req == nil {
- return
- }
- err := c.sessionService.SkipSession(r.Context(), r.PathValue("session_id"), req)
- if errors.Is(err, service.ErrSessionNotFound) {
- http.Error(w, fmt.Sprint(err), http.StatusNotFound)
- return
- } else if err != nil {
- http.Error(w, fmt.Sprint(err), http.StatusInternalServerError)
- return
- }
- reply[interface{}](w, nil)
-}
-
-func (c ControllerAPI) getSeries(w http.ResponseWriter, r *http.Request) {
- resp, err := c.seriesService.GetSeries(r.Context(), r.PathValue("series_id"))
- if errors.Is(err, service.ErrSeriesNotFound) {
- http.Error(w, fmt.Sprint(err), http.StatusNotFound)
- return
- } else if err != nil {
- http.Error(w, fmt.Sprint(err), http.StatusInternalServerError)
- return
- }
- reply(w, resp)
-}
-
-func (c ControllerAPI) uploadBuild(w http.ResponseWriter, r *http.Request) {
- req := parseBody[api.UploadBuildReq](w, r)
- if req == nil {
- return
- }
- resp, err := c.buildService.Upload(r.Context(), req)
- if err != nil {
- // TODO: sometimes it's not StatusInternalServerError.
- http.Error(w, fmt.Sprint(err), http.StatusInternalServerError)
- return
- }
- reply(w, resp)
-}
-
-func (c ControllerAPI) uploadTest(w http.ResponseWriter, r *http.Request) {
- req := parseBody[api.TestResult](w, r)
- if req == nil {
- return
- }
- // TODO: add parameters validation (and also of the Log size).
- err := c.testService.Save(r.Context(), req)
- if err != nil {
- http.Error(w, fmt.Sprint(err), http.StatusInternalServerError)
- return
- }
- reply[interface{}](w, nil)
-}
-
-func (c ControllerAPI) uploadFinding(w http.ResponseWriter, r *http.Request) {
- req := parseBody[api.Finding](w, r)
- if req == nil {
- return
- }
- // TODO: add parameters validation.
- err := c.findingService.Save(r.Context(), req)
- if err != nil {
- http.Error(w, fmt.Sprint(err), http.StatusInternalServerError)
- return
- }
- reply[interface{}](w, nil)
-}
-
-func (c ControllerAPI) getLastBuild(w http.ResponseWriter, r *http.Request) {
- req := parseBody[api.LastBuildReq](w, r)
- if req == nil {
- return
- }
- resp, err := c.buildService.LastBuild(r.Context(), req)
- if err != nil {
- http.Error(w, fmt.Sprint(err), http.StatusInternalServerError)
- return
- }
- reply[*api.Build](w, resp)
-}
-
-func (c ControllerAPI) uploadSeries(w http.ResponseWriter, r *http.Request) {
- req := parseBody[api.Series](w, r)
- if req == nil {
- return
- }
- resp, err := c.seriesService.UploadSeries(r.Context(), req)
- if err != nil {
- http.Error(w, fmt.Sprint(err), http.StatusInternalServerError)
- return
- }
- reply[*api.UploadSeriesResp](w, resp)
-}
-
-func (c ControllerAPI) uploadSession(w http.ResponseWriter, r *http.Request) {
- req := parseBody[api.NewSession](w, r)
- if req == nil {
- return
- }
- resp, err := c.sessionService.UploadSession(r.Context(), req)
- if err != nil {
- http.Error(w, fmt.Sprint(err), http.StatusInternalServerError)
- return
- }
- reply[*api.UploadSessionResp](w, resp)
-}
-
-func reply[T any](w http.ResponseWriter, resp T) {
- w.Header().Set("Content-Type", "application/json")
- err := json.NewEncoder(w).Encode(resp)
- if err != nil {
- http.Error(w, "failed to serialize the response", http.StatusInternalServerError)
- return
- }
-}
-
-func parseBody[T any](w http.ResponseWriter, r *http.Request) *T {
- if r.Method != http.MethodPost {
- http.Error(w, "must be called via POST", http.StatusMethodNotAllowed)
- return nil
- }
- body, err := io.ReadAll(r.Body)
- if err != nil {
- http.Error(w, "failed to read body", http.StatusBadRequest)
- return nil
- }
- var data T
- err = json.Unmarshal(body, &data)
- if err != nil {
- http.Error(w, "invalid body", http.StatusBadRequest)
- return nil
- }
- return &data
-}
diff --git a/syz-cluster/controller/api_test.go b/syz-cluster/controller/api_test.go
deleted file mode 100644
index 24c113b12..000000000
--- a/syz-cluster/controller/api_test.go
+++ /dev/null
@@ -1,141 +0,0 @@
-// Copyright 2024 syzkaller project authors. All rights reserved.
-// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
-
-package main
-
-import (
- "context"
- "net/http/httptest"
- "testing"
- "time"
-
- "github.com/google/syzkaller/syz-cluster/pkg/api"
- "github.com/google/syzkaller/syz-cluster/pkg/app"
- "github.com/stretchr/testify/assert"
-)
-
-func TestAPIGetSeries(t *testing.T) {
- env, ctx := app.TestEnvironment(t)
- apiServer := NewControllerAPI(env)
- server := httptest.NewServer(apiServer.Mux())
- defer server.Close()
-
- client := api.NewClient(server.URL)
- seriesID, sessionID := uploadSeries(t, ctx, client, testSeries)
-
- ret, err := client.GetSessionSeries(ctx, sessionID)
- assert.NoError(t, err)
- ret.ID = ""
- assert.Equal(t, testSeries, ret)
-
- ret, err = client.GetSeries(ctx, seriesID)
- assert.NoError(t, err)
- ret.ID = ""
- assert.Equal(t, testSeries, ret)
-}
-
-func TestAPISuccessfulBuild(t *testing.T) {
- env, ctx := app.TestEnvironment(t)
- apiServer := NewControllerAPI(env)
- server := httptest.NewServer(apiServer.Mux())
- defer server.Close()
-
- client := api.NewClient(server.URL)
- buildInfo, _ := uploadTestBuild(t, client)
- info, err := client.LastSuccessfulBuild(ctx, &api.LastBuildReq{
- Arch: buildInfo.Arch,
- TreeName: buildInfo.TreeName,
- ConfigName: buildInfo.ConfigName,
- })
- assert.NoError(t, err)
- assert.Equal(t, buildInfo, info)
-}
-
-func TestAPISaveFinding(t *testing.T) {
- env, ctx := app.TestEnvironment(t)
- apiServer := NewControllerAPI(env)
- server := httptest.NewServer(apiServer.Mux())
- defer server.Close()
-
- client := api.NewClient(server.URL)
-
- _, sessionID := uploadSeries(t, ctx, client, testSeries)
- _, buildResp := uploadTestBuild(t, client)
- err := client.UploadTestResult(ctx, &api.TestResult{
- SessionID: sessionID,
- BaseBuildID: buildResp.ID,
- TestName: "test",
- Result: api.TestRunning,
- Log: []byte("some log"),
- })
- assert.NoError(t, err)
-
- t.Run("not existing test", func(t *testing.T) {
- err = client.UploadFinding(ctx, &api.Finding{
- SessionID: sessionID,
- TestName: "unknown test",
- })
- assert.Error(t, err)
- })
-
- t.Run("must succeed", func(t *testing.T) {
- finding := &api.Finding{
- SessionID: sessionID,
- TestName: "test",
- Report: []byte("report"),
- Log: []byte("log"),
- }
- err = client.UploadFinding(ctx, finding)
- assert.NoError(t, err)
- // Even if the finding is reported the second time, it must still not fail.
- err = client.UploadFinding(ctx, finding)
- assert.NoError(t, err)
- })
-}
-
-func uploadTestBuild(t *testing.T, client *api.Client) (*api.Build, *api.UploadBuildResp) {
- buildInfo := &api.Build{
- Arch: "amd64",
- TreeName: "mainline",
- ConfigName: "config",
- CommitHash: "abcd",
- CommitDate: time.Date(2020, time.January, 1, 3, 0, 0, 0, time.UTC),
- BuildSuccess: true,
- }
- ret, err := client.UploadBuild(context.Background(), &api.UploadBuildReq{
- Build: *buildInfo,
- })
- assert.NoError(t, err)
- assert.NotEmpty(t, ret.ID)
- return buildInfo, ret
-}
-
-// Returns a (session ID, series ID) tuple.
-func uploadSeries(t *testing.T, ctx context.Context, client *api.Client, series *api.Series) (string, string) {
- retSeries, err := client.UploadSeries(ctx, series)
- assert.NoError(t, err)
- retSession, err := client.UploadSession(ctx, &api.NewSession{
- ExtID: series.ExtID,
- })
- assert.NoError(t, err)
- return retSeries.ID, retSession.ID
-}
-
-var testSeries = &api.Series{
- ExtID: "ext-id",
- AuthorEmail: "some@email.com",
- Title: "test series name",
- Version: 2,
- PublishedAt: time.Date(2020, time.January, 1, 3, 0, 0, 0, time.UTC),
- Cc: []string{"email"},
- Patches: []api.SeriesPatch{
- {
- Seq: 1,
- Body: []byte("first content"),
- },
- {
- Seq: 2,
- Body: []byte("second content"),
- },
- },
-}
diff --git a/syz-cluster/controller/main.go b/syz-cluster/controller/main.go
index 9667c2239..525c02da0 100644
--- a/syz-cluster/controller/main.go
+++ b/syz-cluster/controller/main.go
@@ -11,6 +11,7 @@ import (
"net/http"
"github.com/google/syzkaller/syz-cluster/pkg/app"
+ "github.com/google/syzkaller/syz-cluster/pkg/controller"
)
func main() {
@@ -24,7 +25,7 @@ func main() {
err := sp.Loop(ctx)
app.Fatalf("processor loop failed: %v", err)
}()
- api := NewControllerAPI(env)
+ api := controller.NewAPIServer(env)
log.Printf("listening on port 8080")
app.Fatalf("listen failed: %v", http.ListenAndServe(":8080", api.Mux()))
}
diff --git a/syz-cluster/controller/processor_test.go b/syz-cluster/controller/processor_test.go
index f56108d3f..ff85ebec9 100644
--- a/syz-cluster/controller/processor_test.go
+++ b/syz-cluster/controller/processor_test.go
@@ -6,13 +6,13 @@ package main
import (
"context"
"fmt"
- "net/http/httptest"
"sync"
"testing"
"time"
"github.com/google/syzkaller/syz-cluster/pkg/api"
"github.com/google/syzkaller/syz-cluster/pkg/app"
+ "github.com/google/syzkaller/syz-cluster/pkg/controller"
"github.com/google/syzkaller/syz-cluster/pkg/db"
"github.com/google/syzkaller/syz-cluster/pkg/workflow"
"github.com/stretchr/testify/assert"
@@ -42,7 +42,7 @@ func TestProcessor(t *testing.T) {
})
}
for _, series := range allSeries[0:5] {
- uploadSeries(t, ctx, client, series)
+ controller.UploadTestSeries(t, ctx, client, series)
}
// Let some workflows finish.
@@ -66,7 +66,7 @@ func TestProcessor(t *testing.T) {
// Add some more series.
for _, series := range allSeries[5:10] {
- uploadSeries(t, ctx, client, series)
+ controller.UploadTestSeries(t, ctx, client, series)
}
// Finish all of them.
@@ -143,14 +143,12 @@ func newMockedWorkflows() *mockedWorkflows {
func prepareProcessorTest(t *testing.T, workflows workflow.Service) (*SeriesProcessor,
*api.Client, context.Context) {
env, ctx := app.TestEnvironment(t)
- apiServer := NewControllerAPI(env)
- server := httptest.NewServer(apiServer.Mux())
- t.Cleanup(server.Close)
+ client := controller.TestServer(t, env)
return &SeriesProcessor{
seriesRepo: db.NewSeriesRepository(env.Spanner),
sessionRepo: db.NewSessionRepository(env.Spanner),
workflows: workflows,
dbPollInterval: time.Second / 10,
parallelWorkers: 2,
- }, api.NewClient(server.URL), ctx
+ }, client, ctx
}