@@ -3,10 +3,12 @@ package client
3
3
import (
4
4
"context"
5
5
"encoding/json"
6
+ "errors"
6
7
"fmt"
7
8
"net/http"
8
9
"strings"
9
10
"sync"
11
+ "sync/atomic"
10
12
"time"
11
13
12
14
"github.com/harness/ff-golang-server-sdk/evaluation"
@@ -46,7 +48,6 @@ type CfClient struct {
46
48
config * config
47
49
environmentID string
48
50
token string
49
- cancelFunc context.CancelFunc
50
51
streamConnected bool
51
52
streamConnectedLock sync.RWMutex
52
53
authenticated chan struct {}
@@ -55,17 +56,14 @@ type CfClient struct {
55
56
initializedLock sync.RWMutex
56
57
analyticsService * analyticsservice.AnalyticsService
57
58
clusterIdentifier string
59
+ stop chan struct {}
60
+ stopped * atomicBool
58
61
}
59
62
60
63
// NewCfClient creates a new client instance that connects to CF with the default configuration.
61
64
// For advanced configuration options use ConfigOptions functions
62
65
func NewCfClient (sdkKey string , options ... ConfigOption ) (* CfClient , error ) {
63
66
64
- var (
65
- ctx context.Context
66
- err error
67
- )
68
-
69
67
// functional options for config
70
68
config := newDefaultConfig ()
71
69
for _ , opt := range options {
@@ -81,8 +79,9 @@ func NewCfClient(sdkKey string, options ...ConfigOption) (*CfClient, error) {
81
79
analyticsService : analyticsService ,
82
80
clusterIdentifier : "1" ,
83
81
postEvalChan : make (chan evaluation.PostEvalData ),
82
+ stop : make (chan struct {}),
83
+ stopped : newAtomicBool (false ),
84
84
}
85
- ctx , client .cancelFunc = context .WithCancel (context .Background ())
86
85
87
86
if sdkKey == "" {
88
87
return client , types .ErrSdkCantBeEmpty
@@ -97,13 +96,20 @@ func NewCfClient(sdkKey string, options ...ConfigOption) (*CfClient, error) {
97
96
return nil , err
98
97
}
99
98
100
- go client .initAuthentication ( ctx )
101
-
102
- go client . setAnalyticsServiceClient ( ctx )
99
+ client .start ( )
100
+ return client , nil
101
+ }
103
102
104
- go client .pullCronJob (ctx )
103
+ func (c * CfClient ) start () {
104
+ ctx , cancel := context .WithCancel (context .Background ())
105
+ go func () {
106
+ <- c .stop
107
+ cancel ()
108
+ }()
105
109
106
- return client , nil
110
+ go c .initAuthentication (ctx )
111
+ go c .setAnalyticsServiceClient (ctx )
112
+ go c .pullCronJob (ctx )
107
113
}
108
114
109
115
// PostEvaluateProcessor push the data to the analytics service
@@ -432,7 +438,12 @@ func (c *CfClient) JSONVariation(key string, target *evaluation.Target, defaultV
432
438
// Close shuts down the Feature Flag client. After calling this, the client
433
439
// should no longer be used
434
440
func (c * CfClient ) Close () error {
435
- c .cancelFunc ()
441
+ if c .stopped .get () {
442
+ return errors .New ("client already closed" )
443
+ }
444
+ close (c .stop )
445
+
446
+ c .stopped .set (true )
436
447
return nil
437
448
}
438
449
@@ -448,3 +459,25 @@ func (c *CfClient) InterceptAddCluster(ctx context.Context, req *http.Request) e
448
459
req .URL .RawQuery = q .Encode ()
449
460
return nil
450
461
}
462
+
463
+ type atomicBool struct {
464
+ flag int32
465
+ }
466
+
467
+ func newAtomicBool (value bool ) * atomicBool {
468
+ b := new (atomicBool )
469
+ b .set (value )
470
+ return b
471
+ }
472
+
473
+ func (a * atomicBool ) set (value bool ) {
474
+ var i int32 = 0
475
+ if value {
476
+ i = 1
477
+ }
478
+ atomic .StoreInt32 (& (a .flag ), i )
479
+ }
480
+
481
+ func (a * atomicBool ) get () bool {
482
+ return atomic .LoadInt32 (& (a .flag )) != int32 (0 )
483
+ }
0 commit comments