1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
// Copyright 2024 syzkaller project authors. All rights reserved.
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
package workflow
import (
"bytes"
"context"
"embed"
"fmt"
"sort"
"time"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
wfclientset "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
wftypes "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
restclient "k8s.io/client-go/rest"
"sigs.k8s.io/yaml"
)
//go:embed *.yaml
var workflowsFS embed.FS
type ArgoService struct {
wfClient wftypes.WorkflowInterface
template *v1alpha1.Workflow
}
func NewArgoService() (*ArgoService, error) {
kubeConfig, err := restclient.InClusterConfig()
if err != nil {
return nil, err
}
namespace := "default"
wfClient := wfclientset.NewForConfigOrDie(kubeConfig).ArgoprojV1alpha1().Workflows(namespace)
templateData, err := workflowsFS.ReadFile("template.yaml")
if err != nil {
return nil, err
}
var wf v1alpha1.Workflow
err = yaml.Unmarshal(templateData, &wf)
if err != nil {
return nil, err
}
return &ArgoService{
wfClient: wfClient,
template: &wf,
}, nil
}
// TODO: substitute the proper (non-dev) Docker image names.
func (w *ArgoService) Start(sessionID string) error {
workflow := w.template.DeepCopy()
workflow.ObjectMeta.Labels = map[string]string{
"workflow-id": sessionID,
}
for i, param := range workflow.Spec.Arguments.Parameters {
if param.Name == "session-id" {
workflow.Spec.Arguments.Parameters[i].Value = v1alpha1.AnyStringPtr(sessionID)
}
}
_, err := w.wfClient.Create(context.Background(), workflow, metav1.CreateOptions{})
return err
}
func (w *ArgoService) Status(sessionID string) (Status, []byte, error) {
listOptions := metav1.ListOptions{
LabelSelector: fmt.Sprintf("workflow-id=%s", sessionID),
}
workflows, err := w.wfClient.List(context.Background(), listOptions)
if err != nil || len(workflows.Items) == 0 {
return StatusNotFound, nil, err
}
wf := workflows.Items[0]
log := w.generateLog(wf.Status.Nodes)
switch wf.Status.Phase {
case v1alpha1.WorkflowRunning, v1alpha1.WorkflowPending:
return StatusRunning, log, nil
case v1alpha1.WorkflowSucceeded:
return StatusFinished, log, nil
}
return StatusFailed, log, nil
}
func (w *ArgoService) generateLog(nodes v1alpha1.Nodes) []byte {
var list []v1alpha1.NodeStatus
for _, node := range nodes {
list = append(list, node)
}
sort.Slice(list, func(i, j int) bool {
a, b := list[i], list[j]
if !a.StartedAt.Equal(&b.StartedAt) {
return a.StartedAt.Before(&b.StartedAt)
}
return a.Name < b.Name
})
var buf bytes.Buffer
for i, val := range list {
if i > 0 {
buf.WriteString("---------\n")
}
fmt.Fprintf(&buf, "Name: %s\n", val.Name)
fmt.Fprintf(&buf, "Phase: %s\n", val.Phase)
fmt.Fprintf(&buf, "StartedAt: %s\n", val.StartedAt)
fmt.Fprintf(&buf, "FinishedAt: %s\n", val.FinishedAt)
fmt.Fprintf(&buf, "Input: %s\n", val.Inputs)
fmt.Fprintf(&buf, "Output: %s\n", val.Outputs)
}
return buf.Bytes()
}
func (w *ArgoService) PollPeriod() time.Duration {
return 30 * time.Second
}
|