diff options
| author | Taras Madan <tarasmadan@google.com> | 2024-12-05 10:30:01 +0100 |
|---|---|---|
| committer | Taras Madan <tarasmadan@google.com> | 2024-12-05 12:03:11 +0000 |
| commit | 6352fa41a008e032412d094aeda8602d406ae464 (patch) | |
| tree | 1ee78bd26117946ea94144670b2478147a4f3853 /pkg/coveragedb | |
| parent | f614e9a5c2a80d331de29cdb8c2ed98c07cc171c (diff) | |
dashboard/app: periodically remove coverage garbage from spanner
Diffstat (limited to 'pkg/coveragedb')
| -rw-r--r-- | pkg/coveragedb/spanner.go | 74 |
1 files changed, 74 insertions, 0 deletions
diff --git a/pkg/coveragedb/spanner.go b/pkg/coveragedb/spanner.go index 54fa2e192..b0c974fb1 100644 --- a/pkg/coveragedb/spanner.go +++ b/pkg/coveragedb/spanner.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "os" + "sync/atomic" "time" "cloud.google.com/go/civil" @@ -14,6 +15,7 @@ import ( "github.com/google/syzkaller/pkg/subsystem" _ "github.com/google/syzkaller/pkg/subsystem/lists" "github.com/google/uuid" + "golang.org/x/sync/errgroup" "google.golang.org/api/iterator" ) @@ -248,3 +250,75 @@ func NsDataMerged(ctx context.Context, projectID, ns string) ([]TimePeriod, []in } return periods, totalRows, nil } + +// DeleteGarbage removes orphaned file entries from the database. +// +// It identifies files in the "files" table that are not referenced by any entries in the "merge_history" table, +// indicating they are no longer associated with an active merge session. +// +// To avoid exceeding Spanner transaction limits, orphaned files are deleted in batches of 10,000. +// Note that in case of an error during batch deletion, some files may be deleted but not counted in the total. +// +// Returns the number of orphaned file entries successfully deleted. +func DeleteGarbage(ctx context.Context) (int64, error) { + batchSize := 10_000 + client, err := NewClient(ctx, os.Getenv("GOOGLE_CLOUD_PROJECT")) + if err != nil { + return 0, fmt.Errorf("coveragedb.NewClient: %w", err) + } + defer client.Close() + + iter := client.Single().Query(ctx, spanner.Statement{ + SQL: `SELECT session, filepath + FROM files + WHERE NOT EXISTS ( + SELECT 1 + FROM merge_history + WHERE merge_history.session = files.session + )`}) + defer iter.Stop() + + var totalDeleted atomic.Int64 + eg, _ := errgroup.WithContext(ctx) + var batch []spanner.Key + for { + row, err := iter.Next() + if err == iterator.Done { + break + } + if err != nil { + return 0, fmt.Errorf("iter.Next: %w", err) + } + var r struct { + Session string + Filepath string + } + if err = row.ToStruct(&r); err != nil { + return 0, fmt.Errorf("row.ToStruct: %w", err) + } + batch = append(batch, spanner.Key{r.Session, r.Filepath}) + if len(batch) > batchSize { + goSpannerDelete(ctx, batch, eg, client, &totalDeleted) + batch = nil + } + } + goSpannerDelete(ctx, batch, eg, client, &totalDeleted) + if err = eg.Wait(); err != nil { + return 0, fmt.Errorf("spanner.Delete: %w", err) + } + return totalDeleted.Load(), nil +} + +func goSpannerDelete(ctx context.Context, batch []spanner.Key, eg *errgroup.Group, client *spanner.Client, + totalDeleted *atomic.Int64) { + ks := spanner.KeySetFromKeys(batch...) + ksSize := len(batch) + eg.Go(func() error { + mutation := spanner.Delete("files", ks) + _, err := client.Apply(ctx, []*spanner.Mutation{mutation}) + if err == nil { + totalDeleted.Add(int64(ksSize)) + } + return err + }) +} |
