Skip to content

Commit 1c8533c

Browse files
committed
WIP to remove bucketing, need to get test coverage back
1 parent 0163245 commit 1c8533c

File tree

5 files changed

+192
-181
lines changed

5 files changed

+192
-181
lines changed

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 15 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1944,15 +1944,7 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19441944
}()
19451945

19461946
if cf.spec.ProgressConfig.PerTableProtectedTimestamps {
1947-
newPTS, updatedPerTablePTS, err := cf.managePerTableProtectedTimestamps(ctx, txn, &ptsEntries, highwater)
1948-
if err != nil {
1949-
return false, err
1950-
}
1951-
updatedMainPTS, err := cf.advanceProtectedTimestamp(ctx, progress, pts, newPTS)
1952-
if err != nil {
1953-
return false, err
1954-
}
1955-
return updatedMainPTS || updatedPerTablePTS, nil
1947+
return cf.managePerTableProtectedTimestamps(ctx, txn, &ptsEntries, highwater)
19561948
}
19571949

19581950
return cf.advanceProtectedTimestamp(ctx, progress, pts, highwater)
@@ -1963,28 +1955,8 @@ func (cf *changeFrontier) managePerTableProtectedTimestamps(
19631955
txn isql.Txn,
19641956
ptsEntries *cdcprogresspb.ProtectedTimestampRecords,
19651957
highwater hlc.Timestamp,
1966-
) (newPTS hlc.Timestamp, updatedPerTablePTS bool, err error) {
1967-
var leastLaggingTimestamp hlc.Timestamp
1968-
for _, frontier := range cf.frontier.Frontiers() {
1969-
if frontier.Frontier().After(leastLaggingTimestamp) {
1970-
leastLaggingTimestamp = frontier.Frontier()
1971-
}
1972-
}
1973-
1974-
newPTS = func() hlc.Timestamp {
1975-
lagDuration := changefeedbase.ProtectTimestampBucketingInterval.Get(&cf.FlowCtx.Cfg.Settings.SV)
1976-
ptsLagCutoff := leastLaggingTimestamp.AddDuration(-lagDuration)
1977-
// If we are within the bucketing interval of having started the changefeed,
1978-
// we use the highwater as the PTS timestamp so as not to try to protect
1979-
// tables before the changefeed started.
1980-
if ptsLagCutoff.Less(highwater) {
1981-
return highwater
1982-
}
1983-
return ptsLagCutoff
1984-
}()
1985-
1958+
) (updatedPerTablePTS bool, err error) {
19861959
pts := cf.FlowCtx.Cfg.ProtectedTimestampProvider.WithTxn(txn)
1987-
tableIDsToRelease := make([]descpb.ID, 0)
19881960
tableIDsToCreate := make(map[descpb.ID]hlc.Timestamp)
19891961
for tableID, frontier := range cf.frontier.Frontiers() {
19901962
tableHighWater := func() hlc.Timestamp {
@@ -1996,22 +1968,9 @@ func (cf *changeFrontier) managePerTableProtectedTimestamps(
19961968
return frontier.Frontier()
19971969
}()
19981970

1999-
isLagging := tableHighWater.Less(newPTS)
2000-
2001-
if cf.knobs.IsTableLagging != nil && cf.knobs.IsTableLagging(tableID) {
2002-
isLagging = true
2003-
}
2004-
2005-
if !isLagging {
2006-
if ptsEntries.ProtectedTimestampRecords[tableID] != nil {
2007-
tableIDsToRelease = append(tableIDsToRelease, tableID)
2008-
}
2009-
continue
2010-
}
2011-
20121971
if ptsEntries.ProtectedTimestampRecords[tableID] != nil {
20131972
if updated, err := cf.advancePerTableProtectedTimestampRecord(ctx, ptsEntries, tableID, tableHighWater, pts); err != nil {
2014-
return hlc.Timestamp{}, false, err
1973+
return false, err
20151974
} else if updated {
20161975
updatedPerTablePTS = true
20171976
}
@@ -2021,41 +1980,25 @@ func (cf *changeFrontier) managePerTableProtectedTimestamps(
20211980
}
20221981
}
20231982

2024-
if len(tableIDsToRelease) > 0 {
2025-
if err := cf.releasePerTableProtectedTimestampRecords(ctx, ptsEntries, tableIDsToRelease, pts); err != nil {
2026-
return hlc.Timestamp{}, false, err
2027-
}
2028-
}
2029-
20301983
if len(tableIDsToCreate) > 0 {
2031-
if err := cf.createPerTableProtectedTimestampRecords(ctx, ptsEntries, tableIDsToCreate, pts); err != nil {
2032-
return hlc.Timestamp{}, false, err
1984+
if err := cf.createPerTableProtectedTimestampRecords(
1985+
ctx, ptsEntries, tableIDsToCreate, pts,
1986+
); err != nil {
1987+
return false, err
20331988
}
2034-
}
2035-
2036-
if len(tableIDsToRelease) > 0 || len(tableIDsToCreate) > 0 {
2037-
if err := writeChangefeedJobInfo(ctx, perTableProtectedTimestampsFilename, ptsEntries, txn, cf.spec.JobID); err != nil {
2038-
return hlc.Timestamp{}, false, err
1989+
if err := writeChangefeedJobInfo(
1990+
ctx, perTableProtectedTimestampsFilename, ptsEntries, txn, cf.spec.JobID,
1991+
); err != nil {
1992+
return false, err
20391993
}
20401994
updatedPerTablePTS = true
20411995
}
20421996

2043-
return newPTS, updatedPerTablePTS, nil
2044-
}
2045-
2046-
func (cf *changeFrontier) releasePerTableProtectedTimestampRecords(
2047-
ctx context.Context,
2048-
ptsEntries *cdcprogresspb.ProtectedTimestampRecords,
2049-
tableIDs []descpb.ID,
2050-
pts protectedts.Storage,
2051-
) error {
2052-
for _, tableID := range tableIDs {
2053-
if err := pts.Release(ctx, *ptsEntries.ProtectedTimestampRecords[tableID]); err != nil {
2054-
return err
2055-
}
2056-
delete(ptsEntries.ProtectedTimestampRecords, tableID)
1997+
if cf.knobs.ManagePTSError != nil {
1998+
return false, cf.knobs.ManagePTSError()
20571999
}
2058-
return nil
2000+
2001+
return updatedPerTablePTS, nil
20592002
}
20602003

20612004
func (cf *changeFrontier) advancePerTableProtectedTimestampRecord(

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 68 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/cockroachdb/cockroach/pkg/backup/backupresolver"
1616
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
17+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcprogresspb"
1718
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
1819
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedvalidators"
1920
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/checkpoint"
@@ -320,15 +321,44 @@ func changefeedPlanHook(
320321
recordPTSMetricsTime := sliMetrics.Timers.PTSCreate.Start()
321322

322323
var ptr *ptpb.Record
324+
var ptrs []*ptpb.Record // *cdcprogresspb.ProtectedTimestampRecords
325+
var jobInfoPTRs *cdcprogresspb.ProtectedTimestampRecords
323326
codec := p.ExecCfg().Codec
324-
ptr = createProtectedTimestampRecord(
325-
ctx,
326-
codec,
327-
jobID,
328-
targets,
329-
details.StatementTime,
330-
)
331-
progress.GetChangefeed().ProtectedTimestampRecord = ptr.ID.GetUUID()
327+
328+
// We do not yet have the progress config here, so we need to check the settings directly.
329+
perTableTrackingEnabled := changefeedbase.TrackPerTableProgress.Get(&p.ExecCfg().Settings.SV)
330+
perTableProtectedTimestampsEnabled := changefeedbase.PerTableProtectedTimestamps.Get(&p.ExecCfg().Settings.SV)
331+
writingMultiplePTS := perTableTrackingEnabled && perTableProtectedTimestampsEnabled
332+
if writingMultiplePTS {
333+
protectedTimestampRecords := make(map[descpb.ID]*uuid.UUID)
334+
targets.EachTarget(func(target changefeedbase.Target) error {
335+
ptsTargets := changefeedbase.Targets{}
336+
ptsTargets.Add(target)
337+
ptsRecord := createProtectedTimestampRecord(
338+
ctx,
339+
codec,
340+
jobID,
341+
ptsTargets,
342+
details.StatementTime,
343+
)
344+
ptrs = append(ptrs, ptsRecord)
345+
uuid := ptsRecord.ID.GetUUID()
346+
protectedTimestampRecords[target.DescID] = &uuid
347+
return nil
348+
})
349+
jobInfoPTRs = &cdcprogresspb.ProtectedTimestampRecords{
350+
ProtectedTimestampRecords: protectedTimestampRecords,
351+
}
352+
} else {
353+
ptr = createProtectedTimestampRecord(
354+
ctx,
355+
codec,
356+
jobID,
357+
targets,
358+
details.StatementTime,
359+
)
360+
progress.GetChangefeed().ProtectedTimestampRecord = ptr.ID.GetUUID()
361+
}
332362

333363
jr.Progress = *progress.GetChangefeed()
334364

@@ -348,6 +378,19 @@ func changefeedPlanHook(
348378
return err
349379
}
350380
}
381+
if len(ptrs) > 0 {
382+
for _, ptr := range ptrs {
383+
pts := p.ExecCfg().ProtectedTimestampProvider.WithTxn(p.InternalSQLTxn())
384+
if err := pts.Protect(ctx, ptr); err != nil {
385+
return err
386+
}
387+
}
388+
if err := writeChangefeedJobInfo(
389+
ctx, perTableProtectedTimestampsFilename, jobInfoPTRs, p.InternalSQLTxn(), jobID,
390+
); err != nil {
391+
return err
392+
}
393+
}
351394

352395
select {
353396
case <-ctx.Done():
@@ -364,7 +407,23 @@ func changefeedPlanHook(
364407
return err
365408
}
366409
if ptr != nil {
367-
return p.ExecCfg().ProtectedTimestampProvider.WithTxn(txn).Protect(ctx, ptr)
410+
err := p.ExecCfg().ProtectedTimestampProvider.WithTxn(txn).Protect(ctx, ptr)
411+
if err != nil {
412+
return err
413+
}
414+
}
415+
if len(ptrs) > 0 {
416+
pts := p.ExecCfg().ProtectedTimestampProvider.WithTxn(txn)
417+
for _, ptr := range ptrs {
418+
if err := pts.Protect(ctx, ptr); err != nil {
419+
return err
420+
}
421+
}
422+
if err := writeChangefeedJobInfo(
423+
ctx, perTableProtectedTimestampsFilename, jobInfoPTRs, txn, jobID,
424+
); err != nil {
425+
return err
426+
}
368427
}
369428
return nil
370429
}); err != nil {

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/cockroachdb/cockroach/pkg/build"
3737
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
3838
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
39+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcprogresspb"
3940
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
4041
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
4142
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/checkpoint"
@@ -65,6 +66,7 @@ import (
6566
"github.com/cockroachdb/cockroach/pkg/sql"
6667
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
6768
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
69+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
6870
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
6971
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
7072
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
@@ -12084,15 +12086,47 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
1208412086
return errors.New("waiting for high watermark to advance")
1208512087
}
1208612088
testutils.SucceedsSoon(t, checkHWM)
12089+
var tableID int
12090+
sqlDB.QueryRow(t, `SELECT table_id FROM crdb_internal.tables WHERE name = 'foo' AND database_name = 'd'`).Scan(&tableID)
12091+
execCfg := s.Server.ExecutorConfig().(sql.ExecutorConfig)
12092+
12093+
getTimestampFromPTSRecord := func(ptsRecordID uuid.UUID) hlc.Timestamp {
12094+
ptsQry := fmt.Sprintf(`SELECT ts FROM system.protected_ts_records WHERE id = '%s'`, ptsRecordID)
12095+
var tsStr string
12096+
sqlDB.QueryRow(t, ptsQry).Scan(&tsStr)
12097+
ts, err := hlc.ParseHLC(tsStr)
12098+
require.NoError(t, err)
12099+
return ts
12100+
}
12101+
getPerTablePTS := func() hlc.Timestamp {
12102+
var ts hlc.Timestamp
12103+
err := execCfg.InternalDB.Txn(context.Background(), func(ctx context.Context, txn isql.Txn) error {
12104+
var ptsEntries cdcprogresspb.ProtectedTimestampRecords
12105+
if err := readChangefeedJobInfo(
12106+
ctx, perTableProtectedTimestampsFilename, &ptsEntries, txn, eFeed.JobID(),
12107+
); err != nil {
12108+
return err
12109+
}
12110+
ptsRecordID := ptsEntries.ProtectedTimestampRecords[descpb.ID(tableID)]
12111+
ts = getTimestampFromPTSRecord(*ptsRecordID)
12112+
return nil
12113+
})
12114+
require.NoError(t, err)
12115+
return ts
12116+
}
12117+
perTablePTSEnabled := changefeedbase.PerTableProtectedTimestamps.Get(&s.Server.ClusterSettings().SV)
12118+
perTableProgressEnabled := changefeedbase.TrackPerTableProgress.Get(&s.Server.ClusterSettings().SV)
12119+
getPTS := func() hlc.Timestamp {
12120+
if perTablePTSEnabled && perTableProgressEnabled {
12121+
return getPerTablePTS()
12122+
}
1208712123

12088-
// Get the PTS of this feed.
12089-
p, err := eFeed.Progress()
12090-
require.NoError(t, err)
12124+
p, err := eFeed.Progress()
12125+
require.NoError(t, err)
12126+
return getTimestampFromPTSRecord(p.ProtectedTimestampRecord)
12127+
}
1209112128

12092-
ptsQry := fmt.Sprintf(`SELECT ts FROM system.protected_ts_records WHERE id = '%s'`, p.ProtectedTimestampRecord)
12093-
var ts, ts2 string
12094-
sqlDB.QueryRow(t, ptsQry).Scan(&ts)
12095-
require.NoError(t, err)
12129+
ts := getPTS()
1209612130

1209712131
// Force the changefeed to restart.
1209812132
require.NoError(t, eFeed.Pause())
@@ -12102,8 +12136,7 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
1210212136
testutils.SucceedsSoon(t, checkHWM)
1210312137

1210412138
// Check that the PTS was not updated after the resume.
12105-
sqlDB.QueryRow(t, ptsQry).Scan(&ts2)
12106-
require.NoError(t, err)
12139+
ts2 := getPTS()
1210712140
require.Equal(t, ts, ts2)
1210812141

1210912142
// Lower the PTS lag and check that it has been updated.
@@ -12115,9 +12148,8 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
1211512148
testutils.SucceedsSoon(t, checkHWM)
1211612149
testutils.SucceedsSoon(t, checkHWM)
1211712150

12118-
sqlDB.QueryRow(t, ptsQry).Scan(&ts2)
12119-
require.NoError(t, err)
12120-
require.Less(t, ts, ts2)
12151+
ts2 = getPTS()
12152+
require.True(t, ts.Less(ts2))
1212112153

1212212154
managePTSCount, _ = metrics.AggMetrics.Timers.PTSManage.WindowedSnapshot().Total()
1212312155
managePTSErrorCount, _ = metrics.AggMetrics.Timers.PTSManageError.WindowedSnapshot().Total()

0 commit comments

Comments
 (0)