1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
// Copyright 2025 syzkaller project authors. All rights reserved.
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
package aflow
import (
"fmt"
"maps"
"slices"
"github.com/google/syzkaller/pkg/aflow/ai"
)
// Flow describes a single agentic workflow.
// A workflow takes some inputs, and produces some outputs in the end
// (specified as fields of the Inputs/Outputs struct types, correspondingly).
// A workflow consists of one or more actions that do the actual computation
// and produce the outputs. Actions can be based on an arbitrary Go function
// (FuncAction), or an LLM agent invocation (LLMAgent). Actions can produce
// final output fields, and/or intermediate inputs for subsequent actions.
// LLMAgent can also use tools that can accept workflow inputs, or outputs
// or preceding actions.
// A workflow is executed sequentially, but it can be thought of as a dataflow graph.
// Actions are nodes of the graph, and they consume/produce some named values
// (input/output fields, and intermediate values consumed by other actions).
type Flow struct {
Name string // Empty for the main workflow for the workflow type.
Root Action
Models []string // LLM models used in this workflow.
*FlowType
}
type FlowType struct {
Type ai.WorkflowType
Description string
checkInputs func(map[string]any) error
extractOutputs func(map[string]any) map[string]any
}
var Flows = make(map[string]*Flow)
// Register a workflow type (characterized by Inputs and Outputs),
// and one or more implementations of the workflow type (actual workflows).
// All workflows for the same type consume the same inputs and produce the same outputs.
// There should be the "main" implementation for the workflow type with an empty name,
// and zero or more secondary implementations with non-empty names.
func Register[Inputs, Outputs any](typ ai.WorkflowType, description string, flows ...*Flow) {
if err := register[Inputs, Outputs](typ, description, Flows, flows); err != nil {
panic(err)
}
}
func register[Inputs, Outputs any](typ ai.WorkflowType, description string,
all map[string]*Flow, flows []*Flow) error {
t := &FlowType{
Type: typ,
Description: description,
checkInputs: func(inputs map[string]any) error {
_, err := convertFromMap[Inputs](inputs, false, false)
return err
},
extractOutputs: func(state map[string]any) map[string]any {
// Ensure that we actually have all outputs.
tmp, err := convertFromMap[Outputs](state, false, false)
if err != nil {
panic(err)
}
return convertToMap(tmp)
},
}
for _, flow := range flows {
if flow.Name == "" {
flow.Name = string(typ)
} else {
flow.Name = string(typ) + "-" + flow.Name
}
flow.FlowType = t
if err := registerOne[Inputs, Outputs](all, flow); err != nil {
return err
}
}
return nil
}
func registerOne[Inputs, Outputs any](all map[string]*Flow, flow *Flow) error {
if all[flow.Name] != nil {
return fmt.Errorf("flow %v is already registered", flow.Name)
}
ctx := &verifyContext{
actions: make(map[string]bool),
state: make(map[string]*varState),
models: make(map[string]bool),
}
provideOutputs[Inputs](ctx, "flow inputs")
flow.Root.verify(ctx)
requireInputs[Outputs](ctx, "flow outputs")
if err := ctx.finalize(); err != nil {
return fmt.Errorf("flow %v: %w", flow.Name, err)
}
flow.Models = slices.Collect(maps.Keys(ctx.models))
slices.Sort(flow.Models)
all[flow.Name] = flow
return nil
}
|