diff options
| author | Aleksandr Nogikh <nogikh@google.com> | 2025-04-24 17:04:38 +0200 |
|---|---|---|
| committer | Aleksandr Nogikh <nogikh@google.com> | 2025-04-30 12:32:14 +0000 |
| commit | 937aafd76e698830e784cf8af849eaae479a6ace (patch) | |
| tree | 1cf7050e4136b688fbac482cef6b415d181605bf /syz-cluster/controller/processor.go | |
| parent | 7c6d304203f2c91fd2927dc473e0b56379eb0dd5 (diff) | |
syz-cluster: separate global env from global config
Environment variables are convenient for storing values like DB or GCS
bucket names, but structured formats are more convenient for the actual
service configuration.
Separate global-config from global-config-env and add the functionality
that queries and parses the config options.
Diffstat (limited to 'syz-cluster/controller/processor.go')
| -rw-r--r-- | syz-cluster/controller/processor.go | 42 |
1 files changed, 16 insertions, 26 deletions
diff --git a/syz-cluster/controller/processor.go b/syz-cluster/controller/processor.go index aa286a149..07851a9fa 100644 --- a/syz-cluster/controller/processor.go +++ b/syz-cluster/controller/processor.go @@ -8,8 +8,6 @@ import ( "context" "fmt" "log" - "os" - "strconv" "sync" "time" @@ -22,36 +20,28 @@ import ( ) type SeriesProcessor struct { - blobStorage blob.Storage - seriesRepo *db.SeriesRepository - sessionRepo *db.SessionRepository - sessionTestRepo *db.SessionTestRepository - workflows workflow.Service - dbPollInterval time.Duration - parallelWorkers int + blobStorage blob.Storage + seriesRepo *db.SeriesRepository + sessionRepo *db.SessionRepository + sessionTestRepo *db.SessionTestRepository + workflows workflow.Service + dbPollInterval time.Duration + parallelWorkflows int } -func NewSeriesProcessor(env *app.AppEnvironment) *SeriesProcessor { +func NewSeriesProcessor(env *app.AppEnvironment, cfg *app.AppConfig) *SeriesProcessor { workflows, err := workflow.NewArgoService() if err != nil { app.Fatalf("failed to initialize workflows: %v", err) } - parallelWorkers := 1 - if val := os.Getenv("PARALLEL_WORKERS"); val != "" { - var err error - parallelWorkers, err = strconv.Atoi(val) - if err != nil || parallelWorkers < 1 { - app.Fatalf("invalid PARALLEL_WORKERS value") - } - } return &SeriesProcessor{ - blobStorage: env.BlobStorage, - seriesRepo: db.NewSeriesRepository(env.Spanner), - sessionRepo: db.NewSessionRepository(env.Spanner), - sessionTestRepo: db.NewSessionTestRepository(env.Spanner), - dbPollInterval: time.Minute, - workflows: workflows, - parallelWorkers: parallelWorkers, + blobStorage: env.BlobStorage, + seriesRepo: db.NewSeriesRepository(env.Spanner), + sessionRepo: db.NewSessionRepository(env.Spanner), + sessionTestRepo: db.NewSessionTestRepository(env.Spanner), + dbPollInterval: time.Minute, + workflows: workflows, + parallelWorkflows: cfg.ParallelWorkflows, } } @@ -113,7 +103,7 @@ func (sp *SeriesProcessor) seriesRunner(ctx context.Context, ch <-chan *db.Sessi var eg errgroup.Group defer eg.Wait() - eg.SetLimit(sp.parallelWorkers) + eg.SetLimit(sp.parallelWorkflows) for { var session *db.Session select { |
