diff options
| author | Aleksandr Nogikh <nogikh@google.com> | 2025-02-13 13:59:19 +0100 |
|---|---|---|
| committer | Aleksandr Nogikh <nogikh@google.com> | 2025-02-14 13:40:12 +0000 |
| commit | eaf86f3f4dc8a7190abf09fe840e20bcf83709d8 (patch) | |
| tree | c28d030a8923833ffc39005b1a57946cd48fea62 /syz-cluster/controller | |
| parent | 59c86b9e1c7a0f91fbb1b680676f33b4cc7bf137 (diff) | |
syz-cluster/controller: move the API server to pkg/controller
This will facilitate its reuse in tests.
Diffstat (limited to 'syz-cluster/controller')
| -rw-r--r-- | syz-cluster/controller/api.go | 198 | ||||
| -rw-r--r-- | syz-cluster/controller/api_test.go | 141 | ||||
| -rw-r--r-- | syz-cluster/controller/main.go | 3 | ||||
| -rw-r--r-- | syz-cluster/controller/processor_test.go | 12 |
4 files changed, 7 insertions, 347 deletions
diff --git a/syz-cluster/controller/api.go b/syz-cluster/controller/api.go deleted file mode 100644 index cddaaf2e2..000000000 --- a/syz-cluster/controller/api.go +++ /dev/null @@ -1,198 +0,0 @@ -// Copyright 2024 syzkaller project authors. All rights reserved. -// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. - -// nolint: dupl // The methods look similar, but extracting the common parts will only make the code worse. -package main - -import ( - "encoding/json" - "errors" - "fmt" - "io" - "net/http" - - "github.com/google/syzkaller/syz-cluster/pkg/api" - "github.com/google/syzkaller/syz-cluster/pkg/app" - "github.com/google/syzkaller/syz-cluster/pkg/service" -) - -type ControllerAPI struct { - seriesService *service.SeriesService - sessionService *service.SessionService - buildService *service.BuildService - testService *service.SessionTestService - findingService *service.FindingService -} - -func NewControllerAPI(env *app.AppEnvironment) *ControllerAPI { - return &ControllerAPI{ - seriesService: service.NewSeriesService(env), - sessionService: service.NewSessionService(env), - buildService: service.NewBuildService(env), - testService: service.NewSessionTestService(env), - findingService: service.NewFindingService(env), - } -} - -func (c ControllerAPI) Mux() *http.ServeMux { - mux := http.NewServeMux() - mux.HandleFunc("/sessions/{session_id}/series", c.getSessionSeries) - mux.HandleFunc("/sessions/{session_id}/skip", c.skipSession) - mux.HandleFunc("/sessions/upload", c.uploadSession) - mux.HandleFunc("/series/{series_id}", c.getSeries) - mux.HandleFunc("/builds/last", c.getLastBuild) - mux.HandleFunc("/builds/upload", c.uploadBuild) - mux.HandleFunc("/tests/upload", c.uploadTest) - mux.HandleFunc("/findings/upload", c.uploadFinding) - mux.HandleFunc("/series/upload", c.uploadSeries) - return mux -} - -func (c ControllerAPI) getSessionSeries(w http.ResponseWriter, r *http.Request) { - resp, err := c.seriesService.GetSessionSeries(r.Context(), r.PathValue("session_id")) - if err == service.ErrSeriesNotFound || err == service.ErrSessionNotFound { - http.Error(w, fmt.Sprint(err), http.StatusNotFound) - return - } else if err != nil { - http.Error(w, fmt.Sprint(err), http.StatusInternalServerError) - return - } - reply(w, resp) -} - -func (c ControllerAPI) skipSession(w http.ResponseWriter, r *http.Request) { - req := parseBody[api.SkipRequest](w, r) - if req == nil { - return - } - err := c.sessionService.SkipSession(r.Context(), r.PathValue("session_id"), req) - if errors.Is(err, service.ErrSessionNotFound) { - http.Error(w, fmt.Sprint(err), http.StatusNotFound) - return - } else if err != nil { - http.Error(w, fmt.Sprint(err), http.StatusInternalServerError) - return - } - reply[interface{}](w, nil) -} - -func (c ControllerAPI) getSeries(w http.ResponseWriter, r *http.Request) { - resp, err := c.seriesService.GetSeries(r.Context(), r.PathValue("series_id")) - if errors.Is(err, service.ErrSeriesNotFound) { - http.Error(w, fmt.Sprint(err), http.StatusNotFound) - return - } else if err != nil { - http.Error(w, fmt.Sprint(err), http.StatusInternalServerError) - return - } - reply(w, resp) -} - -func (c ControllerAPI) uploadBuild(w http.ResponseWriter, r *http.Request) { - req := parseBody[api.UploadBuildReq](w, r) - if req == nil { - return - } - resp, err := c.buildService.Upload(r.Context(), req) - if err != nil { - // TODO: sometimes it's not StatusInternalServerError. - http.Error(w, fmt.Sprint(err), http.StatusInternalServerError) - return - } - reply(w, resp) -} - -func (c ControllerAPI) uploadTest(w http.ResponseWriter, r *http.Request) { - req := parseBody[api.TestResult](w, r) - if req == nil { - return - } - // TODO: add parameters validation (and also of the Log size). - err := c.testService.Save(r.Context(), req) - if err != nil { - http.Error(w, fmt.Sprint(err), http.StatusInternalServerError) - return - } - reply[interface{}](w, nil) -} - -func (c ControllerAPI) uploadFinding(w http.ResponseWriter, r *http.Request) { - req := parseBody[api.Finding](w, r) - if req == nil { - return - } - // TODO: add parameters validation. - err := c.findingService.Save(r.Context(), req) - if err != nil { - http.Error(w, fmt.Sprint(err), http.StatusInternalServerError) - return - } - reply[interface{}](w, nil) -} - -func (c ControllerAPI) getLastBuild(w http.ResponseWriter, r *http.Request) { - req := parseBody[api.LastBuildReq](w, r) - if req == nil { - return - } - resp, err := c.buildService.LastBuild(r.Context(), req) - if err != nil { - http.Error(w, fmt.Sprint(err), http.StatusInternalServerError) - return - } - reply[*api.Build](w, resp) -} - -func (c ControllerAPI) uploadSeries(w http.ResponseWriter, r *http.Request) { - req := parseBody[api.Series](w, r) - if req == nil { - return - } - resp, err := c.seriesService.UploadSeries(r.Context(), req) - if err != nil { - http.Error(w, fmt.Sprint(err), http.StatusInternalServerError) - return - } - reply[*api.UploadSeriesResp](w, resp) -} - -func (c ControllerAPI) uploadSession(w http.ResponseWriter, r *http.Request) { - req := parseBody[api.NewSession](w, r) - if req == nil { - return - } - resp, err := c.sessionService.UploadSession(r.Context(), req) - if err != nil { - http.Error(w, fmt.Sprint(err), http.StatusInternalServerError) - return - } - reply[*api.UploadSessionResp](w, resp) -} - -func reply[T any](w http.ResponseWriter, resp T) { - w.Header().Set("Content-Type", "application/json") - err := json.NewEncoder(w).Encode(resp) - if err != nil { - http.Error(w, "failed to serialize the response", http.StatusInternalServerError) - return - } -} - -func parseBody[T any](w http.ResponseWriter, r *http.Request) *T { - if r.Method != http.MethodPost { - http.Error(w, "must be called via POST", http.StatusMethodNotAllowed) - return nil - } - body, err := io.ReadAll(r.Body) - if err != nil { - http.Error(w, "failed to read body", http.StatusBadRequest) - return nil - } - var data T - err = json.Unmarshal(body, &data) - if err != nil { - http.Error(w, "invalid body", http.StatusBadRequest) - return nil - } - return &data -} diff --git a/syz-cluster/controller/api_test.go b/syz-cluster/controller/api_test.go deleted file mode 100644 index 24c113b12..000000000 --- a/syz-cluster/controller/api_test.go +++ /dev/null @@ -1,141 +0,0 @@ -// Copyright 2024 syzkaller project authors. All rights reserved. -// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. - -package main - -import ( - "context" - "net/http/httptest" - "testing" - "time" - - "github.com/google/syzkaller/syz-cluster/pkg/api" - "github.com/google/syzkaller/syz-cluster/pkg/app" - "github.com/stretchr/testify/assert" -) - -func TestAPIGetSeries(t *testing.T) { - env, ctx := app.TestEnvironment(t) - apiServer := NewControllerAPI(env) - server := httptest.NewServer(apiServer.Mux()) - defer server.Close() - - client := api.NewClient(server.URL) - seriesID, sessionID := uploadSeries(t, ctx, client, testSeries) - - ret, err := client.GetSessionSeries(ctx, sessionID) - assert.NoError(t, err) - ret.ID = "" - assert.Equal(t, testSeries, ret) - - ret, err = client.GetSeries(ctx, seriesID) - assert.NoError(t, err) - ret.ID = "" - assert.Equal(t, testSeries, ret) -} - -func TestAPISuccessfulBuild(t *testing.T) { - env, ctx := app.TestEnvironment(t) - apiServer := NewControllerAPI(env) - server := httptest.NewServer(apiServer.Mux()) - defer server.Close() - - client := api.NewClient(server.URL) - buildInfo, _ := uploadTestBuild(t, client) - info, err := client.LastSuccessfulBuild(ctx, &api.LastBuildReq{ - Arch: buildInfo.Arch, - TreeName: buildInfo.TreeName, - ConfigName: buildInfo.ConfigName, - }) - assert.NoError(t, err) - assert.Equal(t, buildInfo, info) -} - -func TestAPISaveFinding(t *testing.T) { - env, ctx := app.TestEnvironment(t) - apiServer := NewControllerAPI(env) - server := httptest.NewServer(apiServer.Mux()) - defer server.Close() - - client := api.NewClient(server.URL) - - _, sessionID := uploadSeries(t, ctx, client, testSeries) - _, buildResp := uploadTestBuild(t, client) - err := client.UploadTestResult(ctx, &api.TestResult{ - SessionID: sessionID, - BaseBuildID: buildResp.ID, - TestName: "test", - Result: api.TestRunning, - Log: []byte("some log"), - }) - assert.NoError(t, err) - - t.Run("not existing test", func(t *testing.T) { - err = client.UploadFinding(ctx, &api.Finding{ - SessionID: sessionID, - TestName: "unknown test", - }) - assert.Error(t, err) - }) - - t.Run("must succeed", func(t *testing.T) { - finding := &api.Finding{ - SessionID: sessionID, - TestName: "test", - Report: []byte("report"), - Log: []byte("log"), - } - err = client.UploadFinding(ctx, finding) - assert.NoError(t, err) - // Even if the finding is reported the second time, it must still not fail. - err = client.UploadFinding(ctx, finding) - assert.NoError(t, err) - }) -} - -func uploadTestBuild(t *testing.T, client *api.Client) (*api.Build, *api.UploadBuildResp) { - buildInfo := &api.Build{ - Arch: "amd64", - TreeName: "mainline", - ConfigName: "config", - CommitHash: "abcd", - CommitDate: time.Date(2020, time.January, 1, 3, 0, 0, 0, time.UTC), - BuildSuccess: true, - } - ret, err := client.UploadBuild(context.Background(), &api.UploadBuildReq{ - Build: *buildInfo, - }) - assert.NoError(t, err) - assert.NotEmpty(t, ret.ID) - return buildInfo, ret -} - -// Returns a (session ID, series ID) tuple. -func uploadSeries(t *testing.T, ctx context.Context, client *api.Client, series *api.Series) (string, string) { - retSeries, err := client.UploadSeries(ctx, series) - assert.NoError(t, err) - retSession, err := client.UploadSession(ctx, &api.NewSession{ - ExtID: series.ExtID, - }) - assert.NoError(t, err) - return retSeries.ID, retSession.ID -} - -var testSeries = &api.Series{ - ExtID: "ext-id", - AuthorEmail: "some@email.com", - Title: "test series name", - Version: 2, - PublishedAt: time.Date(2020, time.January, 1, 3, 0, 0, 0, time.UTC), - Cc: []string{"email"}, - Patches: []api.SeriesPatch{ - { - Seq: 1, - Body: []byte("first content"), - }, - { - Seq: 2, - Body: []byte("second content"), - }, - }, -} diff --git a/syz-cluster/controller/main.go b/syz-cluster/controller/main.go index 9667c2239..525c02da0 100644 --- a/syz-cluster/controller/main.go +++ b/syz-cluster/controller/main.go @@ -11,6 +11,7 @@ import ( "net/http" "github.com/google/syzkaller/syz-cluster/pkg/app" + "github.com/google/syzkaller/syz-cluster/pkg/controller" ) func main() { @@ -24,7 +25,7 @@ func main() { err := sp.Loop(ctx) app.Fatalf("processor loop failed: %v", err) }() - api := NewControllerAPI(env) + api := controller.NewAPIServer(env) log.Printf("listening on port 8080") app.Fatalf("listen failed: %v", http.ListenAndServe(":8080", api.Mux())) } diff --git a/syz-cluster/controller/processor_test.go b/syz-cluster/controller/processor_test.go index f56108d3f..ff85ebec9 100644 --- a/syz-cluster/controller/processor_test.go +++ b/syz-cluster/controller/processor_test.go @@ -6,13 +6,13 @@ package main import ( "context" "fmt" - "net/http/httptest" "sync" "testing" "time" "github.com/google/syzkaller/syz-cluster/pkg/api" "github.com/google/syzkaller/syz-cluster/pkg/app" + "github.com/google/syzkaller/syz-cluster/pkg/controller" "github.com/google/syzkaller/syz-cluster/pkg/db" "github.com/google/syzkaller/syz-cluster/pkg/workflow" "github.com/stretchr/testify/assert" @@ -42,7 +42,7 @@ func TestProcessor(t *testing.T) { }) } for _, series := range allSeries[0:5] { - uploadSeries(t, ctx, client, series) + controller.UploadTestSeries(t, ctx, client, series) } // Let some workflows finish. @@ -66,7 +66,7 @@ func TestProcessor(t *testing.T) { // Add some more series. for _, series := range allSeries[5:10] { - uploadSeries(t, ctx, client, series) + controller.UploadTestSeries(t, ctx, client, series) } // Finish all of them. @@ -143,14 +143,12 @@ func newMockedWorkflows() *mockedWorkflows { func prepareProcessorTest(t *testing.T, workflows workflow.Service) (*SeriesProcessor, *api.Client, context.Context) { env, ctx := app.TestEnvironment(t) - apiServer := NewControllerAPI(env) - server := httptest.NewServer(apiServer.Mux()) - t.Cleanup(server.Close) + client := controller.TestServer(t, env) return &SeriesProcessor{ seriesRepo: db.NewSeriesRepository(env.Spanner), sessionRepo: db.NewSessionRepository(env.Spanner), workflows: workflows, dbPollInterval: time.Second / 10, parallelWorkers: 2, - }, api.NewClient(server.URL), ctx + }, client, ctx } |
