aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDmitry Vyukov <dvyukov@google.com>2017-01-16 20:42:52 +0100
committerDmitry Vyukov <dvyukov@google.com>2017-01-16 20:42:52 +0100
commitc91904333f8d65d2056af2d53a02fda58b23e5c2 (patch)
treeda3b24c31373544d71f60ee59dcddce0064994d3
parent6f52004fcffa39281bdd65806b2091e034690d04 (diff)
db: add package
Package db implements a simple key-value database. The database is cached in memory and mirrored on disk. It is used to store corpus in syz-manager and syz-hub. The database strives to minimize number of disk accesses as they can be slow in virtualized environments (GCE). Use db in syz-manager instead of the old PersistentSet.
-rw-r--r--db/db.go248
-rw-r--r--db/db_test.go151
-rw-r--r--hash/hash.go5
-rw-r--r--syz-manager/manager.go67
-rw-r--r--syz-manager/persistent.go28
5 files changed, 474 insertions, 25 deletions
diff --git a/db/db.go b/db/db.go
new file mode 100644
index 000000000..1bd654782
--- /dev/null
+++ b/db/db.go
@@ -0,0 +1,248 @@
+// Copyright 2017 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+// Package db implements a simple key-value database.
+// The database is cached in memory and mirrored on disk.
+// It is used to store corpus in syz-manager and syz-hub.
+// The database strives to minimize number of disk accesses
+// as they can be slow in virtualized environments (GCE).
+package db
+
+import (
+ "bufio"
+ "bytes"
+ "compress/flate"
+ "encoding/binary"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "os"
+
+ . "github.com/google/syzkaller/log"
+)
+
+type DB struct {
+ Records map[string]Record // in-memory cache, must not be modified directly
+
+ filename string
+ uncompacted int // number of records in the file
+ pending *bytes.Buffer // pending writes to the file
+}
+
+type Record struct {
+ Val []byte
+ Seq uint64
+}
+
+func Open(filename string) (*DB, error) {
+ db := &DB{
+ filename: filename,
+ }
+ f, err := os.OpenFile(db.filename, os.O_RDONLY|os.O_CREATE, 0640)
+ if err != nil {
+ return nil, err
+ }
+ db.Records, db.uncompacted = deserializeDB(bufio.NewReader(f))
+ f.Close()
+ if len(db.Records) == 0 || db.uncompacted/10*9 > len(db.Records) {
+ db.compact()
+ }
+ return db, nil
+}
+
+func (db *DB) Save(key string, val []byte, seq uint64) {
+ if seq == seqDeleted {
+ panic("reserved seq")
+ }
+ if rec, ok := db.Records[key]; ok && seq == rec.Seq && bytes.Equal(val, rec.Val) {
+ return
+ }
+ db.Records[key] = Record{val, seq}
+ db.serialize(key, val, seq)
+ db.uncompacted++
+}
+
+func (db *DB) Delete(key string) {
+ if _, ok := db.Records[key]; !ok {
+ return
+ }
+ delete(db.Records, key)
+ db.serialize(key, nil, seqDeleted)
+ db.uncompacted++
+}
+
+func (db *DB) Flush() error {
+ if db.uncompacted/10*9 > len(db.Records) {
+ db.compact()
+ return nil
+ }
+ if db.pending == nil {
+ return nil
+ }
+ f, err := os.OpenFile(db.filename, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ if _, err := f.Write(db.pending.Bytes()); err != nil {
+ return err
+ }
+ db.pending = nil
+ return nil
+}
+
+func (db *DB) compact() error {
+ buf := new(bytes.Buffer)
+ serializeHeader(buf)
+ for key, rec := range db.Records {
+ serializeRecord(buf, key, rec.Val, rec.Seq)
+ }
+ f, err := os.Create(db.filename + ".tmp")
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ if _, err := f.Write(buf.Bytes()); err != nil {
+ return err
+ }
+ f.Close()
+ if err := os.Rename(f.Name(), db.filename); err != nil {
+ return err
+ }
+ db.uncompacted = len(db.Records)
+ db.pending = nil
+ return nil
+}
+
+func (db *DB) serialize(key string, val []byte, seq uint64) {
+ if db.pending == nil {
+ db.pending = new(bytes.Buffer)
+ }
+ serializeRecord(db.pending, key, val, seq)
+}
+
+const (
+ dbMagic = uint32(0xbaddb)
+ recMagic = uint32(0xfee1bad)
+ curVersion = uint32(1)
+ seqDeleted = ^uint64(0)
+)
+
+func serializeHeader(w *bytes.Buffer) {
+ binary.Write(w, binary.LittleEndian, dbMagic)
+ binary.Write(w, binary.LittleEndian, curVersion)
+}
+
+func serializeRecord(w *bytes.Buffer, key string, val []byte, seq uint64) {
+ binary.Write(w, binary.LittleEndian, recMagic)
+ binary.Write(w, binary.LittleEndian, uint32(len(key)))
+ w.WriteString(key)
+ binary.Write(w, binary.LittleEndian, seq)
+ if seq == seqDeleted {
+ if len(val) != 0 {
+ panic("deleting record with value")
+ }
+ return
+ }
+ if len(val) == 0 {
+ binary.Write(w, binary.LittleEndian, uint32(len(val)))
+ } else {
+ lenPos := len(w.Bytes())
+ binary.Write(w, binary.LittleEndian, uint32(0))
+ startPos := len(w.Bytes())
+ fw, err := flate.NewWriter(w, flate.BestCompression)
+ if err != nil {
+ panic(err)
+ }
+ if _, err := fw.Write(val); err != nil {
+ panic(err)
+ }
+ fw.Flush()
+ fw.Close()
+ binary.Write(bytes.NewBuffer(w.Bytes()[lenPos:lenPos:lenPos+8]), binary.LittleEndian, uint32(len(w.Bytes())-startPos))
+ }
+}
+
+func deserializeDB(r *bufio.Reader) (records map[string]Record, uncompacted int) {
+ records = make(map[string]Record)
+ ver, err := deserializeHeader(r)
+ if err != nil {
+ Logf(0, "failed to deserialize database header: %v", err)
+ return
+ }
+ _ = ver
+ for {
+ key, val, seq, err := deserializeRecord(r)
+ if err == io.EOF {
+ return
+ }
+ if err != nil {
+ Logf(0, "failed to deserialize database record: %v", err)
+ return
+ }
+ uncompacted++
+ if seq == seqDeleted {
+ delete(records, key)
+ } else {
+ records[key] = Record{val, seq}
+ }
+ }
+}
+
+func deserializeHeader(r *bufio.Reader) (uint32, error) {
+ var magic, ver uint32
+ if err := binary.Read(r, binary.LittleEndian, &magic); err != nil {
+ if err == io.EOF {
+ return curVersion, nil
+ }
+ return 0, err
+ }
+ if magic != dbMagic {
+ return 0, fmt.Errorf("bad db header: 0x%x", magic)
+ }
+ if err := binary.Read(r, binary.LittleEndian, &ver); err != nil {
+ return 0, err
+ }
+ if ver == 0 || ver > curVersion {
+ return 0, fmt.Errorf("bad db version: %v", ver)
+ }
+ return ver, nil
+}
+
+func deserializeRecord(r *bufio.Reader) (key string, val []byte, seq uint64, err error) {
+ var magic uint32
+ if err = binary.Read(r, binary.LittleEndian, &magic); err != nil {
+ return
+ }
+ if magic != recMagic {
+ err = fmt.Errorf("bad record header: 0x%x", magic)
+ return
+ }
+ var keyLen uint32
+ if err = binary.Read(r, binary.LittleEndian, &keyLen); err != nil {
+ return
+ }
+ keyBuf := make([]byte, keyLen)
+ if _, err = io.ReadFull(r, keyBuf); err != nil {
+ return
+ }
+ key = string(keyBuf)
+ if err = binary.Read(r, binary.LittleEndian, &seq); err != nil {
+ return
+ }
+ if seq == seqDeleted {
+ return
+ }
+ var valLen uint32
+ if err = binary.Read(r, binary.LittleEndian, &valLen); err != nil {
+ return
+ }
+ if valLen != 0 {
+ fr := flate.NewReader(&io.LimitedReader{r, int64(valLen)})
+ if val, err = ioutil.ReadAll(fr); err != nil {
+ return
+ }
+ fr.Close()
+ }
+ return
+}
diff --git a/db/db_test.go b/db/db_test.go
new file mode 100644
index 000000000..ccddb806a
--- /dev/null
+++ b/db/db_test.go
@@ -0,0 +1,151 @@
+// Copyright 2017 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package db
+
+import (
+ "bytes"
+ "fmt"
+ "io/ioutil"
+ "math/rand"
+ "os"
+ "testing"
+)
+
+func TestBasic(t *testing.T) {
+ fn := tempFile(t)
+ defer os.Remove(fn)
+ db, err := Open(fn)
+ if err != nil {
+ t.Fatalf("failed to open db: %v", err)
+ }
+ if len(db.Records) != 0 {
+ t.Fatalf("empty db contains records")
+ }
+ db.Save("", nil, 0)
+ db.Save("1", []byte("ab"), 1)
+ db.Save("23", []byte("abcd"), 2)
+ checkContents := func(where string) {
+ if len(db.Records) != 3 {
+ t.Fatalf("bad record count %v %v, want 3", where, len(db.Records))
+ }
+ for key, rec := range db.Records {
+ switch key {
+ case "":
+ if len(rec.Val) == 0 && rec.Seq == 0 {
+ return
+ }
+ case "1":
+ if bytes.Equal(rec.Val, []byte("ab")) && rec.Seq == 1 {
+ return
+ }
+ case "23":
+ if bytes.Equal(rec.Val, []byte("abcd")) && rec.Seq == 2 {
+ return
+ }
+ default:
+ t.Fatalf("unknown key: %v", key)
+ }
+ t.Fatalf("bad record for key %v: %+v", key, rec)
+ }
+ }
+ checkContents("after save")
+ if err := db.Flush(); err != nil {
+ t.Fatalf("failed to flush db: %v", err)
+ }
+ checkContents("after flush")
+ db, err = Open(fn)
+ if err != nil {
+ t.Fatalf("failed to open db: %v", err)
+ }
+ checkContents("after reopen")
+}
+
+func TestModify(t *testing.T) {
+ fn := tempFile(t)
+ defer os.Remove(fn)
+ db, err := Open(fn)
+ if err != nil {
+ t.Fatalf("failed to open db: %v", err)
+ }
+ db.Save("1", []byte("ab"), 0)
+ db.Save("23", nil, 1)
+ db.Save("456", []byte("abcd"), 1)
+ db.Save("7890", []byte("a"), 0)
+ db.Delete("23")
+ db.Save("1", nil, 5)
+ db.Save("456", []byte("ef"), 6)
+ db.Delete("7890")
+ db.Save("456", []byte("efg"), 0)
+ db.Save("7890", []byte("bc"), 0)
+ checkContents := func(where string) {
+ if len(db.Records) != 3 {
+ t.Fatalf("bad record count %v %v, want 3", where, len(db.Records))
+ }
+ for key, rec := range db.Records {
+ switch key {
+ case "1":
+ if len(rec.Val) == 0 && rec.Seq == 5 {
+ return
+ }
+ case "456":
+ if bytes.Equal(rec.Val, []byte("efg")) && rec.Seq == 0 {
+ return
+ }
+ case "7890":
+ if bytes.Equal(rec.Val, []byte("bc")) && rec.Seq == 0 {
+ return
+ }
+ default:
+ t.Fatalf("unknown key: %v", key)
+ }
+ t.Fatalf("bad record for key %v: %+v", key, rec)
+ }
+ }
+ checkContents("after modification")
+ if err := db.Flush(); err != nil {
+ t.Fatalf("failed to flush db: %v", err)
+ }
+ checkContents("after flush")
+ db, err = Open(fn)
+ if err != nil {
+ t.Fatalf("failed to open db: %v", err)
+ }
+ checkContents("after reopen")
+}
+
+func TestLarge(t *testing.T) {
+ fn := tempFile(t)
+ defer os.Remove(fn)
+ db, err := Open(fn)
+ if err != nil {
+ t.Fatalf("failed to open db: %v", err)
+ }
+ const nrec = 1000
+ val := make([]byte, 1000)
+ for i := range val {
+ val[i] = byte(rand.Intn(256))
+ }
+ for i := 0; i < nrec; i++ {
+ db.Save(fmt.Sprintf("%v", i), val, 0)
+ }
+ if err := db.Flush(); err != nil {
+ t.Fatalf("failed to flush db: %v", err)
+ }
+ db, err = Open(fn)
+ if err != nil {
+ t.Fatalf("failed to open db: %v", err)
+ }
+ if len(db.Records) != nrec {
+ t.Fatalf("wrong record count: %v, want %v", len(db.Records), nrec)
+ }
+}
+
+func tempFile(t *testing.T) string {
+ f, err := ioutil.TempFile("", "syzkaller.test.db")
+ if err != nil {
+ t.Fatalf("failed to create temp file: %v", err)
+ }
+ f.Close()
+ return f.Name()
+}
diff --git a/hash/hash.go b/hash/hash.go
index d640536fe..4c45ba6e6 100644
--- a/hash/hash.go
+++ b/hash/hash.go
@@ -15,6 +15,11 @@ func Hash(data []byte) Sig {
return Sig(sha1.Sum(data))
}
+func String(data []byte) string {
+ sig := Hash(data)
+ return sig.String()
+}
+
func (sig *Sig) String() string {
return hex.EncodeToString((*sig)[:])
}
diff --git a/syz-manager/manager.go b/syz-manager/manager.go
index 666821ddf..9a4d4af8d 100644
--- a/syz-manager/manager.go
+++ b/syz-manager/manager.go
@@ -21,6 +21,7 @@ import (
"github.com/google/syzkaller/config"
"github.com/google/syzkaller/cover"
"github.com/google/syzkaller/csource"
+ "github.com/google/syzkaller/db"
"github.com/google/syzkaller/hash"
. "github.com/google/syzkaller/log"
"github.com/google/syzkaller/prog"
@@ -42,16 +43,16 @@ var (
)
type Manager struct {
- cfg *config.Config
- crashdir string
- port int
- persistentCorpus *PersistentSet
- startTime time.Time
- firstConnect time.Time
- stats map[string]uint64
- vmStop chan bool
- vmChecked bool
- fresh bool
+ cfg *config.Config
+ crashdir string
+ port int
+ corpusDB *db.DB
+ startTime time.Time
+ firstConnect time.Time
+ stats map[string]uint64
+ vmStop chan bool
+ vmChecked bool
+ fresh bool
mu sync.Mutex
enabledSyscalls string
@@ -122,18 +123,23 @@ func RunManager(cfg *config.Config, syscalls map[int]bool) {
}
Logf(0, "loading corpus...")
- mgr.persistentCorpus = newPersistentSet(filepath.Join(cfg.Workdir, "corpus"), func(data []byte) bool {
- mgr.fresh = false
- if _, err := prog.Deserialize(data); err != nil {
- Logf(0, "deleting broken program: %v\n%s", err, data)
- return false
+ dbFilename := filepath.Join(cfg.Workdir, "corpus.db")
+ if _, err := os.Stat(dbFilename); err != nil {
+ if err := convertPersistentToDB(filepath.Join(cfg.Workdir, "corpus"), dbFilename); err != nil {
+ Fatalf("failed to convert old corpus: %v", err)
}
- return true
- })
- for _, data := range mgr.persistentCorpus.a {
- p, err := prog.Deserialize(data)
+ }
+ var err error
+ mgr.corpusDB, err = db.Open(dbFilename)
+ if err != nil {
+ Fatalf("failed to open corpus database: %v", err)
+ }
+ for key, rec := range mgr.corpusDB.Records {
+ p, err := prog.Deserialize(rec.Val)
if err != nil {
- Fatalf("failed to deserialize program: %v", err)
+ Logf(0, "deleting broken program: %v\n%s", err, rec.Val)
+ mgr.corpusDB.Delete(key)
+ continue
}
disabled := false
for _, c := range p.Calls {
@@ -148,13 +154,14 @@ func RunManager(cfg *config.Config, syscalls map[int]bool) {
// it is not deleted during minimization.
// TODO: use mgr.enabledCalls which accounts for missing devices, etc.
// But it is available only after vm check.
- sig := hash.Hash(data)
+ sig := hash.Hash(rec.Val)
mgr.disabledHashes = append(mgr.disabledHashes, sig.String())
continue
}
- mgr.candidates = append(mgr.candidates, data)
+ mgr.candidates = append(mgr.candidates, rec.Val)
}
- Logf(0, "loaded %v programs (%v total)", len(mgr.candidates), len(mgr.persistentCorpus.m))
+ mgr.fresh = len(mgr.corpusDB.Records) == 0
+ Logf(0, "loaded %v programs (%v total)", len(mgr.candidates), len(mgr.corpusDB.Records))
// Create HTTP server.
mgr.initHttp()
@@ -545,7 +552,12 @@ func (mgr *Manager) minimizeCorpus() {
for _, h := range mgr.disabledHashes {
hashes[h] = true
}
- mgr.persistentCorpus.minimize(hashes)
+ for key := range mgr.corpusDB.Records {
+ if !hashes[key] {
+ mgr.corpusDB.Delete(key)
+ }
+ }
+ mgr.corpusDB.Flush()
}
}
@@ -610,7 +622,11 @@ func (mgr *Manager) NewInput(a *NewInputArgs, r *int) error {
mgr.corpusCover[call] = cover.Union(mgr.corpusCover[call], a.Cover)
mgr.corpus = append(mgr.corpus, a.RpcInput)
mgr.stats["manager new inputs"]++
- mgr.persistentCorpus.add(a.RpcInput.Prog)
+ sig := hash.Hash(a.RpcInput.Prog)
+ mgr.corpusDB.Save(sig.String(), a.RpcInput.Prog, 0)
+ if err := mgr.corpusDB.Flush(); err != nil {
+ Logf(0, "failed to save corpus database: %v", err)
+ }
for _, f1 := range mgr.fuzzers {
if f1 == f {
continue
@@ -685,6 +701,7 @@ func (mgr *Manager) hubSync() {
Logf(0, "Hub.Connect rpc failed: %v", err)
mgr.hub.Close()
mgr.hub = nil
+ mgr.hubCorpus = nil
return
}
mgr.fresh = false
diff --git a/syz-manager/persistent.go b/syz-manager/persistent.go
index 1ef53ecff..c4071ff3d 100644
--- a/syz-manager/persistent.go
+++ b/syz-manager/persistent.go
@@ -4,14 +4,42 @@
package main
import (
+ "fmt"
"io/ioutil"
"os"
"path/filepath"
+ "github.com/google/syzkaller/db"
"github.com/google/syzkaller/hash"
. "github.com/google/syzkaller/log"
+ "github.com/google/syzkaller/prog"
)
+// PersistentSet is now superseded by db package that stores corpus in a single file.
+// This code is left here to convert old corpuses to the new format.
+// It needs to be delete in Mar 2017.
+func convertPersistentToDB(persistentDir, dbFilename string) error {
+ persistentCorpus := newPersistentSet(persistentDir, func(data []byte) bool {
+ if _, err := prog.Deserialize(data); err != nil {
+ Logf(0, "deleting broken program: %v\n%s", err, data)
+ return false
+ }
+ return true
+ })
+ tmpDB, err := db.Open(dbFilename)
+ if err != nil {
+ return fmt.Errorf("failed to create corpus database: %v", err)
+ }
+ for key, data := range persistentCorpus.m {
+ tmpDB.Save(key.String(), data, 0)
+ }
+ if err := tmpDB.Flush(); err != nil {
+ return fmt.Errorf("failed to save corpus database: %v", err)
+ }
+ Logf(0, "converted %v programs to new corpus database format", len(persistentCorpus.m))
+ return nil
+}
+
// PersistentSet is a set of binary blobs with a persistent mirror on disk.
type PersistentSet struct {
dir string