aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTaras Madan <tarasmadan@google.com>2024-07-03 17:18:38 +0200
committerTaras Madan <tarasmadan@google.com>2024-07-05 07:51:41 +0000
commit2a40360c27f1cd827c9fa0183aa402ef505d07db (patch)
tree71469c5ec49e1b11ab9717464dfd37e578712414
parent40bd04b893f19dbb0252d44e4935a1beabe972d7 (diff)
dashboard/app: scale batching
-rw-r--r--dashboard/app/coverage_batch.go140
-rw-r--r--go.mod2
-rwxr-xr-xtools/syz-bq.sh43
-rw-r--r--tools/syz-covermerger/db.go25
-rw-r--r--tools/syz-covermerger/syz_covermerger.go2
5 files changed, 180 insertions, 32 deletions
diff --git a/dashboard/app/coverage_batch.go b/dashboard/app/coverage_batch.go
index 731bea604..d60f10eb4 100644
--- a/dashboard/app/coverage_batch.go
+++ b/dashboard/app/coverage_batch.go
@@ -8,11 +8,10 @@ import (
"fmt"
"net/http"
"strconv"
- "time"
- batch "cloud.google.com/go/batch/apiv1"
+ "cloud.google.com/go/batch/apiv1"
"cloud.google.com/go/batch/apiv1/batchpb"
- _ "cloud.google.com/go/bigquery"
+ "cloud.google.com/go/bigquery"
"cloud.google.com/go/civil"
"github.com/google/uuid"
"google.golang.org/api/iterator"
@@ -23,19 +22,16 @@ import (
func initCoverageBatches() {
http.HandleFunc("/cron/batch_coverage", handleBatchCoverage)
- // TODO: delme
- http.HandleFunc("/test_batch_coverage", handleBatchCoverage)
}
const (
daysPerBatch = 5
daysToMerge = 7
- batchTimeoutSeconds = 60*60*6 - 5 // 6 hours minus 5 minutes to not enter hour 7
+ batchTimeoutSeconds = 60 * 60 * 6
)
func handleBatchCoverage(w http.ResponseWriter, r *http.Request) {
ctx := appengine.NewContext(r)
- beforeYesterday := civil.DateOf(time.Now().Add(-1 * 48 * time.Hour))
for ns, nsConfig := range getConfig(ctx).Namespaces {
if nsConfig.Coverage == nil {
continue
@@ -43,29 +39,44 @@ func handleBatchCoverage(w http.ResponseWriter, r *http.Request) {
repo, branch := nsConfig.mainRepoBranch()
if repo == "" || branch == "" {
log.Errorf(ctx, "can't find default repo or branch for ns %s", ns)
- } else {
- if err := createScriptJob(
- ctx,
- nsConfig.Coverage.BatchProject,
- nsConfig.Coverage.BatchServiceAccount,
- mergerScript(ns, repo, branch, 7, beforeYesterday)); err != nil {
- log.Errorf(ctx, "failed to batchScript(): %s", err.Error())
- }
+ continue
+ }
+ dates, err := nsDatesToMerge(ctx, ns, daysToMerge, daysPerBatch)
+ if err != nil {
+ log.Errorf(ctx, "failed nsDatesToMerge(): %s", err)
+ continue
+ }
+ if len(dates) == 0 {
+ log.Infof(ctx, "there is no new coverage for merging available in %s", ns)
+ continue
+ }
+ if err := createScriptJob(
+ ctx,
+ nsConfig.Coverage.BatchProject,
+ nsConfig.Coverage.BatchServiceAccount,
+ batchScript(ns, repo, branch, 7, dates)); err != nil {
+ log.Errorf(ctx, "failed to batchScript(): %s", err.Error())
}
}
}
-func mergerScript(ns, repo, branch string, days int, dateFor civil.Date) string {
- return "git clone https://github.com/google/syzkaller\n" +
+func batchScript(ns, repo, branch string, days int, datesTo []civil.Date) string {
+ script := "git clone --depth 1 --branch master --single-branch https://github.com/google/syzkaller\n" +
"cd syzkaller\n" +
"export CI=1\n" +
- "./tools/syz-env ./tools/syz-bq.sh -w ../workdir-cover-aggregation/" +
- " -n " + ns +
- " -r " + repo +
- " -b " + branch +
- " -d " + strconv.Itoa(days) +
- " -t " + dateFor.String() +
- " 2>&1" // we don't want stderr output to be logged as errors
+ "./tools/syz-env \""
+ for _, dateTo := range datesTo {
+ script += "./tools/syz-bq.sh" +
+ " -w ../workdir-cover-aggregation/" +
+ " -n " + ns +
+ " -r " + repo +
+ " -b " + branch +
+ " -d " + strconv.Itoa(days) +
+ " -t " + dateTo.String() +
+ " 2>&1; " // we don't want stderr output to be logged as errors
+ }
+ script += "\""
+ return script
}
// from https://cloud.google.com/batch/docs/samples/batch-create-script-job
@@ -95,8 +106,9 @@ func createScriptJob(ctx context.Context, projectID, serviceAccount, script stri
MemoryMib: 12 * 1024,
},
MaxRunDuration: &durationpb.Duration{
- Seconds: 60*60*6 - 5, // 6 hours minus 5 minutes to not enter hour 7
+ Seconds: batchTimeoutSeconds,
},
+ MaxRetryCount: 2,
},
},
}
@@ -108,7 +120,8 @@ func createScriptJob(ctx context.Context, projectID, serviceAccount, script stri
Instances: []*batchpb.AllocationPolicy_InstancePolicyOrTemplate{{
PolicyTemplate: &batchpb.AllocationPolicy_InstancePolicyOrTemplate_Policy{
Policy: &batchpb.AllocationPolicy_InstancePolicy{
- MachineType: "c3-standard-4",
+ ProvisioningModel: batchpb.AllocationPolicy_SPOT,
+ MachineType: "c3-standard-4",
},
},
}},
@@ -145,3 +158,78 @@ func createScriptJob(ctx context.Context, projectID, serviceAccount, script stri
return nil
}
+
+func nsDatesToMerge(ctx context.Context, ns string, days, maxRecords int64) ([]civil.Date, error) {
+ client, err := bigquery.NewClient(ctx, "syzkaller")
+ client.EnableStorageReadClient(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("failed to initialize bigquery client: %w", err)
+ }
+ q := client.Query(fmt.Sprintf(`
+ WITH data AS (
+ SELECT
+ table_name as namespace,
+ PARSE_DATE('%%Y%%m%%d', partition_id) as partition_date,
+ total_rows
+ FROM
+ syzkaller.syzbot_coverage.INFORMATION_SCHEMA.PARTITIONS
+ WHERE table_name LIKE '%s'
+ )
+
+ SELECT * from (
+ SELECT
+ mainquery.namespace as namespace,
+ partition_date as dateto,
+ sp.total_rows_dest,
+ (
+ select
+ sum(total_rows)
+ FROM
+ data as subquery
+ WHERE
+ subquery.partition_date
+ BETWEEN
+ DATE_SUB(mainquery.partition_date, INTERVAL %d DAY) AND
+ mainquery.partition_date AND
+ subquery.namespace = mainquery.namespace
+ ) as total_rows_src
+ FROM data as mainquery
+ LEFT JOIN
+ EXTERNAL_QUERY("syzkaller.us-central1.spanner-coverage", '''
+ SELECT
+ distinct namespace, duration, dateto, totalrows as total_rows_dest
+ FROM
+ merge_history
+ WHERE
+ duration = %d;''') AS sp
+ ON
+ mainquery.partition_date = sp.dateto AND
+ mainquery.namespace = sp.namespace
+ ORDER BY dateto DESC
+ )
+ WHERE
+ total_rows_dest IS NULL OR total_rows_dest != total_rows_src
+ LIMIT %d
+ `, ns, days, days, maxRecords))
+ it, err := q.Read(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("failed to Read() from bigquery: %w", err)
+ }
+
+ var dates []civil.Date
+ for {
+ var values struct {
+ Dateto civil.Date
+ }
+ err = it.Next(&values)
+ if err == iterator.Done {
+ break
+ }
+ if err != nil {
+ return nil, fmt.Errorf("failed to it.Next() bigquery records: %w", err)
+ }
+ dates = append(dates, values.Dateto)
+ }
+
+ return dates, nil
+}
diff --git a/go.mod b/go.mod
index afcbd45fe..e657a77de 100644
--- a/go.mod
+++ b/go.mod
@@ -34,6 +34,7 @@ require (
google.golang.org/api v0.182.0
google.golang.org/appengine/v2 v2.0.5
google.golang.org/genproto v0.0.0-20240401170217-c3f982113cda
+ google.golang.org/protobuf v1.34.1
gopkg.in/yaml.v3 v3.0.1
)
@@ -247,7 +248,6 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20240513163218-0867130af1f8 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240521202816-d264139d666e // indirect
google.golang.org/grpc v1.64.0 // indirect
- google.golang.org/protobuf v1.34.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
honnef.co/go/tools v0.4.7 // indirect
diff --git a/tools/syz-bq.sh b/tools/syz-bq.sh
index 3319f4064..7bf33bbca 100755
--- a/tools/syz-bq.sh
+++ b/tools/syz-bq.sh
@@ -67,6 +67,21 @@ CREATE TABLE IF NOT EXISTS
gcloud spanner databases ddl update coverage --instance=syzbot --project=syzkaller \
--ddl="$create_table"
+echo "making sure spanner table 'merge_history' exists"
+create_table=$( echo -n '
+CREATE TABLE IF NOT EXISTS
+ merge_history (
+ "namespace" text,
+ "repo" text,
+ "commit" text,
+ "duration" bigint,
+ "dateto" date,
+ "totalrows" bigint,
+ PRIMARY KEY
+ (duration, dateto, commit) );')
+gcloud spanner databases ddl update coverage --instance=syzbot --project=syzkaller \
+ --ddl="$create_table"
+
echo "Workdir: $workdir"
base_dir="${workdir}repos/linux_kernels"
if [ ! -d $base_dir ]; then
@@ -89,10 +104,29 @@ then
fi
echo The latest commit as of $to_date is $base_commit.
-# rm -rf $base_dir
-# echo Temp dir $base_dir deleted.
-
from_date=$(date -d "$to_date - $duration days" +%Y-%m-%d)
+# every partition covers 1 day
+query=$(cat <<-END
+SELECT
+ sum(total_rows) as total_rows,
+FROM
+ syzkaller.syzbot_coverage.INFORMATION_SCHEMA.PARTITIONS
+WHERE
+ table_name = '${namespace}' AND
+ PARSE_DATE('%Y%m%d', partition_id) >= '${from_date}' AND
+ PARSE_DATE('%Y%m%d', partition_id) <= '${to_date}';
+END
+)
+
+total_rows=$(bq query --format=csv --use_legacy_sql=false "$query" | tail -n +2)
+if (( total_rows <= 0 ))
+then
+ echo error: no source rows in bigquery available
+ exit
+else
+ echo $total_rows rows are available for processing
+fi
+
sessionID=$(cat /proc/sys/kernel/random/uuid)
gsURI=$(echo gs://syzbot-temp/bq-exports/${sessionID}/*.csv.gz)
echo fetching data from bigquery
@@ -129,7 +163,8 @@ go run ./tools/syz-covermerger/ -workdir $workdir \
-save-to-spanner true \
-namespace $namespace \
-duration $duration \
- -date-to $to_date
+ -date-to $to_date \
+ -total-rows $total_rows
echo Cleanup
rm -rf $sessionDir
diff --git a/tools/syz-covermerger/db.go b/tools/syz-covermerger/db.go
index 996e7a6e0..7ba72702d 100644
--- a/tools/syz-covermerger/db.go
+++ b/tools/syz-covermerger/db.go
@@ -24,8 +24,17 @@ type DBRecord struct {
Covered int64
}
+type DBHistoryRecord struct {
+ Namespace string
+ Repo string
+ Commit string
+ Duration int64
+ DateTo civil.Date
+ TotalRows int64
+}
+
func saveToSpanner(ctx context.Context, projectID string, coverage map[string]*Coverage,
- template *DBRecord) {
+ template *DBRecord, totalRows int64) {
client, err := spanner.NewClient(ctx, "projects/"+projectID+"/instances/syzbot/databases/coverage")
if err != nil {
panic(fmt.Sprintf("spanner.NewClient() failed: %s", err.Error()))
@@ -58,6 +67,20 @@ func saveToSpanner(ctx context.Context, projectID string, coverage map[string]*C
mutations = nil
}
}
+
+ var historyInsert *spanner.Mutation
+ if historyInsert, err = spanner.InsertOrUpdateStruct("merge_history", &DBHistoryRecord{
+ Namespace: template.Namespace,
+ Repo: template.Repo,
+ Commit: template.Commit,
+ Duration: template.Duration,
+ DateTo: template.DateTo,
+ TotalRows: totalRows,
+ }); err != nil {
+ panic(fmt.Sprintf("failed to spanner.InsertStruct(): %s", err.Error()))
+ }
+ mutations = append(mutations, historyInsert)
+
if _, err = client.Apply(ctx, mutations); err != nil {
panic(fmt.Sprintf("failed to spanner.Apply(inserts): %s", err.Error()))
}
diff --git a/tools/syz-covermerger/syz_covermerger.go b/tools/syz-covermerger/syz_covermerger.go
index 063e8c85b..15aab4abb 100644
--- a/tools/syz-covermerger/syz_covermerger.go
+++ b/tools/syz-covermerger/syz_covermerger.go
@@ -28,6 +28,7 @@ var (
flagDateTo = flag.String("date-to", "", "[optional] used to mark DB records")
flagSaveToSpanner = flag.String("save-to-spanner", "", "[optional] save aggregation to spanner")
flagProjectID = flag.String("project-id", "syzkaller", "[optional] target spanner db project")
+ flagTotalRows = flag.Int64("total-rows", 0, "[optional] source size, is used for version contol")
)
func main() {
@@ -65,6 +66,7 @@ func main() {
Duration: *flagDuration,
DateTo: dateTo,
},
+ *flagTotalRows,
)
}
}