From 2a40360c27f1cd827c9fa0183aa402ef505d07db Mon Sep 17 00:00:00 2001 From: Taras Madan Date: Wed, 3 Jul 2024 17:18:38 +0200 Subject: dashboard/app: scale batching --- dashboard/app/coverage_batch.go | 140 +++++++++++++++++++++++++------ go.mod | 2 +- tools/syz-bq.sh | 43 +++++++++- tools/syz-covermerger/db.go | 25 +++++- tools/syz-covermerger/syz_covermerger.go | 2 + 5 files changed, 180 insertions(+), 32 deletions(-) diff --git a/dashboard/app/coverage_batch.go b/dashboard/app/coverage_batch.go index 731bea604..d60f10eb4 100644 --- a/dashboard/app/coverage_batch.go +++ b/dashboard/app/coverage_batch.go @@ -8,11 +8,10 @@ import ( "fmt" "net/http" "strconv" - "time" - batch "cloud.google.com/go/batch/apiv1" + "cloud.google.com/go/batch/apiv1" "cloud.google.com/go/batch/apiv1/batchpb" - _ "cloud.google.com/go/bigquery" + "cloud.google.com/go/bigquery" "cloud.google.com/go/civil" "github.com/google/uuid" "google.golang.org/api/iterator" @@ -23,19 +22,16 @@ import ( func initCoverageBatches() { http.HandleFunc("/cron/batch_coverage", handleBatchCoverage) - // TODO: delme - http.HandleFunc("/test_batch_coverage", handleBatchCoverage) } const ( daysPerBatch = 5 daysToMerge = 7 - batchTimeoutSeconds = 60*60*6 - 5 // 6 hours minus 5 minutes to not enter hour 7 + batchTimeoutSeconds = 60 * 60 * 6 ) func handleBatchCoverage(w http.ResponseWriter, r *http.Request) { ctx := appengine.NewContext(r) - beforeYesterday := civil.DateOf(time.Now().Add(-1 * 48 * time.Hour)) for ns, nsConfig := range getConfig(ctx).Namespaces { if nsConfig.Coverage == nil { continue @@ -43,29 +39,44 @@ func handleBatchCoverage(w http.ResponseWriter, r *http.Request) { repo, branch := nsConfig.mainRepoBranch() if repo == "" || branch == "" { log.Errorf(ctx, "can't find default repo or branch for ns %s", ns) - } else { - if err := createScriptJob( - ctx, - nsConfig.Coverage.BatchProject, - nsConfig.Coverage.BatchServiceAccount, - mergerScript(ns, repo, branch, 7, beforeYesterday)); err != nil { - log.Errorf(ctx, "failed to batchScript(): %s", err.Error()) - } + continue + } + dates, err := nsDatesToMerge(ctx, ns, daysToMerge, daysPerBatch) + if err != nil { + log.Errorf(ctx, "failed nsDatesToMerge(): %s", err) + continue + } + if len(dates) == 0 { + log.Infof(ctx, "there is no new coverage for merging available in %s", ns) + continue + } + if err := createScriptJob( + ctx, + nsConfig.Coverage.BatchProject, + nsConfig.Coverage.BatchServiceAccount, + batchScript(ns, repo, branch, 7, dates)); err != nil { + log.Errorf(ctx, "failed to batchScript(): %s", err.Error()) } } } -func mergerScript(ns, repo, branch string, days int, dateFor civil.Date) string { - return "git clone https://github.com/google/syzkaller\n" + +func batchScript(ns, repo, branch string, days int, datesTo []civil.Date) string { + script := "git clone --depth 1 --branch master --single-branch https://github.com/google/syzkaller\n" + "cd syzkaller\n" + "export CI=1\n" + - "./tools/syz-env ./tools/syz-bq.sh -w ../workdir-cover-aggregation/" + - " -n " + ns + - " -r " + repo + - " -b " + branch + - " -d " + strconv.Itoa(days) + - " -t " + dateFor.String() + - " 2>&1" // we don't want stderr output to be logged as errors + "./tools/syz-env \"" + for _, dateTo := range datesTo { + script += "./tools/syz-bq.sh" + + " -w ../workdir-cover-aggregation/" + + " -n " + ns + + " -r " + repo + + " -b " + branch + + " -d " + strconv.Itoa(days) + + " -t " + dateTo.String() + + " 2>&1; " // we don't want stderr output to be logged as errors + } + script += "\"" + return script } // from https://cloud.google.com/batch/docs/samples/batch-create-script-job @@ -95,8 +106,9 @@ func createScriptJob(ctx context.Context, projectID, serviceAccount, script stri MemoryMib: 12 * 1024, }, MaxRunDuration: &durationpb.Duration{ - Seconds: 60*60*6 - 5, // 6 hours minus 5 minutes to not enter hour 7 + Seconds: batchTimeoutSeconds, }, + MaxRetryCount: 2, }, }, } @@ -108,7 +120,8 @@ func createScriptJob(ctx context.Context, projectID, serviceAccount, script stri Instances: []*batchpb.AllocationPolicy_InstancePolicyOrTemplate{{ PolicyTemplate: &batchpb.AllocationPolicy_InstancePolicyOrTemplate_Policy{ Policy: &batchpb.AllocationPolicy_InstancePolicy{ - MachineType: "c3-standard-4", + ProvisioningModel: batchpb.AllocationPolicy_SPOT, + MachineType: "c3-standard-4", }, }, }}, @@ -145,3 +158,78 @@ func createScriptJob(ctx context.Context, projectID, serviceAccount, script stri return nil } + +func nsDatesToMerge(ctx context.Context, ns string, days, maxRecords int64) ([]civil.Date, error) { + client, err := bigquery.NewClient(ctx, "syzkaller") + client.EnableStorageReadClient(ctx) + if err != nil { + return nil, fmt.Errorf("failed to initialize bigquery client: %w", err) + } + q := client.Query(fmt.Sprintf(` + WITH data AS ( + SELECT + table_name as namespace, + PARSE_DATE('%%Y%%m%%d', partition_id) as partition_date, + total_rows + FROM + syzkaller.syzbot_coverage.INFORMATION_SCHEMA.PARTITIONS + WHERE table_name LIKE '%s' + ) + + SELECT * from ( + SELECT + mainquery.namespace as namespace, + partition_date as dateto, + sp.total_rows_dest, + ( + select + sum(total_rows) + FROM + data as subquery + WHERE + subquery.partition_date + BETWEEN + DATE_SUB(mainquery.partition_date, INTERVAL %d DAY) AND + mainquery.partition_date AND + subquery.namespace = mainquery.namespace + ) as total_rows_src + FROM data as mainquery + LEFT JOIN + EXTERNAL_QUERY("syzkaller.us-central1.spanner-coverage", ''' + SELECT + distinct namespace, duration, dateto, totalrows as total_rows_dest + FROM + merge_history + WHERE + duration = %d;''') AS sp + ON + mainquery.partition_date = sp.dateto AND + mainquery.namespace = sp.namespace + ORDER BY dateto DESC + ) + WHERE + total_rows_dest IS NULL OR total_rows_dest != total_rows_src + LIMIT %d + `, ns, days, days, maxRecords)) + it, err := q.Read(ctx) + if err != nil { + return nil, fmt.Errorf("failed to Read() from bigquery: %w", err) + } + + var dates []civil.Date + for { + var values struct { + Dateto civil.Date + } + err = it.Next(&values) + if err == iterator.Done { + break + } + if err != nil { + return nil, fmt.Errorf("failed to it.Next() bigquery records: %w", err) + } + dates = append(dates, values.Dateto) + } + + return dates, nil +} diff --git a/go.mod b/go.mod index afcbd45fe..e657a77de 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,7 @@ require ( google.golang.org/api v0.182.0 google.golang.org/appengine/v2 v2.0.5 google.golang.org/genproto v0.0.0-20240401170217-c3f982113cda + google.golang.org/protobuf v1.34.1 gopkg.in/yaml.v3 v3.0.1 ) @@ -247,7 +248,6 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20240513163218-0867130af1f8 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240521202816-d264139d666e // indirect google.golang.org/grpc v1.64.0 // indirect - google.golang.org/protobuf v1.34.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect honnef.co/go/tools v0.4.7 // indirect diff --git a/tools/syz-bq.sh b/tools/syz-bq.sh index 3319f4064..7bf33bbca 100755 --- a/tools/syz-bq.sh +++ b/tools/syz-bq.sh @@ -67,6 +67,21 @@ CREATE TABLE IF NOT EXISTS gcloud spanner databases ddl update coverage --instance=syzbot --project=syzkaller \ --ddl="$create_table" +echo "making sure spanner table 'merge_history' exists" +create_table=$( echo -n ' +CREATE TABLE IF NOT EXISTS + merge_history ( + "namespace" text, + "repo" text, + "commit" text, + "duration" bigint, + "dateto" date, + "totalrows" bigint, + PRIMARY KEY + (duration, dateto, commit) );') +gcloud spanner databases ddl update coverage --instance=syzbot --project=syzkaller \ + --ddl="$create_table" + echo "Workdir: $workdir" base_dir="${workdir}repos/linux_kernels" if [ ! -d $base_dir ]; then @@ -89,10 +104,29 @@ then fi echo The latest commit as of $to_date is $base_commit. -# rm -rf $base_dir -# echo Temp dir $base_dir deleted. - from_date=$(date -d "$to_date - $duration days" +%Y-%m-%d) +# every partition covers 1 day +query=$(cat <<-END +SELECT + sum(total_rows) as total_rows, +FROM + syzkaller.syzbot_coverage.INFORMATION_SCHEMA.PARTITIONS +WHERE + table_name = '${namespace}' AND + PARSE_DATE('%Y%m%d', partition_id) >= '${from_date}' AND + PARSE_DATE('%Y%m%d', partition_id) <= '${to_date}'; +END +) + +total_rows=$(bq query --format=csv --use_legacy_sql=false "$query" | tail -n +2) +if (( total_rows <= 0 )) +then + echo error: no source rows in bigquery available + exit +else + echo $total_rows rows are available for processing +fi + sessionID=$(cat /proc/sys/kernel/random/uuid) gsURI=$(echo gs://syzbot-temp/bq-exports/${sessionID}/*.csv.gz) echo fetching data from bigquery @@ -129,7 +163,8 @@ go run ./tools/syz-covermerger/ -workdir $workdir \ -save-to-spanner true \ -namespace $namespace \ -duration $duration \ - -date-to $to_date + -date-to $to_date \ + -total-rows $total_rows echo Cleanup rm -rf $sessionDir diff --git a/tools/syz-covermerger/db.go b/tools/syz-covermerger/db.go index 996e7a6e0..7ba72702d 100644 --- a/tools/syz-covermerger/db.go +++ b/tools/syz-covermerger/db.go @@ -24,8 +24,17 @@ type DBRecord struct { Covered int64 } +type DBHistoryRecord struct { + Namespace string + Repo string + Commit string + Duration int64 + DateTo civil.Date + TotalRows int64 +} + func saveToSpanner(ctx context.Context, projectID string, coverage map[string]*Coverage, - template *DBRecord) { + template *DBRecord, totalRows int64) { client, err := spanner.NewClient(ctx, "projects/"+projectID+"/instances/syzbot/databases/coverage") if err != nil { panic(fmt.Sprintf("spanner.NewClient() failed: %s", err.Error())) @@ -58,6 +67,20 @@ func saveToSpanner(ctx context.Context, projectID string, coverage map[string]*C mutations = nil } } + + var historyInsert *spanner.Mutation + if historyInsert, err = spanner.InsertOrUpdateStruct("merge_history", &DBHistoryRecord{ + Namespace: template.Namespace, + Repo: template.Repo, + Commit: template.Commit, + Duration: template.Duration, + DateTo: template.DateTo, + TotalRows: totalRows, + }); err != nil { + panic(fmt.Sprintf("failed to spanner.InsertStruct(): %s", err.Error())) + } + mutations = append(mutations, historyInsert) + if _, err = client.Apply(ctx, mutations); err != nil { panic(fmt.Sprintf("failed to spanner.Apply(inserts): %s", err.Error())) } diff --git a/tools/syz-covermerger/syz_covermerger.go b/tools/syz-covermerger/syz_covermerger.go index 063e8c85b..15aab4abb 100644 --- a/tools/syz-covermerger/syz_covermerger.go +++ b/tools/syz-covermerger/syz_covermerger.go @@ -28,6 +28,7 @@ var ( flagDateTo = flag.String("date-to", "", "[optional] used to mark DB records") flagSaveToSpanner = flag.String("save-to-spanner", "", "[optional] save aggregation to spanner") flagProjectID = flag.String("project-id", "syzkaller", "[optional] target spanner db project") + flagTotalRows = flag.Int64("total-rows", 0, "[optional] source size, is used for version contol") ) func main() { @@ -65,6 +66,7 @@ func main() { Duration: *flagDuration, DateTo: dateTo, }, + *flagTotalRows, ) } } -- cgit mrf-deployment