Skip to content

Commit 7c56ef2

Browse files
authored
Merge pull request #19233 from AwesomePatrol/add-compaction-to-kubernetess-robustness-tests
Add compaction to kubernetes robustness tests
2 parents 0dcd015 + 845a330 commit 7c56ef2

File tree

3 files changed

+95
-35
lines changed

3 files changed

+95
-35
lines changed

tests/robustness/traffic/etcd.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"fmt"
2020
"math/rand"
21+
"time"
2122

2223
"golang.org/x/time/rate"
2324

@@ -108,7 +109,7 @@ func (t etcdTraffic) Name() string {
108109
return "Etcd"
109110
}
110111

111-
func (t etcdTraffic) Run(ctx context.Context, c *client.RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) {
112+
func (t etcdTraffic) RunTrafficLoop(ctx context.Context, c *client.RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) {
112113
lastOperationSucceeded := true
113114
var lastRev int64
114115
var requestType etcdRequestType
@@ -155,6 +156,35 @@ func (t etcdTraffic) Run(ctx context.Context, c *client.RecordingClient, limiter
155156
}
156157
}
157158

159+
func (t etcdTraffic) RunCompactLoop(ctx context.Context, c *client.RecordingClient, period time.Duration, finish <-chan struct{}) {
160+
var lastRev int64 = 2
161+
ticker := time.NewTicker(period)
162+
defer ticker.Stop()
163+
for {
164+
select {
165+
case <-ctx.Done():
166+
return
167+
case <-finish:
168+
return
169+
case <-ticker.C:
170+
}
171+
statusCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
172+
resp, err := c.Status(statusCtx, c.Endpoints()[0])
173+
cancel()
174+
if err != nil {
175+
continue
176+
}
177+
178+
// Range allows for both revision has been compacted and future revision errors
179+
compactRev := random.RandRange(lastRev, resp.Header.Revision+5)
180+
_, err = c.Compact(ctx, compactRev)
181+
if err != nil {
182+
continue
183+
}
184+
lastRev = compactRev
185+
}
186+
}
187+
158188
func filterOutNonUniqueEtcdWrites(choices []random.ChoiceWeight[etcdRequestType]) (resp []random.ChoiceWeight[etcdRequestType]) {
159189
for _, choice := range choices {
160190
if choice.Choice != Delete && choice.Choice != LeaseRevoke {

tests/robustness/traffic/kubernetes.go

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ import (
1919
"errors"
2020
"fmt"
2121
"math/rand"
22+
"strconv"
2223
"sync"
24+
"time"
2325

2426
"golang.org/x/sync/errgroup"
2527
"golang.org/x/time/rate"
@@ -56,7 +58,7 @@ func (t kubernetesTraffic) ExpectUniqueRevision() bool {
5658
return true
5759
}
5860

59-
func (t kubernetesTraffic) Run(ctx context.Context, c *client.RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) {
61+
func (t kubernetesTraffic) RunTrafficLoop(ctx context.Context, c *client.RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) {
6062
kc := kubernetes.Client{Client: &clientv3.Client{KV: c}}
6163
s := newStorage()
6264
keyPrefix := "/registry/" + t.resource + "/"
@@ -205,6 +207,62 @@ func (t kubernetesTraffic) generateKey() string {
205207
return fmt.Sprintf("/registry/%s/%s/%s", t.resource, t.namespace, stringutil.RandString(5))
206208
}
207209

210+
func (t kubernetesTraffic) RunCompactLoop(ctx context.Context, c *client.RecordingClient, interval time.Duration, finish <-chan struct{}) {
211+
// Based on https://github.com/kubernetes/apiserver/blob/7dd4904f1896e11244ba3c5a59797697709de6b6/pkg/storage/etcd3/compact.go#L112-L127
212+
var compactTime int64
213+
var rev int64
214+
var err error
215+
for {
216+
select {
217+
case <-time.After(interval):
218+
case <-ctx.Done():
219+
return
220+
case <-finish:
221+
return
222+
}
223+
224+
compactTime, rev, err = compact(ctx, c, compactTime, rev)
225+
if err != nil {
226+
continue
227+
}
228+
}
229+
}
230+
231+
// Based on https://github.com/kubernetes/apiserver/blob/7dd4904f1896e11244ba3c5a59797697709de6b6/pkg/storage/etcd3/compact.go#L30
232+
const (
233+
compactRevKey = "compact_rev_key"
234+
)
235+
236+
func compact(ctx context.Context, client *client.RecordingClient, t, rev int64) (int64, int64, error) {
237+
// Based on https://github.com/kubernetes/apiserver/blob/7dd4904f1896e11244ba3c5a59797697709de6b6/pkg/storage/etcd3/compact.go#L133-L162
238+
// TODO: Use Version and not ModRevision when model supports key versioning.
239+
resp, err := client.Txn(ctx).
240+
If(clientv3.Compare(clientv3.ModRevision(compactRevKey), "=", t)).
241+
Then(clientv3.OpPut(compactRevKey, strconv.FormatInt(rev, 10))).
242+
Else(clientv3.OpGet(compactRevKey)).
243+
Commit()
244+
if err != nil {
245+
return t, rev, err
246+
}
247+
248+
curRev := resp.Header.Revision
249+
250+
if !resp.Succeeded {
251+
// TODO: Use Version and not ModRevision when model supports key versioning.
252+
curTime := resp.Responses[0].GetResponseRange().Kvs[0].ModRevision
253+
return curTime, curRev, nil
254+
}
255+
curTime := t + 1
256+
257+
if rev == 0 {
258+
return curTime, curRev, nil
259+
}
260+
if _, err = client.Compact(ctx, rev); err != nil {
261+
return curTime, curRev, err
262+
}
263+
return curTime, curRev, nil
264+
}
265+
208266
type KubernetesRequestType string
209267

210268
const (

tests/robustness/traffic/traffic.go

Lines changed: 5 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
"go.etcd.io/etcd/tests/v3/robustness/client"
2929
"go.etcd.io/etcd/tests/v3/robustness/identity"
3030
"go.etcd.io/etcd/tests/v3/robustness/model"
31-
"go.etcd.io/etcd/tests/v3/robustness/random"
3231
"go.etcd.io/etcd/tests/v3/robustness/report"
3332
)
3433

@@ -81,7 +80,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
8180
defer wg.Done()
8281
defer c.Close()
8382

84-
traffic.Run(ctx, c, limiter, ids, lm, nonUniqueWriteLimiter, finish)
83+
traffic.RunTrafficLoop(ctx, c, limiter, ids, lm, nonUniqueWriteLimiter, finish)
8584
mux.Lock()
8685
reports = append(reports, c.Report())
8786
mux.Unlock()
@@ -101,7 +100,8 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
101100
if profile.CompactPeriod != time.Duration(0) {
102101
compactionPeriod = profile.CompactPeriod
103102
}
104-
RunCompactLoop(ctx, c, compactionPeriod, finish)
103+
104+
traffic.RunCompactLoop(ctx, c, compactionPeriod, finish)
105105
mux.Lock()
106106
reports = append(reports, c.Report())
107107
mux.Unlock()
@@ -195,35 +195,7 @@ func (p Profile) WithCompactionPeriod(cp time.Duration) Profile {
195195
}
196196

197197
type Traffic interface {
198-
Run(ctx context.Context, c *client.RecordingClient, qpsLimiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{})
198+
RunTrafficLoop(ctx context.Context, c *client.RecordingClient, qpsLimiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{})
199+
RunCompactLoop(ctx context.Context, c *client.RecordingClient, period time.Duration, finish <-chan struct{})
199200
ExpectUniqueRevision() bool
200201
}
201-
202-
func RunCompactLoop(ctx context.Context, c *client.RecordingClient, period time.Duration, finish <-chan struct{}) {
203-
var lastRev int64 = 2
204-
timer := time.NewTimer(period)
205-
for {
206-
timer.Reset(period)
207-
select {
208-
case <-ctx.Done():
209-
return
210-
case <-finish:
211-
return
212-
case <-timer.C:
213-
}
214-
statusCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
215-
resp, err := c.Status(statusCtx, c.Endpoints()[0])
216-
cancel()
217-
if err != nil {
218-
continue
219-
}
220-
221-
// Range allows for both revision has been compacted and future revision errors
222-
compactRev := random.RandRange(lastRev, resp.Header.Revision+5)
223-
_, err = c.Compact(ctx, compactRev)
224-
if err != nil {
225-
continue
226-
}
227-
lastRev = compactRev
228-
}
229-
}

0 commit comments

Comments
 (0)