From 00dc68fb94791fb479cdd1932b9dd6793f14fefd Mon Sep 17 00:00:00 2001 From: Taras Madan Date: Thu, 31 Jul 2025 16:05:38 +0200 Subject: pkg/coveragedb: update file to subsystem info periodically #6070 explains the problem of data propagation. 1. Add weekly /cron/update_coverdb_subsystems. 2. Stop updating subsystems from coverage receiver API. --- dashboard/app/api.go | 7 +---- dashboard/app/coverage.go | 21 +++++++++++++ dashboard/app/cron.yaml | 3 ++ dashboard/app/main.go | 1 + pkg/coveragedb/coveragedb.go | 57 ++++++++++++++++++++++++++++++---- pkg/coveragedb/coveragedb_mock_test.go | 23 +++++--------- pkg/covermerger/covermerger_test.go | 6 ++-- 7 files changed, 88 insertions(+), 30 deletions(-) diff --git a/dashboard/app/api.go b/dashboard/app/api.go index c1d77881c..49c524c14 100644 --- a/dashboard/app/api.go +++ b/dashboard/app/api.go @@ -1963,12 +1963,7 @@ func apiSaveCoverage(c context.Context, payload io.Reader) (interface{}, error) if err := jsonDec.Decode(descr); err != nil { return 0, fmt.Errorf("json.NewDecoder(coveragedb.HistoryRecord).Decode: %w", err) } - var sss []*subsystem.Subsystem - if service := getNsConfig(c, descr.Namespace).Subsystems.Service; service != nil { - sss = service.List() - log.Infof(c, "found %d subsystems for %s namespace", len(sss), descr.Namespace) - } - rowsCreated, err := coveragedb.SaveMergeResult(c, getCoverageDBClient(c), descr, jsonDec, sss) + rowsCreated, err := coveragedb.SaveMergeResult(c, getCoverageDBClient(c), descr, jsonDec) if err != nil { log.Errorf(c, "error storing coverage for ns %s, date %s: %v", descr.Namespace, descr.DateTo.String(), err) diff --git a/dashboard/app/coverage.go b/dashboard/app/coverage.go index 4611a7977..67a4494e0 100644 --- a/dashboard/app/coverage.go +++ b/dashboard/app/coverage.go @@ -21,6 +21,7 @@ import ( "github.com/google/syzkaller/pkg/html/urlutil" "github.com/google/syzkaller/pkg/validator" "google.golang.org/appengine/v2" + "google.golang.org/appengine/v2/log" ) var coverageDBClient spannerclient.SpannerClient @@ -393,3 +394,23 @@ func handleCoverageGraph(c context.Context, w http.ResponseWriter, r *http.Reque } return serveTemplate(w, "graph_histogram.html", data) } + +func handleUpdateCoverDBSubsystems(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + for ns, nsConfig := range getConfig(ctx).Namespaces { + service := nsConfig.Subsystems.Service + if service == nil { + continue + } + sss := service.List() + updatedRecords, err := coveragedb.RegenerateSubsystems(ctx, ns, sss, coverageDBClient) + if err != nil { + httpErr := fmt.Errorf("ns %s: %w", ns, err) + log.Errorf(ctx, "%s", httpErr.Error()) + http.Error(w, httpErr.Error(), http.StatusInternalServerError) + return + } + log.Infof(ctx, "%s: %v records updated\n", ns, updatedRecords) + fmt.Fprintf(w, "%s: %v records updated\n", ns, updatedRecords) + } +} diff --git a/dashboard/app/cron.yaml b/dashboard/app/cron.yaml index 2e293797a..1a9400e65 100644 --- a/dashboard/app/cron.yaml +++ b/dashboard/app/cron.yaml @@ -36,3 +36,6 @@ cron: # We use 15 for convenience here. - url: /cron/email_coverage_reports schedule: 15 of month 00:00 +# Weekly update the kernel-file -> subsystems relationship +- url: /cron/update_coverdb_subsystems + schedule: every monday diff --git a/dashboard/app/main.go b/dashboard/app/main.go index eeba85c9e..0d2e588b1 100644 --- a/dashboard/app/main.go +++ b/dashboard/app/main.go @@ -88,6 +88,7 @@ func initHTTPHandlers() { http.HandleFunc("/cron/deprecate_assets", handleDeprecateAssets) http.HandleFunc("/cron/refresh_subsystems", handleRefreshSubsystems) http.HandleFunc("/cron/subsystem_reports", handleSubsystemReports) + http.HandleFunc("/cron/update_coverdb_subsystems", handleUpdateCoverDBSubsystems) } func handleMovedPermanently(dest string) http.HandlerFunc { diff --git a/pkg/coveragedb/coveragedb.go b/pkg/coveragedb/coveragedb.go index 6741e72d1..8272ce6ff 100644 --- a/pkg/coveragedb/coveragedb.go +++ b/pkg/coveragedb/coveragedb.go @@ -85,14 +85,11 @@ type fileSubsystems struct { } func SaveMergeResult(ctx context.Context, client spannerclient.SpannerClient, descr *HistoryRecord, dec *json.Decoder, - sss []*subsystem.Subsystem) (int, error) { +) (int, error) { if client == nil { return 0, fmt.Errorf("nil spannerclient") } var rowsCreated int - ssMatcher := subsystem.MakePathMatcher(sss) - ssCache := make(map[string][]string) - session := uuid.New().String() var mutations []*spanner.Mutation @@ -107,8 +104,6 @@ func SaveMergeResult(ctx context.Context, client spannerclient.SpannerClient, de } if mcr := wr.MCR; mcr != nil { mutations = append(mutations, fileRecordMutation(session, mcr)) - subsystems := getFileSubsystems(mcr.FilePath, ssMatcher, ssCache) - mutations = append(mutations, fileSubsystemsMutation(descr.Namespace, mcr.FilePath, subsystems)) } else if fl := wr.FL; fl != nil { mutations = append(mutations, fileFunctionsMutation(session, fl)) } else { @@ -627,3 +622,53 @@ func UniqCoverage(fullCov, partCov map[int]int64) map[int]int64 { } return res } + +func RegenerateSubsystems(ctx context.Context, ns string, sss []*subsystem.Subsystem, + client spannerclient.SpannerClient) (int, error) { + ssMatcher := subsystem.MakePathMatcher(sss) + ssCache := make(map[string][]string) + filePaths, err := getFilePaths(ctx, ns, client) + if err != nil { + return 0, err + } + var mutations []*spanner.Mutation + for _, filePath := range filePaths { + subsystems := getFileSubsystems(filePath, ssMatcher, ssCache) + mutations = append(mutations, fileSubsystemsMutation(ns, filePath, subsystems)) + } + // There is a limit on the number of mutations per transaction (80k) imposed by the DB. + // Expected mutations count is < 20k and looks safe to do w/o batching. + if _, err = client.Apply(ctx, mutations); err != nil { + return 0, err + } + return len(mutations), nil +} + +func getFilePaths(ctx context.Context, ns string, client spannerclient.SpannerClient) ([]string, error) { + iter := client.Single().Query(ctx, spanner.Statement{ + SQL: `select filepath from file_subsystems where namespace=$1`, + Params: map[string]interface{}{ + "p1": ns, + }, + }) + defer iter.Stop() + + var res []string + for { + row, err := iter.Next() + if err == iterator.Done { + break + } + if err != nil { + return nil, fmt.Errorf("iter.Next: %w", err) + } + var r struct { + Filepath string + } + if err = row.ToStruct(&r); err != nil { + return nil, fmt.Errorf("row.ToStruct: %w", err) + } + res = append(res, r.Filepath) + } + return res, nil +} diff --git a/pkg/coveragedb/coveragedb_mock_test.go b/pkg/coveragedb/coveragedb_mock_test.go index 23676d31f..f7efd479a 100644 --- a/pkg/coveragedb/coveragedb_mock_test.go +++ b/pkg/coveragedb/coveragedb_mock_test.go @@ -4,7 +4,6 @@ package coveragedb import ( - "context" "encoding/json" "io" "strings" @@ -13,7 +12,6 @@ import ( "cloud.google.com/go/spanner" "github.com/google/syzkaller/pkg/coveragedb/mocks" - "github.com/google/syzkaller/pkg/subsystem" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) @@ -23,7 +21,6 @@ type spannerMockTune func(*testing.T, *mocks.SpannerClient) func TestSaveMergeResult(t *testing.T) { tests := []struct { name string - sss []*subsystem.Subsystem jsonl io.Reader descr *HistoryRecord mockTune spannerMockTune @@ -45,7 +42,7 @@ func TestSaveMergeResult(t *testing.T) { name: "1 MCR record, Ok", jsonl: strings.NewReader(`{"MCR":{"FileData":{}}}`), descr: &HistoryRecord{}, - wantRows: 3, // 1 in files, 1 in file_subsystems and 1 in merge_history + wantRows: 2, // 1 in files and 1 in merge_history mockTune: func(t *testing.T, m *mocks.SpannerClient) { m. On("Apply", mock.Anything, mock.Anything). @@ -71,14 +68,14 @@ func TestSaveMergeResult(t *testing.T) { jsonl: strings.NewReader(` {"MCR":{"FileData":{}}} {"MCR":{"FileData":{}}}`), descr: &HistoryRecord{}, - wantRows: 5, + wantRows: 3, mockTune: func(t *testing.T, m *mocks.SpannerClient) { m. On("Apply", mock.Anything, mock.MatchedBy(func(ms []*spanner.Mutation) bool { - // 2 in files, 2 in file_subsystems and 1 in merge_history - return len(ms) == 5 + // 2 in files and 1 in merge_history + return len(ms) == 3 })). Return(time.Now(), nil). Once() @@ -88,17 +85,17 @@ func TestSaveMergeResult(t *testing.T) { name: "2k records, Ok", jsonl: strings.NewReader(strings.Repeat("{\"MCR\":{\"FileData\":{}}}\n", 2000)), descr: &HistoryRecord{}, - wantRows: 4001, + wantRows: 2001, mockTune: func(t *testing.T, m *mocks.SpannerClient) { m. On("Apply", mock.Anything, mock.MatchedBy(func(ms []*spanner.Mutation) bool { - // 2k in files, 2k in file_subsystems + // 2k in files return len(ms) == 1000 })). Return(time.Now(), nil). - Times(4). + Times(2). On("Apply", mock.Anything, mock.MatchedBy(func(ms []*spanner.Mutation) bool { @@ -117,11 +114,7 @@ func TestSaveMergeResult(t *testing.T) { if test.mockTune != nil { test.mockTune(t, spannerMock) } - gotRows, err := SaveMergeResult( - context.Background(), - spannerMock, - test.descr, - json.NewDecoder(test.jsonl), test.sss) + gotRows, err := SaveMergeResult(t.Context(), spannerMock, test.descr, json.NewDecoder(test.jsonl)) if test.wantErr { assert.Error(t, err) } else { diff --git a/pkg/covermerger/covermerger_test.go b/pkg/covermerger/covermerger_test.go index c45c6f8ff..a6b7afd52 100644 --- a/pkg/covermerger/covermerger_test.go +++ b/pkg/covermerger/covermerger_test.go @@ -55,8 +55,8 @@ func TestMergeCSVWriteJSONL_and_coveragedb_SaveMergeResult(t *testing.T) { spannerMock := mocks.NewSpannerClient(t) spannerMock. On("Apply", mock.Anything, mock.MatchedBy(func(ms []*spanner.Mutation) bool { - // 1 file * (5 managers + 1 manager total) x 2 (to update files and subsystems) + 1 merge_history + 18 functions - return len(ms) == 13+18 + // 1 file * (5 managers + 1 manager total) x 1 (to update files) + 1 merge_history + 18 functions + return len(ms) == 1*(5+1)*1+1+18 })). Return(time.Now(), nil). Once() @@ -67,7 +67,7 @@ func TestMergeCSVWriteJSONL_and_coveragedb_SaveMergeResult(t *testing.T) { descr := new(coveragedb.HistoryRecord) assert.NoError(t, decoder.Decode(descr)) - _, err = coveragedb.SaveMergeResult(context.Background(), spannerMock, descr, decoder, nil) + _, err = coveragedb.SaveMergeResult(context.Background(), spannerMock, descr, decoder) return err }) assert.NoError(t, eg.Wait()) -- cgit mrf-deployment