diff --git a/.harness/ffgolangserversdk.yaml b/.harness/ffgolangserversdk.yaml index 96ee333..4861a05 100644 --- a/.harness/ffgolangserversdk.yaml +++ b/.harness/ffgolangserversdk.yaml @@ -176,7 +176,7 @@ pipeline: dockerfile: ff-sdk-testgrid/go/Dockerfile context: ff-sdk-testgrid/go buildArgs: - SDK_VERSION: v0.1.24 + SDK_VERSION: v0.1.25 BUILD_MODE: local resources: limits: diff --git a/analyticsservice/analytics.go b/analyticsservice/analytics.go index 9caf6cc..eb6ff04 100644 --- a/analyticsservice/analytics.go +++ b/analyticsservice/analytics.go @@ -26,7 +26,7 @@ const ( variationValueAttribute string = "featureValue" targetAttribute string = "target" sdkVersionAttribute string = "SDK_VERSION" - SdkVersion string = "0.1.24" + SdkVersion string = "0.1.25" sdkTypeAttribute string = "SDK_TYPE" sdkType string = "server" sdkLanguageAttribute string = "SDK_LANGUAGE" @@ -46,6 +46,12 @@ type SafeAnalyticsCache[K comparable, V any] interface { iterate(func(K, V)) } +// SafeSeenTargetsCache extends SafeAnalyticsCache and adds behavior specific to seen targets +type SafeSeenTargetsCache[K comparable, V any] interface { + SafeAnalyticsCache[K, V] + isLimitExceeded() bool +} + type analyticsEvent struct { target *evaluation.Target featureConfig *rest.FeatureConfig @@ -55,20 +61,21 @@ type analyticsEvent struct { // AnalyticsService provides a way to cache and send analytics to the server type AnalyticsService struct { - analyticsChan chan analyticsEvent - evaluationAnalytics SafeAnalyticsCache[string, analyticsEvent] - targetAnalytics SafeAnalyticsCache[string, evaluation.Target] - seenTargets SafeAnalyticsCache[string, bool] - logEvaluationLimitReached atomic.Bool - logTargetLimitReached atomic.Bool - timeout time.Duration - logger logger.Logger - metricsClient metricsclient.ClientWithResponsesInterface - environmentID string + analyticsChan chan analyticsEvent + evaluationAnalytics SafeAnalyticsCache[string, analyticsEvent] + targetAnalytics SafeAnalyticsCache[string, evaluation.Target] + seenTargets SafeSeenTargetsCache[string, bool] + logEvaluationLimitReached atomic.Bool + logTargetLimitReached atomic.Bool + timeout time.Duration + logger logger.Logger + metricsClient metricsclient.ClientWithResponsesInterface + environmentID string + seenTargetsClearingInterval time.Duration } // NewAnalyticsService creates and starts a analytics service to send data to the client -func NewAnalyticsService(timeout time.Duration, logger logger.Logger) *AnalyticsService { +func NewAnalyticsService(timeout time.Duration, logger logger.Logger, seenTargetsMaxSize int, seenTargetsClearingSchedule time.Duration) *AnalyticsService { serviceTimeout := timeout if timeout < 60*time.Second { serviceTimeout = 60 * time.Second @@ -76,12 +83,13 @@ func NewAnalyticsService(timeout time.Duration, logger logger.Logger) *Analytics serviceTimeout = 1 * time.Hour } as := AnalyticsService{ - analyticsChan: make(chan analyticsEvent), - evaluationAnalytics: newSafeEvaluationAnalytics(), - targetAnalytics: newSafeTargetAnalytics(), - seenTargets: newSafeSeenTargets(), - timeout: serviceTimeout, - logger: logger, + analyticsChan: make(chan analyticsEvent), + evaluationAnalytics: newSafeEvaluationAnalytics(), + targetAnalytics: newSafeTargetAnalytics(), + seenTargets: newSafeSeenTargets(seenTargetsMaxSize), + timeout: serviceTimeout, + logger: logger, + seenTargetsClearingInterval: seenTargetsClearingSchedule, } go as.listener() @@ -94,6 +102,7 @@ func (as *AnalyticsService) Start(ctx context.Context, client metricsclient.Clie as.metricsClient = client as.environmentID = environmentID go as.startTimer(ctx) + go as.startSeenTargetsClearingSchedule(ctx, as.seenTargetsClearingInterval) } func (as *AnalyticsService) startTimer(ctx context.Context) { @@ -103,6 +112,7 @@ func (as *AnalyticsService) startTimer(ctx context.Context) { timeStamp := time.Now().UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)) as.sendDataAndResetCache(ctx, timeStamp) case <-ctx.Done(): + close(as.analyticsChan) as.logger.Infof("%s Metrics stopped", sdk_codes.MetricsStopped) return } @@ -149,9 +159,12 @@ func (as *AnalyticsService) listener() { } // Check if target has been seen - _, seen := as.seenTargets.get(ad.target.Identifier) + if _, seen := as.seenTargets.get(ad.target.Identifier); seen { + continue + } - if seen { + // Check if seen targets limit has been hit + if as.seenTargets.isLimitExceeded() { continue } @@ -314,6 +327,22 @@ func (as *AnalyticsService) processTargetMetrics(targetAnalytics SafeAnalyticsCa return targetData } +func (as *AnalyticsService) startSeenTargetsClearingSchedule(ctx context.Context, clearingInterval time.Duration) { + ticker := time.NewTicker(clearingInterval) + + for { + select { + case <-ticker.C: + as.logger.Debugf("Clearing seen targets") + as.seenTargets.clear() + + case <-ctx.Done(): + ticker.Stop() + return + } + } +} + func getEvaluationAnalyticKey(event analyticsEvent) string { return fmt.Sprintf("%s-%s-%s-%s", event.featureConfig.Feature, event.variation.Identifier, event.variation.Value, globalTarget) } diff --git a/analyticsservice/analytics_test.go b/analyticsservice/analytics_test.go index 07dd7e2..bccda2e 100644 --- a/analyticsservice/analytics_test.go +++ b/analyticsservice/analytics_test.go @@ -120,7 +120,7 @@ func TestListenerHandlesEventsCorrectly(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - service := NewAnalyticsService(1*time.Minute, noOpLogger) + service := NewAnalyticsService(1*time.Minute, noOpLogger, 10, time.Hour) defer close(service.analyticsChan) // Start the listener in a goroutine diff --git a/analyticsservice/safe_maps_test.go b/analyticsservice/safe_maps_test.go index fb25e1d..6c7213c 100644 --- a/analyticsservice/safe_maps_test.go +++ b/analyticsservice/safe_maps_test.go @@ -1,6 +1,7 @@ package analyticsservice import ( + "fmt" "reflect" "sync" "testing" @@ -81,13 +82,95 @@ func TestSafeTargetAnalytics(t *testing.T) { } func TestSafeSeenTargets(t *testing.T) { - s := newSafeSeenTargets() + // Initialize with a small maxSize for testing + maxSize := 3 + s := newSafeSeenTargets(maxSize).(SafeSeenTargetsCache[string, bool]) + testData := map[string]bool{ "target1": true, "target21": true, "target3": true, - "target4": true, } - testMapOperations[string, bool](t, s, testData) + // Insert items and ensure limit is not exceeded + for key, value := range testData { + s.set(key, value) + } + + if s.isLimitExceeded() { + t.Errorf("Limit should not have been exceeded yet") + } + + // Add one more item to exceed the limit + s.set("target4", true) + + // Ensure limitExceeded is true after exceeding the limit + if !s.isLimitExceeded() { + t.Errorf("Limit should be exceeded after adding target4") + } + + // Ensure that new items are not added once the limit is exceeded + s.set("target5", true) + if _, exists := s.get("target5"); exists { + t.Errorf("target5 should not have been added as the limit was exceeded") + } + + // Clear the map and ensure limit is reset + s.clear() + + if s.isLimitExceeded() { + t.Errorf("Limit should have been reset after clearing the map") + } + + // Add items again after clearing + s.set("target6", true) + if _, exists := s.get("target6"); !exists { + t.Errorf("target6 should have been added after clearing the map") + } + + // Concurrency test + t.Run("ConcurrencyTest", func(t *testing.T) { + var wg sync.WaitGroup + concurrencyLevel := 100 + + // Re-initialize the map for concurrency testing + s = newSafeSeenTargets(100).(SafeSeenTargetsCache[string, bool]) + + // Concurrently set keys + for i := 0; i < concurrencyLevel; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + key := "target" + fmt.Sprint(i) + s.set(key, true) + }(i) + } + + // Concurrently get keys + for i := 0; i < concurrencyLevel; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + key := "target" + fmt.Sprint(i) + s.get(key) + }(i) + } + + // Concurrently clear the map + for i := 0; i < concurrencyLevel/2; i++ { + wg.Add(1) + go func() { + defer wg.Done() + s.clear() + }() + } + + wg.Wait() + + // Ensure the map is cleared after the concurrency operations + if s.size() > 0 { + t.Errorf("Map size should be 0 after clearing, got %d", s.size()) + } + }) + } diff --git a/analyticsservice/safe_seen_targets_map.go b/analyticsservice/safe_seen_targets_map.go index 94e4a8d..6b580f8 100644 --- a/analyticsservice/safe_seen_targets_map.go +++ b/analyticsservice/safe_seen_targets_map.go @@ -2,22 +2,32 @@ package analyticsservice import ( "sync" + "sync/atomic" ) type safeSeenTargets struct { sync.RWMutex - data map[string]bool + data map[string]bool + maxSize int + limitExceeded atomic.Bool } -func newSafeSeenTargets() SafeAnalyticsCache[string, bool] { +func newSafeSeenTargets(maxSize int) SafeSeenTargetsCache[string, bool] { return &safeSeenTargets{ - data: make(map[string]bool), + data: make(map[string]bool), + maxSize: maxSize, } } func (s *safeSeenTargets) set(key string, seen bool) { s.Lock() defer s.Unlock() + + if len(s.data) >= s.maxSize { + s.limitExceeded.Store(true) + return + } + s.data[key] = seen } @@ -44,6 +54,7 @@ func (s *safeSeenTargets) clear() { s.Lock() defer s.Unlock() s.data = make(map[string]bool) + s.limitExceeded.Store(false) } func (s *safeSeenTargets) iterate(f func(string, bool)) { @@ -53,3 +64,7 @@ func (s *safeSeenTargets) iterate(f func(string, bool)) { f(key, value) } } + +func (s *safeSeenTargets) isLimitExceeded() bool { + return s.limitExceeded.Load() +} diff --git a/client/client.go b/client/client.go index 60679e0..efc7f9d 100644 --- a/client/client.go +++ b/client/client.go @@ -79,7 +79,7 @@ func NewCfClient(sdkKey string, options ...ConfigOption) (*CfClient, error) { opt(config) } - analyticsService := analyticsservice.NewAnalyticsService(time.Minute, config.Logger) + analyticsService := analyticsservice.NewAnalyticsService(time.Minute, config.Logger, config.seenTargetsMaxSize, config.seenTargetsClearInterval) client := &CfClient{ sdkKey: sdkKey, diff --git a/client/config.go b/client/config.go index c3fcb27..dc22189 100644 --- a/client/config.go +++ b/client/config.go @@ -18,26 +18,28 @@ import ( ) type config struct { - url string - eventsURL string - pullInterval uint // in seconds - Cache cache.Cache - Store storage.Storage - Logger logger.Logger - httpClient *http.Client - authHttpClient *http.Client - enableStream bool - enableStore bool - target evaluation.Target - eventStreamListener stream.EventStreamListener - enableAnalytics bool - proxyMode bool - waitForInitialized bool - maxAuthRetries int - authRetryStrategy *backoff.ExponentialBackOff - streamingRetryStrategy *backoff.ExponentialBackOff - sleeper types.Sleeper - apiConfig *apiConfiguration + url string + eventsURL string + pullInterval uint // in seconds + Cache cache.Cache + Store storage.Storage + Logger logger.Logger + httpClient *http.Client + authHttpClient *http.Client + enableStream bool + enableStore bool + target evaluation.Target + eventStreamListener stream.EventStreamListener + enableAnalytics bool + proxyMode bool + waitForInitialized bool + maxAuthRetries int + authRetryStrategy *backoff.ExponentialBackOff + streamingRetryStrategy *backoff.ExponentialBackOff + sleeper types.Sleeper + apiConfig *apiConfiguration + seenTargetsMaxSize int + seenTargetsClearInterval time.Duration } type apiConfiguration struct { @@ -87,24 +89,25 @@ func newDefaultConfig(log logger.Logger) *config { } return &config{ - url: "https://config.ff.harness.io/api/1.0", - eventsURL: "https://events.ff.harness.io/api/1.0", - pullInterval: 60, - Cache: defaultCache, - Store: defaultStore, - Logger: log, - authHttpClient: authHttpClient, - httpClient: requestHttpClient.StandardClient(), - enableStream: true, - enableStore: true, - enableAnalytics: true, - proxyMode: false, - // Indicate that we should retry forever by default - maxAuthRetries: -1, - authRetryStrategy: getDefaultExpBackoff(), - streamingRetryStrategy: getDefaultExpBackoff(), - sleeper: &types.RealClock{}, - apiConfig: apiConfig, + url: "https://config.ff.harness.io/api/1.0", + eventsURL: "https://events.ff.harness.io/api/1.0", + pullInterval: 60, + Cache: defaultCache, + Store: defaultStore, + Logger: log, + authHttpClient: authHttpClient, + httpClient: requestHttpClient.StandardClient(), + enableStream: true, + enableStore: true, + enableAnalytics: true, + proxyMode: false, + maxAuthRetries: -1, // Indicate that we should retry forever by default + authRetryStrategy: getDefaultExpBackoff(), + streamingRetryStrategy: getDefaultExpBackoff(), + sleeper: &types.RealClock{}, + apiConfig: apiConfig, + seenTargetsMaxSize: 500000, + seenTargetsClearInterval: 24 * time.Hour, } } diff --git a/client/options.go b/client/options.go index dc73b23..1a5c080 100644 --- a/client/options.go +++ b/client/options.go @@ -1,6 +1,9 @@ package client import ( + "net/http" + "time" + "github.com/cenkalti/backoff/v4" "github.com/harness/ff-golang-server-sdk/cache" "github.com/harness/ff-golang-server-sdk/evaluation" @@ -8,7 +11,6 @@ import ( "github.com/harness/ff-golang-server-sdk/storage" "github.com/harness/ff-golang-server-sdk/stream" "github.com/harness/ff-golang-server-sdk/types" - "net/http" ) // ConfigOption is used as return value for advanced client configuration @@ -142,3 +144,22 @@ func WithSleeper(sleeper types.Sleeper) ConfigOption { config.sleeper = sleeper } } + +// WithSeenTargetsMaxSize sets the maximum size for the seen targets map. +// The SeenTargetsCache helps to reduce the size of the analytics payload that the SDK sends to the Feature Flags Service. +// This method allows you to set the maximum number of unique targets that will be stored in the SeenTargets cache. +// By default, the limit is set to 500,000 unique targets. You can increase this number if you need to handle more than +// 500,000 targets, which will reduce the payload size but will also increase memory usage. +func WithSeenTargetsMaxSize(maxSize int) ConfigOption { + return func(config *config) { + config.seenTargetsMaxSize = maxSize + } +} + +// WithSeenTargetsClearInterval sets the clearing interval for the seen targets map. By default, the interval +// is set to 24 hours. +func WithSeenTargetsClearInterval(interval time.Duration) ConfigOption { + return func(config *config) { + config.seenTargetsClearInterval = interval + } +} diff --git a/evaluation/util.go b/evaluation/util.go index dddb17b..2550139 100644 --- a/evaluation/util.go +++ b/evaluation/util.go @@ -80,7 +80,7 @@ func isEnabled(target *Target, bucketBy string, percentage int) bool { if value == "" { return false } - log.Warnf("%s BucketBy attribute not found in target attributes, falling back to 'identifier': missing=%s, using value=%s", sdk_codes.MissingBucketBy, bucketBy, value) + log.Debugf("%s BucketBy attribute not found in target attributes, falling back to 'identifier': missing=%s, using value=%s", sdk_codes.MissingBucketBy, bucketBy, value) bucketBy = "identifier" }