aboutsummaryrefslogtreecommitdiffstats
path: root/syz-cluster/pkg/service
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2025-02-13 11:57:14 +0100
committerAleksandr Nogikh <nogikh@google.com>2025-02-14 13:40:12 +0000
commitf20e88b2468bdcdb631b14e384f1f9a67e984013 (patch)
treed2758c08ab9deea9354a91237a3d9234de0efce3 /syz-cluster/pkg/service
parenteaf86f3f4dc8a7190abf09fe840e20bcf83709d8 (diff)
syz-cluster: report session results
Provide an API to set up the reporting of finished sessions for which syz-cluster collected reportable findings. The actual sending of the results is to be done in a separate component that would: 1) Call Next() to get the next report to send. 2) Call Confirm() to confirm that the report has been sent. 3) Call Upstream() if the report has been moderated and needs to be sent to e.g. public mailing lists.
Diffstat (limited to 'syz-cluster/pkg/service')
-rw-r--r--syz-cluster/pkg/service/finding.go22
-rw-r--r--syz-cluster/pkg/service/report.go116
-rw-r--r--syz-cluster/pkg/service/series.go26
3 files changed, 159 insertions, 5 deletions
diff --git a/syz-cluster/pkg/service/finding.go b/syz-cluster/pkg/service/finding.go
index 21eb74c66..b0530a42c 100644
--- a/syz-cluster/pkg/service/finding.go
+++ b/syz-cluster/pkg/service/finding.go
@@ -26,7 +26,7 @@ func NewFindingService(env *app.AppEnvironment) *FindingService {
}
}
-func (s *FindingService) Save(ctx context.Context, req *api.Finding) error {
+func (s *FindingService) Save(ctx context.Context, req *api.NewFinding) error {
var reportURI, logURI string
var err error
if len(req.Log) > 0 {
@@ -55,3 +55,23 @@ func (s *FindingService) Save(ctx context.Context, req *api.Finding) error {
}
return err
}
+
+func (s *FindingService) List(ctx context.Context, sessionID string) ([]*api.Finding, error) {
+ list, err := s.findingRepo.ListForSession(ctx, sessionID)
+ if err != nil {
+ return nil, fmt.Errorf("failed to query the list: %w", err)
+ }
+ var ret []*api.Finding
+ for _, item := range list {
+ finding := &api.Finding{
+ Title: item.Title,
+ LogURL: "TODO", // TODO: where to take it from?
+ }
+ finding.Report, err = blob.ReadAllBytes(s.blobStorage, item.ReportURI)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read the report: %w", err)
+ }
+ ret = append(ret, finding)
+ }
+ return ret, nil
+}
diff --git a/syz-cluster/pkg/service/report.go b/syz-cluster/pkg/service/report.go
new file mode 100644
index 000000000..4e8bc4974
--- /dev/null
+++ b/syz-cluster/pkg/service/report.go
@@ -0,0 +1,116 @@
+// Copyright 2025 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package service
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "time"
+
+ "github.com/google/syzkaller/syz-cluster/pkg/api"
+ "github.com/google/syzkaller/syz-cluster/pkg/app"
+ "github.com/google/syzkaller/syz-cluster/pkg/db"
+)
+
+type ReportService struct {
+ reportRepo *db.ReportRepository
+ seriesService *SeriesService
+ findingService *FindingService
+}
+
+func NewReportService(env *app.AppEnvironment) *ReportService {
+ return &ReportService{
+ reportRepo: db.NewReportRepository(env.Spanner),
+ seriesService: NewSeriesService(env),
+ findingService: NewFindingService(env),
+ }
+}
+
+var ErrReportNotFound = errors.New("report is not found")
+
+func (rs *ReportService) Update(ctx context.Context, id string, req *api.UpdateReportReq) error {
+ // TODO: validate the link?
+ err := rs.reportRepo.Update(ctx, id, func(rep *db.SessionReport) error {
+ if req.Link != "" {
+ rep.Link = req.Link
+ }
+ return nil
+ })
+ if errors.Is(err, db.ErrEntityNotFound) {
+ return ErrReportNotFound
+ }
+ return err
+}
+
+func (rs *ReportService) Confirm(ctx context.Context, id string) error {
+ err := rs.reportRepo.Update(ctx, id, func(rep *db.SessionReport) error {
+ if rep.ReportedAt.IsNull() {
+ rep.SetReportedAt(time.Now())
+ }
+ // TODO: fail if already confirmed?
+ return nil
+ })
+ if errors.Is(err, db.ErrEntityNotFound) {
+ return ErrReportNotFound
+ }
+ return err
+}
+
+var ErrNotOnModeration = errors.New("the report is not on moderation")
+
+func (rs *ReportService) Upstream(ctx context.Context, id string, req *api.UpstreamReportReq) error {
+ rep, err := rs.query(ctx, id)
+ if err != nil {
+ return nil
+ } else if !rep.Moderation {
+ return ErrNotOnModeration
+ }
+ // In case of a concurrent Upstream() call or an Upstream() invocation on
+ // an already upstreamed report, the "NoDupSessionReports" index should
+ // prevent duplications.
+ err = rs.reportRepo.Insert(ctx, &db.SessionReport{
+ SessionID: rep.SessionID,
+ })
+ if err != nil {
+ return fmt.Errorf("failed to schedule a new report: %w", err)
+ }
+ return nil
+}
+
+func (rs *ReportService) Next(ctx context.Context) (*api.NextReportResp, error) {
+ list, err := rs.reportRepo.ListNotReported(ctx, 1)
+ if err != nil {
+ return nil, err
+ } else if len(list) != 1 {
+ return &api.NextReportResp{}, nil
+ }
+ report := list[0]
+ series, err := rs.seriesService.GetSessionSeriesShort(ctx, report.SessionID)
+ if err != nil {
+ return nil, fmt.Errorf("failed to query series: %w", err)
+ }
+ findings, err := rs.findingService.List(ctx, report.SessionID)
+ if err != nil {
+ return nil, fmt.Errorf("failed to query findings: %w", err)
+ }
+ return &api.NextReportResp{
+ Report: &api.SessionReport{
+ ID: report.ID,
+ Moderation: report.Moderation,
+ Series: series,
+ Findings: findings,
+ },
+ }, nil
+}
+
+func (rs *ReportService) query(ctx context.Context, id string) (*db.SessionReport, error) {
+ rep, err := rs.reportRepo.GetByID(ctx, id)
+ if err != nil {
+ return nil, fmt.Errorf("failed to query the report: %w", err)
+ } else if rep == nil {
+ return nil, ErrReportNotFound
+ }
+ return rep, err
+}
diff --git a/syz-cluster/pkg/service/series.go b/syz-cluster/pkg/service/series.go
index 13579cefe..8ab8b5cef 100644
--- a/syz-cluster/pkg/service/series.go
+++ b/syz-cluster/pkg/service/series.go
@@ -32,13 +32,23 @@ func NewSeriesService(env *app.AppEnvironment) *SeriesService {
}
func (s *SeriesService) GetSessionSeries(ctx context.Context, sessionID string) (*api.Series, error) {
+ return s.getSessionSeries(ctx, sessionID, true)
+}
+
+func (s *SeriesService) GetSessionSeriesShort(ctx context.Context,
+ sessionID string) (*api.Series, error) {
+ return s.getSessionSeries(ctx, sessionID, false)
+}
+
+func (s *SeriesService) getSessionSeries(ctx context.Context, sessionID string,
+ includePatches bool) (*api.Series, error) {
session, err := s.sessionRepo.GetByID(ctx, sessionID)
if err != nil {
return nil, fmt.Errorf("failed to fetch the session: %w", err)
} else if session == nil {
return nil, fmt.Errorf("%w: %q", ErrSessionNotFound, sessionID)
}
- return s.GetSeries(ctx, session.SeriesID)
+ return s.getSeries(ctx, session.SeriesID, includePatches)
}
func (s *SeriesService) UploadSeries(ctx context.Context, series *api.Series) (*api.UploadSeriesResp, error) {
@@ -84,6 +94,11 @@ func (s *SeriesService) UploadSeries(ctx context.Context, series *api.Series) (*
var ErrSeriesNotFound = errors.New("series not found")
func (s *SeriesService) GetSeries(ctx context.Context, seriesID string) (*api.Series, error) {
+ return s.getSeries(ctx, seriesID, true)
+}
+
+func (s *SeriesService) getSeries(ctx context.Context,
+ seriesID string, includeBody bool) (*api.Series, error) {
series, err := s.seriesRepo.GetByID(ctx, seriesID)
if err != nil {
return nil, fmt.Errorf("failed to fetch the series: %w", err)
@@ -104,9 +119,12 @@ func (s *SeriesService) GetSeries(ctx context.Context, seriesID string) (*api.Se
PublishedAt: series.PublishedAt,
}
for _, patch := range patches {
- body, err := blob.ReadAllBytes(s.blobStorage, patch.BodyURI)
- if err != nil {
- return nil, fmt.Errorf("failed to read patch %q: %w", patch.ID, err)
+ var body []byte
+ if includeBody {
+ body, err = blob.ReadAllBytes(s.blobStorage, patch.BodyURI)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read patch %q: %w", patch.ID, err)
+ }
}
ret.Patches = append(ret.Patches, api.SeriesPatch{
Seq: int(patch.Seq),