Skip to content

Commit 9d6562a

Browse files
committed
changefeedccl: remove shared PTS record for non-lagging tables
Previously, per-table PTS changefeeds used a single protected timestamp record for all non-lagging tables, while assigning individual records only to lagging ones. This commit removes that shared record; instead, all tables now receive their own per-table PTS record at startup, each updated using the table's highwater. Epic: CRDB-1421 Release note: None
1 parent 0163245 commit 9d6562a

File tree

5 files changed

+219
-182
lines changed

5 files changed

+219
-182
lines changed

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 15 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1944,15 +1944,7 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19441944
}()
19451945

19461946
if cf.spec.ProgressConfig.PerTableProtectedTimestamps {
1947-
newPTS, updatedPerTablePTS, err := cf.managePerTableProtectedTimestamps(ctx, txn, &ptsEntries, highwater)
1948-
if err != nil {
1949-
return false, err
1950-
}
1951-
updatedMainPTS, err := cf.advanceProtectedTimestamp(ctx, progress, pts, newPTS)
1952-
if err != nil {
1953-
return false, err
1954-
}
1955-
return updatedMainPTS || updatedPerTablePTS, nil
1947+
return cf.managePerTableProtectedTimestamps(ctx, txn, &ptsEntries, highwater)
19561948
}
19571949

19581950
return cf.advanceProtectedTimestamp(ctx, progress, pts, highwater)
@@ -1963,28 +1955,8 @@ func (cf *changeFrontier) managePerTableProtectedTimestamps(
19631955
txn isql.Txn,
19641956
ptsEntries *cdcprogresspb.ProtectedTimestampRecords,
19651957
highwater hlc.Timestamp,
1966-
) (newPTS hlc.Timestamp, updatedPerTablePTS bool, err error) {
1967-
var leastLaggingTimestamp hlc.Timestamp
1968-
for _, frontier := range cf.frontier.Frontiers() {
1969-
if frontier.Frontier().After(leastLaggingTimestamp) {
1970-
leastLaggingTimestamp = frontier.Frontier()
1971-
}
1972-
}
1973-
1974-
newPTS = func() hlc.Timestamp {
1975-
lagDuration := changefeedbase.ProtectTimestampBucketingInterval.Get(&cf.FlowCtx.Cfg.Settings.SV)
1976-
ptsLagCutoff := leastLaggingTimestamp.AddDuration(-lagDuration)
1977-
// If we are within the bucketing interval of having started the changefeed,
1978-
// we use the highwater as the PTS timestamp so as not to try to protect
1979-
// tables before the changefeed started.
1980-
if ptsLagCutoff.Less(highwater) {
1981-
return highwater
1982-
}
1983-
return ptsLagCutoff
1984-
}()
1985-
1958+
) (updatedPerTablePTS bool, err error) {
19861959
pts := cf.FlowCtx.Cfg.ProtectedTimestampProvider.WithTxn(txn)
1987-
tableIDsToRelease := make([]descpb.ID, 0)
19881960
tableIDsToCreate := make(map[descpb.ID]hlc.Timestamp)
19891961
for tableID, frontier := range cf.frontier.Frontiers() {
19901962
tableHighWater := func() hlc.Timestamp {
@@ -1996,22 +1968,9 @@ func (cf *changeFrontier) managePerTableProtectedTimestamps(
19961968
return frontier.Frontier()
19971969
}()
19981970

1999-
isLagging := tableHighWater.Less(newPTS)
2000-
2001-
if cf.knobs.IsTableLagging != nil && cf.knobs.IsTableLagging(tableID) {
2002-
isLagging = true
2003-
}
2004-
2005-
if !isLagging {
2006-
if ptsEntries.ProtectedTimestampRecords[tableID] != nil {
2007-
tableIDsToRelease = append(tableIDsToRelease, tableID)
2008-
}
2009-
continue
2010-
}
2011-
20121971
if ptsEntries.ProtectedTimestampRecords[tableID] != nil {
20131972
if updated, err := cf.advancePerTableProtectedTimestampRecord(ctx, ptsEntries, tableID, tableHighWater, pts); err != nil {
2014-
return hlc.Timestamp{}, false, err
1973+
return false, err
20151974
} else if updated {
20161975
updatedPerTablePTS = true
20171976
}
@@ -2021,41 +1980,25 @@ func (cf *changeFrontier) managePerTableProtectedTimestamps(
20211980
}
20221981
}
20231982

2024-
if len(tableIDsToRelease) > 0 {
2025-
if err := cf.releasePerTableProtectedTimestampRecords(ctx, ptsEntries, tableIDsToRelease, pts); err != nil {
2026-
return hlc.Timestamp{}, false, err
2027-
}
2028-
}
2029-
20301983
if len(tableIDsToCreate) > 0 {
2031-
if err := cf.createPerTableProtectedTimestampRecords(ctx, ptsEntries, tableIDsToCreate, pts); err != nil {
2032-
return hlc.Timestamp{}, false, err
1984+
if err := cf.createPerTableProtectedTimestampRecords(
1985+
ctx, ptsEntries, tableIDsToCreate, pts,
1986+
); err != nil {
1987+
return false, err
20331988
}
2034-
}
2035-
2036-
if len(tableIDsToRelease) > 0 || len(tableIDsToCreate) > 0 {
2037-
if err := writeChangefeedJobInfo(ctx, perTableProtectedTimestampsFilename, ptsEntries, txn, cf.spec.JobID); err != nil {
2038-
return hlc.Timestamp{}, false, err
1989+
if err := writeChangefeedJobInfo(
1990+
ctx, perTableProtectedTimestampsFilename, ptsEntries, txn, cf.spec.JobID,
1991+
); err != nil {
1992+
return false, err
20391993
}
20401994
updatedPerTablePTS = true
20411995
}
20421996

2043-
return newPTS, updatedPerTablePTS, nil
2044-
}
2045-
2046-
func (cf *changeFrontier) releasePerTableProtectedTimestampRecords(
2047-
ctx context.Context,
2048-
ptsEntries *cdcprogresspb.ProtectedTimestampRecords,
2049-
tableIDs []descpb.ID,
2050-
pts protectedts.Storage,
2051-
) error {
2052-
for _, tableID := range tableIDs {
2053-
if err := pts.Release(ctx, *ptsEntries.ProtectedTimestampRecords[tableID]); err != nil {
2054-
return err
2055-
}
2056-
delete(ptsEntries.ProtectedTimestampRecords, tableID)
1997+
if cf.knobs.ManagePTSError != nil {
1998+
return false, cf.knobs.ManagePTSError()
20571999
}
2058-
return nil
2000+
2001+
return updatedPerTablePTS, nil
20592002
}
20602003

20612004
func (cf *changeFrontier) advancePerTableProtectedTimestampRecord(

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 70 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/cockroachdb/cockroach/pkg/backup/backupresolver"
1616
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
17+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcprogresspb"
1718
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
1819
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedvalidators"
1920
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/checkpoint"
@@ -320,15 +321,46 @@ func changefeedPlanHook(
320321
recordPTSMetricsTime := sliMetrics.Timers.PTSCreate.Start()
321322

322323
var ptr *ptpb.Record
324+
var ptrs []*ptpb.Record // *cdcprogresspb.ProtectedTimestampRecords
325+
var jobInfoPTRs *cdcprogresspb.ProtectedTimestampRecords
323326
codec := p.ExecCfg().Codec
324-
ptr = createProtectedTimestampRecord(
325-
ctx,
326-
codec,
327-
jobID,
328-
targets,
329-
details.StatementTime,
330-
)
331-
progress.GetChangefeed().ProtectedTimestampRecord = ptr.ID.GetUUID()
327+
328+
// We do not yet have the progress config here, so we need to check the settings directly.
329+
perTableTrackingEnabled := changefeedbase.TrackPerTableProgress.Get(&p.ExecCfg().Settings.SV)
330+
perTableProtectedTimestampsEnabled := changefeedbase.PerTableProtectedTimestamps.Get(&p.ExecCfg().Settings.SV)
331+
writingMultiplePTS := perTableTrackingEnabled && perTableProtectedTimestampsEnabled
332+
if writingMultiplePTS {
333+
protectedTimestampRecords := make(map[descpb.ID]*uuid.UUID)
334+
if err := targets.EachTarget(func(target changefeedbase.Target) error {
335+
ptsTargets := changefeedbase.Targets{}
336+
ptsTargets.Add(target)
337+
ptsRecord := createProtectedTimestampRecord(
338+
ctx,
339+
codec,
340+
jobID,
341+
ptsTargets,
342+
details.StatementTime,
343+
)
344+
ptrs = append(ptrs, ptsRecord)
345+
uuid := ptsRecord.ID.GetUUID()
346+
protectedTimestampRecords[target.DescID] = &uuid
347+
return nil
348+
}); err != nil {
349+
return err
350+
}
351+
jobInfoPTRs = &cdcprogresspb.ProtectedTimestampRecords{
352+
ProtectedTimestampRecords: protectedTimestampRecords,
353+
}
354+
} else {
355+
ptr = createProtectedTimestampRecord(
356+
ctx,
357+
codec,
358+
jobID,
359+
targets,
360+
details.StatementTime,
361+
)
362+
progress.GetChangefeed().ProtectedTimestampRecord = ptr.ID.GetUUID()
363+
}
332364

333365
jr.Progress = *progress.GetChangefeed()
334366

@@ -348,6 +380,19 @@ func changefeedPlanHook(
348380
return err
349381
}
350382
}
383+
if len(ptrs) > 0 {
384+
for _, ptr := range ptrs {
385+
pts := p.ExecCfg().ProtectedTimestampProvider.WithTxn(p.InternalSQLTxn())
386+
if err := pts.Protect(ctx, ptr); err != nil {
387+
return err
388+
}
389+
}
390+
if err := writeChangefeedJobInfo(
391+
ctx, perTableProtectedTimestampsFilename, jobInfoPTRs, p.InternalSQLTxn(), jobID,
392+
); err != nil {
393+
return err
394+
}
395+
}
351396

352397
select {
353398
case <-ctx.Done():
@@ -364,7 +409,23 @@ func changefeedPlanHook(
364409
return err
365410
}
366411
if ptr != nil {
367-
return p.ExecCfg().ProtectedTimestampProvider.WithTxn(txn).Protect(ctx, ptr)
412+
err := p.ExecCfg().ProtectedTimestampProvider.WithTxn(txn).Protect(ctx, ptr)
413+
if err != nil {
414+
return err
415+
}
416+
}
417+
if len(ptrs) > 0 {
418+
pts := p.ExecCfg().ProtectedTimestampProvider.WithTxn(txn)
419+
for _, ptr := range ptrs {
420+
if err := pts.Protect(ctx, ptr); err != nil {
421+
return err
422+
}
423+
}
424+
if err := writeChangefeedJobInfo(
425+
ctx, perTableProtectedTimestampsFilename, jobInfoPTRs, txn, jobID,
426+
); err != nil {
427+
return err
428+
}
368429
}
369430
return nil
370431
}); err != nil {

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/cockroachdb/cockroach/pkg/build"
3737
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
3838
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
39+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcprogresspb"
3940
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
4041
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
4142
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/checkpoint"
@@ -65,6 +66,7 @@ import (
6566
"github.com/cockroachdb/cockroach/pkg/sql"
6667
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
6768
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
69+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
6870
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
6971
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
7072
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
@@ -12084,15 +12086,47 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
1208412086
return errors.New("waiting for high watermark to advance")
1208512087
}
1208612088
testutils.SucceedsSoon(t, checkHWM)
12089+
var tableID int
12090+
sqlDB.QueryRow(t, `SELECT table_id FROM crdb_internal.tables WHERE name = 'foo' AND database_name = 'd'`).Scan(&tableID)
12091+
execCfg := s.Server.ExecutorConfig().(sql.ExecutorConfig)
12092+
12093+
getTimestampFromPTSRecord := func(ptsRecordID uuid.UUID) hlc.Timestamp {
12094+
ptsQry := fmt.Sprintf(`SELECT ts FROM system.protected_ts_records WHERE id = '%s'`, ptsRecordID)
12095+
var tsStr string
12096+
sqlDB.QueryRow(t, ptsQry).Scan(&tsStr)
12097+
ts, err := hlc.ParseHLC(tsStr)
12098+
require.NoError(t, err)
12099+
return ts
12100+
}
12101+
getPerTablePTS := func() hlc.Timestamp {
12102+
var ts hlc.Timestamp
12103+
err := execCfg.InternalDB.Txn(context.Background(), func(ctx context.Context, txn isql.Txn) error {
12104+
var ptsEntries cdcprogresspb.ProtectedTimestampRecords
12105+
if err := readChangefeedJobInfo(
12106+
ctx, perTableProtectedTimestampsFilename, &ptsEntries, txn, eFeed.JobID(),
12107+
); err != nil {
12108+
return err
12109+
}
12110+
ptsRecordID := ptsEntries.ProtectedTimestampRecords[descpb.ID(tableID)]
12111+
ts = getTimestampFromPTSRecord(*ptsRecordID)
12112+
return nil
12113+
})
12114+
require.NoError(t, err)
12115+
return ts
12116+
}
12117+
perTablePTSEnabled := changefeedbase.PerTableProtectedTimestamps.Get(&s.Server.ClusterSettings().SV)
12118+
perTableProgressEnabled := changefeedbase.TrackPerTableProgress.Get(&s.Server.ClusterSettings().SV)
12119+
getPTS := func() hlc.Timestamp {
12120+
if perTablePTSEnabled && perTableProgressEnabled {
12121+
return getPerTablePTS()
12122+
}
1208712123

12088-
// Get the PTS of this feed.
12089-
p, err := eFeed.Progress()
12090-
require.NoError(t, err)
12124+
p, err := eFeed.Progress()
12125+
require.NoError(t, err)
12126+
return getTimestampFromPTSRecord(p.ProtectedTimestampRecord)
12127+
}
1209112128

12092-
ptsQry := fmt.Sprintf(`SELECT ts FROM system.protected_ts_records WHERE id = '%s'`, p.ProtectedTimestampRecord)
12093-
var ts, ts2 string
12094-
sqlDB.QueryRow(t, ptsQry).Scan(&ts)
12095-
require.NoError(t, err)
12129+
ts := getPTS()
1209612130

1209712131
// Force the changefeed to restart.
1209812132
require.NoError(t, eFeed.Pause())
@@ -12102,8 +12136,7 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
1210212136
testutils.SucceedsSoon(t, checkHWM)
1210312137

1210412138
// Check that the PTS was not updated after the resume.
12105-
sqlDB.QueryRow(t, ptsQry).Scan(&ts2)
12106-
require.NoError(t, err)
12139+
ts2 := getPTS()
1210712140
require.Equal(t, ts, ts2)
1210812141

1210912142
// Lower the PTS lag and check that it has been updated.
@@ -12115,9 +12148,8 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
1211512148
testutils.SucceedsSoon(t, checkHWM)
1211612149
testutils.SucceedsSoon(t, checkHWM)
1211712150

12118-
sqlDB.QueryRow(t, ptsQry).Scan(&ts2)
12119-
require.NoError(t, err)
12120-
require.Less(t, ts, ts2)
12151+
ts2 = getPTS()
12152+
require.True(t, ts.Less(ts2))
1212112153

1212212154
managePTSCount, _ = metrics.AggMetrics.Timers.PTSManage.WindowedSnapshot().Total()
1212312155
managePTSErrorCount, _ = metrics.AggMetrics.Timers.PTSManageError.WindowedSnapshot().Total()

0 commit comments

Comments
 (0)