aboutsummaryrefslogtreecommitdiffstats
path: root/syz-cluster/pkg/workflow
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2024-12-17 16:10:02 +0100
committerAleksandr Nogikh <nogikh@google.com>2025-01-22 13:17:53 +0000
commit44f2ad31190603135f4ac758273f26111ca6003c (patch)
tree4f6190f27654e45bfb3bcd71d4c53adc533909a1 /syz-cluster/pkg/workflow
parentda72ac06e38cf1dd2ecbddd5502225ff7589542d (diff)
syz-cluster: initial code
The basic code of a K8S-based cluster that: * Aggregates new LKML patch series. * Determines the kernel trees to apply them to. * Builds the basic and the patched kernel. * Displays the results on a web dashboard. This is a very rudimentary version with a lot of TODOs that provides a skeleton for further work. The project makes use of Argo workflows and Spanner DB. Bootstrap is used for the web interface. Overall structure: * syz-cluster/dashboard: a web dashboard listing patch series and their test results. * syz-cluster/series-tracker: polls Lore archives and submits the new patch series to the DB. * syz-cluster/controller: schedules workflows and provides API for them. * syz-cluster/kernel-disk: a cron job that keeps a kernel checkout up to date. * syz-cluster/workflow/*: workflow steps. For the DB structure see syz-cluster/pkg/db/migrations/*.
Diffstat (limited to 'syz-cluster/pkg/workflow')
-rw-r--r--syz-cluster/pkg/workflow/argo.go115
-rw-r--r--syz-cluster/pkg/workflow/service.go59
-rw-r--r--syz-cluster/pkg/workflow/template.yaml146
3 files changed, 320 insertions, 0 deletions
diff --git a/syz-cluster/pkg/workflow/argo.go b/syz-cluster/pkg/workflow/argo.go
new file mode 100644
index 000000000..502b0cf7f
--- /dev/null
+++ b/syz-cluster/pkg/workflow/argo.go
@@ -0,0 +1,115 @@
+// Copyright 2024 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package workflow
+
+import (
+ "bytes"
+ "context"
+ "embed"
+ "fmt"
+ "sort"
+ "time"
+
+ "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
+ wfclientset "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
+ wftypes "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ restclient "k8s.io/client-go/rest"
+ "sigs.k8s.io/yaml"
+)
+
+//go:embed *.yaml
+var workflowsFS embed.FS
+
+type ArgoService struct {
+ wfClient wftypes.WorkflowInterface
+ template *v1alpha1.Workflow
+}
+
+func NewArgoService() (*ArgoService, error) {
+ kubeConfig, err := restclient.InClusterConfig()
+ if err != nil {
+ return nil, err
+ }
+ namespace := "default"
+ wfClient := wfclientset.NewForConfigOrDie(kubeConfig).ArgoprojV1alpha1().Workflows(namespace)
+ templateData, err := workflowsFS.ReadFile("template.yaml")
+ if err != nil {
+ return nil, err
+ }
+ var wf v1alpha1.Workflow
+ err = yaml.Unmarshal(templateData, &wf)
+ if err != nil {
+ return nil, err
+ }
+ return &ArgoService{
+ wfClient: wfClient,
+ template: &wf,
+ }, nil
+}
+
+// TODO: substitute the proper (non-dev) Docker image names.
+func (w *ArgoService) Start(sessionID string) error {
+ workflow := w.template.DeepCopy()
+ workflow.ObjectMeta.Labels = map[string]string{
+ "workflow-id": sessionID,
+ }
+ for i, param := range workflow.Spec.Arguments.Parameters {
+ if param.Name == "session-id" {
+ workflow.Spec.Arguments.Parameters[i].Value = v1alpha1.AnyStringPtr(sessionID)
+ }
+ }
+ _, err := w.wfClient.Create(context.Background(), workflow, metav1.CreateOptions{})
+ return err
+}
+
+func (w *ArgoService) Status(sessionID string) (Status, []byte, error) {
+ listOptions := metav1.ListOptions{
+ LabelSelector: fmt.Sprintf("workflow-id=%s", sessionID),
+ }
+ workflows, err := w.wfClient.List(context.Background(), listOptions)
+ if err != nil || len(workflows.Items) == 0 {
+ return StatusNotFound, nil, err
+ }
+ wf := workflows.Items[0]
+ log := w.generateLog(wf.Status.Nodes)
+ switch wf.Status.Phase {
+ case v1alpha1.WorkflowRunning, v1alpha1.WorkflowPending:
+ return StatusRunning, log, nil
+ case v1alpha1.WorkflowSucceeded:
+ return StatusFinished, log, nil
+ }
+ return StatusFailed, log, nil
+}
+
+func (w *ArgoService) generateLog(nodes v1alpha1.Nodes) []byte {
+ var list []v1alpha1.NodeStatus
+ for _, node := range nodes {
+ list = append(list, node)
+ }
+ sort.Slice(list, func(i, j int) bool {
+ a, b := list[i], list[j]
+ if !a.StartedAt.Equal(&b.StartedAt) {
+ return a.StartedAt.Before(&b.StartedAt)
+ }
+ return a.Name < b.Name
+ })
+ var buf bytes.Buffer
+ for i, val := range list {
+ if i > 0 {
+ buf.WriteString("---------\n")
+ }
+ fmt.Fprintf(&buf, "Name: %s\n", val.Name)
+ fmt.Fprintf(&buf, "Phase: %s\n", val.Phase)
+ fmt.Fprintf(&buf, "StartedAt: %s\n", val.StartedAt)
+ fmt.Fprintf(&buf, "FinishedAt: %s\n", val.FinishedAt)
+ fmt.Fprintf(&buf, "Input: %s\n", val.Inputs)
+ fmt.Fprintf(&buf, "Output: %s\n", val.Outputs)
+ }
+ return buf.Bytes()
+}
+
+func (w *ArgoService) PollPeriod() time.Duration {
+ return time.Minute
+}
diff --git a/syz-cluster/pkg/workflow/service.go b/syz-cluster/pkg/workflow/service.go
new file mode 100644
index 000000000..4ec7eb5cf
--- /dev/null
+++ b/syz-cluster/pkg/workflow/service.go
@@ -0,0 +1,59 @@
+// Copyright 2024 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package workflow
+
+import (
+ "sync"
+ "time"
+)
+
+// Service is the interface for starting and managing the workflows that process individual patch series.
+// The workflow includes steps like building base/patched kernel, dong boot tests, running fuzzing, etc.
+// It's assumed that the workflow will query the necessary data and report its detailed progress itself,
+// so we only need to be able to start it and to check its current overall state.
+type Service interface {
+ Start(sessionID string) error
+ Status(id string) (Status, []byte, error)
+ // The recommended value. May depend on the implementation (test/prod).
+ PollPeriod() time.Duration
+}
+
+type Status string
+
+const (
+ StatusNotFound Status = "not_found"
+ StatusRunning Status = "running"
+ StatusFinished Status = "finished"
+ StatusFailed Status = "failed"
+)
+
+// MockService serializes callback invocations to simplify test implementations.
+type MockService struct {
+ mu sync.Mutex
+ PollDelayValue time.Duration
+ OnStart func(string) error
+ OnStatus func(string) (Status, []byte, error)
+}
+
+func (ms *MockService) Start(id string) error {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
+ if ms.OnStart != nil {
+ return ms.OnStart(id)
+ }
+ return nil
+}
+
+func (ms *MockService) Status(id string) (Status, []byte, error) {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
+ if ms.OnStatus != nil {
+ return ms.OnStatus(id)
+ }
+ return StatusNotFound, nil, nil
+}
+
+func (ms *MockService) PollPeriod() time.Duration {
+ return ms.PollDelayValue
+}
diff --git a/syz-cluster/pkg/workflow/template.yaml b/syz-cluster/pkg/workflow/template.yaml
new file mode 100644
index 000000000..1a51ee646
--- /dev/null
+++ b/syz-cluster/pkg/workflow/template.yaml
@@ -0,0 +1,146 @@
+# Copyright 2025 syzkaller project authors. All rights reserved.
+# Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+ generateName: series-workflow
+spec:
+ entrypoint: main
+ arguments:
+ parameters:
+ - name: session-id
+ value: "some-session-id"
+ templates:
+ - name: main
+ # Don't schedule new steps if any of the previous steps failed.
+ failFast: true
+ # Note that failFast and parallelism only affect this template's steps.
+ parallelism: 1
+ steps:
+ - - name: run-triage
+ templateRef:
+ name: triage-step-template
+ template: triage-step
+ - - name: abort-on-skip
+ template: exit-workflow
+ when: "{{=jsonpath(steps['run-triage'].outputs.parameters.result, '$.skip') == true}}"
+ - - name: iterate-fuzz
+ template: process-fuzz-wrapper
+ arguments:
+ parameters:
+ - name: element
+ value: "{{item}}"
+ # Because of parallelism=1, the steps will be scheduled one after another.
+ # Because of failFast=True, the Failed status acts as the "break" operation.
+ withParam: "{{=jsonpath(steps['run-triage'].outputs.parameters.result, '$.fuzz')}}"
+ # Ignoge the (fake) Failed status to still keep the overall workflow status a Success.
+ continueOn:
+ failed: true
+ # The wrapper inverts the process-fuzz status in order to let iterate-fuzz iterate until a success.
+ - name: process-fuzz-wrapper
+ inputs:
+ parameters:
+ - name: element
+ steps:
+ - - name: run-process-fuzz
+ template: process-fuzz
+ arguments:
+ parameters:
+ - name: element
+ value: "{{inputs.parameters.element}}"
+ continueOn:
+ failed: true
+ - - name: break-if-succeeded
+ template: exit-workflow
+ when: "{{steps['run-process-fuzz'].status}} == Succeeded"
+ - name: process-fuzz
+ inputs:
+ parameters:
+ - name: element
+ steps:
+ - - name: save-base-req
+ template: extract-request
+ arguments:
+ parameters:
+ - name: data
+ value: "{{= jsonpath(inputs.parameters.element, '$.base')}}"
+ - - name: base-build
+ templateRef:
+ name: build-step-template
+ template: build-step
+ arguments:
+ artifacts:
+ - name: request
+ from: "{{steps.save-base-req.outputs.artifacts.request}}"
+ - - name: continue-if-base-build-failed
+ template: exit-workflow
+ when: "{{=jsonpath(steps['base-build'].outputs.parameters.result, '$.success') == false}}"
+ - - name: save-patched-req
+ template: extract-request
+ arguments:
+ parameters:
+ - name: data
+ value: "{{=jsonpath(inputs.parameters.element, '$.patched')}}"
+ - - name: patched-build
+ templateRef:
+ name: build-step-template
+ template: build-step
+ arguments:
+ artifacts:
+ - name: request
+ from: "{{steps.save-patched-req.outputs.artifacts.request}}"
+ - - name: do-submit-test
+ template: submit-test
+ arguments:
+ parameters:
+ - name: test-name
+ value: "Build test"
+ - name: base-build-id
+ value: "{{=jsonpath(steps['base-build'].outputs.parameters.result, '$.build_id')}}"
+ - name: patched-build-id
+ value: "{{=jsonpath(steps['patched-build'].outputs.parameters.result, '$.build_id')}}"
+ - name: result
+ value: "{{=jsonpath(steps['patched-build'].outputs.parameters.result, '$.success') == true ? 'passed' : 'failed'}}"
+ - - name: continue-if-patched-build-failed
+ template: exit-workflow
+ when: "{{=jsonpath(steps['patched-build'].outputs.parameters.result, '$.success')}} == false"
+ - name: extract-request
+ inputs:
+ parameters:
+ - name: data
+ outputs:
+ artifacts:
+ - name: request
+ path: /tmp/request.json
+ container:
+ image: alpine:latest
+ command: [sh, -c]
+ args: ["echo '{{inputs.parameters.data}}' > /tmp/request.json"]
+ - name: submit-test
+ inputs:
+ parameters:
+ - name: test-name
+ - name: base-build-id
+ - name: patched-build-id
+ - name: result
+ container:
+ image: submit-test-local:latest
+ imagePullPolicy: IfNotPresent
+ command: ["/bin/submit-test"]
+ args: [
+ "--session", "{{workflow.parameters.session-id}}",
+ "--test", "{{inputs.parameters.test-name}}",
+ "--base-build", "{{inputs.parameters.base-build-id}}",
+ "--patched-build", "{{inputs.parameters.patched-build-id}}",
+ "--result", "{{inputs.parameters.result}}"
+ ]
+ - name: exit-workflow
+ inputs:
+ parameters:
+ - name: code
+ value: 1
+ container:
+ image: alpine:latest
+ command: ['/bin/sh', '-c']
+ args: ["exit {{inputs.parameters.code}}"]