aboutsummaryrefslogtreecommitdiffstats
path: root/pkg/coveragedb
diff options
context:
space:
mode:
authorTaras Madan <tarasmadan@google.com>2024-12-05 10:30:01 +0100
committerTaras Madan <tarasmadan@google.com>2024-12-05 12:03:11 +0000
commit6352fa41a008e032412d094aeda8602d406ae464 (patch)
tree1ee78bd26117946ea94144670b2478147a4f3853 /pkg/coveragedb
parentf614e9a5c2a80d331de29cdb8c2ed98c07cc171c (diff)
dashboard/app: periodically remove coverage garbage from spanner
Diffstat (limited to 'pkg/coveragedb')
-rw-r--r--pkg/coveragedb/spanner.go74
1 files changed, 74 insertions, 0 deletions
diff --git a/pkg/coveragedb/spanner.go b/pkg/coveragedb/spanner.go
index 54fa2e192..b0c974fb1 100644
--- a/pkg/coveragedb/spanner.go
+++ b/pkg/coveragedb/spanner.go
@@ -7,6 +7,7 @@ import (
"context"
"fmt"
"os"
+ "sync/atomic"
"time"
"cloud.google.com/go/civil"
@@ -14,6 +15,7 @@ import (
"github.com/google/syzkaller/pkg/subsystem"
_ "github.com/google/syzkaller/pkg/subsystem/lists"
"github.com/google/uuid"
+ "golang.org/x/sync/errgroup"
"google.golang.org/api/iterator"
)
@@ -248,3 +250,75 @@ func NsDataMerged(ctx context.Context, projectID, ns string) ([]TimePeriod, []in
}
return periods, totalRows, nil
}
+
+// DeleteGarbage removes orphaned file entries from the database.
+//
+// It identifies files in the "files" table that are not referenced by any entries in the "merge_history" table,
+// indicating they are no longer associated with an active merge session.
+//
+// To avoid exceeding Spanner transaction limits, orphaned files are deleted in batches of 10,000.
+// Note that in case of an error during batch deletion, some files may be deleted but not counted in the total.
+//
+// Returns the number of orphaned file entries successfully deleted.
+func DeleteGarbage(ctx context.Context) (int64, error) {
+ batchSize := 10_000
+ client, err := NewClient(ctx, os.Getenv("GOOGLE_CLOUD_PROJECT"))
+ if err != nil {
+ return 0, fmt.Errorf("coveragedb.NewClient: %w", err)
+ }
+ defer client.Close()
+
+ iter := client.Single().Query(ctx, spanner.Statement{
+ SQL: `SELECT session, filepath
+ FROM files
+ WHERE NOT EXISTS (
+ SELECT 1
+ FROM merge_history
+ WHERE merge_history.session = files.session
+ )`})
+ defer iter.Stop()
+
+ var totalDeleted atomic.Int64
+ eg, _ := errgroup.WithContext(ctx)
+ var batch []spanner.Key
+ for {
+ row, err := iter.Next()
+ if err == iterator.Done {
+ break
+ }
+ if err != nil {
+ return 0, fmt.Errorf("iter.Next: %w", err)
+ }
+ var r struct {
+ Session string
+ Filepath string
+ }
+ if err = row.ToStruct(&r); err != nil {
+ return 0, fmt.Errorf("row.ToStruct: %w", err)
+ }
+ batch = append(batch, spanner.Key{r.Session, r.Filepath})
+ if len(batch) > batchSize {
+ goSpannerDelete(ctx, batch, eg, client, &totalDeleted)
+ batch = nil
+ }
+ }
+ goSpannerDelete(ctx, batch, eg, client, &totalDeleted)
+ if err = eg.Wait(); err != nil {
+ return 0, fmt.Errorf("spanner.Delete: %w", err)
+ }
+ return totalDeleted.Load(), nil
+}
+
+func goSpannerDelete(ctx context.Context, batch []spanner.Key, eg *errgroup.Group, client *spanner.Client,
+ totalDeleted *atomic.Int64) {
+ ks := spanner.KeySetFromKeys(batch...)
+ ksSize := len(batch)
+ eg.Go(func() error {
+ mutation := spanner.Delete("files", ks)
+ _, err := client.Apply(ctx, []*spanner.Mutation{mutation})
+ if err == nil {
+ totalDeleted.Add(int64(ksSize))
+ }
+ return err
+ })
+}