aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDmitry Vyukov <dvyukov@google.com>2026-01-16 09:33:14 +0100
committerDmitry Vyukov <dvyukov@google.com>2026-01-20 21:12:57 +0000
commit91e26ec437abcd42a8255aa88e31b45da059529e (patch)
treec9e6e26949375e3ae053f1fc84fe749e69d5cabc
parent7f5908e77ae0e7fef4b7901341b8c2c4bbb74b28 (diff)
pkg/aflow: handle model quota errors
Detect model quota violations (assumed to be RPD). Make syz-agent not request jobs that use the model until the next quota reset time. Fixes #6573
-rw-r--r--pkg/aflow/execute.go46
-rw-r--r--pkg/aflow/flow.go6
-rw-r--r--pkg/aflow/flow_test.go22
-rw-r--r--pkg/aflow/llm_agent.go9
-rw-r--r--pkg/aflow/verify.go1
-rw-r--r--syz-agent/agent.go59
6 files changed, 132 insertions, 11 deletions
diff --git a/pkg/aflow/execute.go b/pkg/aflow/execute.go
index 96f15c13b..f15b19eb7 100644
--- a/pkg/aflow/execute.go
+++ b/pkg/aflow/execute.go
@@ -88,6 +88,52 @@ type flowError struct {
error
}
+func IsModelQuotaError(err error) string {
+ var quotaErr *modelQuotaError
+ if errors.As(err, &quotaErr) {
+ return quotaErr.model
+ }
+ return ""
+}
+
+type modelQuotaError struct {
+ model string
+}
+
+func (err *modelQuotaError) Error() string {
+ return fmt.Sprintf("model %q is over daily quota", err.model)
+}
+
+// QuotaResetTime returns the time when RPD quota will be reset
+// for a quota overflow happened at time t.
+func QuotaResetTime(t time.Time) time.Time {
+ // Requests per day (RPD) quotas reset at midnight Pacific time:
+ // https://ai.google.dev/gemini-api/docs/rate-limits
+ // To account for potential delays in the reset logic, we add small delta (5 mins)
+ // to that to avoid situation when we reset it at exactly midnight locally,
+ // but it's not reset on the server yet.
+ // The assumption is also that any rate limiting errors in the very beginning
+ // of the day (within first seconds/minutes), actually belong to the previous day
+ // (we couldn't overflow the quota within that period).
+ t = t.In(pacificLoc)
+ resetTime := time.Date(t.Year(), t.Month(), t.Day(), 0, 5, 0, 0, pacificLoc)
+ if t.After(resetTime) {
+ resetTime = resetTime.Add(24 * time.Hour)
+ if t.After(resetTime) {
+ panic(fmt.Sprintf("%v > %v", t, resetTime))
+ }
+ }
+ return resetTime.UTC()
+}
+
+var pacificLoc = func() *time.Location {
+ loc, err := time.LoadLocation("US/Pacific")
+ if err != nil {
+ panic(err)
+ }
+ return loc
+}()
+
type (
onEvent func(*trajectory.Span) error
contextKeyType int
diff --git a/pkg/aflow/flow.go b/pkg/aflow/flow.go
index 6325b2fd2..a391f5a01 100644
--- a/pkg/aflow/flow.go
+++ b/pkg/aflow/flow.go
@@ -5,6 +5,8 @@ package aflow
import (
"fmt"
+ "maps"
+ "slices"
"github.com/google/syzkaller/pkg/aflow/ai"
)
@@ -25,6 +27,7 @@ type Flow struct {
Name string // Empty for the main workflow for the workflow type.
Root Action
+ Models []string // LLM models used in this workflow.
*FlowType
}
@@ -87,6 +90,7 @@ func registerOne[Inputs, Outputs any](all map[string]*Flow, flow *Flow) error {
ctx := &verifyContext{
actions: make(map[string]bool),
state: make(map[string]*varState),
+ models: make(map[string]bool),
}
provideOutputs[Inputs](ctx, "flow inputs")
flow.Root.verify(ctx)
@@ -94,6 +98,8 @@ func registerOne[Inputs, Outputs any](all map[string]*Flow, flow *Flow) error {
if err := ctx.finalize(); err != nil {
return fmt.Errorf("flow %v: %w", flow.Name, err)
}
+ flow.Models = slices.Collect(maps.Keys(ctx.models))
+ slices.Sort(flow.Models)
all[flow.Name] = flow
return nil
}
diff --git a/pkg/aflow/flow_test.go b/pkg/aflow/flow_test.go
index 97e2dd93a..3ee3f545b 100644
--- a/pkg/aflow/flow_test.go
+++ b/pkg/aflow/flow_test.go
@@ -918,3 +918,25 @@ func TestNoInputs(t *testing.T) {
require.Equal(t, err.Error(), "flow inputs are missing:"+
" field InBar is not present when converting map to aflow.flowInputs")
}
+
+func TestQuotaResetTime(t *testing.T) {
+ type Test struct {
+ when time.Time
+ reset time.Time
+ }
+ testLoc := time.FixedZone("+4h", 4*60*60) // seconds east of UTC
+ tests := []Test{
+ {time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), time.Date(2000, 1, 1, 8, 5, 0, 0, time.UTC)},
+ {time.Date(2000, 1, 1, 0, 0, 0, 0, testLoc), time.Date(2000, 1, 1, 8, 5, 0, 0, time.UTC)},
+ {time.Date(2000, 1, 1, 8, 0, 0, 0, time.UTC), time.Date(2000, 1, 1, 8, 5, 0, 0, time.UTC)},
+ {time.Date(2000, 1, 1, 8, 0, 0, 0, testLoc), time.Date(2000, 1, 1, 8, 5, 0, 0, time.UTC)},
+ {time.Date(2000, 1, 1, 9, 0, 0, 0, time.UTC), time.Date(2000, 1, 2, 8, 5, 0, 0, time.UTC)},
+ {time.Date(2000, 1, 1, 9, 0, 0, 0, testLoc), time.Date(2000, 1, 1, 8, 5, 0, 0, time.UTC)},
+ {time.Date(2000, 1, 1, 13, 0, 0, 0, time.UTC), time.Date(2000, 1, 2, 8, 5, 0, 0, time.UTC)},
+ {time.Date(2000, 1, 1, 13, 0, 0, 0, testLoc), time.Date(2000, 1, 2, 8, 5, 0, 0, time.UTC)},
+ }
+ for _, test := range tests {
+ got := QuotaResetTime(test.when)
+ assert.Equal(t, test.reset, got, "when: %v", test.when)
+ }
+}
diff --git a/pkg/aflow/llm_agent.go b/pkg/aflow/llm_agent.go
index 3c416b37c..d5e4d6d4d 100644
--- a/pkg/aflow/llm_agent.go
+++ b/pkg/aflow/llm_agent.go
@@ -9,6 +9,7 @@ import (
"maps"
"net/http"
"reflect"
+ "strings"
"time"
"github.com/google/syzkaller/pkg/aflow/trajectory"
@@ -289,8 +290,9 @@ func (a *LLMAgent) parseResponse(resp *genai.GenerateContentResponse) (
func (a *LLMAgent) generateContent(ctx *Context, cfg *genai.GenerateContentConfig,
req []*genai.Content) (*genai.GenerateContentResponse, error) {
backoff := time.Second
+ model := ctx.modelName(a.Model)
for try := 0; ; try++ {
- resp, err := ctx.generateContent(ctx.modelName(a.Model), cfg, req)
+ resp, err := ctx.generateContent(model, cfg, req)
var apiErr genai.APIError
if err != nil && try < 100 && errors.As(err, &apiErr) &&
apiErr.Code == http.StatusServiceUnavailable {
@@ -298,6 +300,11 @@ func (a *LLMAgent) generateContent(ctx *Context, cfg *genai.GenerateContentConfi
backoff = min(backoff+time.Second, 10*time.Second)
continue
}
+ if err != nil && errors.As(err, &apiErr) && apiErr.Code == http.StatusTooManyRequests &&
+ strings.Contains(apiErr.Message, "Quota exceeded for metric") &&
+ strings.Contains(apiErr.Message, "generate_requests_per_model_per_day") {
+ return resp, &modelQuotaError{model}
+ }
return resp, err
}
}
diff --git a/pkg/aflow/verify.go b/pkg/aflow/verify.go
index 5f7d16a09..8486560f8 100644
--- a/pkg/aflow/verify.go
+++ b/pkg/aflow/verify.go
@@ -13,6 +13,7 @@ import (
type verifyContext struct {
actions map[string]bool
state map[string]*varState
+ models map[string]bool
err error
}
diff --git a/syz-agent/agent.go b/syz-agent/agent.go
index d070144db..bfd3a7b7b 100644
--- a/syz-agent/agent.go
+++ b/syz-agent/agent.go
@@ -112,10 +112,11 @@ func run(configFile string, exitOnUpgrade, autoUpdate bool) error {
}
s := &Server{
- cfg: cfg,
- dash: dash,
- cache: cache,
- workdir: workdir,
+ cfg: cfg,
+ dash: dash,
+ cache: cache,
+ workdir: workdir,
+ overQuotaModels: make(map[string]time.Time),
}
ctx, stop := context.WithCancel(context.Background())
@@ -159,18 +160,22 @@ func run(configFile string, exitOnUpgrade, autoUpdate bool) error {
}
type Server struct {
- cfg *Config
- dash *dashapi.Dashboard
- cache *aflow.Cache
- workdir string
+ cfg *Config
+ dash *dashapi.Dashboard
+ cache *aflow.Cache
+ workdir string
+ overQuotaModels map[string]time.Time
}
-func (s *Server) poll(ctx context.Context) (
- bool, error) {
+func (s *Server) poll(ctx context.Context) (bool, error) {
+ s.resetModelQuota()
req := &dashapi.AIJobPollReq{
CodeRevision: prog.GitRevision,
}
for _, flow := range aflow.Flows {
+ if s.modelOverQuota(flow) {
+ continue
+ }
req.Workflows = append(req.Workflows, dashapi.AIWorkflow{
Type: flow.Type,
Name: flow.Name,
@@ -190,6 +195,20 @@ func (s *Server) poll(ctx context.Context) (
doneReq.Results = results
if jobErr != nil {
doneReq.Error = jobErr.Error()
+ if model := aflow.IsModelQuotaError(jobErr); model != "" {
+ // If a model is over quota, we will avoid requesting more jobs
+ // for workflows that use the model.
+ // There is no easy way to handle the current job, though.
+ // We would either need to tell dashboard to have a special status,
+ // and retry the job the next day. Or, somehow stack the job locally,
+ // and resume it the next day. This looks rather complex, so instead
+ // we just drop it on the floor and don't report anything to
+ // the dashboard at all. For the dashboard it will look like
+ // the server has crashed while executing the job, and it should
+ // eventually retry it on common grounds.
+ s.overQuotaModels[model] = time.Now()
+ return true, nil
+ }
}
if err := s.dash.AIJobDone(doneReq); err != nil {
return false, err
@@ -223,3 +242,23 @@ func (s *Server) executeJob(ctx context.Context, req *dashapi.AIJobPollResp) (ma
}
return flow.Execute(ctx, s.cfg.Model, s.workdir, inputs, s.cache, onEvent)
}
+
+func (s *Server) modelOverQuota(flow *aflow.Flow) bool {
+ if s.cfg.Model != "" {
+ return !s.overQuotaModels[s.cfg.Model].IsZero()
+ }
+ for _, model := range flow.Models {
+ if !s.overQuotaModels[model].IsZero() {
+ return true
+ }
+ }
+ return false
+}
+
+func (s *Server) resetModelQuota() {
+ for model, when := range s.overQuotaModels {
+ if aflow.QuotaResetTime(when).After(time.Now()) {
+ delete(s.overQuotaModels, model)
+ }
+ }
+}