diff options
Diffstat (limited to 'syz-agent/agent.go')
| -rw-r--r-- | syz-agent/agent.go | 59 |
1 files changed, 49 insertions, 10 deletions
diff --git a/syz-agent/agent.go b/syz-agent/agent.go index d070144db..bfd3a7b7b 100644 --- a/syz-agent/agent.go +++ b/syz-agent/agent.go @@ -112,10 +112,11 @@ func run(configFile string, exitOnUpgrade, autoUpdate bool) error { } s := &Server{ - cfg: cfg, - dash: dash, - cache: cache, - workdir: workdir, + cfg: cfg, + dash: dash, + cache: cache, + workdir: workdir, + overQuotaModels: make(map[string]time.Time), } ctx, stop := context.WithCancel(context.Background()) @@ -159,18 +160,22 @@ func run(configFile string, exitOnUpgrade, autoUpdate bool) error { } type Server struct { - cfg *Config - dash *dashapi.Dashboard - cache *aflow.Cache - workdir string + cfg *Config + dash *dashapi.Dashboard + cache *aflow.Cache + workdir string + overQuotaModels map[string]time.Time } -func (s *Server) poll(ctx context.Context) ( - bool, error) { +func (s *Server) poll(ctx context.Context) (bool, error) { + s.resetModelQuota() req := &dashapi.AIJobPollReq{ CodeRevision: prog.GitRevision, } for _, flow := range aflow.Flows { + if s.modelOverQuota(flow) { + continue + } req.Workflows = append(req.Workflows, dashapi.AIWorkflow{ Type: flow.Type, Name: flow.Name, @@ -190,6 +195,20 @@ func (s *Server) poll(ctx context.Context) ( doneReq.Results = results if jobErr != nil { doneReq.Error = jobErr.Error() + if model := aflow.IsModelQuotaError(jobErr); model != "" { + // If a model is over quota, we will avoid requesting more jobs + // for workflows that use the model. + // There is no easy way to handle the current job, though. + // We would either need to tell dashboard to have a special status, + // and retry the job the next day. Or, somehow stack the job locally, + // and resume it the next day. This looks rather complex, so instead + // we just drop it on the floor and don't report anything to + // the dashboard at all. For the dashboard it will look like + // the server has crashed while executing the job, and it should + // eventually retry it on common grounds. + s.overQuotaModels[model] = time.Now() + return true, nil + } } if err := s.dash.AIJobDone(doneReq); err != nil { return false, err @@ -223,3 +242,23 @@ func (s *Server) executeJob(ctx context.Context, req *dashapi.AIJobPollResp) (ma } return flow.Execute(ctx, s.cfg.Model, s.workdir, inputs, s.cache, onEvent) } + +func (s *Server) modelOverQuota(flow *aflow.Flow) bool { + if s.cfg.Model != "" { + return !s.overQuotaModels[s.cfg.Model].IsZero() + } + for _, model := range flow.Models { + if !s.overQuotaModels[model].IsZero() { + return true + } + } + return false +} + +func (s *Server) resetModelQuota() { + for model, when := range s.overQuotaModels { + if aflow.QuotaResetTime(when).After(time.Now()) { + delete(s.overQuotaModels, model) + } + } +} |
