Skip to content

Commit 7a5c9f8

Browse files
committed
crosscluster/physical: add reader tenant system table id offset setting
This patch adds the private physical_cluster_replication.reader_system_table_id_offset setting, which a pcr customer can set on the destination system tenant to some very large number, like 1,000,000, which will bootstrap the reader tenant with dynamically allocated system table ids to be offset+i. This setting can be set when the reader tenant fails to start up because a source table id collides with a system table id. Informs #152909 Release note: none
1 parent 3f7e0b1 commit 7a5c9f8

File tree

2 files changed

+135
-3
lines changed

2 files changed

+135
-3
lines changed

pkg/crosscluster/physical/standby_read_ts_poller_job_test.go

Lines changed: 121 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ func TestStandbyReadTSPollerJob(t *testing.T) {
3737
c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
3838
defer cleanup()
3939

40+
c.SrcTenantSQL.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY)`)
41+
c.SrcTenantSQL.Exec(t, `CREATE TABLE bar (i INT PRIMARY KEY)`)
42+
43+
offset, offsetChecksInReaderTenant := maybeOffsetReaderTenantSystemTables(t, c)
44+
4045
producerJobID, ingestionJobID := c.StartStreamReplication(ctx)
4146

4247
jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
@@ -68,6 +73,11 @@ INSERT INTO a VALUES (1);
6873
waitForPollerJobToStartDest(t, c, ingestionJobID)
6974
observeValueInReaderTenant(t, c.ReaderTenantSQL)
7075

76+
var idWithOffsetCount int
77+
c.ReaderTenantSQL.QueryRow(t, fmt.Sprintf("SELECT count(*) FROM system.namespace where id = %d", 50+offset)).Scan(&idWithOffsetCount)
78+
require.Equal(t, 1, idWithOffsetCount, "expected to find namespace entry for table a with offset applied")
79+
offsetChecksInReaderTenant(c.ReaderTenantSQL)
80+
7181
// Failback and setup stanby reader tenant on the og source.
7282
{
7383
c.Cutover(ctx, producerJobID, ingestionJobID, srcTime.GoTime(), false)
@@ -102,7 +112,113 @@ INSERT INTO a VALUES (1);
102112
var numTables int
103113
srcReaderSQL.QueryRow(t, `SELECT count(*) FROM [SHOW TABLES]`).Scan(&numTables)
104114
observeValueInReaderTenant(t, srcReaderSQL)
115+
offsetChecksInReaderTenant(srcReaderSQL)
116+
}
117+
}
118+
119+
func maybeOffsetReaderTenantSystemTables(
120+
t *testing.T, c *replicationtestutils.TenantStreamingClusters,
121+
) (int, func(sql *sqlutils.SQLRunner)) {
122+
if c.Rng.Intn(2) == 0 {
123+
return 0, func(sql *sqlutils.SQLRunner) {}
124+
}
125+
offset := 100000
126+
c.DestSysSQL.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING physical_cluster_replication.reader_system_table_id_offset = %d`, offset))
127+
// Set on source to ensure failback works well too.
128+
c.SrcSysSQL.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING physical_cluster_replication.reader_system_table_id_offset = %d`, offset))
129+
130+
// swap a system table ID and a user table ID to simulate a cluster that has interleaving user and system table ids.
131+
scaryTableIDRemapFunc := `
132+
CREATE OR REPLACE FUNCTION renumber_desc(oldID INT, newID INT) RETURNS BOOL AS
133+
$$
134+
BEGIN
135+
-- Rewrite the ID within the descriptor
136+
SELECT crdb_internal.unsafe_upsert_descriptor(
137+
newid,
138+
crdb_internal.json_to_pb(
139+
'cockroach.sql.sqlbase.Descriptor',
140+
d
141+
),
142+
true
143+
)
144+
FROM (
145+
SELECT id,
146+
json_set(
147+
json_set(
148+
crdb_internal.pb_to_json(
149+
'cockroach.sql.sqlbase.Descriptor',
150+
descriptor,
151+
false
152+
),
153+
ARRAY['table', 'id'],
154+
newid::STRING::JSONB
155+
),
156+
ARRAY['table', 'modificationTime'],
157+
json_build_object(
158+
'wallTime',
159+
(
160+
(
161+
extract('epoch', now())
162+
* 1000000
163+
)::INT8
164+
* 1000
165+
)::STRING
166+
)
167+
) AS d
168+
FROM system.descriptor
169+
WHERE id IN (oldid,)
170+
);
171+
-- Update the namespace entry and delete the old descriptor.
172+
SELECT crdb_internal.unsafe_upsert_namespace_entry("parentID", "parentSchemaID", name, newID, true) FROM (SELECT "parentID", "parentSchemaID", name, id FROM system.namespace where id =oldID) UNION ALL
173+
SELECT crdb_internal.unsafe_delete_descriptor(oldID, true);
174+
175+
RETURN true;
176+
177+
END
178+
$$ LANGUAGE PLpgSQL;`
179+
180+
c.SrcTenantSQL.Exec(t, scaryTableIDRemapFunc)
181+
var txnInsightsID, privilegesID int
182+
c.SrcTenantSQL.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'transaction_execution_insights'`).Scan(&txnInsightsID)
183+
c.SrcTenantSQL.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'privileges'`).Scan(&privilegesID)
184+
require.NotEqual(t, 0, txnInsightsID)
185+
require.NotEqual(t, 0, privilegesID)
186+
187+
// renumber these two priv tables to be out of the way
188+
txnInsightIDRemapedID := txnInsightsID + 1000
189+
privilegesIDRemapedID := privilegesID + 1000
190+
c.SrcTenantSQL.Exec(t, `SELECT renumber_desc($1, $2)`, txnInsightsID, txnInsightIDRemapedID)
191+
c.SrcTenantSQL.Exec(t, `SELECT renumber_desc($1, $2)`, privilegesID, privilegesIDRemapedID)
192+
193+
// create two user tables on the source and interleave them with system table ids
194+
var fooID, barID int
195+
c.SrcTenantSQL.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'foo'`).Scan(&fooID)
196+
c.SrcTenantSQL.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'bar'`).Scan(&barID)
197+
require.NotEqual(t, 0, fooID)
198+
require.NotEqual(t, 0, barID)
199+
200+
c.SrcTenantSQL.Exec(t, `SELECT renumber_desc($1, $2)`, fooID, txnInsightsID)
201+
c.SrcTenantSQL.Exec(t, `SELECT renumber_desc($1, $2)`, barID, privilegesID)
202+
203+
// Drop the function, to avoid hitting 152978
204+
c.SrcTenantSQL.Exec(t, `DROP FUNCTION renumber_desc`)
205+
206+
offsetChecksInReaderTenant := func(sql *sqlutils.SQLRunner) {
207+
// Check that txn execution insights table is not at the same id as source as it's not replicated.
208+
sql.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'transaction_execution_insights'`).Scan(&txnInsightsID)
209+
require.NotEqual(t, txnInsightIDRemapedID, txnInsightsID)
210+
211+
// Check that the privildges table is at the same id as source, since it is
212+
// replicated. This also implies that the og priviliges table created during
213+
// reader tenant bootstrapping at id+offset was removed.
214+
sql.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'privileges'`).Scan(&privilegesID)
215+
require.Equal(t, privilegesIDRemapedID, privilegesID)
216+
var count int
217+
sql.QueryRow(t, `SELECT count(*) FROM system.namespace WHERE name = 'privileges'`).Scan(&count)
218+
require.Equal(t, 1, count)
105219
}
220+
221+
return offset, offsetChecksInReaderTenant
106222
}
107223

108224
func observeValueInReaderTenant(t *testing.T, readerSQL *sqlutils.SQLRunner) {
@@ -113,8 +229,8 @@ func observeValueInReaderTenant(t *testing.T, readerSQL *sqlutils.SQLRunner) {
113229
var numTables int
114230
readerSQL.QueryRow(t, `SELECT count(*) FROM [SHOW TABLES]`).Scan(&numTables)
115231

116-
if numTables != 1 {
117-
return errors.Errorf("expected 1 table to be present in reader tenant, but got %d instead", numTables)
232+
if numTables != 3 {
233+
return errors.Errorf("expected 3 tables to be present in reader tenant, but got %d instead", numTables)
118234
}
119235

120236
var actualQueryResult int
@@ -175,6 +291,9 @@ func TestReaderTenantCutover(t *testing.T) {
175291
c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
176292
defer cleanup()
177293

294+
c.SrcTenantSQL.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY)`)
295+
c.SrcTenantSQL.Exec(t, `CREATE TABLE bar (i INT PRIMARY KEY)`)
296+
178297
producerJobID, ingestionJobID := c.StartStreamReplication(ctx)
179298

180299
jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))

pkg/crosscluster/physical/stream_ingestion_planning.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package physical
77

88
import (
99
"context"
10+
"math"
1011

1112
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
1213
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
@@ -17,6 +18,7 @@ import (
1718
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
1819
"github.com/cockroachdb/cockroach/pkg/roachpb"
1920
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
21+
"github.com/cockroachdb/cockroach/pkg/settings"
2022
"github.com/cockroachdb/cockroach/pkg/sql"
2123
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
2224
"github.com/cockroachdb/cockroach/pkg/sql/exprutil"
@@ -34,6 +36,16 @@ import (
3436
// replicated data will be retained.
3537
const defaultRetentionTTLSeconds = int32(4 * 60 * 60)
3638

39+
var readerTenantSystemTableIDOffset = settings.RegisterIntSetting(
40+
settings.ApplicationLevel,
41+
"physical_cluster_replication.reader_system_table_id_offset",
42+
"the offset added to dynamically allocated system table IDs in the reader tenant",
43+
0,
44+
// Max offset is 1000 less than MaxUint32 to leave room 1000 dynamically
45+
// allocated system table ids. Hope that never happens.
46+
settings.NonNegativeIntWithMaximum(math.MaxUint32-1000),
47+
)
48+
3749
func streamIngestionJobDescription(
3850
p sql.PlanHookState,
3951
source streamclient.ConfigUri,
@@ -331,7 +343,8 @@ func createReaderTenant(
331343
}
332344

333345
readerInfo.ID = readerID.ToUint64()
334-
_, err = sql.BootstrapTenant(ctx, p.ExecCfg(), p.Txn(), readerInfo, readerZcfg, 0)
346+
systemTableIDOffset := readerTenantSystemTableIDOffset.Get(&p.ExecCfg().Settings.SV)
347+
_, err = sql.BootstrapTenant(ctx, p.ExecCfg(), p.Txn(), readerInfo, readerZcfg, uint32(systemTableIDOffset))
335348
if err != nil {
336349
return readerID, err
337350
}

0 commit comments

Comments
 (0)