Skip to content

Commit e114ea5

Browse files
fix: ensure EDOT subprocess shuts down gracefully on agent termination
1 parent 69618d6 commit e114ea5

File tree

6 files changed

+212
-66
lines changed

6 files changed

+212
-66
lines changed

internal/pkg/otel/manager/execution.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package manager
66

77
import (
88
"context"
9+
"time"
910

1011
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
1112
"go.opentelemetry.io/collector/confmap"
@@ -18,5 +19,5 @@ type collectorExecution interface {
1819
}
1920

2021
type collectorHandle interface {
21-
Stop(ctx context.Context)
22+
Stop(waitTime time.Duration)
2223
}

internal/pkg/otel/manager/execution_embedded.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package manager
66

77
import (
88
"context"
9+
"time"
910

1011
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
1112
"go.opentelemetry.io/collector/confmap"
@@ -66,15 +67,16 @@ type ctxHandle struct {
6667
}
6768

6869
// Stop stops the collector
69-
func (s *ctxHandle) Stop(ctx context.Context) {
70+
func (s *ctxHandle) Stop(waitTime time.Duration) {
7071
if s.cancel == nil {
7172
return
7273
}
7374

7475
s.cancel()
7576

7677
select {
77-
case <-ctx.Done():
78+
case <-time.After(waitTime):
79+
return
7880
case <-s.collectorDoneCh:
7981
}
8082
}

internal/pkg/otel/manager/execution_subprocess.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ import (
3030
)
3131

3232
const (
33-
processKillAfter = 5 * time.Second
34-
3533
OtelSetSupervisedFlagName = "supervised"
3634
OtelSupervisedLoggingLevelFlagName = "supervised.logging.level"
3735
)
@@ -56,6 +54,7 @@ func newSubprocessExecution(logLevel logp.Level, collectorPath string) (*subproc
5654
},
5755
logLevel: logLevel,
5856
healthCheckExtensionID: healthCheckExtensionID,
57+
reportErrFn: reportErr,
5958
}, nil
6059
}
6160

@@ -64,6 +63,7 @@ type subprocessExecution struct {
6463
collectorArgs []string
6564
logLevel logp.Level
6665
healthCheckExtensionID string
66+
reportErrFn func(ctx context.Context, errCh chan error, err error) // required for testing
6767
}
6868

6969
// startCollector starts a supervised collector and monitors its health. Process exit errors are sent to the
@@ -106,7 +106,6 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger
106106
procCtx, procCtxCancel := context.WithCancel(ctx)
107107
processInfo, err := process.Start(r.collectorPath,
108108
process.WithArgs(r.collectorArgs),
109-
process.WithContext(procCtx),
110109
process.WithEnv(os.Environ()),
111110
process.WithCmdOptions(func(c *exec.Cmd) error {
112111
c.Stdin = bytes.NewReader(confBytes)
@@ -130,6 +129,7 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger
130129
ctl := &procHandle{
131130
processDoneCh: make(chan struct{}),
132131
processInfo: processInfo,
132+
log: logger,
133133
}
134134

135135
healthCheckDone := make(chan struct{})
@@ -196,14 +196,14 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger
196196
if procErr == nil {
197197
if procState.Success() {
198198
// report nil error so that the caller can be notified that the process has exited without error
199-
reportErr(ctx, processErrCh, nil)
199+
r.reportErrFn(ctx, processErrCh, nil)
200200
} else {
201-
reportErr(ctx, processErrCh, fmt.Errorf("supervised collector (pid: %d) exited with error: %s", procState.Pid(), procState.String()))
201+
r.reportErrFn(ctx, processErrCh, fmt.Errorf("supervised collector (pid: %d) exited with error: %s", procState.Pid(), procState.String()))
202202
}
203203
return
204204
}
205205

206-
reportErr(ctx, processErrCh, fmt.Errorf("failed to wait supervised collector process: %w", procErr))
206+
r.reportErrFn(ctx, processErrCh, fmt.Errorf("failed to wait supervised collector process: %w", procErr))
207207
}()
208208

209209
return ctl, nil
@@ -212,11 +212,12 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger
212212
type procHandle struct {
213213
processDoneCh chan struct{}
214214
processInfo *process.Info
215+
log *logger.Logger
215216
}
216217

217218
// Stop stops the process. If the process is already stopped, it does nothing. If the process does not stop within
218219
// processKillAfter or due to an error, it will be killed.
219-
func (s *procHandle) Stop(ctx context.Context) {
220+
func (s *procHandle) Stop(waitTime time.Duration) {
220221
select {
221222
case <-s.processDoneCh:
222223
// process has already exited
@@ -225,19 +226,18 @@ func (s *procHandle) Stop(ctx context.Context) {
225226
}
226227

227228
if err := s.processInfo.Stop(); err != nil {
229+
s.log.Warnf("failed to send stop signal to the supervised collector: %v", err)
228230
// we failed to stop the process just kill it and return
229231
_ = s.processInfo.Kill()
230232
return
231233
}
232234

233235
select {
234-
case <-ctx.Done():
236+
case <-time.After(waitTime):
237+
s.log.Warnf("timeout waiting (%s) for the supervised collector to stop, killing it", waitTime.String())
235238
// our caller ctx is Done; kill the process just in case
236239
_ = s.processInfo.Kill()
237240
case <-s.processDoneCh:
238241
// process has already exited
239-
case <-time.After(processKillAfter):
240-
// process is still running kill it
241-
_ = s.processInfo.Kill()
242242
}
243243
}

internal/pkg/otel/manager/manager.go

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ type ExecutionMode string
3434
const (
3535
SubprocessExecutionMode ExecutionMode = "subprocess"
3636
EmbeddedExecutionMode ExecutionMode = "embedded"
37+
// waitTimeForStop is the time to wait for the collector to stop before killing it.
38+
waitTimeForStop = 30 * time.Second
3739
)
3840

3941
type collectorRecoveryTimer interface {
@@ -101,6 +103,9 @@ type OTelManager struct {
101103
execution collectorExecution
102104

103105
proc collectorHandle
106+
107+
// collectorRunErr is used to signal that the collector has exited.
108+
collectorRunErr chan error
104109
}
105110

106111
// NewOTelManager returns a OTelManager.
@@ -131,7 +136,7 @@ func NewOTelManager(
131136
recoveryTimer = newRestarterNoop()
132137
exec = newExecutionEmbedded()
133138
default:
134-
return nil, errors.New("unknown otel collector exec")
139+
return nil, errors.New("unknown otel collector execModeFn")
135140
}
136141

137142
logger.Debugf("Using collector execution mode: %s", mode)
@@ -144,10 +149,11 @@ func NewOTelManager(
144149
errCh: make(chan error, 1), // holds at most one error
145150
collectorStatusCh: make(chan *status.AggregateStatus, 1),
146151
componentStateCh: make(chan []runtime.ComponentComponentState, 1),
147-
updateCh: make(chan configUpdate),
152+
updateCh: make(chan configUpdate, 1),
148153
doneChan: make(chan struct{}),
149154
execution: exec,
150155
recoveryTimer: recoveryTimer,
156+
collectorRunErr: make(chan error),
151157
}, nil
152158
}
153159

@@ -156,24 +162,21 @@ func (m *OTelManager) Run(ctx context.Context) error {
156162
var err error
157163
m.proc = nil
158164

159-
// signal that the run loop is ended to unblock any incoming update calls
160-
defer close(m.doneChan)
161-
162-
// collectorRunErr is used to signal that the collector has exited.
163-
collectorRunErr := make(chan error)
164-
165165
// collectorStatusCh is used internally by the otel collector to send status updates to the manager
166166
// this channel is buffered because it's possible for the collector to send a status update while the manager is
167167
// waiting for the collector to exit
168168
collectorStatusCh := make(chan *status.AggregateStatus, 1)
169169
for {
170170
select {
171171
case <-ctx.Done():
172+
// signal that the run loop is ended to unblock any incoming update calls
173+
close(m.doneChan)
174+
172175
m.recoveryTimer.Stop()
173176
// our caller context is cancelled so stop the collector and return
174177
// has exited.
175178
if m.proc != nil {
176-
m.proc.Stop(ctx)
179+
m.proc.Stop(waitTimeForStop)
177180
}
178181
return ctx.Err()
179182
case <-m.recoveryTimer.C():
@@ -187,7 +190,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
187190

188191
newRetries := m.recoveryRetries.Add(1)
189192
m.logger.Infof("collector recovery restarting, total retries: %d", newRetries)
190-
m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh)
193+
m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh)
191194
if err != nil {
192195
reportErr(ctx, m.errCh, err)
193196
// reset the restart timer to the next backoff
@@ -197,12 +200,12 @@ func (m *OTelManager) Run(ctx context.Context) error {
197200
reportErr(ctx, m.errCh, nil)
198201
}
199202

200-
case err = <-collectorRunErr:
203+
case err = <-m.collectorRunErr:
201204
m.recoveryTimer.Stop()
202205
if err == nil {
203206
// err is nil means that the collector has exited cleanly without an error
204207
if m.proc != nil {
205-
m.proc.Stop(ctx)
208+
m.proc.Stop(waitTimeForStop)
206209
m.proc = nil
207210
updateErr := m.reportOtelStatusUpdate(ctx, nil)
208211
if updateErr != nil {
@@ -223,7 +226,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
223226

224227
// in this rare case the collector stopped running but a configuration was
225228
// provided and the collector stopped with a clean exit
226-
m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh)
229+
m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh)
227230
if err != nil {
228231
// failed to create the collector (this is different then
229232
// it's failing to run). we do not retry creation on failure
@@ -245,7 +248,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
245248
// in the case that the configuration is invalid there is no reason to
246249
// try again as it will keep failing so we do not trigger a restart
247250
if m.proc != nil {
248-
m.proc.Stop(ctx)
251+
m.proc.Stop(waitTimeForStop)
249252
m.proc = nil
250253
// don't wait here for <-collectorRunErr, already occurred
251254
// clear status, no longer running
@@ -281,7 +284,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
281284
m.components = cfgUpdate.components
282285
m.mx.Unlock()
283286

284-
err = m.applyMergedConfig(ctx, collectorStatusCh, collectorRunErr)
287+
err = m.applyMergedConfig(ctx, collectorStatusCh, m.collectorRunErr)
285288
// report the error unconditionally to indicate that the config was applied
286289
reportErr(ctx, m.errCh, err)
287290

@@ -340,7 +343,7 @@ func buildMergedConfig(cfgUpdate configUpdate, agentInfo info.Agent, monitoringC
340343

341344
func (m *OTelManager) applyMergedConfig(ctx context.Context, collectorStatusCh chan *status.AggregateStatus, collectorRunErr chan error) error {
342345
if m.proc != nil {
343-
m.proc.Stop(ctx)
346+
m.proc.Stop(waitTimeForStop)
344347
m.proc = nil
345348
select {
346349
case <-collectorRunErr:
@@ -402,6 +405,15 @@ func (m *OTelManager) Update(cfg *confmap.Conf, components []component.Component
402405
collectorCfg: cfg,
403406
components: components,
404407
}
408+
409+
// we care only about the latest config update
410+
select {
411+
case <-m.updateCh:
412+
case <-m.doneChan:
413+
return
414+
default:
415+
}
416+
405417
select {
406418
case m.updateCh <- cfgUpdate:
407419
case <-m.doneChan:

0 commit comments

Comments
 (0)