From 2a40360c27f1cd827c9fa0183aa402ef505d07db Mon Sep 17 00:00:00 2001 From: Taras Madan Date: Wed, 3 Jul 2024 17:18:38 +0200 Subject: dashboard/app: scale batching --- dashboard/app/coverage_batch.go | 140 ++++++++++++++++++++++++++++++++-------- 1 file changed, 114 insertions(+), 26 deletions(-) (limited to 'dashboard') diff --git a/dashboard/app/coverage_batch.go b/dashboard/app/coverage_batch.go index 731bea604..d60f10eb4 100644 --- a/dashboard/app/coverage_batch.go +++ b/dashboard/app/coverage_batch.go @@ -8,11 +8,10 @@ import ( "fmt" "net/http" "strconv" - "time" - batch "cloud.google.com/go/batch/apiv1" + "cloud.google.com/go/batch/apiv1" "cloud.google.com/go/batch/apiv1/batchpb" - _ "cloud.google.com/go/bigquery" + "cloud.google.com/go/bigquery" "cloud.google.com/go/civil" "github.com/google/uuid" "google.golang.org/api/iterator" @@ -23,19 +22,16 @@ import ( func initCoverageBatches() { http.HandleFunc("/cron/batch_coverage", handleBatchCoverage) - // TODO: delme - http.HandleFunc("/test_batch_coverage", handleBatchCoverage) } const ( daysPerBatch = 5 daysToMerge = 7 - batchTimeoutSeconds = 60*60*6 - 5 // 6 hours minus 5 minutes to not enter hour 7 + batchTimeoutSeconds = 60 * 60 * 6 ) func handleBatchCoverage(w http.ResponseWriter, r *http.Request) { ctx := appengine.NewContext(r) - beforeYesterday := civil.DateOf(time.Now().Add(-1 * 48 * time.Hour)) for ns, nsConfig := range getConfig(ctx).Namespaces { if nsConfig.Coverage == nil { continue @@ -43,29 +39,44 @@ func handleBatchCoverage(w http.ResponseWriter, r *http.Request) { repo, branch := nsConfig.mainRepoBranch() if repo == "" || branch == "" { log.Errorf(ctx, "can't find default repo or branch for ns %s", ns) - } else { - if err := createScriptJob( - ctx, - nsConfig.Coverage.BatchProject, - nsConfig.Coverage.BatchServiceAccount, - mergerScript(ns, repo, branch, 7, beforeYesterday)); err != nil { - log.Errorf(ctx, "failed to batchScript(): %s", err.Error()) - } + continue + } + dates, err := nsDatesToMerge(ctx, ns, daysToMerge, daysPerBatch) + if err != nil { + log.Errorf(ctx, "failed nsDatesToMerge(): %s", err) + continue + } + if len(dates) == 0 { + log.Infof(ctx, "there is no new coverage for merging available in %s", ns) + continue + } + if err := createScriptJob( + ctx, + nsConfig.Coverage.BatchProject, + nsConfig.Coverage.BatchServiceAccount, + batchScript(ns, repo, branch, 7, dates)); err != nil { + log.Errorf(ctx, "failed to batchScript(): %s", err.Error()) } } } -func mergerScript(ns, repo, branch string, days int, dateFor civil.Date) string { - return "git clone https://github.com/google/syzkaller\n" + +func batchScript(ns, repo, branch string, days int, datesTo []civil.Date) string { + script := "git clone --depth 1 --branch master --single-branch https://github.com/google/syzkaller\n" + "cd syzkaller\n" + "export CI=1\n" + - "./tools/syz-env ./tools/syz-bq.sh -w ../workdir-cover-aggregation/" + - " -n " + ns + - " -r " + repo + - " -b " + branch + - " -d " + strconv.Itoa(days) + - " -t " + dateFor.String() + - " 2>&1" // we don't want stderr output to be logged as errors + "./tools/syz-env \"" + for _, dateTo := range datesTo { + script += "./tools/syz-bq.sh" + + " -w ../workdir-cover-aggregation/" + + " -n " + ns + + " -r " + repo + + " -b " + branch + + " -d " + strconv.Itoa(days) + + " -t " + dateTo.String() + + " 2>&1; " // we don't want stderr output to be logged as errors + } + script += "\"" + return script } // from https://cloud.google.com/batch/docs/samples/batch-create-script-job @@ -95,8 +106,9 @@ func createScriptJob(ctx context.Context, projectID, serviceAccount, script stri MemoryMib: 12 * 1024, }, MaxRunDuration: &durationpb.Duration{ - Seconds: 60*60*6 - 5, // 6 hours minus 5 minutes to not enter hour 7 + Seconds: batchTimeoutSeconds, }, + MaxRetryCount: 2, }, }, } @@ -108,7 +120,8 @@ func createScriptJob(ctx context.Context, projectID, serviceAccount, script stri Instances: []*batchpb.AllocationPolicy_InstancePolicyOrTemplate{{ PolicyTemplate: &batchpb.AllocationPolicy_InstancePolicyOrTemplate_Policy{ Policy: &batchpb.AllocationPolicy_InstancePolicy{ - MachineType: "c3-standard-4", + ProvisioningModel: batchpb.AllocationPolicy_SPOT, + MachineType: "c3-standard-4", }, }, }}, @@ -145,3 +158,78 @@ func createScriptJob(ctx context.Context, projectID, serviceAccount, script stri return nil } + +func nsDatesToMerge(ctx context.Context, ns string, days, maxRecords int64) ([]civil.Date, error) { + client, err := bigquery.NewClient(ctx, "syzkaller") + client.EnableStorageReadClient(ctx) + if err != nil { + return nil, fmt.Errorf("failed to initialize bigquery client: %w", err) + } + q := client.Query(fmt.Sprintf(` + WITH data AS ( + SELECT + table_name as namespace, + PARSE_DATE('%%Y%%m%%d', partition_id) as partition_date, + total_rows + FROM + syzkaller.syzbot_coverage.INFORMATION_SCHEMA.PARTITIONS + WHERE table_name LIKE '%s' + ) + + SELECT * from ( + SELECT + mainquery.namespace as namespace, + partition_date as dateto, + sp.total_rows_dest, + ( + select + sum(total_rows) + FROM + data as subquery + WHERE + subquery.partition_date + BETWEEN + DATE_SUB(mainquery.partition_date, INTERVAL %d DAY) AND + mainquery.partition_date AND + subquery.namespace = mainquery.namespace + ) as total_rows_src + FROM data as mainquery + LEFT JOIN + EXTERNAL_QUERY("syzkaller.us-central1.spanner-coverage", ''' + SELECT + distinct namespace, duration, dateto, totalrows as total_rows_dest + FROM + merge_history + WHERE + duration = %d;''') AS sp + ON + mainquery.partition_date = sp.dateto AND + mainquery.namespace = sp.namespace + ORDER BY dateto DESC + ) + WHERE + total_rows_dest IS NULL OR total_rows_dest != total_rows_src + LIMIT %d + `, ns, days, days, maxRecords)) + it, err := q.Read(ctx) + if err != nil { + return nil, fmt.Errorf("failed to Read() from bigquery: %w", err) + } + + var dates []civil.Date + for { + var values struct { + Dateto civil.Date + } + err = it.Next(&values) + if err == iterator.Done { + break + } + if err != nil { + return nil, fmt.Errorf("failed to it.Next() bigquery records: %w", err) + } + dates = append(dates, values.Dateto) + } + + return dates, nil +} -- cgit mrf-deployment