aboutsummaryrefslogtreecommitdiffstats
path: root/vm
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2024-07-01 18:54:32 +0200
committerAleksandr Nogikh <nogikh@google.com>2024-07-02 16:22:10 +0000
commit1ecfa2d8506efdae0483eedc0b425db8537b6e80 (patch)
tree27b05fdd728cacbd56eab0eea5c5eca4fa65fda5 /vm
parent07dedd50ee8834dbca4da7667e69e72b7d0565b9 (diff)
vm/gce: use vmimpl.Multiplex()
It will let us reduce code duplication and use the more appropriate approach to the graceful Run() shutdown - by enforcing a delay between stopping the command and stopping the collection of the console output.
Diffstat (limited to 'vm')
-rw-r--r--vm/gce/gce.go56
-rw-r--r--vm/vmimpl/vmimpl.go22
2 files changed, 34 insertions, 44 deletions
diff --git a/vm/gce/gce.go b/vm/gce/gce.go
index 16f3f996f..12f49e3d7 100644
--- a/vm/gce/gce.go
+++ b/vm/gce/gce.go
@@ -77,6 +77,7 @@ type instance struct {
closed chan bool
consolew io.WriteCloser
consoleReadCmd string // optional: command to read non-standard kernel console
+ timeouts targets.Timeouts
}
func ctor(env *vmimpl.Env) (vmimpl.Pool, error) {
@@ -226,6 +227,7 @@ func (pool *Pool) Create(workdir string, index int) (vmimpl.Instance, error) {
sshUser: sshUser,
closed: make(chan bool),
consoleReadCmd: pool.consoleReadCmd,
+ timeouts: pool.env.Timeouts,
}
if err := vmimpl.WaitForSSH(pool.env.Debug, 5*time.Minute, ip,
sshKey, sshUser, pool.env.OS, 22, nil, false); err != nil {
@@ -329,55 +331,31 @@ func (inst *instance) Run(timeout time.Duration, stop <-chan bool, command strin
sshWpipe.Close()
merger.Add("ssh", sshRpipe)
- errc := make(chan error, 1)
- signal := func(err error) {
- select {
- case errc <- err:
- default:
- }
- }
-
- go func() {
- select {
- case <-time.After(timeout):
- signal(vmimpl.ErrTimeout)
- case <-stop:
- signal(vmimpl.ErrTimeout)
- case <-inst.closed:
- signal(fmt.Errorf("instance closed"))
- case err := <-merger.Err:
- con.Process.Kill()
- ssh.Process.Kill()
- merger.Wait()
- con.Wait()
+ return vmimpl.Multiplex(ssh, merger, timeout, vmimpl.MultiplexConfig{
+ Console: vmimpl.CmdCloser{Cmd: con},
+ Stop: stop,
+ Close: inst.closed,
+ Debug: inst.debug,
+ Scale: inst.timeouts.Scale,
+ IgnoreError: func(err error) bool {
var mergeError *vmimpl.MergerError
- if cmdErr := ssh.Wait(); cmdErr == nil {
- // If the command exited successfully, we got EOF error from merger.
- // But in this case no error has happened and the EOF is expected.
- err = nil
- } else if errors.As(err, &mergeError) && mergeError.R == conRpipe {
+ if errors.As(err, &mergeError) && mergeError.R == conRpipe {
// Console connection must never fail. If it does, it's either
// instance preemption or a GCE bug. In either case, not a kernel bug.
log.Logf(0, "%v: gce console connection failed with %v", inst.name, mergeError.Err)
- err = vmimpl.ErrTimeout
+ return true
} else {
// Check if the instance was terminated due to preemption or host maintenance.
- time.Sleep(5 * time.Second) // just to avoid any GCE races
+ // vmimpl.Multiplex() already adds a delay, so we've already waited enough
+ // to let GCE VM status updates propagate.
if !inst.GCE.IsInstanceRunning(inst.name) {
log.Logf(0, "%v: ssh exited but instance is not running", inst.name)
- err = vmimpl.ErrTimeout
+ return true
}
}
- signal(err)
- return
- }
- con.Process.Kill()
- ssh.Process.Kill()
- merger.Wait()
- con.Wait()
- ssh.Wait()
- }()
- return merger.Output, errc, nil
+ return false
+ },
+ })
}
func waitForConsoleConnect(merger *vmimpl.OutputMerger) error {
diff --git a/vm/vmimpl/vmimpl.go b/vm/vmimpl/vmimpl.go
index a38ca0d8b..e29412d75 100644
--- a/vm/vmimpl/vmimpl.go
+++ b/vm/vmimpl/vmimpl.go
@@ -146,14 +146,24 @@ var (
Types = make(map[string]Type)
)
+type CmdCloser struct {
+ *exec.Cmd
+}
+
+func (cc CmdCloser) Close() error {
+ cc.Process.Kill()
+ return cc.Wait()
+}
+
var WaitForOutputTimeout = 10 * time.Second
type MultiplexConfig struct {
- Console io.Closer
- Stop <-chan bool
- Close <-chan bool
- Debug bool
- Scale time.Duration
+ Console io.Closer
+ Stop <-chan bool
+ Close <-chan bool
+ Debug bool
+ Scale time.Duration
+ IgnoreError func(err error) bool
}
func Multiplex(cmd *exec.Cmd, merger *OutputMerger, timeout time.Duration, config MultiplexConfig) (
@@ -193,6 +203,8 @@ func Multiplex(cmd *exec.Cmd, merger *OutputMerger, timeout time.Duration, confi
// If the command exited successfully, we got EOF error from merger.
// But in this case no error has happened and the EOF is expected.
err = nil
+ } else if config.IgnoreError != nil && config.IgnoreError(err) {
+ err = ErrTimeout
}
signal(err)
return