aboutsummaryrefslogtreecommitdiffstats
path: root/syz-cluster/controller
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2025-04-24 17:04:38 +0200
committerAleksandr Nogikh <nogikh@google.com>2025-04-30 12:32:14 +0000
commit937aafd76e698830e784cf8af849eaae479a6ace (patch)
tree1cf7050e4136b688fbac482cef6b415d181605bf /syz-cluster/controller
parent7c6d304203f2c91fd2927dc473e0b56379eb0dd5 (diff)
syz-cluster: separate global env from global config
Environment variables are convenient for storing values like DB or GCS bucket names, but structured formats are more convenient for the actual service configuration. Separate global-config from global-config-env and add the functionality that queries and parses the config options.
Diffstat (limited to 'syz-cluster/controller')
-rw-r--r--syz-cluster/controller/deployment.yaml9
-rw-r--r--syz-cluster/controller/main.go6
-rw-r--r--syz-cluster/controller/processor.go42
-rw-r--r--syz-cluster/controller/processor_test.go12
4 files changed, 35 insertions, 34 deletions
diff --git a/syz-cluster/controller/deployment.yaml b/syz-cluster/controller/deployment.yaml
index 5f0df411e..5272725a0 100644
--- a/syz-cluster/controller/deployment.yaml
+++ b/syz-cluster/controller/deployment.yaml
@@ -21,7 +21,10 @@ spec:
image: ${IMAGE_PREFIX}controller:${IMAGE_TAG}
envFrom:
- configMapRef:
- name: global-config
+ name: global-config-env
+ volumeMounts:
+ - name: config-volume
+ mountPath: /config
ports:
- containerPort: 8080
resources:
@@ -31,3 +34,7 @@ spec:
limits:
cpu: 4
memory: 16G
+ volumes:
+ - name: config-volume
+ configMap:
+ name: global-config
diff --git a/syz-cluster/controller/main.go b/syz-cluster/controller/main.go
index 525c02da0..dc6404b38 100644
--- a/syz-cluster/controller/main.go
+++ b/syz-cluster/controller/main.go
@@ -20,7 +20,11 @@ func main() {
if err != nil {
app.Fatalf("failed to set up environment: %v", err)
}
- sp := NewSeriesProcessor(env)
+ cfg, err := app.Config()
+ if err != nil {
+ app.Fatalf("failed to fetch the config: %v", err)
+ }
+ sp := NewSeriesProcessor(env, cfg)
go func() {
err := sp.Loop(ctx)
app.Fatalf("processor loop failed: %v", err)
diff --git a/syz-cluster/controller/processor.go b/syz-cluster/controller/processor.go
index aa286a149..07851a9fa 100644
--- a/syz-cluster/controller/processor.go
+++ b/syz-cluster/controller/processor.go
@@ -8,8 +8,6 @@ import (
"context"
"fmt"
"log"
- "os"
- "strconv"
"sync"
"time"
@@ -22,36 +20,28 @@ import (
)
type SeriesProcessor struct {
- blobStorage blob.Storage
- seriesRepo *db.SeriesRepository
- sessionRepo *db.SessionRepository
- sessionTestRepo *db.SessionTestRepository
- workflows workflow.Service
- dbPollInterval time.Duration
- parallelWorkers int
+ blobStorage blob.Storage
+ seriesRepo *db.SeriesRepository
+ sessionRepo *db.SessionRepository
+ sessionTestRepo *db.SessionTestRepository
+ workflows workflow.Service
+ dbPollInterval time.Duration
+ parallelWorkflows int
}
-func NewSeriesProcessor(env *app.AppEnvironment) *SeriesProcessor {
+func NewSeriesProcessor(env *app.AppEnvironment, cfg *app.AppConfig) *SeriesProcessor {
workflows, err := workflow.NewArgoService()
if err != nil {
app.Fatalf("failed to initialize workflows: %v", err)
}
- parallelWorkers := 1
- if val := os.Getenv("PARALLEL_WORKERS"); val != "" {
- var err error
- parallelWorkers, err = strconv.Atoi(val)
- if err != nil || parallelWorkers < 1 {
- app.Fatalf("invalid PARALLEL_WORKERS value")
- }
- }
return &SeriesProcessor{
- blobStorage: env.BlobStorage,
- seriesRepo: db.NewSeriesRepository(env.Spanner),
- sessionRepo: db.NewSessionRepository(env.Spanner),
- sessionTestRepo: db.NewSessionTestRepository(env.Spanner),
- dbPollInterval: time.Minute,
- workflows: workflows,
- parallelWorkers: parallelWorkers,
+ blobStorage: env.BlobStorage,
+ seriesRepo: db.NewSeriesRepository(env.Spanner),
+ sessionRepo: db.NewSessionRepository(env.Spanner),
+ sessionTestRepo: db.NewSessionTestRepository(env.Spanner),
+ dbPollInterval: time.Minute,
+ workflows: workflows,
+ parallelWorkflows: cfg.ParallelWorkflows,
}
}
@@ -113,7 +103,7 @@ func (sp *SeriesProcessor) seriesRunner(ctx context.Context, ch <-chan *db.Sessi
var eg errgroup.Group
defer eg.Wait()
- eg.SetLimit(sp.parallelWorkers)
+ eg.SetLimit(sp.parallelWorkflows)
for {
var session *db.Session
select {
diff --git a/syz-cluster/controller/processor_test.go b/syz-cluster/controller/processor_test.go
index 9f5e04d9b..68c62ba48 100644
--- a/syz-cluster/controller/processor_test.go
+++ b/syz-cluster/controller/processor_test.go
@@ -191,11 +191,11 @@ func prepareProcessorTest(t *testing.T, workflows workflow.Service) (*SeriesProc
env, ctx := app.TestEnvironment(t)
client := controller.TestServer(t, env)
return &SeriesProcessor{
- seriesRepo: db.NewSeriesRepository(env.Spanner),
- sessionRepo: db.NewSessionRepository(env.Spanner),
- sessionTestRepo: db.NewSessionTestRepository(env.Spanner),
- workflows: workflows,
- dbPollInterval: time.Second / 10,
- parallelWorkers: 2,
+ seriesRepo: db.NewSeriesRepository(env.Spanner),
+ sessionRepo: db.NewSessionRepository(env.Spanner),
+ sessionTestRepo: db.NewSessionTestRepository(env.Spanner),
+ workflows: workflows,
+ dbPollInterval: time.Second / 10,
+ parallelWorkflows: 2,
}, client, ctx
}