diff options
| author | Aleksandr Nogikh <nogikh@google.com> | 2024-12-17 16:10:02 +0100 |
|---|---|---|
| committer | Aleksandr Nogikh <nogikh@google.com> | 2025-01-22 13:17:53 +0000 |
| commit | 44f2ad31190603135f4ac758273f26111ca6003c (patch) | |
| tree | 4f6190f27654e45bfb3bcd71d4c53adc533909a1 /syz-cluster/pkg/workflow | |
| parent | da72ac06e38cf1dd2ecbddd5502225ff7589542d (diff) | |
syz-cluster: initial code
The basic code of a K8S-based cluster that:
* Aggregates new LKML patch series.
* Determines the kernel trees to apply them to.
* Builds the basic and the patched kernel.
* Displays the results on a web dashboard.
This is a very rudimentary version with a lot of TODOs that
provides a skeleton for further work.
The project makes use of Argo workflows and Spanner DB.
Bootstrap is used for the web interface.
Overall structure:
* syz-cluster/dashboard: a web dashboard listing patch series
and their test results.
* syz-cluster/series-tracker: polls Lore archives and submits
the new patch series to the DB.
* syz-cluster/controller: schedules workflows and provides API for them.
* syz-cluster/kernel-disk: a cron job that keeps a kernel checkout up to date.
* syz-cluster/workflow/*: workflow steps.
For the DB structure see syz-cluster/pkg/db/migrations/*.
Diffstat (limited to 'syz-cluster/pkg/workflow')
| -rw-r--r-- | syz-cluster/pkg/workflow/argo.go | 115 | ||||
| -rw-r--r-- | syz-cluster/pkg/workflow/service.go | 59 | ||||
| -rw-r--r-- | syz-cluster/pkg/workflow/template.yaml | 146 |
3 files changed, 320 insertions, 0 deletions
diff --git a/syz-cluster/pkg/workflow/argo.go b/syz-cluster/pkg/workflow/argo.go new file mode 100644 index 000000000..502b0cf7f --- /dev/null +++ b/syz-cluster/pkg/workflow/argo.go @@ -0,0 +1,115 @@ +// Copyright 2024 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package workflow + +import ( + "bytes" + "context" + "embed" + "fmt" + "sort" + "time" + + "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + wfclientset "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned" + wftypes "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/typed/workflow/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + restclient "k8s.io/client-go/rest" + "sigs.k8s.io/yaml" +) + +//go:embed *.yaml +var workflowsFS embed.FS + +type ArgoService struct { + wfClient wftypes.WorkflowInterface + template *v1alpha1.Workflow +} + +func NewArgoService() (*ArgoService, error) { + kubeConfig, err := restclient.InClusterConfig() + if err != nil { + return nil, err + } + namespace := "default" + wfClient := wfclientset.NewForConfigOrDie(kubeConfig).ArgoprojV1alpha1().Workflows(namespace) + templateData, err := workflowsFS.ReadFile("template.yaml") + if err != nil { + return nil, err + } + var wf v1alpha1.Workflow + err = yaml.Unmarshal(templateData, &wf) + if err != nil { + return nil, err + } + return &ArgoService{ + wfClient: wfClient, + template: &wf, + }, nil +} + +// TODO: substitute the proper (non-dev) Docker image names. +func (w *ArgoService) Start(sessionID string) error { + workflow := w.template.DeepCopy() + workflow.ObjectMeta.Labels = map[string]string{ + "workflow-id": sessionID, + } + for i, param := range workflow.Spec.Arguments.Parameters { + if param.Name == "session-id" { + workflow.Spec.Arguments.Parameters[i].Value = v1alpha1.AnyStringPtr(sessionID) + } + } + _, err := w.wfClient.Create(context.Background(), workflow, metav1.CreateOptions{}) + return err +} + +func (w *ArgoService) Status(sessionID string) (Status, []byte, error) { + listOptions := metav1.ListOptions{ + LabelSelector: fmt.Sprintf("workflow-id=%s", sessionID), + } + workflows, err := w.wfClient.List(context.Background(), listOptions) + if err != nil || len(workflows.Items) == 0 { + return StatusNotFound, nil, err + } + wf := workflows.Items[0] + log := w.generateLog(wf.Status.Nodes) + switch wf.Status.Phase { + case v1alpha1.WorkflowRunning, v1alpha1.WorkflowPending: + return StatusRunning, log, nil + case v1alpha1.WorkflowSucceeded: + return StatusFinished, log, nil + } + return StatusFailed, log, nil +} + +func (w *ArgoService) generateLog(nodes v1alpha1.Nodes) []byte { + var list []v1alpha1.NodeStatus + for _, node := range nodes { + list = append(list, node) + } + sort.Slice(list, func(i, j int) bool { + a, b := list[i], list[j] + if !a.StartedAt.Equal(&b.StartedAt) { + return a.StartedAt.Before(&b.StartedAt) + } + return a.Name < b.Name + }) + var buf bytes.Buffer + for i, val := range list { + if i > 0 { + buf.WriteString("---------\n") + } + fmt.Fprintf(&buf, "Name: %s\n", val.Name) + fmt.Fprintf(&buf, "Phase: %s\n", val.Phase) + fmt.Fprintf(&buf, "StartedAt: %s\n", val.StartedAt) + fmt.Fprintf(&buf, "FinishedAt: %s\n", val.FinishedAt) + fmt.Fprintf(&buf, "Input: %s\n", val.Inputs) + fmt.Fprintf(&buf, "Output: %s\n", val.Outputs) + } + return buf.Bytes() +} + +func (w *ArgoService) PollPeriod() time.Duration { + return time.Minute +} diff --git a/syz-cluster/pkg/workflow/service.go b/syz-cluster/pkg/workflow/service.go new file mode 100644 index 000000000..4ec7eb5cf --- /dev/null +++ b/syz-cluster/pkg/workflow/service.go @@ -0,0 +1,59 @@ +// Copyright 2024 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package workflow + +import ( + "sync" + "time" +) + +// Service is the interface for starting and managing the workflows that process individual patch series. +// The workflow includes steps like building base/patched kernel, dong boot tests, running fuzzing, etc. +// It's assumed that the workflow will query the necessary data and report its detailed progress itself, +// so we only need to be able to start it and to check its current overall state. +type Service interface { + Start(sessionID string) error + Status(id string) (Status, []byte, error) + // The recommended value. May depend on the implementation (test/prod). + PollPeriod() time.Duration +} + +type Status string + +const ( + StatusNotFound Status = "not_found" + StatusRunning Status = "running" + StatusFinished Status = "finished" + StatusFailed Status = "failed" +) + +// MockService serializes callback invocations to simplify test implementations. +type MockService struct { + mu sync.Mutex + PollDelayValue time.Duration + OnStart func(string) error + OnStatus func(string) (Status, []byte, error) +} + +func (ms *MockService) Start(id string) error { + ms.mu.Lock() + defer ms.mu.Unlock() + if ms.OnStart != nil { + return ms.OnStart(id) + } + return nil +} + +func (ms *MockService) Status(id string) (Status, []byte, error) { + ms.mu.Lock() + defer ms.mu.Unlock() + if ms.OnStatus != nil { + return ms.OnStatus(id) + } + return StatusNotFound, nil, nil +} + +func (ms *MockService) PollPeriod() time.Duration { + return ms.PollDelayValue +} diff --git a/syz-cluster/pkg/workflow/template.yaml b/syz-cluster/pkg/workflow/template.yaml new file mode 100644 index 000000000..1a51ee646 --- /dev/null +++ b/syz-cluster/pkg/workflow/template.yaml @@ -0,0 +1,146 @@ +# Copyright 2025 syzkaller project authors. All rights reserved. +# Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: series-workflow +spec: + entrypoint: main + arguments: + parameters: + - name: session-id + value: "some-session-id" + templates: + - name: main + # Don't schedule new steps if any of the previous steps failed. + failFast: true + # Note that failFast and parallelism only affect this template's steps. + parallelism: 1 + steps: + - - name: run-triage + templateRef: + name: triage-step-template + template: triage-step + - - name: abort-on-skip + template: exit-workflow + when: "{{=jsonpath(steps['run-triage'].outputs.parameters.result, '$.skip') == true}}" + - - name: iterate-fuzz + template: process-fuzz-wrapper + arguments: + parameters: + - name: element + value: "{{item}}" + # Because of parallelism=1, the steps will be scheduled one after another. + # Because of failFast=True, the Failed status acts as the "break" operation. + withParam: "{{=jsonpath(steps['run-triage'].outputs.parameters.result, '$.fuzz')}}" + # Ignoge the (fake) Failed status to still keep the overall workflow status a Success. + continueOn: + failed: true + # The wrapper inverts the process-fuzz status in order to let iterate-fuzz iterate until a success. + - name: process-fuzz-wrapper + inputs: + parameters: + - name: element + steps: + - - name: run-process-fuzz + template: process-fuzz + arguments: + parameters: + - name: element + value: "{{inputs.parameters.element}}" + continueOn: + failed: true + - - name: break-if-succeeded + template: exit-workflow + when: "{{steps['run-process-fuzz'].status}} == Succeeded" + - name: process-fuzz + inputs: + parameters: + - name: element + steps: + - - name: save-base-req + template: extract-request + arguments: + parameters: + - name: data + value: "{{= jsonpath(inputs.parameters.element, '$.base')}}" + - - name: base-build + templateRef: + name: build-step-template + template: build-step + arguments: + artifacts: + - name: request + from: "{{steps.save-base-req.outputs.artifacts.request}}" + - - name: continue-if-base-build-failed + template: exit-workflow + when: "{{=jsonpath(steps['base-build'].outputs.parameters.result, '$.success') == false}}" + - - name: save-patched-req + template: extract-request + arguments: + parameters: + - name: data + value: "{{=jsonpath(inputs.parameters.element, '$.patched')}}" + - - name: patched-build + templateRef: + name: build-step-template + template: build-step + arguments: + artifacts: + - name: request + from: "{{steps.save-patched-req.outputs.artifacts.request}}" + - - name: do-submit-test + template: submit-test + arguments: + parameters: + - name: test-name + value: "Build test" + - name: base-build-id + value: "{{=jsonpath(steps['base-build'].outputs.parameters.result, '$.build_id')}}" + - name: patched-build-id + value: "{{=jsonpath(steps['patched-build'].outputs.parameters.result, '$.build_id')}}" + - name: result + value: "{{=jsonpath(steps['patched-build'].outputs.parameters.result, '$.success') == true ? 'passed' : 'failed'}}" + - - name: continue-if-patched-build-failed + template: exit-workflow + when: "{{=jsonpath(steps['patched-build'].outputs.parameters.result, '$.success')}} == false" + - name: extract-request + inputs: + parameters: + - name: data + outputs: + artifacts: + - name: request + path: /tmp/request.json + container: + image: alpine:latest + command: [sh, -c] + args: ["echo '{{inputs.parameters.data}}' > /tmp/request.json"] + - name: submit-test + inputs: + parameters: + - name: test-name + - name: base-build-id + - name: patched-build-id + - name: result + container: + image: submit-test-local:latest + imagePullPolicy: IfNotPresent + command: ["/bin/submit-test"] + args: [ + "--session", "{{workflow.parameters.session-id}}", + "--test", "{{inputs.parameters.test-name}}", + "--base-build", "{{inputs.parameters.base-build-id}}", + "--patched-build", "{{inputs.parameters.patched-build-id}}", + "--result", "{{inputs.parameters.result}}" + ] + - name: exit-workflow + inputs: + parameters: + - name: code + value: 1 + container: + image: alpine:latest + command: ['/bin/sh', '-c'] + args: ["exit {{inputs.parameters.code}}"] |
