// Copyright 2020 syzkaller project authors. All rights reserved. // Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. package main import ( "context" "encoding/json" "errors" "fmt" "net/http" "sort" "time" "github.com/google/syzkaller/pkg/hash" "github.com/google/syzkaller/pkg/image" "google.golang.org/appengine/v2" "google.golang.org/appengine/v2/log" "google.golang.org/appengine/v2/memcache" ) type Cached struct { MissingBackports int Total CachedBugStats Subsystems map[string]CachedBugStats NoSubsystem CachedBugStats } type CachedBugStats struct { Open int Fixed int Invalid int } func CacheGet(c context.Context, r *http.Request, ns string) (*Cached, error) { accessLevel := accessLevel(c, r) v := new(Cached) _, err := memcache.Gob.Get(c, cacheKey(ns, accessLevel), v) if err != nil && err != memcache.ErrCacheMiss { return nil, err } if err == nil { return v, nil } bugs, _, err := loadNamespaceBugs(c, ns) if err != nil { return nil, err } backports, err := loadAllBackports(c, false) if err != nil { return nil, err } return buildAndStoreCached(c, bugs, backports, ns, accessLevel) } var cacheAccessLevels = []AccessLevel{AccessPublic, AccessUser, AccessAdmin} // cacheUpdate updates memcache every hour (called by cron.yaml). // Cache update is slow and we don't want to slow down user requests. func cacheUpdate(w http.ResponseWriter, r *http.Request) { c := appengine.NewContext(r) backports, err := loadAllBackports(c, false) if err != nil { log.Errorf(c, "failed load backports: %v", err) return } for ns := range getConfig(c).Namespaces { bugs, _, err := loadNamespaceBugs(c, ns) if err != nil { log.Errorf(c, "failed load ns=%v bugs: %v", ns, err) continue } for _, accessLevel := range cacheAccessLevels { _, err := buildAndStoreCached(c, bugs, backports, ns, accessLevel) if err != nil { log.Errorf(c, "failed to build cached for ns=%v access=%v: %v", ns, accessLevel, err) continue } } } } func buildAndStoreCached(c context.Context, bugs []*Bug, backports []*rawBackport, ns string, accessLevel AccessLevel) (*Cached, error) { v := &Cached{ Subsystems: make(map[string]CachedBugStats), } for _, bug := range bugs { if bug.Status == BugStatusOpen && accessLevel < bug.sanitizeAccess(c, accessLevel) { continue } v.Total.Record(bug) subsystems := bug.LabelValues(SubsystemLabel) for _, label := range subsystems { stats := v.Subsystems[label.Value] stats.Record(bug) v.Subsystems[label.Value] = stats } if len(subsystems) == 0 { v.NoSubsystem.Record(bug) } } for _, backport := range backports { outgoing := stringInList(backport.FromNs, ns) for _, info := range backport.Bugs { if accessLevel < info.bug.sanitizeAccess(c, accessLevel) { continue } if info.bug.Namespace == ns || outgoing { v.MissingBackports++ } } } item := &memcache.Item{ Key: cacheKey(ns, accessLevel), Object: v, Expiration: 4 * time.Hour, // supposed to be updated by cron every hour } if err := memcache.Gob.Set(c, item); err != nil { return nil, err } return v, nil } func (c *CachedBugStats) Record(bug *Bug) { switch bug.Status { case BugStatusOpen: if len(bug.Commits) == 0 { c.Open++ } else { c.Fixed++ } case BugStatusFixed: c.Fixed++ case BugStatusInvalid: c.Invalid++ } } func cacheKey(ns string, accessLevel AccessLevel) string { return fmt.Sprintf("%v-%v", ns, accessLevel) } func CachedBugGroups(c context.Context, ns string, accessLevel AccessLevel) ([]*uiBugGroup, error) { item, err := memcache.Get(c, cachedBugGroupsKey(ns, accessLevel)) if err == memcache.ErrCacheMiss { return nil, nil } if err != nil { return nil, err } jsonData, destructor := image.MustDecompress(item.Value) defer destructor() var ret []*uiBugGroup err = json.Unmarshal(jsonData, &ret) return ret, err } func cachedBugGroupsKey(ns string, accessLevel AccessLevel) string { return fmt.Sprintf("%v-%v-bug-groups", ns, accessLevel) } // minuteCacheUpdate updates memcache every minute (called by cron.yaml). func handleMinuteCacheUpdate(w http.ResponseWriter, r *http.Request) { c := appengine.NewContext(r) for ns, nsConfig := range getConfig(c).Namespaces { if !nsConfig.CacheUIPages { continue } err := minuteCacheNsUpdate(c, ns) if err != nil { http.Error(w, fmt.Sprintf("bug groups cache update for %s failed: %v", ns, err), http.StatusInternalServerError) return } } } func minuteCacheNsUpdate(c context.Context, ns string) error { bugs, err := loadVisibleBugs(c, ns, nil) if err != nil { return err } managers, err := managerList(c, ns) if err != nil { return err } for _, accessLevel := range cacheAccessLevels { groups, err := prepareBugGroups(c, bugs, managers, accessLevel, ns) if err != nil { return fmt.Errorf("failed to fetch groups: %w", err) } encoded, err := json.Marshal(groups) if err != nil { return fmt.Errorf("failed to marshal: %w", err) } item := &memcache.Item{ Key: cachedBugGroupsKey(ns, accessLevel), // The resulting blob can be quite big, so let's compress. Value: image.Compress(encoded), Expiration: 2 * time.Minute, // supposed to be updated by cron every minute } if err := memcache.Set(c, item); err != nil { return err } } return nil } func CachedManagerList(c context.Context, ns string) ([]string, error) { return cachedObjectList(c, fmt.Sprintf("%s-managers-list", ns), time.Minute, func(c context.Context) ([]string, error) { return managerList(c, ns) }, ) } func CachedUIManagers(c context.Context, accessLevel AccessLevel, ns string, filter *userBugFilter) ([]*uiManager, error) { return cachedObjectList(c, fmt.Sprintf("%s-%v-%v-ui-managers", ns, accessLevel, filter.Hash()), 5*time.Minute, func(c context.Context) ([]*uiManager, error) { return loadManagers(c, accessLevel, ns, filter) }, ) } func cachedObjectList[T any](c context.Context, key string, period time.Duration, load func(context.Context) ([]T, error)) ([]T, error) { // Check if the object is in cache. var obj []T _, err := memcache.Gob.Get(c, key, &obj) if err == nil { return obj, nil } else if err != memcache.ErrCacheMiss { return nil, err } // Load the object. obj, err = load(c) if err != nil { return nil, err } item := &memcache.Item{ Key: key, Object: obj, Expiration: period, } if err := memcache.Gob.Set(c, item); err != nil { return nil, err } return obj, nil } type RequesterInfo struct { Requests []time.Time } func (ri *RequesterInfo) Record(now time.Time, cfg ThrottleConfig) bool { var newRequests []time.Time for _, req := range ri.Requests { if now.Sub(req) >= cfg.Window { continue } newRequests = append(newRequests, req) } newRequests = append(newRequests, now) sort.Slice(ri.Requests, func(i, j int) bool { return ri.Requests[i].Before(ri.Requests[j]) }) // Don't store more than needed. if len(newRequests) > cfg.Limit+1 { newRequests = newRequests[len(newRequests)-(cfg.Limit+1):] } ri.Requests = newRequests // Check that we satisfy the conditions. return len(newRequests) <= cfg.Limit } var ErrThrottleTooManyRetries = errors.New("all attempts to record request failed") func ThrottleRequest(c context.Context, requesterID string) (bool, error) { cfg := getConfig(c).Throttle if cfg.Empty() || requesterID == "" { // No sense to query memcached. return true, nil } key := fmt.Sprintf("requester-%s", hash.String([]byte(requesterID))) const attempts = 5 for i := 0; i < attempts; i++ { var obj RequesterInfo item, err := memcache.Gob.Get(c, key, &obj) if err == memcache.ErrCacheMiss { ok := obj.Record(timeNow(c), cfg) err = memcache.Gob.Add(c, &memcache.Item{ Key: key, Object: obj, Expiration: cfg.Window, }) if err == memcache.ErrNotStored { // Conflict with another instance. Retry. continue } return ok, err } else if err != nil { return false, err } // Update the existing object. ok := obj.Record(timeNow(c), cfg) item.Expiration = cfg.Window item.Object = obj err = memcache.Gob.CompareAndSwap(c, item) if err == memcache.ErrCASConflict { if ok { // Only retry if we approved the query. // If we denied and there was a concurrent write // to the same object, it could have only denied // the query as well. // Our save won't change anything. continue } } else if err != nil { return false, err } return ok, nil } return false, ErrThrottleTooManyRetries }