aboutsummaryrefslogtreecommitdiffstats
path: root/syz-agent/agent.go
diff options
context:
space:
mode:
Diffstat (limited to 'syz-agent/agent.go')
-rw-r--r--syz-agent/agent.go59
1 files changed, 49 insertions, 10 deletions
diff --git a/syz-agent/agent.go b/syz-agent/agent.go
index d070144db..bfd3a7b7b 100644
--- a/syz-agent/agent.go
+++ b/syz-agent/agent.go
@@ -112,10 +112,11 @@ func run(configFile string, exitOnUpgrade, autoUpdate bool) error {
}
s := &Server{
- cfg: cfg,
- dash: dash,
- cache: cache,
- workdir: workdir,
+ cfg: cfg,
+ dash: dash,
+ cache: cache,
+ workdir: workdir,
+ overQuotaModels: make(map[string]time.Time),
}
ctx, stop := context.WithCancel(context.Background())
@@ -159,18 +160,22 @@ func run(configFile string, exitOnUpgrade, autoUpdate bool) error {
}
type Server struct {
- cfg *Config
- dash *dashapi.Dashboard
- cache *aflow.Cache
- workdir string
+ cfg *Config
+ dash *dashapi.Dashboard
+ cache *aflow.Cache
+ workdir string
+ overQuotaModels map[string]time.Time
}
-func (s *Server) poll(ctx context.Context) (
- bool, error) {
+func (s *Server) poll(ctx context.Context) (bool, error) {
+ s.resetModelQuota()
req := &dashapi.AIJobPollReq{
CodeRevision: prog.GitRevision,
}
for _, flow := range aflow.Flows {
+ if s.modelOverQuota(flow) {
+ continue
+ }
req.Workflows = append(req.Workflows, dashapi.AIWorkflow{
Type: flow.Type,
Name: flow.Name,
@@ -190,6 +195,20 @@ func (s *Server) poll(ctx context.Context) (
doneReq.Results = results
if jobErr != nil {
doneReq.Error = jobErr.Error()
+ if model := aflow.IsModelQuotaError(jobErr); model != "" {
+ // If a model is over quota, we will avoid requesting more jobs
+ // for workflows that use the model.
+ // There is no easy way to handle the current job, though.
+ // We would either need to tell dashboard to have a special status,
+ // and retry the job the next day. Or, somehow stack the job locally,
+ // and resume it the next day. This looks rather complex, so instead
+ // we just drop it on the floor and don't report anything to
+ // the dashboard at all. For the dashboard it will look like
+ // the server has crashed while executing the job, and it should
+ // eventually retry it on common grounds.
+ s.overQuotaModels[model] = time.Now()
+ return true, nil
+ }
}
if err := s.dash.AIJobDone(doneReq); err != nil {
return false, err
@@ -223,3 +242,23 @@ func (s *Server) executeJob(ctx context.Context, req *dashapi.AIJobPollResp) (ma
}
return flow.Execute(ctx, s.cfg.Model, s.workdir, inputs, s.cache, onEvent)
}
+
+func (s *Server) modelOverQuota(flow *aflow.Flow) bool {
+ if s.cfg.Model != "" {
+ return !s.overQuotaModels[s.cfg.Model].IsZero()
+ }
+ for _, model := range flow.Models {
+ if !s.overQuotaModels[model].IsZero() {
+ return true
+ }
+ }
+ return false
+}
+
+func (s *Server) resetModelQuota() {
+ for model, when := range s.overQuotaModels {
+ if aflow.QuotaResetTime(when).After(time.Now()) {
+ delete(s.overQuotaModels, model)
+ }
+ }
+}