1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
// Copyright 2024 syzkaller project authors. All rights reserved.
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
package db
import (
"context"
"errors"
"time"
"cloud.google.com/go/spanner"
"github.com/google/uuid"
)
type SessionRepository struct {
client *spanner.Client
*genericEntityOps[Session, string]
}
func NewSessionRepository(client *spanner.Client) *SessionRepository {
return &SessionRepository{
client: client,
genericEntityOps: &genericEntityOps[Session, string]{
client: client,
keyField: "ID",
table: "Sessions",
},
}
}
var ErrSessionAlreadyStarted = errors.New("the session already started")
func (repo *SessionRepository) Start(ctx context.Context, sessionID string) error {
_, err := repo.client.ReadWriteTransaction(ctx,
func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
session, err := readEntity[Session](ctx, txn, spanner.Statement{
SQL: "SELECT * from `Sessions` WHERE `ID`=@id",
Params: map[string]any{"id": sessionID},
})
if err != nil {
return err
}
if !session.StartedAt.IsNull() {
return ErrSessionAlreadyStarted
}
session.SetStartedAt(time.Now())
updateSession, err := spanner.UpdateStruct("Sessions", session)
if err != nil {
return err
}
series, err := readEntity[Series](ctx, txn, spanner.Statement{
SQL: "SELECT * from `Series` WHERE `ID`=@id",
Params: map[string]any{"id": session.SeriesID},
})
if err != nil {
return err
}
series.SetLatestSession(session)
updateSeries, err := spanner.UpdateStruct("Series", series)
if err != nil {
return err
}
return txn.BufferWrite([]*spanner.Mutation{updateSeries, updateSession})
})
return err
}
func (repo *SessionRepository) Insert(ctx context.Context, session *Session) error {
if session.ID == "" {
session.ID = uuid.NewString()
}
return repo.genericEntityOps.Insert(ctx, session)
}
func (repo *SessionRepository) ListRunning(ctx context.Context) ([]*Session, error) {
return repo.readEntities(ctx, spanner.Statement{
SQL: "SELECT * FROM `Sessions` WHERE `StartedAt` IS NOT NULL AND `FinishedAt` IS NULL",
})
}
type NextSession struct {
id string
createdAt time.Time
}
func (repo *SessionRepository) ListWaiting(ctx context.Context, from *NextSession,
limit int) ([]*Session, *NextSession, error) {
stmt := spanner.Statement{
SQL: "SELECT * FROM `Sessions` WHERE `StartedAt` IS NULL",
Params: map[string]any{},
}
if from != nil {
stmt.SQL += " AND ((`CreatedAt` > @from) OR (`CreatedAt` = @from AND `ID` > @id))"
stmt.Params["from"] = from.createdAt
stmt.Params["id"] = from.id
}
stmt.SQL += " ORDER BY `CreatedAt`, `ID`"
addLimit(&stmt, limit)
list, err := repo.readEntities(ctx, stmt)
var next *NextSession
if err == nil && len(list) > 0 {
last := list[len(list)-1]
next = &NextSession{
id: last.ID,
createdAt: last.CreatedAt,
}
}
return list, next, err
}
// golint sees too much similarity with SeriesRepository's ListPatches, but in reality there's not.
// nolint:dupl
func (repo *SessionRepository) ListForSeries(ctx context.Context, series *Series) ([]*Session, error) {
return repo.readEntities(ctx, spanner.Statement{
SQL: "SELECT * FROM `Sessions` WHERE `SeriesID` = @series ORDER BY CreatedAt DESC",
Params: map[string]any{"series": series.ID},
})
}
// MissingReportList lists the session objects that are missing any SessionReport objects,
// but do have Findings.
// Once the conditions for creating a SessionRepor object become more complex, it will
// likely be not enough to have this simple method, but for now it should be fine.
func (repo *SessionRepository) MissingReportList(ctx context.Context, from time.Time, limit int) ([]*Session, error) {
stmt := spanner.Statement{
SQL: "SELECT * FROM Sessions WHERE FinishedAt IS NOT NULL " +
" AND NOT EXISTS (" +
"SELECT 1 FROM SessionReports WHERE SessionReports.SessionID = Sessions.ID" +
") AND EXISTS (" +
"SELECT 1 FROM Findings WHERE Findings.SessionID = Sessions.ID)",
Params: map[string]any{},
}
if !from.IsZero() {
stmt.SQL += " AND `FinishedAt` > @from"
stmt.Params["from"] = from
}
stmt.SQL += " ORDER BY `FinishedAt`"
addLimit(&stmt, limit)
return repo.readEntities(ctx, stmt)
}
|