From cd092b66e539f0f0be5d743345cff797f5ac148b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paolo=20Chil=C3=A0?= Date: Thu, 11 Sep 2025 20:52:36 +0200 Subject: [PATCH] Disable flaky test TestBeatsReceiverLogs (#9891) (cherry picked from commit ba9c156513a432434d5153d40c61dae8da4b7f5f) # Conflicts: # testing/integration/ess/beat_receivers_test.go --- .../integration/ess/beat_receivers_test.go | 400 ++++++++++++++++++ 1 file changed, 400 insertions(+) diff --git a/testing/integration/ess/beat_receivers_test.go b/testing/integration/ess/beat_receivers_test.go index 3e72259b0fb..6b6ba6114aa 100644 --- a/testing/integration/ess/beat_receivers_test.go +++ b/testing/integration/ess/beat_receivers_test.go @@ -376,6 +376,406 @@ func TestClassicAndReceiverAgentMonitoring(t *testing.T) { AssertMapsEqual(t, agent, otel, ignoredFields, fmt.Sprintf("expected document to be equal for dataset: %s", key)) } } +<<<<<<< HEAD +======= + + // 8. Compare statuses + zeroDifferingFields := func(status *atesting.AgentStatusOutput) { + status.Info.ID = "" + status.Info.PID = 0 + status.Collector = nil // we do get collector status with beats receivers, it's just empty + for i := range len(status.Components) { + status.Components[i].Message = "" + status.Components[i].VersionInfo = atesting.AgentStatusOutputVersionInfo{} + } + } + zeroDifferingFields(&agentStatus) + zeroDifferingFields(&otelStatus) + assert.Equal(t, agentStatus, otelStatus, "expected agent status to be equal to otel status") +} + +// TestAgentMetricsInput is a test that compares documents ingested by +// agent system/metrics input in process and otel modes and asserts that they are +// equivalent. +func TestAgentMetricsInput(t *testing.T) { + info := define.Require(t, define.Requirements{ + Group: integration.Default, + Local: true, + Sudo: true, + OS: []define.OS{ + {Type: define.Windows}, + {Type: define.Linux}, + {Type: define.Darwin}, + }, + Stack: &define.Stack{}, + }) + + metricsets := []string{"cpu", "memory", "network", "filesystem"} + + type configOptions struct { + HomeDir string + ESEndpoint string + BeatsESApiKey string + FBReceiverIndex string + Namespace string + RuntimeExperimental string + Metricsets []string + } + configTemplate := `agent.logging.level: info +agent.logging.to_stderr: true +inputs: + # Collecting system metrics + - type: system/metrics + id: unique-system-metrics-input + data_stream.namespace: {{.Namespace}} + use_output: default + {{if ne .RuntimeExperimental "" }} + _runtime_experimental: {{.RuntimeExperimental}} + {{end}} + streams: + {{range $mset := .Metricsets}} + - metricsets: + - {{$mset}} + data_stream.dataset: system.{{$mset}} + {{end}} +outputs: + default: + type: elasticsearch + hosts: [{{.ESEndpoint}}] + api_key: {{.BeatsESApiKey}} +` + + esEndpoint, err := integration.GetESHost() + require.NoError(t, err, "error getting elasticsearch endpoint") + esApiKey, err := createESApiKey(info.ESClient) + require.NoError(t, err, "error creating API key") + require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) + + beatsApiKey, err := base64.StdEncoding.DecodeString(esApiKey.Encoded) + require.NoError(t, err, "error decoding api key") + + tableTests := []struct { + name string + runtimeExperimental string + }{ + {name: "agent"}, + {name: "otel", runtimeExperimental: "otel"}, + } + + // map of testcase -> metricset -> documents + esDocs := make(map[string]map[string]estools.Documents) + + for _, tt := range tableTests { + t.Run(tt.name, func(t *testing.T) { + startedAt := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") + tmpDir := t.TempDir() + + if _, ok := esDocs[tt.name]; !ok { + esDocs[tt.name] = make(map[string]estools.Documents) + } + + var configBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer, + configOptions{ + HomeDir: tmpDir, + ESEndpoint: esEndpoint, + BeatsESApiKey: string(beatsApiKey), + Namespace: info.Namespace, + RuntimeExperimental: tt.runtimeExperimental, + Metricsets: metricsets, + })) + configContents := configBuffer.Bytes() + t.Cleanup(func() { + if t.Failed() { + t.Log("Contents of agent config file:\n") + println(string(configContents)) + } + }) + + ctx, cancel := testcontext.WithDeadline(t, t.Context(), time.Now().Add(5*time.Minute)) + defer cancel() + + fixture, cmd, output := prepareAgentCmd(t, ctx, configContents) + + err = cmd.Start() + require.NoError(t, err) + + t.Cleanup(func() { + if t.Failed() { + t.Log("Elastic-Agent output:") + t.Log(output.String()) + } + }) + + require.Eventually(t, func() bool { + err = fixture.IsHealthy(ctx) + if err != nil { + t.Logf("waiting for agent healthy: %s", err.Error()) + return false + } + return true + }, 1*time.Minute, 1*time.Second) + + mustClauses := []map[string]any{ + {"range": map[string]any{ + "@timestamp": map[string]string{ + "gte": startedAt, + }, + }}, + } + + rawQuery := map[string]any{ + "query": map[string]any{ + "bool": map[string]any{ + "must": mustClauses, + }, + }, + } + + for _, mset := range metricsets { + index := fmt.Sprintf(".ds-metrics-system.%s-%s*", mset, info.Namespace) + require.EventuallyWithTf(t, + func(ct *assert.CollectT) { + findCtx, findCancel := context.WithTimeout(t.Context(), 10*time.Second) + defer findCancel() + + docs, err := estools.PerformQueryForRawQuery(findCtx, rawQuery, index, info.ESClient) + require.NoError(ct, err) + + if docs.Hits.Total.Value != 0 { + esDocs[tt.name][mset] = docs + } + require.Greater(ct, docs.Hits.Total.Value, 0, "docs count") + }, + 30*time.Second, 1*time.Second, + "Expected to find at least one document for metricset %s in index %s and runtime %q, got 0", mset, index, tt.runtimeExperimental) + } + + cancel() + cmd.Wait() + }) + } + + t.Run("compare documents", func(t *testing.T) { + require.Greater(t, len(esDocs), 0, "expected to find documents ingested") + require.Greater(t, len(esDocs["agent"]), 0, "expected to find documents ingested by normal agent metrics input") + require.Greater(t, len(esDocs["otel"]), 0, "expected to find documents ingested by beat receivers") + + agentDocs := esDocs["agent"] + otelDocs := esDocs["otel"] + + // Fields that are present in both agent and otel documents, but are expected to change + ignoredFields := []string{ + "@timestamp", + "agent.id", + "agent.ephemeral_id", + "elastic_agent.id", + "data_stream.namespace", + "event.ingested", + "event.duration", + } + + stripNondeterminism := func(m mapstr.M, mset string) { + // These metrics will change from run to run + prefixes := []string{ + fmt.Sprintf("system.%s", mset), + fmt.Sprintf("host.%s", mset), + } + + for k := range m { + for _, prefix := range prefixes { + if strings.HasPrefix(k, prefix) { + m[k] = nil + } + } + } + } + + testCases := []struct { + metricset string + yieldDocsFunc func(agent []estools.ESDoc, otel []estools.ESDoc) (mapstr.M, mapstr.M) + }{ + { + metricset: "cpu", + yieldDocsFunc: func(agent []estools.ESDoc, otel []estools.ESDoc) (mapstr.M, mapstr.M) { + return agent[0].Source, otel[0].Source + }, + }, + { + metricset: "memory", + yieldDocsFunc: func(agent []estools.ESDoc, otel []estools.ESDoc) (mapstr.M, mapstr.M) { + return agent[0].Source, otel[0].Source + }, + }, + { + metricset: "network", + yieldDocsFunc: func(agent []estools.ESDoc, otel []estools.ESDoc) (mapstr.M, mapstr.M) { + // make sure we compare events from network interfaces and not host metrics + var agentDoc, otelDoc mapstr.M + for _, hit := range agent { + agentDoc = hit.Source + if ok, _ := agentDoc.Flatten().HasKey("system.network.name"); ok { + break + } + } + for _, hit := range otel { + otelDoc = hit.Source + if ok, _ := otelDoc.Flatten().HasKey("system.network.name"); ok { + break + } + } + return agentDoc, otelDoc + }, + }, + { + metricset: "filesystem", + yieldDocsFunc: func(agent []estools.ESDoc, otel []estools.ESDoc) (mapstr.M, mapstr.M) { + return agent[0].Source, otel[0].Source + }, + }, + } + + for _, tt := range testCases { + t.Run(tt.metricset, func(t *testing.T) { + msetAgentDocs := agentDocs[tt.metricset].Hits.Hits + msetOtelDocs := otelDocs[tt.metricset].Hits.Hits + require.Greater(t, len(msetAgentDocs), 0, "expected to find agent documents for metricset %s", tt.metricset) + require.Greater(t, len(msetOtelDocs), 0, "expected to find otel documents for metricset %s", tt.metricset) + + agentDoc, otelDoc := tt.yieldDocsFunc(msetAgentDocs, msetOtelDocs) + agentDoc = agentDoc.Flatten() + otelDoc = otelDoc.Flatten() + + t.Cleanup(func() { + if t.Failed() { + t.Logf("agent document for metricset %s:\n%s", tt.metricset, agentDoc.StringToPrint()) + t.Logf("otel document for metricset %s:\n%s", tt.metricset, otelDoc.StringToPrint()) + } + }) + + stripNondeterminism(agentDoc, tt.metricset) + stripNondeterminism(otelDoc, tt.metricset) + + AssertMapstrKeysEqual(t, agentDoc, otelDoc, nil, "expected documents keys to be equal for metricset "+tt.metricset) + AssertMapsEqual(t, agentDoc, otelDoc, ignoredFields, "expected documents to be equal for metricset "+tt.metricset) + }) + } + }) +} + +// TestBeatsReceiverLogs is a test that compares logs emitted by beats processes to those emitted by beats receivers. +func TestBeatsReceiverLogs(t *testing.T) { + _ = define.Require(t, define.Requirements{ + Group: integration.Default, + Local: true, + Sudo: true, + OS: []define.OS{ + {Type: define.Windows}, + {Type: define.Linux}, + {Type: define.Darwin}, + }, + Stack: nil, + }) + + t.Skip("Skip this test as it's flaky. See https://github.com/elastic/elastic-agent/issues/9890") + + type configOptions struct { + RuntimeExperimental string + } + configTemplate := `agent.logging.level: info +agent.logging.to_stderr: true +agent.logging.to_files: false +inputs: + # Collecting system metrics + - type: system/metrics + id: unique-system-metrics-input + _runtime_experimental: {{.RuntimeExperimental}} + streams: + - metricsets: + - cpu +outputs: + default: + type: elasticsearch + hosts: [http://localhost:9200] + api_key: placeholder +agent.monitoring.enabled: false +` + + var configBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer, + configOptions{ + RuntimeExperimental: "process", + })) + processConfig := configBuffer.Bytes() + require.NoError(t, + template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer, + configOptions{ + RuntimeExperimental: "otel", + })) + receiverConfig := configBuffer.Bytes() + // this is the context for the whole test, with a global timeout defined + ctx, cancel := testcontext.WithDeadline(t, t.Context(), time.Now().Add(5*time.Minute)) + defer cancel() + + // use a subcontext for the agent + agentProcessCtx, agentProcessCancel := context.WithCancel(ctx) + fixture, cmd, output := prepareAgentCmd(t, agentProcessCtx, processConfig) + + require.NoError(t, cmd.Start()) + + require.EventuallyWithT(t, func(collect *assert.CollectT) { + var statusErr error + status, statusErr := fixture.ExecStatus(agentProcessCtx) + assert.NoError(collect, statusErr) + assertBeatsHealthy(collect, &status, component.ProcessRuntimeManager, 1) + return + }, 1*time.Minute, 1*time.Second) + + agentProcessCancel() + require.Error(t, cmd.Wait()) + processLogsString := output.String() + output.Reset() + + // use a subcontext for the agent + agentReceiverCtx, agentReceiverCancel := context.WithCancel(ctx) + fixture, cmd, output = prepareAgentCmd(t, agentReceiverCtx, receiverConfig) + + require.NoError(t, cmd.Start()) + + t.Cleanup(func() { + if t.Failed() { + t.Log("Elastic-Agent output:") + t.Log(output.String()) + } + }) + + require.EventuallyWithT(t, func(collect *assert.CollectT) { + var statusErr error + status, statusErr := fixture.ExecStatus(agentReceiverCtx) + assert.NoError(collect, statusErr) + assertBeatsHealthy(collect, &status, component.OtelRuntimeManager, 1) + return + }, 1*time.Minute, 1*time.Second) + agentReceiverCancel() + require.Error(t, cmd.Wait()) + receiverLogsString := output.String() + + processLog := getBeatStartLogRecord(processLogsString) + assert.NotEmpty(t, processLog) + receiverLog := getBeatStartLogRecord(receiverLogsString) + assert.NotEmpty(t, receiverLog) + + // Check that the process log is a subset of the receiver log + for key, value := range processLog { + assert.Contains(t, receiverLog, key) + if key == "@timestamp" { // the timestamp value will be different + continue + } + assert.Equal(t, value, receiverLog[key]) + } +>>>>>>> ba9c15651 (Disable flaky test TestBeatsReceiverLogs (#9891)) } func assertCollectorComponentsHealthy(t *assert.CollectT, status *atesting.AgentStatusCollectorOutput) {