Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions pkg/jobs/jobfrontier/frontier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -464,3 +465,108 @@ func TestGetAllResolvedSpans(t *testing.T) {
})
}
}

func TestRandomizedFrontier(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
srv := serverutils.StartServerOnly(t, base.TestServerArgs{})
defer srv.Stopper().Stop(ctx)

s := srv.ApplicationLayer()

// Seed for reproducible randomness
rng, _ := randutil.NewTestRand()

// Create 50 adjacent key spans
const numSpans = 50
spans := make([]roachpb.Span, numSpans)
for i := 0; i < numSpans; i++ {
startKey := []byte{byte(i)}
endKey := []byte{byte(i + 1)}
spans[i] = roachpb.Span{Key: startKey, EndKey: endKey}
}

frontier, err := span.MakeFrontier(spans...)
require.NoError(t, err)

// Initialize spans to random timestamps 0-3
for _, sp := range spans {
ts := hlc.Timestamp{WallTime: int64(rng.Intn(4))}
_, err := frontier.Forward(sp, ts)
require.NoError(t, err)
}

jobID := jobspb.JobID(3000)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if you want to keep the trend of each test in this file generating different job IDs, consider making this 4000

Copy link
Collaborator Author

@msbutler msbutler Sep 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that really adds any test coverage. each test uses its own test server.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep mostly an aesthetic concern ;)

spanSize := spans[0].Size()
smallChunkSize := spanSize * rng.Intn(10)
largeChunkSize := spanSize * numSpans * 2

persist := func() {
require.NoError(t, s.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
// Store sharded version (small chunk size forces sharding)
err := storeChunked(ctx, txn, jobID, "sharded", frontier, smallChunkSize)
require.NoError(t, err)

// Store unsharded version (large chunk size prevents sharding)
err = storeChunked(ctx, txn, jobID, "unsharded", frontier, largeChunkSize)
require.NoError(t, err)

return nil
}))
}

getAndVerify := func() {
require.NoError(t, s.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {

shardedFrontier, found, err := Get(ctx, txn, jobID, "sharded")
require.NoError(t, err)
require.True(t, found, "updated sharded frontier should be found")

checkSpanFrontierEquality(t, frontier, shardedFrontier)

unshardedFrontier, found, err := Get(ctx, txn, jobID, "unsharded")
require.NoError(t, err)
require.True(t, found, "updated unsharded frontier should be found")

checkSpanFrontierEquality(t, frontier, unshardedFrontier)

return nil
}))
}

persist()
getAndVerify()

// Randomly update some spans in the in-memory frontier
numUpdates := rng.Intn(20) + 10

for i := 0; i < numUpdates; i++ {
spanIdx := rng.Intn(numSpans)
newTs := hlc.Timestamp{WallTime: int64(rng.Intn(4))}

// If the new ts is less than the current time in the frontier, this will be
// a noop.
_, err := frontier.Forward(spans[spanIdx], newTs)
require.NoError(t, err)
}

persist()
getAndVerify()
}

func checkSpanFrontierEquality(t *testing.T, expected span.Frontier, actual span.Frontier) {
require.Equal(t, expected.Len(), actual.Len(), "frontiers should have same number of spans")
for eSp, eTs := range expected.Entries() {
found := false
for aSp, aTs := range actual.Entries() {
if eSp.Equal(aSp) {
require.Equal(t, eTs, aTs, "span with expected Ts %s and actual Ts %s", eTs, aTs)
found = true
break
}
}
require.True(t, found, "span %d should be found in loaded frontier", eSp)
}
}