Skip to content

Commit 3174b19

Browse files
[connector/elasticapm] Add ability configure metric aggregation limits (#626)
1 parent 2081dab commit 3174b19

File tree

12 files changed

+764
-16
lines changed

12 files changed

+764
-16
lines changed

connector/elasticapmconnector/README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,27 @@ By default, aggregated metrics will be exported without any client metadata. It
3939
propagate client metadata from input to exported metrics by specifying a list of metadata keys
4040
in `elasticapm::aggregation::metadata_keys`.
4141

42+
By default, cardinality for aggregated metrics will be limited.
43+
Each limit defines a `max_cardinality`. There are four limits that can be configured:
44+
- `elasticapm::aggregation::limit::resource`: configures the max cardinality of resources
45+
- `elasticapm::aggregation::limit::scope`: configures the max cardinality of scopes within a resource
46+
- `elasticapm::aggregation::limit::metric`: configures the max cardinality of metrics within a scope
47+
- `elasticapm::aggregation::limit::datapoint`: configures the max cardinality of datapoints within a metric
48+
4249
```yaml
4350
elasticapm:
4451
aggregation:
4552
directory: /path/to/aggregation/directory
4653
metadata_keys: [list, of, metadata, keys]
54+
limit:
55+
resource:
56+
max_cardinality: 8000
57+
scope:
58+
max_cardinality: 4000
59+
metric:
60+
max_cardinality: 4000
61+
datapoint:
62+
max_cardinality: 4000
4763
```
4864
4965
### Metrics produced by the connector

connector/elasticapmconnector/config.go

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ import (
2121
"fmt"
2222
"time"
2323

24-
lsmconfig "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/config"
2524
signaltometricsconfig "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/config"
2625
"go.opentelemetry.io/collector/component"
26+
27+
lsmconfig "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/config"
2728
)
2829

2930
var _ component.Config = (*Config)(nil)
@@ -64,6 +65,27 @@ type AggregationConfig struct {
6465
// in all other cases -- using this configuration may lead to invalid behavior,
6566
// and will not be supported.
6667
Intervals []time.Duration `mapstructure:"intervals"`
68+
69+
// Limit holds optional cardinality limits for aggregated metrics
70+
Limit AggregationLimitConfig `mapstructure:"limit"`
71+
}
72+
73+
type AggregationLimitConfig struct {
74+
// ResourceLimit defines the max cardinality of resources
75+
ResourceLimit LimitConfig `mapstructure:"resource"`
76+
77+
// ScopeLimit defines the max cardinality of scopes within a resource
78+
ScopeLimit LimitConfig `mapstructure:"scope"`
79+
80+
// MetricLimit defines the max cardinality of metrics within a scope
81+
MetricLimit LimitConfig `mapstructure:"metric"`
82+
83+
// DatapointLimit defines the max cardinality of datapoints within a metric
84+
DatapointLimit LimitConfig `mapstructure:"datapoint"`
85+
}
86+
87+
type LimitConfig struct {
88+
MaxCardinality int64 `mapstructure:"max_cardinality"`
6789
}
6890

6991
func (cfg Config) Validate() error {
@@ -87,13 +109,48 @@ func (cfg Config) lsmConfig() *lsmconfig.Config {
87109
},
88110
})
89111
}
112+
90113
lsmConfig := &lsmconfig.Config{
91114
Intervals: intervalsConfig,
92115
ExponentialHistogramMaxBuckets: 160,
93116
}
117+
94118
if cfg.Aggregation != nil {
95119
lsmConfig.Directory = cfg.Aggregation.Directory
96120
lsmConfig.MetadataKeys = cfg.Aggregation.MetadataKeys
121+
lsmConfig.ResourceLimit = lsmconfig.LimitConfig{
122+
MaxCardinality: cfg.Aggregation.Limit.ResourceLimit.MaxCardinality,
123+
Overflow: lsmconfig.OverflowConfig{
124+
Attributes: []lsmconfig.Attribute{
125+
{Key: "service.name", Value: "_other"}, // Specific attribute required for APU UI compatibility
126+
{Key: "overflow", Value: "resource"},
127+
},
128+
},
129+
}
130+
lsmConfig.ScopeLimit = lsmconfig.LimitConfig{
131+
MaxCardinality: cfg.Aggregation.Limit.ScopeLimit.MaxCardinality,
132+
Overflow: lsmconfig.OverflowConfig{
133+
Attributes: []lsmconfig.Attribute{
134+
{Key: "overflow", Value: "scope"},
135+
},
136+
},
137+
}
138+
lsmConfig.MetricLimit = lsmconfig.LimitConfig{
139+
MaxCardinality: cfg.Aggregation.Limit.MetricLimit.MaxCardinality,
140+
Overflow: lsmconfig.OverflowConfig{
141+
Attributes: []lsmconfig.Attribute{
142+
{Key: "overflow", Value: "metric"},
143+
},
144+
},
145+
}
146+
lsmConfig.DatapointLimit = lsmconfig.LimitConfig{
147+
MaxCardinality: cfg.Aggregation.Limit.DatapointLimit.MaxCardinality,
148+
Overflow: lsmconfig.OverflowConfig{
149+
Attributes: []lsmconfig.Attribute{
150+
{Key: "overflow", Value: "datapoint"},
151+
},
152+
},
153+
}
97154
}
98155
return lsmConfig
99156
}

connector/elasticapmconnector/config_test.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,13 @@ import (
2222
"testing"
2323
"time"
2424

25-
"github.com/elastic/opentelemetry-collector-components/connector/elasticapmconnector/internal/metadata"
2625
"github.com/stretchr/testify/assert"
2726
"github.com/stretchr/testify/require"
2827
"go.opentelemetry.io/collector/component"
2928
"go.opentelemetry.io/collector/confmap/confmaptest"
3029
"go.opentelemetry.io/collector/confmap/xconfmap"
30+
31+
"github.com/elastic/opentelemetry-collector-components/connector/elasticapmconnector/internal/metadata"
3132
)
3233

3334
func TestConfig(t *testing.T) {
@@ -47,6 +48,20 @@ func TestConfig(t *testing.T) {
4748
Directory: "/path/to/aggregation/state",
4849
MetadataKeys: []string{"a", "B", "c"},
4950
Intervals: []time.Duration{time.Second, time.Minute},
51+
Limit: AggregationLimitConfig{
52+
ResourceLimit: LimitConfig{
53+
MaxCardinality: 1,
54+
},
55+
ScopeLimit: LimitConfig{
56+
MaxCardinality: 1,
57+
},
58+
MetricLimit: LimitConfig{
59+
MaxCardinality: 1,
60+
},
61+
DatapointLimit: LimitConfig{
62+
MaxCardinality: 1,
63+
},
64+
},
5065
},
5166
},
5267
},

connector/elasticapmconnector/connector_test.go

Lines changed: 86 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ import (
2424
"path/filepath"
2525
"testing"
2626

27-
"github.com/elastic/opentelemetry-collector-components/connector/elasticapmconnector/internal/metadata"
27+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
28+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
2829
"github.com/stretchr/testify/assert"
2930
"github.com/stretchr/testify/require"
3031
"go.opentelemetry.io/collector/client"
@@ -36,25 +37,43 @@ import (
3637
"go.opentelemetry.io/collector/pdata/plog"
3738
"go.opentelemetry.io/collector/pdata/pmetric"
3839

39-
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
40-
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
40+
"github.com/elastic/opentelemetry-collector-components/connector/elasticapmconnector/internal/metadata"
4141
)
4242

4343
var update = flag.Bool("update", false, "Update golden files")
4444

4545
func TestConnector_LogsToMetrics(t *testing.T) {
46+
oneCardinalityLimitConfig := LimitConfig{
47+
MaxCardinality: 1,
48+
}
49+
oneCardinalityAggregationConfig := Config{
50+
Aggregation: &AggregationConfig{
51+
Limit: AggregationLimitConfig{
52+
ResourceLimit: oneCardinalityLimitConfig,
53+
ScopeLimit: oneCardinalityLimitConfig,
54+
MetricLimit: oneCardinalityLimitConfig,
55+
DatapointLimit: oneCardinalityLimitConfig,
56+
},
57+
},
58+
}
59+
4660
testCases := []struct {
4761
name string
62+
cfg *Config
4863
}{
49-
{name: "logs/service_summary"},
64+
// output should remain the same for all provided configs
65+
{name: "logs/service_summary", cfg: &Config{}},
66+
{name: "logs/service_summary", cfg: &oneCardinalityAggregationConfig},
67+
68+
// output should show overflow behavior
69+
{name: "logs/service_summary_overflow", cfg: &oneCardinalityAggregationConfig},
5070
}
5171

5272
for _, tc := range testCases {
5373
t.Run(tc.name, func(t *testing.T) {
5474
nextMetrics := &consumertest.MetricsSink{}
5575

56-
cfg := &Config{}
57-
l2m := newLogsToMetrics(t, connectortest.NewNopSettings(metadata.Type), cfg, nextMetrics)
76+
l2m := newLogsToMetrics(t, connectortest.NewNopSettings(metadata.Type), tc.cfg, nextMetrics)
5877

5978
dir := filepath.Join("testdata", tc.name)
6079
input, err := golden.ReadLogs(filepath.Join(dir, "input.yaml"))
@@ -76,18 +95,37 @@ func TestConnector_LogsToMetrics(t *testing.T) {
7695
}
7796

7897
func TestConnector_MetricsToMetrics(t *testing.T) {
98+
oneCardinalityLimitConfig := LimitConfig{
99+
MaxCardinality: 1,
100+
}
101+
oneCardinalityAggregationConfig := Config{
102+
Aggregation: &AggregationConfig{
103+
Limit: AggregationLimitConfig{
104+
ResourceLimit: oneCardinalityLimitConfig,
105+
ScopeLimit: oneCardinalityLimitConfig,
106+
MetricLimit: oneCardinalityLimitConfig,
107+
DatapointLimit: oneCardinalityLimitConfig,
108+
},
109+
},
110+
}
111+
79112
testCases := []struct {
80113
name string
114+
cfg *Config
81115
}{
82-
{name: "metrics/service_summary"},
116+
// output should remain the same for all provided configs
117+
{name: "metrics/service_summary", cfg: &Config{}},
118+
{name: "metrics/service_summary", cfg: &oneCardinalityAggregationConfig},
119+
120+
// output should show overflow
121+
{name: "metrics/service_summary_overflow", cfg: &oneCardinalityAggregationConfig},
83122
}
84123

85124
for _, tc := range testCases {
86125
t.Run(tc.name, func(t *testing.T) {
87126
nextMetrics := &consumertest.MetricsSink{}
88127

89-
cfg := &Config{}
90-
m2m := newMetricsConnector(t, connectortest.NewNopSettings(metadata.Type), cfg, nextMetrics)
128+
m2m := newMetricsConnector(t, connectortest.NewNopSettings(metadata.Type), tc.cfg, nextMetrics)
91129

92130
dir := filepath.Join("testdata", tc.name)
93131
input, err := golden.ReadMetrics(filepath.Join(dir, "input.yaml"))
@@ -109,19 +147,53 @@ func TestConnector_MetricsToMetrics(t *testing.T) {
109147
}
110148

111149
func TestConnector_TracesToMetrics(t *testing.T) {
150+
fourCardinalityLimitConfig := LimitConfig{
151+
MaxCardinality: 4, // min limit to prevent overflow behavior
152+
}
153+
fourCardinalityAggregationConfig := Config{
154+
Aggregation: &AggregationConfig{
155+
Limit: AggregationLimitConfig{
156+
ResourceLimit: fourCardinalityLimitConfig,
157+
ScopeLimit: fourCardinalityLimitConfig,
158+
MetricLimit: fourCardinalityLimitConfig,
159+
DatapointLimit: fourCardinalityLimitConfig,
160+
},
161+
},
162+
}
163+
164+
oneCardinalityLimitConfig := LimitConfig{
165+
MaxCardinality: 1,
166+
}
167+
oneCardinalityAggregationConfig := Config{
168+
Aggregation: &AggregationConfig{
169+
Limit: AggregationLimitConfig{
170+
ResourceLimit: oneCardinalityLimitConfig,
171+
ScopeLimit: oneCardinalityLimitConfig,
172+
MetricLimit: oneCardinalityLimitConfig,
173+
DatapointLimit: oneCardinalityLimitConfig,
174+
},
175+
},
176+
}
177+
112178
testCases := []struct {
113179
name string
180+
cfg *Config
114181
}{
115-
{name: "traces/transaction_metrics"},
116-
{name: "traces/span_metrics"},
182+
// output should remain the same for all provided configs
183+
{name: "traces/transaction_metrics", cfg: &Config{}},
184+
{name: "traces/transaction_metrics", cfg: &fourCardinalityAggregationConfig},
185+
{name: "traces/span_metrics", cfg: &Config{}},
186+
{name: "traces/span_metrics", cfg: &fourCardinalityAggregationConfig},
187+
188+
// output should show overflow
189+
{name: "traces/span_metrics_overflow", cfg: &oneCardinalityAggregationConfig},
117190
}
118191

119192
for _, tc := range testCases {
120193
t.Run(tc.name, func(t *testing.T) {
121194
nextMetrics := &consumertest.MetricsSink{}
122195

123-
cfg := &Config{}
124-
t2m := newTracesConnector(t, connectortest.NewNopSettings(metadata.Type), cfg, nextMetrics)
196+
t2m := newTracesConnector(t, connectortest.NewNopSettings(metadata.Type), tc.cfg, nextMetrics)
125197

126198
dir := filepath.Join("testdata", tc.name)
127199
input, err := golden.ReadTraces(filepath.Join(dir, "input.yaml"))
@@ -159,6 +231,7 @@ func TestConnector_AggregationDirectory(t *testing.T) {
159231
require.NoError(t, err)
160232
require.NotEmpty(t, entries)
161233
}
234+
162235
func TestConnector_AggregationMetadataKeys(t *testing.T) {
163236
cfg := &Config{Aggregation: &AggregationConfig{MetadataKeys: []string{"k"}}}
164237

connector/elasticapmconnector/factory.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,24 @@ func NewFactory() connector.Factory {
4848

4949
// createDefaultConfig creates the default configuration.
5050
func createDefaultConfig() component.Config {
51-
return &Config{}
51+
return &Config{
52+
Aggregation: &AggregationConfig{
53+
Limit: AggregationLimitConfig{
54+
ResourceLimit: LimitConfig{
55+
MaxCardinality: 8000,
56+
},
57+
ScopeLimit: LimitConfig{
58+
MaxCardinality: 4000,
59+
},
60+
MetricLimit: LimitConfig{
61+
MaxCardinality: 4000,
62+
},
63+
DatapointLimit: LimitConfig{
64+
MaxCardinality: 4000,
65+
},
66+
},
67+
},
68+
}
5269
}
5370

5471
func createLogsToMetrics(

connector/elasticapmconnector/testdata/config/full.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,12 @@ elasticapm:
33
directory: /path/to/aggregation/state
44
metadata_keys: [a, B, c]
55
intervals: [1s, 1m]
6+
limit:
7+
resource:
8+
max_cardinality: 1
9+
scope:
10+
max_cardinality: 1
11+
metric:
12+
max_cardinality: 1
13+
datapoint:
14+
max_cardinality: 1

0 commit comments

Comments
 (0)