From 91e26ec437abcd42a8255aa88e31b45da059529e Mon Sep 17 00:00:00 2001 From: Dmitry Vyukov Date: Fri, 16 Jan 2026 09:33:14 +0100 Subject: pkg/aflow: handle model quota errors Detect model quota violations (assumed to be RPD). Make syz-agent not request jobs that use the model until the next quota reset time. Fixes #6573 --- pkg/aflow/execute.go | 46 +++++++++++++++++++++++++++++++++++++++ pkg/aflow/flow.go | 6 +++++ pkg/aflow/flow_test.go | 22 +++++++++++++++++++ pkg/aflow/llm_agent.go | 9 +++++++- pkg/aflow/verify.go | 1 + syz-agent/agent.go | 59 +++++++++++++++++++++++++++++++++++++++++--------- 6 files changed, 132 insertions(+), 11 deletions(-) diff --git a/pkg/aflow/execute.go b/pkg/aflow/execute.go index 96f15c13b..f15b19eb7 100644 --- a/pkg/aflow/execute.go +++ b/pkg/aflow/execute.go @@ -88,6 +88,52 @@ type flowError struct { error } +func IsModelQuotaError(err error) string { + var quotaErr *modelQuotaError + if errors.As(err, "aErr) { + return quotaErr.model + } + return "" +} + +type modelQuotaError struct { + model string +} + +func (err *modelQuotaError) Error() string { + return fmt.Sprintf("model %q is over daily quota", err.model) +} + +// QuotaResetTime returns the time when RPD quota will be reset +// for a quota overflow happened at time t. +func QuotaResetTime(t time.Time) time.Time { + // Requests per day (RPD) quotas reset at midnight Pacific time: + // https://ai.google.dev/gemini-api/docs/rate-limits + // To account for potential delays in the reset logic, we add small delta (5 mins) + // to that to avoid situation when we reset it at exactly midnight locally, + // but it's not reset on the server yet. + // The assumption is also that any rate limiting errors in the very beginning + // of the day (within first seconds/minutes), actually belong to the previous day + // (we couldn't overflow the quota within that period). + t = t.In(pacificLoc) + resetTime := time.Date(t.Year(), t.Month(), t.Day(), 0, 5, 0, 0, pacificLoc) + if t.After(resetTime) { + resetTime = resetTime.Add(24 * time.Hour) + if t.After(resetTime) { + panic(fmt.Sprintf("%v > %v", t, resetTime)) + } + } + return resetTime.UTC() +} + +var pacificLoc = func() *time.Location { + loc, err := time.LoadLocation("US/Pacific") + if err != nil { + panic(err) + } + return loc +}() + type ( onEvent func(*trajectory.Span) error contextKeyType int diff --git a/pkg/aflow/flow.go b/pkg/aflow/flow.go index 6325b2fd2..a391f5a01 100644 --- a/pkg/aflow/flow.go +++ b/pkg/aflow/flow.go @@ -5,6 +5,8 @@ package aflow import ( "fmt" + "maps" + "slices" "github.com/google/syzkaller/pkg/aflow/ai" ) @@ -25,6 +27,7 @@ type Flow struct { Name string // Empty for the main workflow for the workflow type. Root Action + Models []string // LLM models used in this workflow. *FlowType } @@ -87,6 +90,7 @@ func registerOne[Inputs, Outputs any](all map[string]*Flow, flow *Flow) error { ctx := &verifyContext{ actions: make(map[string]bool), state: make(map[string]*varState), + models: make(map[string]bool), } provideOutputs[Inputs](ctx, "flow inputs") flow.Root.verify(ctx) @@ -94,6 +98,8 @@ func registerOne[Inputs, Outputs any](all map[string]*Flow, flow *Flow) error { if err := ctx.finalize(); err != nil { return fmt.Errorf("flow %v: %w", flow.Name, err) } + flow.Models = slices.Collect(maps.Keys(ctx.models)) + slices.Sort(flow.Models) all[flow.Name] = flow return nil } diff --git a/pkg/aflow/flow_test.go b/pkg/aflow/flow_test.go index 97e2dd93a..3ee3f545b 100644 --- a/pkg/aflow/flow_test.go +++ b/pkg/aflow/flow_test.go @@ -918,3 +918,25 @@ func TestNoInputs(t *testing.T) { require.Equal(t, err.Error(), "flow inputs are missing:"+ " field InBar is not present when converting map to aflow.flowInputs") } + +func TestQuotaResetTime(t *testing.T) { + type Test struct { + when time.Time + reset time.Time + } + testLoc := time.FixedZone("+4h", 4*60*60) // seconds east of UTC + tests := []Test{ + {time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), time.Date(2000, 1, 1, 8, 5, 0, 0, time.UTC)}, + {time.Date(2000, 1, 1, 0, 0, 0, 0, testLoc), time.Date(2000, 1, 1, 8, 5, 0, 0, time.UTC)}, + {time.Date(2000, 1, 1, 8, 0, 0, 0, time.UTC), time.Date(2000, 1, 1, 8, 5, 0, 0, time.UTC)}, + {time.Date(2000, 1, 1, 8, 0, 0, 0, testLoc), time.Date(2000, 1, 1, 8, 5, 0, 0, time.UTC)}, + {time.Date(2000, 1, 1, 9, 0, 0, 0, time.UTC), time.Date(2000, 1, 2, 8, 5, 0, 0, time.UTC)}, + {time.Date(2000, 1, 1, 9, 0, 0, 0, testLoc), time.Date(2000, 1, 1, 8, 5, 0, 0, time.UTC)}, + {time.Date(2000, 1, 1, 13, 0, 0, 0, time.UTC), time.Date(2000, 1, 2, 8, 5, 0, 0, time.UTC)}, + {time.Date(2000, 1, 1, 13, 0, 0, 0, testLoc), time.Date(2000, 1, 2, 8, 5, 0, 0, time.UTC)}, + } + for _, test := range tests { + got := QuotaResetTime(test.when) + assert.Equal(t, test.reset, got, "when: %v", test.when) + } +} diff --git a/pkg/aflow/llm_agent.go b/pkg/aflow/llm_agent.go index 3c416b37c..d5e4d6d4d 100644 --- a/pkg/aflow/llm_agent.go +++ b/pkg/aflow/llm_agent.go @@ -9,6 +9,7 @@ import ( "maps" "net/http" "reflect" + "strings" "time" "github.com/google/syzkaller/pkg/aflow/trajectory" @@ -289,8 +290,9 @@ func (a *LLMAgent) parseResponse(resp *genai.GenerateContentResponse) ( func (a *LLMAgent) generateContent(ctx *Context, cfg *genai.GenerateContentConfig, req []*genai.Content) (*genai.GenerateContentResponse, error) { backoff := time.Second + model := ctx.modelName(a.Model) for try := 0; ; try++ { - resp, err := ctx.generateContent(ctx.modelName(a.Model), cfg, req) + resp, err := ctx.generateContent(model, cfg, req) var apiErr genai.APIError if err != nil && try < 100 && errors.As(err, &apiErr) && apiErr.Code == http.StatusServiceUnavailable { @@ -298,6 +300,11 @@ func (a *LLMAgent) generateContent(ctx *Context, cfg *genai.GenerateContentConfi backoff = min(backoff+time.Second, 10*time.Second) continue } + if err != nil && errors.As(err, &apiErr) && apiErr.Code == http.StatusTooManyRequests && + strings.Contains(apiErr.Message, "Quota exceeded for metric") && + strings.Contains(apiErr.Message, "generate_requests_per_model_per_day") { + return resp, &modelQuotaError{model} + } return resp, err } } diff --git a/pkg/aflow/verify.go b/pkg/aflow/verify.go index 5f7d16a09..8486560f8 100644 --- a/pkg/aflow/verify.go +++ b/pkg/aflow/verify.go @@ -13,6 +13,7 @@ import ( type verifyContext struct { actions map[string]bool state map[string]*varState + models map[string]bool err error } diff --git a/syz-agent/agent.go b/syz-agent/agent.go index d070144db..bfd3a7b7b 100644 --- a/syz-agent/agent.go +++ b/syz-agent/agent.go @@ -112,10 +112,11 @@ func run(configFile string, exitOnUpgrade, autoUpdate bool) error { } s := &Server{ - cfg: cfg, - dash: dash, - cache: cache, - workdir: workdir, + cfg: cfg, + dash: dash, + cache: cache, + workdir: workdir, + overQuotaModels: make(map[string]time.Time), } ctx, stop := context.WithCancel(context.Background()) @@ -159,18 +160,22 @@ func run(configFile string, exitOnUpgrade, autoUpdate bool) error { } type Server struct { - cfg *Config - dash *dashapi.Dashboard - cache *aflow.Cache - workdir string + cfg *Config + dash *dashapi.Dashboard + cache *aflow.Cache + workdir string + overQuotaModels map[string]time.Time } -func (s *Server) poll(ctx context.Context) ( - bool, error) { +func (s *Server) poll(ctx context.Context) (bool, error) { + s.resetModelQuota() req := &dashapi.AIJobPollReq{ CodeRevision: prog.GitRevision, } for _, flow := range aflow.Flows { + if s.modelOverQuota(flow) { + continue + } req.Workflows = append(req.Workflows, dashapi.AIWorkflow{ Type: flow.Type, Name: flow.Name, @@ -190,6 +195,20 @@ func (s *Server) poll(ctx context.Context) ( doneReq.Results = results if jobErr != nil { doneReq.Error = jobErr.Error() + if model := aflow.IsModelQuotaError(jobErr); model != "" { + // If a model is over quota, we will avoid requesting more jobs + // for workflows that use the model. + // There is no easy way to handle the current job, though. + // We would either need to tell dashboard to have a special status, + // and retry the job the next day. Or, somehow stack the job locally, + // and resume it the next day. This looks rather complex, so instead + // we just drop it on the floor and don't report anything to + // the dashboard at all. For the dashboard it will look like + // the server has crashed while executing the job, and it should + // eventually retry it on common grounds. + s.overQuotaModels[model] = time.Now() + return true, nil + } } if err := s.dash.AIJobDone(doneReq); err != nil { return false, err @@ -223,3 +242,23 @@ func (s *Server) executeJob(ctx context.Context, req *dashapi.AIJobPollResp) (ma } return flow.Execute(ctx, s.cfg.Model, s.workdir, inputs, s.cache, onEvent) } + +func (s *Server) modelOverQuota(flow *aflow.Flow) bool { + if s.cfg.Model != "" { + return !s.overQuotaModels[s.cfg.Model].IsZero() + } + for _, model := range flow.Models { + if !s.overQuotaModels[model].IsZero() { + return true + } + } + return false +} + +func (s *Server) resetModelQuota() { + for model, when := range s.overQuotaModels { + if aflow.QuotaResetTime(when).After(time.Now()) { + delete(s.overQuotaModels, model) + } + } +} -- cgit mrf-deployment