Skip to content

Commit 5d1c3b7

Browse files
authored
FFM-11212 Metrics enhancements (#150)
1 parent dffa98c commit 5d1c3b7

File tree

9 files changed

+544
-175
lines changed

9 files changed

+544
-175
lines changed

analyticsservice/analytics.go

Lines changed: 107 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"encoding/json"
66
"fmt"
77
"strconv"
8-
"sync"
98
"time"
109

1110
"github.com/harness/ff-golang-server-sdk/sdk_codes"
@@ -32,8 +31,20 @@ const (
3231
sdkLanguageAttribute string = "SDK_LANGUAGE"
3332
sdkLanguage string = "go"
3433
globalTarget string = "global"
34+
maxAnalyticsEntries int = 10000
35+
maxTargetEntries int = 100000
3536
)
3637

38+
// SafeAnalyticsCache is a type that provides thread safe access to maps used by analytics
39+
type SafeAnalyticsCache[K comparable, V any] interface {
40+
set(key K, value V)
41+
get(key K) (V, bool)
42+
delete(key K)
43+
size() int
44+
clear()
45+
iterate(func(K, V))
46+
}
47+
3748
type analyticsEvent struct {
3849
target *evaluation.Target
3950
featureConfig *rest.FeatureConfig
@@ -43,13 +54,14 @@ type analyticsEvent struct {
4354

4455
// AnalyticsService provides a way to cache and send analytics to the server
4556
type AnalyticsService struct {
46-
mx *sync.Mutex
47-
analyticsChan chan analyticsEvent
48-
analyticsData map[string]analyticsEvent
49-
timeout time.Duration
50-
logger logger.Logger
51-
metricsClient *metricsclient.ClientWithResponsesInterface
52-
environmentID string
57+
analyticsChan chan analyticsEvent
58+
evaluationAnalytics SafeAnalyticsCache[string, analyticsEvent]
59+
targetAnalytics SafeAnalyticsCache[string, evaluation.Target]
60+
seenTargets SafeAnalyticsCache[string, bool]
61+
timeout time.Duration
62+
logger logger.Logger
63+
metricsClient metricsclient.ClientWithResponsesInterface
64+
environmentID string
5365
}
5466

5567
// NewAnalyticsService creates and starts a analytics service to send data to the client
@@ -61,19 +73,20 @@ func NewAnalyticsService(timeout time.Duration, logger logger.Logger) *Analytics
6173
serviceTimeout = 1 * time.Hour
6274
}
6375
as := AnalyticsService{
64-
mx: &sync.Mutex{},
65-
analyticsChan: make(chan analyticsEvent),
66-
analyticsData: map[string]analyticsEvent{},
67-
timeout: serviceTimeout,
68-
logger: logger,
76+
analyticsChan: make(chan analyticsEvent),
77+
evaluationAnalytics: newSafeEvaluationAnalytics(),
78+
targetAnalytics: newSafeTargetAnalytics(),
79+
seenTargets: newSafeSeenTargets(),
80+
timeout: serviceTimeout,
81+
logger: logger,
6982
}
7083
go as.listener()
7184

7285
return &as
7386
}
7487

7588
// Start starts the client and timer to send analytics
76-
func (as *AnalyticsService) Start(ctx context.Context, client *metricsclient.ClientWithResponsesInterface, environmentID string) {
89+
func (as *AnalyticsService) Start(ctx context.Context, client metricsclient.ClientWithResponsesInterface, environmentID string) {
7790
as.logger.Infof("%s Metrics started", sdk_codes.MetricsStarted)
7891
as.metricsClient = client
7992
as.environmentID = environmentID
@@ -106,18 +119,44 @@ func (as *AnalyticsService) PushToQueue(featureConfig *rest.FeatureConfig, targe
106119
func (as *AnalyticsService) listener() {
107120
as.logger.Info("Analytics cache successfully initialized")
108121
for ad := range as.analyticsChan {
109-
key := getEventSummaryKey(ad)
122+
analyticsKey := getEvaluationAnalyticKey(ad)
123+
124+
// Check if we've hit capacity for evaluations
125+
if as.evaluationAnalytics.size() < maxAnalyticsEntries {
126+
// Update evaluation metrics
127+
analytic, ok := as.evaluationAnalytics.get(analyticsKey)
128+
if !ok {
129+
ad.count = 1
130+
as.evaluationAnalytics.set(analyticsKey, ad)
131+
} else {
132+
ad.count = analytic.count + 1
133+
as.evaluationAnalytics.set(analyticsKey, ad)
134+
}
135+
} else {
136+
as.logger.Warnf("%s Evaluation analytic cache reached max size, remaining evaluation metrics for this analytics interval will not be sent", sdk_codes.EvaluationMetricsMaxSizeReached)
137+
}
138+
139+
// Check if target is nil or anonymous
140+
if ad.target == nil || (ad.target.Anonymous != nil && *ad.target.Anonymous) {
141+
continue
142+
}
110143

111-
as.mx.Lock()
112-
analytic, ok := as.analyticsData[key]
113-
if !ok {
114-
ad.count = 1
115-
as.analyticsData[key] = ad
144+
// Check if target has been seen
145+
_, seen := as.seenTargets.get(ad.target.Identifier)
146+
147+
if seen {
148+
continue
149+
}
150+
151+
// Update seen targets
152+
as.seenTargets.set(ad.target.Identifier, true)
153+
154+
// Update target metrics
155+
if as.targetAnalytics.size() < maxTargetEntries {
156+
as.targetAnalytics.set(ad.target.Identifier, *ad.target)
116157
} else {
117-
ad.count = (analytic.count + 1)
118-
as.analyticsData[key] = ad
158+
as.logger.Warnf("%s Target analytics cache reached max size, remaining target metrics for this analytics interval will not be sent", sdk_codes.TargetMetricsMaxSizeReached)
119159
}
120-
as.mx.Unlock()
121160
}
122161
}
123162

@@ -150,116 +189,75 @@ func convertInterfaceToString(i interface{}) string {
150189
}
151190

152191
func (as *AnalyticsService) sendDataAndResetCache(ctx context.Context) {
153-
as.mx.Lock()
154-
// copy cache to send to server
155-
analyticsData := as.analyticsData
156-
// clear cache. As metrics is secondary to the flags, we do it this way
157-
// so it doesn't effect the performance of our users code. Even if it means
158-
// we lose metrics the odd time.
159-
as.analyticsData = map[string]analyticsEvent{}
160-
as.mx.Unlock()
161-
162-
metricData := make([]metricsclient.MetricsData, 0, len(as.analyticsData))
163-
targetData := map[string]metricsclient.TargetData{}
164-
165-
for _, analytic := range analyticsData {
166-
if analytic.target != nil {
167-
if analytic.target.Anonymous == nil || !*analytic.target.Anonymous {
168-
targetAttributes := make([]metricsclient.KeyValue, 0)
169-
if analytic.target.Attributes != nil {
170-
targetAttributes = make([]metricsclient.KeyValue, 0, len(*analytic.target.Attributes))
171-
for key, value := range *analytic.target.Attributes {
172-
v := convertInterfaceToString(value)
173-
kv := metricsclient.KeyValue{
174-
Key: key,
175-
Value: v,
176-
}
177-
targetAttributes = append(targetAttributes, kv)
178-
}
179-
180-
}
181-
182-
targetName := analytic.target.Identifier
183-
if analytic.target.Name != "" {
184-
targetName = analytic.target.Name
185-
}
186-
187-
td := metricsclient.TargetData{
188-
Name: targetName,
189-
Identifier: analytic.target.Identifier,
190-
Attributes: targetAttributes,
191-
}
192-
targetData[analytic.target.Identifier] = td
193-
}
194-
}
195192

193+
// Clone and reset the evaluation analytics cache to minimise the duration
194+
// for which locks are held, so that metrics processing does not affect flag evaluations performance.
195+
// Although this might occasionally result in the loss of some metrics during periods of high load,
196+
// it is an acceptable tradeoff to prevent extended lock periods that could degrade user code.
197+
evaluationAnalyticsClone := as.evaluationAnalytics
198+
199+
as.evaluationAnalytics = newSafeEvaluationAnalytics()
200+
201+
// Clone and reset target analytics cache for same reason.
202+
targetAnalyticsClone := as.targetAnalytics
203+
as.targetAnalytics = newSafeTargetAnalytics()
204+
205+
metricData := make([]metricsclient.MetricsData, 0, evaluationAnalyticsClone.size())
206+
targetData := make([]metricsclient.TargetData, 0, targetAnalyticsClone.size())
207+
208+
// Process evaluation metrics
209+
evaluationAnalyticsClone.iterate(func(key string, analytic analyticsEvent) {
196210
metricAttributes := []metricsclient.KeyValue{
197-
{
198-
Key: featureIdentifierAttribute,
199-
Value: analytic.featureConfig.Feature,
200-
},
201-
{
202-
Key: featureNameAttribute,
203-
Value: analytic.featureConfig.Feature,
204-
},
205-
{
206-
Key: variationIdentifierAttribute,
207-
Value: analytic.variation.Identifier,
208-
},
209-
{
210-
Key: variationValueAttribute,
211-
Value: analytic.variation.Value,
212-
},
213-
{
214-
Key: sdkTypeAttribute,
215-
Value: sdkType,
216-
},
217-
{
218-
Key: sdkLanguageAttribute,
219-
Value: sdkLanguage,
220-
},
221-
{
222-
Key: sdkVersionAttribute,
223-
Value: SdkVersion,
224-
},
211+
{Key: featureIdentifierAttribute, Value: analytic.featureConfig.Feature},
212+
{Key: featureNameAttribute, Value: analytic.featureConfig.Feature},
213+
{Key: variationIdentifierAttribute, Value: analytic.variation.Identifier},
214+
{Key: variationValueAttribute, Value: analytic.variation.Value},
215+
{Key: sdkTypeAttribute, Value: sdkType},
216+
{Key: sdkLanguageAttribute, Value: sdkLanguage},
217+
{Key: sdkVersionAttribute, Value: SdkVersion},
218+
{Key: targetAttribute, Value: globalTarget},
225219
}
226220

227-
metricAttributes = append(metricAttributes, metricsclient.KeyValue{
228-
Key: targetAttribute,
229-
Value: globalTarget,
230-
})
231-
232221
md := metricsclient.MetricsData{
233222
Timestamp: time.Now().UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)),
234223
Count: analytic.count,
235224
MetricsType: metricsclient.MetricsDataMetricsType(ffMetricType),
236225
Attributes: metricAttributes,
237226
}
238227
metricData = append(metricData, md)
239-
}
228+
})
240229

241-
// if targets data is empty we just send nil
242-
var targetDataPayload *[]metricsclient.TargetData = nil
243-
if len(targetData) > 0 {
244-
targetDataPayload = targetDataMapToArray(targetData)
245-
}
230+
// Process target metrics
231+
targetAnalyticsClone.iterate(func(key string, target evaluation.Target) {
232+
targetAttributes := make([]metricsclient.KeyValue, 0)
233+
for key, value := range *target.Attributes {
234+
targetAttributes = append(targetAttributes, metricsclient.KeyValue{Key: key, Value: convertInterfaceToString(value)})
235+
}
236+
237+
td := metricsclient.TargetData{
238+
Identifier: target.Identifier,
239+
Name: target.Name,
240+
Attributes: targetAttributes,
241+
}
242+
targetData = append(targetData, td)
243+
})
246244

247245
analyticsPayload := metricsclient.PostMetricsJSONRequestBody{
248246
MetricsData: &metricData,
249-
TargetData: targetDataPayload,
247+
TargetData: &targetData,
250248
}
251249

252250
if as.metricsClient != nil {
253-
emptyMetricsData := analyticsPayload.MetricsData == nil || len(*analyticsPayload.MetricsData) == 0
254-
emptyTargetData := analyticsPayload.TargetData == nil || len(*analyticsPayload.TargetData) == 0
251+
emptyMetricsData := len(metricData) == 0
252+
emptyTargetData := len(targetData) == 0
255253

256254
// if we have no metrics to send skip the post request
257255
if emptyMetricsData && emptyTargetData {
258256
as.logger.Debug("No metrics or target data to send")
259257
return
260258
}
261259

262-
mClient := *as.metricsClient
260+
mClient := as.metricsClient
263261

264262
jsonData, err := json.Marshal(analyticsPayload)
265263
if err != nil {
@@ -287,22 +285,6 @@ func (as *AnalyticsService) sendDataAndResetCache(ctx context.Context) {
287285
}
288286
}
289287

290-
//func getEventKey(event analyticsEvent) string {
291-
// targetIdentifier := ""
292-
// if event.target != nil {
293-
// targetIdentifier = event.target.Identifier
294-
// }
295-
// return fmt.Sprintf("%s-%s-%s-%s", event.featureConfig.Feature, event.variation.Identifier, event.variation.Value, targetIdentifier)
296-
//}
297-
298-
func getEventSummaryKey(event analyticsEvent) string {
288+
func getEvaluationAnalyticKey(event analyticsEvent) string {
299289
return fmt.Sprintf("%s-%s-%s-%s", event.featureConfig.Feature, event.variation.Identifier, event.variation.Value, globalTarget)
300290
}
301-
302-
func targetDataMapToArray(targetMap map[string]metricsclient.TargetData) *[]metricsclient.TargetData {
303-
targetDataArray := make([]metricsclient.TargetData, 0, len(targetMap))
304-
for _, targetData := range targetMap {
305-
targetDataArray = append(targetDataArray, targetData)
306-
}
307-
return &targetDataArray
308-
}

0 commit comments

Comments
 (0)