From 500bfdc41735bc8d617cbfd4f1ab6b5980c8f1e5 Mon Sep 17 00:00:00 2001 From: Aleksandr Nogikh Date: Thu, 2 Nov 2023 19:06:05 +0100 Subject: dashboard: throttle incoming requests To ensure service stability, let's rate limit incoming requests to our web endpoints. --- dashboard/app/cache.go | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) (limited to 'dashboard/app/cache.go') diff --git a/dashboard/app/cache.go b/dashboard/app/cache.go index f54f95c61..14c8169c8 100644 --- a/dashboard/app/cache.go +++ b/dashboard/app/cache.go @@ -7,8 +7,10 @@ import ( "encoding/json" "fmt" "net/http" + "sort" "time" + "github.com/google/syzkaller/pkg/hash" "github.com/google/syzkaller/pkg/image" "golang.org/x/net/context" "google.golang.org/appengine/v2" @@ -254,3 +256,68 @@ func cachedObjectList[T any](c context.Context, key string, period time.Duration } return obj, nil } + +type RequesterInfo struct { + Requests []time.Time +} + +func (ri *RequesterInfo) Record(now time.Time, cfg ThrottleConfig) bool { + var newRequests []time.Time + for _, req := range ri.Requests { + if now.Sub(req) >= cfg.Window { + continue + } + newRequests = append(newRequests, req) + } + newRequests = append(newRequests, now) + sort.Slice(ri.Requests, func(i, j int) bool { return ri.Requests[i].Before(ri.Requests[j]) }) + // Don't store more than needed. + if len(newRequests) > cfg.Limit+1 { + newRequests = newRequests[len(newRequests)-(cfg.Limit+1):] + } + ri.Requests = newRequests + // Check that we satisfy the conditions. + return len(newRequests) <= cfg.Limit +} + +func ThrottleRequest(c context.Context, requesterID string) (bool, error) { + cfg := getConfig(c).Throttle + if cfg.Empty() || requesterID == "" { + // No sense to query memcached. + return true, nil + } + key := fmt.Sprintf("requester-%s", hash.String([]byte(requesterID))) + const attempts = 5 + for i := 0; i < attempts; i++ { + var obj RequesterInfo + item, err := memcache.Gob.Get(c, key, &obj) + if err == memcache.ErrCacheMiss { + ok := obj.Record(timeNow(c), cfg) + err = memcache.Gob.Add(c, &memcache.Item{ + Key: key, + Object: obj, + Expiration: cfg.Window, + }) + if err == memcache.ErrNotStored { + // Conflict with another instance. Retry. + continue + } + return ok, err + } else if err != nil { + return false, err + } + // Update the existing object. + ok := obj.Record(timeNow(c), cfg) + item.Expiration = cfg.Window + item.Object = obj + err = memcache.Gob.CompareAndSwap(c, item) + if err == memcache.ErrCASConflict { + // Update conflict. Retry. + continue + } else if err != nil { + return false, err + } + return ok, nil + } + return false, fmt.Errorf("all attempts to record request failed") +} -- cgit mrf-deployment