Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions connector/elasticapmconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,17 @@ in `elasticapm::aggregation::metadata_keys`.

By default, cardinality for aggregated metrics will be limited.
Each limit defines a `max_cardinality`. There are four limits that can be configured:
- `elasticapm::aggregation::limit::resource`: configures the max cardinality of resources
- `elasticapm::aggregation::limit::scope`: configures the max cardinality of scopes within a resource
- `elasticapm::aggregation::limit::metric`: configures the max cardinality of metrics within a scope
- `elasticapm::aggregation::limit::datapoint`: configures the max cardinality of datapoints within a metric
- `elasticapm::aggregation::limits::resource`: configures the max cardinality of resources
- `elasticapm::aggregation::limits::scope`: configures the max cardinality of scopes within a resource
- `elasticapm::aggregation::limits::metric`: configures the max cardinality of metrics within a scope
- `elasticapm::aggregation::limits::datapoint`: configures the max cardinality of datapoints within a metric

```yaml
elasticapm:
aggregation:
directory: /path/to/aggregation/directory
metadata_keys: [list, of, metadata, keys]
limit:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@isaacaflores2 While resolving conflicts, I renamed the limit to limits.

limits:
resource:
max_cardinality: 8000
scope:
Expand Down
177 changes: 113 additions & 64 deletions connector/elasticapmconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package elasticapmconnector // import "github.com/elastic/opentelemetry-collecto

import (
"fmt"
"slices"
"time"

signaltometricsconfig "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/config"
Expand All @@ -39,6 +40,25 @@ type Config struct {
// Aggregation holds configuration related to aggregation of Elastic APM
// metrics from other signals.
Aggregation *AggregationConfig `mapstructure:"aggregation"`

// CustomResourceAttributes define a list of resource attributes that will
// be added to all the aggregated metrics as optional attributes i.e. the
// attribute will be added to the aggregated metrics if they are present in
// the incoming signal, otherwise, the attribute will be ignored.
//
// NOTE: any custom attributes should have a bounded and preferably low
// cardinality to be performant.
CustomResourceAttributes []string `mapstructure:"custom_resource_attributes"`

// CustomSpanAttributes define a list of span attributes that will be added
// to the aggregated `service_transaction`, `transaction`, and `span_destination`
// metrics as optional attributes i.e. the attribute will be added to the
// aggregated metrics if they are present in the incoming signal, otherwise,
// the attribute will be ignored.
//
// NOTE: any custom attributes should have a bounded and preferably low
// cardinality to be performant.
CustomSpanAttributes []string `mapstructure:"custom_span_attributes"`
}

type AggregationConfig struct {
Expand Down Expand Up @@ -66,8 +86,8 @@ type AggregationConfig struct {
// and will not be supported.
Intervals []time.Duration `mapstructure:"intervals"`

// Limit holds optional cardinality limits for aggregated metrics
Limit AggregationLimitConfig `mapstructure:"limit"`
// Limits holds optional cardinality limits for aggregated metrics
Limits AggregationLimitConfig `mapstructure:"limits"`
}

type AggregationLimitConfig struct {
Expand Down Expand Up @@ -119,7 +139,7 @@ func (cfg Config) lsmConfig() *lsmconfig.Config {
lsmConfig.Directory = cfg.Aggregation.Directory
lsmConfig.MetadataKeys = cfg.Aggregation.MetadataKeys
lsmConfig.ResourceLimit = lsmconfig.LimitConfig{
MaxCardinality: cfg.Aggregation.Limit.ResourceLimit.MaxCardinality,
MaxCardinality: cfg.Aggregation.Limits.ResourceLimit.MaxCardinality,
Overflow: lsmconfig.OverflowConfig{
Attributes: []lsmconfig.Attribute{
{Key: "service.name", Value: "_other"}, // Specific attribute required for APU UI compatibility
Expand All @@ -128,23 +148,23 @@ func (cfg Config) lsmConfig() *lsmconfig.Config {
},
}
lsmConfig.ScopeLimit = lsmconfig.LimitConfig{
MaxCardinality: cfg.Aggregation.Limit.ScopeLimit.MaxCardinality,
MaxCardinality: cfg.Aggregation.Limits.ScopeLimit.MaxCardinality,
Overflow: lsmconfig.OverflowConfig{
Attributes: []lsmconfig.Attribute{
{Key: "overflow", Value: "scope"},
},
},
}
lsmConfig.MetricLimit = lsmconfig.LimitConfig{
MaxCardinality: cfg.Aggregation.Limit.MetricLimit.MaxCardinality,
MaxCardinality: cfg.Aggregation.Limits.MetricLimit.MaxCardinality,
Overflow: lsmconfig.OverflowConfig{
Attributes: []lsmconfig.Attribute{
{Key: "overflow", Value: "metric"},
},
},
}
lsmConfig.DatapointLimit = lsmconfig.LimitConfig{
MaxCardinality: cfg.Aggregation.Limit.DatapointLimit.MaxCardinality,
MaxCardinality: cfg.Aggregation.Limits.DatapointLimit.MaxCardinality,
Overflow: lsmconfig.OverflowConfig{
Attributes: []lsmconfig.Attribute{
{Key: "overflow", Value: "datapoint"},
Expand All @@ -156,72 +176,88 @@ func (cfg Config) lsmConfig() *lsmconfig.Config {
}

func (cfg Config) signaltometricsConfig() *signaltometricsconfig.Config {
// serviceResourceAttributes is the resource attributes included in
// service-level aggregated metrics.
serviceResourceAttributes := []signaltometricsconfig.Attribute{
{Key: "service.name"},
{Key: "deployment.environment"}, // service.environment
{Key: "telemetry.sdk.language"}, // service.language.name

// agent.name is set via elastictraceprocessor for traces,
// but not for other signals. Default to "unknown" for the
// others.
{
Key: "agent.name",
DefaultValue: "unknown",
},
}
// commonResourceAttributes are resource attributes included in
// all aggregated metrics.
commonResourceAttributes := append(
[]signaltometricsconfig.Attribute{
{Key: "service.name"},
{Key: "deployment.environment"}, // service.environment
{Key: "telemetry.sdk.language"}, // service.language.name

// agent.name is set via elastictraceprocessor for traces,
// but not for other signals. Default to "unknown" for the
// others.
{
Key: "agent.name",
DefaultValue: "unknown",
},
}, toSignalToMetricsAttributes(cfg.CustomResourceAttributes)...,
)

// serviceSummaryResourceAttributes are resource attributes for service
// summary metrics.
serviceSummaryResourceAttributes := slices.Clone(commonResourceAttributes)

// serviceTransactionResourceAttributes are resource attributes for service
// transaction metrics
serviceTransactionResourceAttributes := slices.Clone(commonResourceAttributes)

// transactionResourceAttributes is the resource attributes included
// transactionResourceAttributes are resource attributes included
// in transaction group-level aggregated metrics.
transactionResourceAttributes := append([]signaltometricsconfig.Attribute{
{Key: "container.id"},
{Key: "k8s.pod.name"},
{Key: "service.version"},
{Key: "service.instance.id"}, // service.node.name
{Key: "process.runtime.name"}, // service.runtime.name
{Key: "process.runtime.version"}, // service.runtime.version
{Key: "telemetry.sdk.version"}, // service.language.version??
{Key: "host.name"},
{Key: "os.type"}, // host.os.platform
{Key: "faas.instance"},
{Key: "faas.name"},
{Key: "faas.version"},
{Key: "cloud.provider"},
{Key: "cloud.region"},
{Key: "cloud.availability_zone"},
{Key: "cloud.platform"}, // cloud.service.name
{Key: "cloud.account.id"},
}, serviceResourceAttributes...)
transactionResourceAttributes := append(
[]signaltometricsconfig.Attribute{
{Key: "container.id"},
{Key: "k8s.pod.name"},
{Key: "service.version"},
{Key: "service.instance.id"}, // service.node.name
{Key: "process.runtime.name"}, // service.runtime.name
{Key: "process.runtime.version"}, // service.runtime.version
{Key: "telemetry.sdk.version"}, // service.language.version??
{Key: "host.name"},
{Key: "os.type"}, // host.os.platform
{Key: "faas.instance"},
{Key: "faas.name"},
{Key: "faas.version"},
{Key: "cloud.provider"},
{Key: "cloud.region"},
{Key: "cloud.availability_zone"},
{Key: "cloud.platform"}, // cloud.service.name
{Key: "cloud.account.id"},
}, commonResourceAttributes...,
)

// spanDestinationResourceAttributes are resource attributes included
// in service destination aggregations
spanDestinationResourceAttributes := slices.Clone(commonResourceAttributes)

serviceSummaryAttributes := []signaltometricsconfig.Attribute{{
Key: "metricset.name",
DefaultValue: "service_summary",
}}

serviceTransactionAttributes := []signaltometricsconfig.Attribute{
serviceTransactionAttributes := append([]signaltometricsconfig.Attribute{
{Key: "transaction.root"},
{Key: "transaction.type"},
{Key: "metricset.name", DefaultValue: "service_transaction"},
}
}, toSignalToMetricsAttributes(cfg.CustomSpanAttributes)...)

transactionAttributes := []signaltometricsconfig.Attribute{
transactionAttributes := append([]signaltometricsconfig.Attribute{
{Key: "transaction.root"},
{Key: "transaction.name"},
{Key: "transaction.type"},
{Key: "transaction.result"},
{Key: "event.outcome"},
{Key: "metricset.name", DefaultValue: "transaction"},
}
}, toSignalToMetricsAttributes(cfg.CustomSpanAttributes)...)

serviceDestinationAttributes := []signaltometricsconfig.Attribute{
spanDestinationAttributes := append([]signaltometricsconfig.Attribute{
{Key: "span.name"},
{Key: "event.outcome"},
{Key: "service.target.type"},
{Key: "service.target.name"},
{Key: "span.destination.service.resource"},
{Key: "metricset.name", DefaultValue: "service_destination"},
}
}, toSignalToMetricsAttributes(cfg.CustomSpanAttributes)...)

transactionDurationHistogram := &signaltometricsconfig.ExponentialHistogram{
Count: "Int(AdjustedCount())",
Expand All @@ -237,30 +273,30 @@ func (cfg Config) signaltometricsConfig() *signaltometricsconfig.Config {
return &signaltometricsconfig.Config{
Logs: []signaltometricsconfig.MetricInfo{{
Name: "service_summary",
IncludeResourceAttributes: serviceResourceAttributes,
IncludeResourceAttributes: serviceSummaryResourceAttributes,
Attributes: serviceSummaryAttributes,
Sum: &signaltometricsconfig.Sum{Value: "1"},
}},

Datapoints: []signaltometricsconfig.MetricInfo{{
Name: "service_summary",
IncludeResourceAttributes: serviceResourceAttributes,
IncludeResourceAttributes: serviceSummaryResourceAttributes,
Attributes: serviceSummaryAttributes,
Sum: &signaltometricsconfig.Sum{Value: "1"},
}},

Spans: []signaltometricsconfig.MetricInfo{{
Name: "service_summary",
IncludeResourceAttributes: serviceResourceAttributes,
IncludeResourceAttributes: serviceSummaryResourceAttributes,
Attributes: serviceSummaryAttributes,
Sum: &signaltometricsconfig.Sum{
Value: "Int(AdjustedCount())",
},
}, {
Name: "transaction.duration.histogram",
Description: "APM service transaction aggregated metrics as histogram",
IncludeResourceAttributes: serviceResourceAttributes,
Attributes: append(serviceTransactionAttributes[:], signaltometricsconfig.Attribute{
IncludeResourceAttributes: serviceTransactionResourceAttributes,
Attributes: append(slices.Clone(serviceTransactionAttributes), signaltometricsconfig.Attribute{
Key: "elasticsearch.mapping.hints",
DefaultValue: []any{"_doc_count"},
}),
Expand All @@ -269,8 +305,8 @@ func (cfg Config) signaltometricsConfig() *signaltometricsconfig.Config {
}, {
Name: "transaction.duration.summary",
Description: "APM service transaction aggregated metrics as summary",
IncludeResourceAttributes: serviceResourceAttributes,
Attributes: append(serviceTransactionAttributes[:], signaltometricsconfig.Attribute{
IncludeResourceAttributes: serviceTransactionResourceAttributes,
Attributes: append(slices.Clone(serviceTransactionAttributes), signaltometricsconfig.Attribute{
Key: "elasticsearch.mapping.hints",
DefaultValue: []any{"aggregate_metric_double"},
}),
Expand All @@ -280,7 +316,7 @@ func (cfg Config) signaltometricsConfig() *signaltometricsconfig.Config {
Name: "transaction.duration.histogram",
Description: "APM transaction aggregated metrics as histogram",
IncludeResourceAttributes: transactionResourceAttributes,
Attributes: append(transactionAttributes[:], signaltometricsconfig.Attribute{
Attributes: append(slices.Clone(transactionAttributes), signaltometricsconfig.Attribute{
Key: "elasticsearch.mapping.hints",
DefaultValue: []any{"_doc_count"},
}),
Expand All @@ -290,7 +326,7 @@ func (cfg Config) signaltometricsConfig() *signaltometricsconfig.Config {
Name: "transaction.duration.summary",
Description: "APM transaction aggregated metrics as summary",
IncludeResourceAttributes: transactionResourceAttributes,
Attributes: append(transactionAttributes[:], signaltometricsconfig.Attribute{
Attributes: append(slices.Clone(transactionAttributes), signaltometricsconfig.Attribute{
Key: "elasticsearch.mapping.hints",
DefaultValue: []any{"aggregate_metric_double"},
}),
Expand All @@ -299,17 +335,17 @@ func (cfg Config) signaltometricsConfig() *signaltometricsconfig.Config {
}, {
Name: "span.destination.service.response_time.sum.us",
Description: "APM span destination metrics",
IncludeResourceAttributes: serviceResourceAttributes,
Attributes: serviceDestinationAttributes,
IncludeResourceAttributes: spanDestinationResourceAttributes,
Attributes: spanDestinationAttributes,
Unit: "us",
Sum: &signaltometricsconfig.Sum{
Value: "Double(Microseconds(end_time - start_time))",
},
}, {
Name: "span.destination.service.response_time.count",
Description: "APM span destination metrics",
IncludeResourceAttributes: serviceResourceAttributes,
Attributes: serviceDestinationAttributes,
IncludeResourceAttributes: spanDestinationResourceAttributes,
Attributes: spanDestinationAttributes,
Sum: &signaltometricsconfig.Sum{
Value: "Int(AdjustedCount())",
},
Expand All @@ -321,8 +357,8 @@ func (cfg Config) signaltometricsConfig() *signaltometricsconfig.Config {
// values are required and the actual histogram bucket is ignored.
Name: "event.success_count",
Description: "Success count as a metric for service transaction",
IncludeResourceAttributes: serviceResourceAttributes,
Attributes: append(serviceTransactionAttributes[:], signaltometricsconfig.Attribute{
IncludeResourceAttributes: serviceTransactionResourceAttributes,
Attributes: append(slices.Clone(serviceTransactionAttributes), signaltometricsconfig.Attribute{
Key: "elasticsearch.mapping.hints",
DefaultValue: []any{"aggregate_metric_double"},
}),
Expand All @@ -338,8 +374,8 @@ func (cfg Config) signaltometricsConfig() *signaltometricsconfig.Config {
}, {
Name: "event.success_count",
Description: "Success count as a metric for service transaction",
IncludeResourceAttributes: serviceResourceAttributes,
Attributes: append(serviceTransactionAttributes[:], signaltometricsconfig.Attribute{
IncludeResourceAttributes: serviceTransactionResourceAttributes,
Attributes: append(slices.Clone(serviceTransactionAttributes), signaltometricsconfig.Attribute{
Key: "elasticsearch.mapping.hints",
DefaultValue: []any{"aggregate_metric_double"},
}),
Expand All @@ -355,3 +391,16 @@ func (cfg Config) signaltometricsConfig() *signaltometricsconfig.Config {
}},
}
}

// toSignalToMetricsAttributes converts slice to string to signal to metricsa attributes
// assuming `optional: true` for each attribute.
func toSignalToMetricsAttributes(in []string) []signaltometricsconfig.Attribute {
attrs := make([]signaltometricsconfig.Attribute, 0, len(in))
for _, k := range in {
attrs = append(attrs, signaltometricsconfig.Attribute{
Key: k,
Optional: true,
})
}
return attrs
}
Loading