diff options
| author | Aleksandr Nogikh <nogikh@google.com> | 2024-07-01 18:54:32 +0200 |
|---|---|---|
| committer | Aleksandr Nogikh <nogikh@google.com> | 2024-07-02 16:22:10 +0000 |
| commit | 1ecfa2d8506efdae0483eedc0b425db8537b6e80 (patch) | |
| tree | 27b05fdd728cacbd56eab0eea5c5eca4fa65fda5 /vm | |
| parent | 07dedd50ee8834dbca4da7667e69e72b7d0565b9 (diff) | |
vm/gce: use vmimpl.Multiplex()
It will let us reduce code duplication and use the more appropriate
approach to the graceful Run() shutdown - by enforcing a delay between
stopping the command and stopping the collection of the console output.
Diffstat (limited to 'vm')
| -rw-r--r-- | vm/gce/gce.go | 56 | ||||
| -rw-r--r-- | vm/vmimpl/vmimpl.go | 22 |
2 files changed, 34 insertions, 44 deletions
diff --git a/vm/gce/gce.go b/vm/gce/gce.go index 16f3f996f..12f49e3d7 100644 --- a/vm/gce/gce.go +++ b/vm/gce/gce.go @@ -77,6 +77,7 @@ type instance struct { closed chan bool consolew io.WriteCloser consoleReadCmd string // optional: command to read non-standard kernel console + timeouts targets.Timeouts } func ctor(env *vmimpl.Env) (vmimpl.Pool, error) { @@ -226,6 +227,7 @@ func (pool *Pool) Create(workdir string, index int) (vmimpl.Instance, error) { sshUser: sshUser, closed: make(chan bool), consoleReadCmd: pool.consoleReadCmd, + timeouts: pool.env.Timeouts, } if err := vmimpl.WaitForSSH(pool.env.Debug, 5*time.Minute, ip, sshKey, sshUser, pool.env.OS, 22, nil, false); err != nil { @@ -329,55 +331,31 @@ func (inst *instance) Run(timeout time.Duration, stop <-chan bool, command strin sshWpipe.Close() merger.Add("ssh", sshRpipe) - errc := make(chan error, 1) - signal := func(err error) { - select { - case errc <- err: - default: - } - } - - go func() { - select { - case <-time.After(timeout): - signal(vmimpl.ErrTimeout) - case <-stop: - signal(vmimpl.ErrTimeout) - case <-inst.closed: - signal(fmt.Errorf("instance closed")) - case err := <-merger.Err: - con.Process.Kill() - ssh.Process.Kill() - merger.Wait() - con.Wait() + return vmimpl.Multiplex(ssh, merger, timeout, vmimpl.MultiplexConfig{ + Console: vmimpl.CmdCloser{Cmd: con}, + Stop: stop, + Close: inst.closed, + Debug: inst.debug, + Scale: inst.timeouts.Scale, + IgnoreError: func(err error) bool { var mergeError *vmimpl.MergerError - if cmdErr := ssh.Wait(); cmdErr == nil { - // If the command exited successfully, we got EOF error from merger. - // But in this case no error has happened and the EOF is expected. - err = nil - } else if errors.As(err, &mergeError) && mergeError.R == conRpipe { + if errors.As(err, &mergeError) && mergeError.R == conRpipe { // Console connection must never fail. If it does, it's either // instance preemption or a GCE bug. In either case, not a kernel bug. log.Logf(0, "%v: gce console connection failed with %v", inst.name, mergeError.Err) - err = vmimpl.ErrTimeout + return true } else { // Check if the instance was terminated due to preemption or host maintenance. - time.Sleep(5 * time.Second) // just to avoid any GCE races + // vmimpl.Multiplex() already adds a delay, so we've already waited enough + // to let GCE VM status updates propagate. if !inst.GCE.IsInstanceRunning(inst.name) { log.Logf(0, "%v: ssh exited but instance is not running", inst.name) - err = vmimpl.ErrTimeout + return true } } - signal(err) - return - } - con.Process.Kill() - ssh.Process.Kill() - merger.Wait() - con.Wait() - ssh.Wait() - }() - return merger.Output, errc, nil + return false + }, + }) } func waitForConsoleConnect(merger *vmimpl.OutputMerger) error { diff --git a/vm/vmimpl/vmimpl.go b/vm/vmimpl/vmimpl.go index a38ca0d8b..e29412d75 100644 --- a/vm/vmimpl/vmimpl.go +++ b/vm/vmimpl/vmimpl.go @@ -146,14 +146,24 @@ var ( Types = make(map[string]Type) ) +type CmdCloser struct { + *exec.Cmd +} + +func (cc CmdCloser) Close() error { + cc.Process.Kill() + return cc.Wait() +} + var WaitForOutputTimeout = 10 * time.Second type MultiplexConfig struct { - Console io.Closer - Stop <-chan bool - Close <-chan bool - Debug bool - Scale time.Duration + Console io.Closer + Stop <-chan bool + Close <-chan bool + Debug bool + Scale time.Duration + IgnoreError func(err error) bool } func Multiplex(cmd *exec.Cmd, merger *OutputMerger, timeout time.Duration, config MultiplexConfig) ( @@ -193,6 +203,8 @@ func Multiplex(cmd *exec.Cmd, merger *OutputMerger, timeout time.Duration, confi // If the command exited successfully, we got EOF error from merger. // But in this case no error has happened and the EOF is expected. err = nil + } else if config.IgnoreError != nil && config.IgnoreError(err) { + err = ErrTimeout } signal(err) return |
