diff options
Diffstat (limited to 'pkg/aflow/flow.go')
| -rw-r--r-- | pkg/aflow/flow.go | 99 |
1 files changed, 99 insertions, 0 deletions
diff --git a/pkg/aflow/flow.go b/pkg/aflow/flow.go new file mode 100644 index 000000000..6325b2fd2 --- /dev/null +++ b/pkg/aflow/flow.go @@ -0,0 +1,99 @@ +// Copyright 2025 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package aflow + +import ( + "fmt" + + "github.com/google/syzkaller/pkg/aflow/ai" +) + +// Flow describes a single agentic workflow. +// A workflow takes some inputs, and produces some outputs in the end +// (specified as fields of the Inputs/Outputs struct types, correspondingly). +// A workflow consists of one or more actions that do the actual computation +// and produce the outputs. Actions can be based on an arbitrary Go function +// (FuncAction), or an LLM agent invocation (LLMAgent). Actions can produce +// final output fields, and/or intermediate inputs for subsequent actions. +// LLMAgent can also use tools that can accept workflow inputs, or outputs +// or preceding actions. +// A workflow is executed sequentially, but it can be thought of as a dataflow graph. +// Actions are nodes of the graph, and they consume/produce some named values +// (input/output fields, and intermediate values consumed by other actions). +type Flow struct { + Name string // Empty for the main workflow for the workflow type. + Root Action + + *FlowType +} + +type FlowType struct { + Type ai.WorkflowType + Description string + checkInputs func(map[string]any) error + extractOutputs func(map[string]any) map[string]any +} + +var Flows = make(map[string]*Flow) + +// Register a workflow type (characterized by Inputs and Outputs), +// and one or more implementations of the workflow type (actual workflows). +// All workflows for the same type consume the same inputs and produce the same outputs. +// There should be the "main" implementation for the workflow type with an empty name, +// and zero or more secondary implementations with non-empty names. +func Register[Inputs, Outputs any](typ ai.WorkflowType, description string, flows ...*Flow) { + if err := register[Inputs, Outputs](typ, description, Flows, flows); err != nil { + panic(err) + } +} + +func register[Inputs, Outputs any](typ ai.WorkflowType, description string, + all map[string]*Flow, flows []*Flow) error { + t := &FlowType{ + Type: typ, + Description: description, + checkInputs: func(inputs map[string]any) error { + _, err := convertFromMap[Inputs](inputs, false) + return err + }, + extractOutputs: func(state map[string]any) map[string]any { + // Ensure that we actually have all outputs. + tmp, err := convertFromMap[Outputs](state, false) + if err != nil { + panic(err) + } + return convertToMap(tmp) + }, + } + for _, flow := range flows { + if flow.Name == "" { + flow.Name = string(typ) + } else { + flow.Name = string(typ) + "-" + flow.Name + } + flow.FlowType = t + if err := registerOne[Inputs, Outputs](all, flow); err != nil { + return err + } + } + return nil +} + +func registerOne[Inputs, Outputs any](all map[string]*Flow, flow *Flow) error { + if all[flow.Name] != nil { + return fmt.Errorf("flow %v is already registered", flow.Name) + } + ctx := &verifyContext{ + actions: make(map[string]bool), + state: make(map[string]*varState), + } + provideOutputs[Inputs](ctx, "flow inputs") + flow.Root.verify(ctx) + requireInputs[Outputs](ctx, "flow outputs") + if err := ctx.finalize(); err != nil { + return fmt.Errorf("flow %v: %w", flow.Name, err) + } + all[flow.Name] = flow + return nil +} |
