diff options
| author | Dmitry Vyukov <dvyukov@google.com> | 2026-01-02 17:03:40 +0100 |
|---|---|---|
| committer | Dmitry Vyukov <dvyukov@google.com> | 2026-01-09 12:51:45 +0000 |
| commit | 45d8f079628d0d9c0214c07e1abe9e8cb26057d6 (patch) | |
| tree | c7b6e95f040cbbf1322de719360bfe573740272c /pkg/aflow/cache.go | |
| parent | ce25ef79a77633ecbd0042eb35c9432dd582d448 (diff) | |
pkg/aflow: add package for agentic workflows
Diffstat (limited to 'pkg/aflow/cache.go')
| -rw-r--r-- | pkg/aflow/cache.go | 201 |
1 files changed, 201 insertions, 0 deletions
diff --git a/pkg/aflow/cache.go b/pkg/aflow/cache.go new file mode 100644 index 000000000..fe60e5358 --- /dev/null +++ b/pkg/aflow/cache.go @@ -0,0 +1,201 @@ +// Copyright 2025 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package aflow + +import ( + "fmt" + "maps" + "os" + "path/filepath" + "slices" + "sync" + "testing" + "time" + + "github.com/google/syzkaller/pkg/hash" + "github.com/google/syzkaller/pkg/osutil" +) + +// Cache maintains on-disk cache with directories with arbitrary contents (kernel checkouts, builds, etc). +// Create method is used to either create a new directory, if it's not cached yet, or returns a previously +// cached directory. Old unused directories are incrementally removed if the total disk usage grows +// over the specified limit. +type Cache struct { + dir string + maxSize uint64 + timeNow func() time.Time + t *testing.T + mu sync.Mutex + currentSize uint64 + entries map[string]*cacheEntry +} + +type cacheEntry struct { + dir string + size uint64 + usageCount int + lastUsed time.Time +} + +func NewCache(dir string, maxSize uint64) (*Cache, error) { + return newTestCache(nil, dir, maxSize, time.Now) +} + +func newTestCache(t *testing.T, dir string, maxSize uint64, timeNow func() time.Time) (*Cache, error) { + if dir == "" { + return nil, fmt.Errorf("cache workdir is empty") + } + c := &Cache{ + dir: osutil.Abs(dir), + maxSize: maxSize, + timeNow: timeNow, + t: t, + entries: make(map[string]*cacheEntry), + } + if err := c.init(); err != nil { + return nil, err + } + return c, nil +} + +// Create creates/returns a cached directory with contents created by the populate callback. +// The populate callback receives a dir it needs to populate with cached files. +// The typ must be a short descriptive name of the contents (e.g. "build", "source", etc). +// The desc is used to identify cached entries and must fully describe the cached contents +// (the second invocation with the same typ+desc will return dir created by the first +// invocation with the same typ+desc). +func (c *Cache) Create(typ, desc string, populate func(string) error) (string, error) { + c.mu.Lock() + defer c.mu.Unlock() + // Note: we don't populate a temp dir and then atomically rename it to the final destination, + // because at least kernel builds encode the current path in debug info/compile commands, + // so moving the dir later would break all that. Instead we rely on the presence of the meta file + // to denote valid cache entries. Modification time of the file says when it was last used. + id := hash.String(desc) + dir := filepath.Join(c.dir, typ, id) + metaFile := filepath.Join(dir, cacheMetaFile) + if c.entries[dir] == nil { + os.RemoveAll(dir) + if err := osutil.MkdirAll(dir); err != nil { + return "", err + } + if err := populate(dir); err != nil { + os.RemoveAll(dir) + return "", err + } + size, err := osutil.DiskUsage(dir) + if err != nil { + return "", err + } + if err := osutil.WriteFile(metaFile, []byte(desc)); err != nil { + os.RemoveAll(dir) + return "", err + } + c.entries[dir] = &cacheEntry{ + dir: dir, + size: size, + } + c.currentSize += size + c.logf("created entry %v, size %v, current size %v", dir, size, c.currentSize) + } + // Note the entry was used now. + now := c.timeNow() + if err := os.Chtimes(metaFile, now, now); err != nil { + return "", err + } + entry := c.entries[dir] + entry.usageCount++ + entry.lastUsed = now + c.logf("using entry %v, usage count %v", dir, entry.usageCount) + if err := c.purge(); err != nil { + entry.usageCount-- + return "", err + } + return dir, nil +} + +// Release must be called for every directory returned by Create method when the directory is not used anymore. +func (c *Cache) Release(dir string) { + c.mu.Lock() + defer c.mu.Unlock() + entry := c.entries[dir] + entry.usageCount-- + c.logf("release entry %v, usage count %v", dir, entry.usageCount) + if entry.usageCount < 0 { + panic("negative usageCount") + } +} + +// init reads the cached dirs (disk usage, last use time) from disk when the cache is created. +func (c *Cache) init() error { + dirs, err := filepath.Glob(filepath.Join(c.dir, "*", "*")) + if err != nil { + return err + } + for _, dir := range dirs { + metaFile := filepath.Join(dir, cacheMetaFile) + if !osutil.IsExist(metaFile) { + if err := osutil.RemoveAll(dir); err != nil { + return err + } + continue + } + stat, err := os.Stat(metaFile) + if err != nil { + return err + } + size, err := osutil.DiskUsage(dir) + if err != nil { + return err + } + c.entries[dir] = &cacheEntry{ + dir: dir, + size: size, + lastUsed: stat.ModTime(), + } + c.currentSize += size + } + c.mu.Lock() + defer c.mu.Unlock() + return c.purge() +} + +// purge removes oldest unused directories if the cache is over maxSize. +func (c *Cache) purge() error { + if c.mu.TryLock() { + panic("c.mu is not locked") + } + if c.currentSize < c.maxSize { + return nil + } + list := slices.Collect(maps.Values(c.entries)) + slices.SortFunc(list, func(a, b *cacheEntry) int { + if a.usageCount != b.usageCount { + return a.usageCount - b.usageCount + } + return a.lastUsed.Compare(b.lastUsed) + }) + for _, entry := range list { + if entry.usageCount != 0 || c.currentSize < c.maxSize { + break + } + if err := os.RemoveAll(entry.dir); err != nil { + return err + } + delete(c.entries, entry.dir) + if c.currentSize < entry.size { + panic(fmt.Sprintf("negative currentSize: %v %v", c.currentSize, entry.size)) + } + c.currentSize -= entry.size + } + return nil +} + +func (c *Cache) logf(msg string, args ...any) { + if c.t != nil { + c.t.Logf("cache: "+msg, args...) + } +} + +const cacheMetaFile = "aflow-meta" |
