aboutsummaryrefslogtreecommitdiffstats
path: root/pkg/aflow/flow.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/aflow/flow.go')
-rw-r--r--pkg/aflow/flow.go99
1 files changed, 99 insertions, 0 deletions
diff --git a/pkg/aflow/flow.go b/pkg/aflow/flow.go
new file mode 100644
index 000000000..6325b2fd2
--- /dev/null
+++ b/pkg/aflow/flow.go
@@ -0,0 +1,99 @@
+// Copyright 2025 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package aflow
+
+import (
+ "fmt"
+
+ "github.com/google/syzkaller/pkg/aflow/ai"
+)
+
+// Flow describes a single agentic workflow.
+// A workflow takes some inputs, and produces some outputs in the end
+// (specified as fields of the Inputs/Outputs struct types, correspondingly).
+// A workflow consists of one or more actions that do the actual computation
+// and produce the outputs. Actions can be based on an arbitrary Go function
+// (FuncAction), or an LLM agent invocation (LLMAgent). Actions can produce
+// final output fields, and/or intermediate inputs for subsequent actions.
+// LLMAgent can also use tools that can accept workflow inputs, or outputs
+// or preceding actions.
+// A workflow is executed sequentially, but it can be thought of as a dataflow graph.
+// Actions are nodes of the graph, and they consume/produce some named values
+// (input/output fields, and intermediate values consumed by other actions).
+type Flow struct {
+ Name string // Empty for the main workflow for the workflow type.
+ Root Action
+
+ *FlowType
+}
+
+type FlowType struct {
+ Type ai.WorkflowType
+ Description string
+ checkInputs func(map[string]any) error
+ extractOutputs func(map[string]any) map[string]any
+}
+
+var Flows = make(map[string]*Flow)
+
+// Register a workflow type (characterized by Inputs and Outputs),
+// and one or more implementations of the workflow type (actual workflows).
+// All workflows for the same type consume the same inputs and produce the same outputs.
+// There should be the "main" implementation for the workflow type with an empty name,
+// and zero or more secondary implementations with non-empty names.
+func Register[Inputs, Outputs any](typ ai.WorkflowType, description string, flows ...*Flow) {
+ if err := register[Inputs, Outputs](typ, description, Flows, flows); err != nil {
+ panic(err)
+ }
+}
+
+func register[Inputs, Outputs any](typ ai.WorkflowType, description string,
+ all map[string]*Flow, flows []*Flow) error {
+ t := &FlowType{
+ Type: typ,
+ Description: description,
+ checkInputs: func(inputs map[string]any) error {
+ _, err := convertFromMap[Inputs](inputs, false)
+ return err
+ },
+ extractOutputs: func(state map[string]any) map[string]any {
+ // Ensure that we actually have all outputs.
+ tmp, err := convertFromMap[Outputs](state, false)
+ if err != nil {
+ panic(err)
+ }
+ return convertToMap(tmp)
+ },
+ }
+ for _, flow := range flows {
+ if flow.Name == "" {
+ flow.Name = string(typ)
+ } else {
+ flow.Name = string(typ) + "-" + flow.Name
+ }
+ flow.FlowType = t
+ if err := registerOne[Inputs, Outputs](all, flow); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func registerOne[Inputs, Outputs any](all map[string]*Flow, flow *Flow) error {
+ if all[flow.Name] != nil {
+ return fmt.Errorf("flow %v is already registered", flow.Name)
+ }
+ ctx := &verifyContext{
+ actions: make(map[string]bool),
+ state: make(map[string]*varState),
+ }
+ provideOutputs[Inputs](ctx, "flow inputs")
+ flow.Root.verify(ctx)
+ requireInputs[Outputs](ctx, "flow outputs")
+ if err := ctx.finalize(); err != nil {
+ return fmt.Errorf("flow %v: %w", flow.Name, err)
+ }
+ all[flow.Name] = flow
+ return nil
+}