aboutsummaryrefslogtreecommitdiffstats
path: root/syz-cluster/pkg/workflow/argo.go
blob: 763d5b5f1f953f6a9a60c069e97e584b6877f5e8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
// Copyright 2024 syzkaller project authors. All rights reserved.
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.

package workflow

import (
	"bytes"
	"context"
	"embed"
	"fmt"
	"sort"
	"time"

	"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
	wfclientset "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
	wftypes "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	restclient "k8s.io/client-go/rest"
	"sigs.k8s.io/yaml"
)

//go:embed *.yaml
var workflowsFS embed.FS

type ArgoService struct {
	wfClient wftypes.WorkflowInterface
	template *v1alpha1.Workflow
}

func NewArgoService() (*ArgoService, error) {
	kubeConfig, err := restclient.InClusterConfig()
	if err != nil {
		return nil, err
	}
	namespace := "default"
	wfClient := wfclientset.NewForConfigOrDie(kubeConfig).ArgoprojV1alpha1().Workflows(namespace)
	templateData, err := workflowsFS.ReadFile("template.yaml")
	if err != nil {
		return nil, err
	}
	var wf v1alpha1.Workflow
	err = yaml.Unmarshal(templateData, &wf)
	if err != nil {
		return nil, err
	}
	return &ArgoService{
		wfClient: wfClient,
		template: &wf,
	}, nil
}

// TODO: substitute the proper (non-dev) Docker image names.
func (w *ArgoService) Start(sessionID string) error {
	workflow := w.template.DeepCopy()
	workflow.ObjectMeta.Labels = map[string]string{
		"workflow-id": sessionID,
	}
	for i, param := range workflow.Spec.Arguments.Parameters {
		if param.Name == "session-id" {
			workflow.Spec.Arguments.Parameters[i].Value = v1alpha1.AnyStringPtr(sessionID)
		}
	}
	_, err := w.wfClient.Create(context.Background(), workflow, metav1.CreateOptions{})
	return err
}

func (w *ArgoService) Status(sessionID string) (Status, []byte, error) {
	listOptions := metav1.ListOptions{
		LabelSelector: fmt.Sprintf("workflow-id=%s", sessionID),
	}
	workflows, err := w.wfClient.List(context.Background(), listOptions)
	if err != nil || len(workflows.Items) == 0 {
		return StatusNotFound, nil, err
	}
	wf := workflows.Items[0]
	log := w.generateLog(wf.Status.Nodes)
	switch wf.Status.Phase {
	case v1alpha1.WorkflowRunning, v1alpha1.WorkflowPending:
		return StatusRunning, log, nil
	case v1alpha1.WorkflowSucceeded:
		return StatusFinished, log, nil
	}
	return StatusFailed, log, nil
}

func (w *ArgoService) generateLog(nodes v1alpha1.Nodes) []byte {
	var list []v1alpha1.NodeStatus
	for _, node := range nodes {
		list = append(list, node)
	}
	sort.Slice(list, func(i, j int) bool {
		a, b := list[i], list[j]
		if !a.StartedAt.Equal(&b.StartedAt) {
			return a.StartedAt.Before(&b.StartedAt)
		}
		return a.Name < b.Name
	})
	var buf bytes.Buffer
	for i, val := range list {
		if i > 0 {
			buf.WriteString("---------\n")
		}
		fmt.Fprintf(&buf, "Name: %s\n", val.Name)
		fmt.Fprintf(&buf, "Phase: %s\n", val.Phase)
		fmt.Fprintf(&buf, "StartedAt: %s\n", val.StartedAt)
		fmt.Fprintf(&buf, "FinishedAt: %s\n", val.FinishedAt)
		fmt.Fprintf(&buf, "Input: %s\n", val.Inputs)
		fmt.Fprintf(&buf, "Output: %s\n", val.Outputs)
	}
	return buf.Bytes()
}

func (w *ArgoService) PollPeriod() time.Duration {
	return 30 * time.Second
}