Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ func (rd *replicationDriver) setupC2C(
destSQL := sqlutils.MakeSQLRunner(destDB)

srcClusterSettings(t, srcSQL)
destClusterSettings(t, destSQL, rd.rs.additionalDuration)
destClusterSettings(t, destSQL, rd.rng, rd.rs.additionalDuration)

overrideSrcAndDestTenantTTL(t, srcSQL, destSQL, rd.rs.overrideTenantTTL)

Expand Down Expand Up @@ -1869,7 +1869,9 @@ func srcClusterSettings(t test.Test, db *sqlutils.SQLRunner) {
)
}

func destClusterSettings(t test.Test, db *sqlutils.SQLRunner, additionalDuration time.Duration) {
func destClusterSettings(
t test.Test, db *sqlutils.SQLRunner, rng *rand.Rand, additionalDuration time.Duration,
) {
db.ExecMultiple(t,
`SET CLUSTER SETTING kv.rangefeed.enabled = true;`,
`SET CLUSTER SETTING kv.lease.reject_on_leader_unknown.enabled = true;`,
Expand All @@ -1881,6 +1883,10 @@ func destClusterSettings(t test.Test, db *sqlutils.SQLRunner, additionalDuration
db.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING stream_replication.replan_flow_frequency = '%s'`,
replanFrequency))
}

if rng.Intn(2) == 0 {
db.Exec(t, `SET CLUSTER SETTING physical_cluster_replication.reader_system_table_id_offset = 100000`)
}
}

func overrideSrcAndDestTenantTTL(
Expand Down
10 changes: 5 additions & 5 deletions pkg/config/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func TestGetLargestID(t *testing.T) {

// Real SQL layout.
func() testCase {
ms := bootstrap.MakeMetadataSchema(keys.SystemSQLCodec, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef())
ms := bootstrap.MakeMetadataSchema(keys.SystemSQLCodec, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(), bootstrap.NoOffset)
descIDs := ms.DescriptorIDs()
maxDescID := config.ObjectID(descIDs[len(descIDs)-1])
kvs, _ /* splits */ := ms.GetInitialValues()
Expand Down Expand Up @@ -321,7 +321,7 @@ func TestComputeSplitKeySystemRanges(t *testing.T) {

cfg := config.NewSystemConfig(zonepb.DefaultZoneConfigRef())
kvs, _ /* splits */ := bootstrap.MakeMetadataSchema(
keys.SystemSQLCodec, cfg.DefaultZoneConfig, zonepb.DefaultSystemZoneConfigRef(),
keys.SystemSQLCodec, cfg.DefaultZoneConfig, zonepb.DefaultSystemZoneConfigRef(), bootstrap.NoOffset,
).GetInitialValues()
cfg.SystemConfigEntries = config.SystemConfigEntries{
Values: kvs,
Expand Down Expand Up @@ -353,7 +353,7 @@ func TestComputeSplitKeyTableIDs(t *testing.T) {
minKey := roachpb.RKey(keys.TimeseriesPrefix.PrefixEnd())

schema := bootstrap.MakeMetadataSchema(
keys.SystemSQLCodec, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(),
keys.SystemSQLCodec, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(), bootstrap.NoOffset,
)
// Real system tables only.
baseSql, _ /* splits */ := schema.GetInitialValues()
Expand Down Expand Up @@ -460,7 +460,7 @@ func TestComputeSplitKeyTenantBoundaries(t *testing.T) {
minTenID, maxTenID := roachpb.MinTenantID.ToUint64(), roachpb.MaxTenantID.ToUint64()

schema := bootstrap.MakeMetadataSchema(
keys.SystemSQLCodec, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(),
keys.SystemSQLCodec, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(), bootstrap.NoOffset,
)
minKey := tkey(bootstrap.TestingUserDescID(0))

Expand Down Expand Up @@ -599,7 +599,7 @@ func TestGetZoneConfigForKey(t *testing.T) {
cfg := config.NewSystemConfig(zonepb.DefaultZoneConfigRef())

kvs, _ /* splits */ := bootstrap.MakeMetadataSchema(
keys.SystemSQLCodec, cfg.DefaultZoneConfig, zonepb.DefaultSystemZoneConfigRef(),
keys.SystemSQLCodec, cfg.DefaultZoneConfig, zonepb.DefaultSystemZoneConfigRef(), bootstrap.NoOffset,
).GetInitialValues()
cfg.SystemConfigEntries = config.SystemConfigEntries{
Values: kvs,
Expand Down
121 changes: 119 additions & 2 deletions pkg/crosscluster/physical/standby_read_ts_poller_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ func TestStandbyReadTSPollerJob(t *testing.T) {
c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
defer cleanup()

c.SrcTenantSQL.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY)`)
c.SrcTenantSQL.Exec(t, `CREATE TABLE bar (i INT PRIMARY KEY)`)

offset, offsetChecksInReaderTenant := maybeOffsetReaderTenantSystemTables(t, c)

producerJobID, ingestionJobID := c.StartStreamReplication(ctx)

jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
Expand Down Expand Up @@ -68,6 +73,11 @@ INSERT INTO a VALUES (1);
waitForPollerJobToStartDest(t, c, ingestionJobID)
observeValueInReaderTenant(t, c.ReaderTenantSQL)

var idWithOffsetCount int
c.ReaderTenantSQL.QueryRow(t, fmt.Sprintf("SELECT count(*) FROM system.namespace where id = %d", 50+offset)).Scan(&idWithOffsetCount)
require.Equal(t, 1, idWithOffsetCount, "expected to find namespace entry for table a with offset applied")
offsetChecksInReaderTenant(c.ReaderTenantSQL)

// Failback and setup stanby reader tenant on the og source.
{
c.Cutover(ctx, producerJobID, ingestionJobID, srcTime.GoTime(), false)
Expand Down Expand Up @@ -102,7 +112,111 @@ INSERT INTO a VALUES (1);
var numTables int
srcReaderSQL.QueryRow(t, `SELECT count(*) FROM [SHOW TABLES]`).Scan(&numTables)
observeValueInReaderTenant(t, srcReaderSQL)
offsetChecksInReaderTenant(srcReaderSQL)
}
}

func maybeOffsetReaderTenantSystemTables(
t *testing.T, c *replicationtestutils.TenantStreamingClusters,
) (int, func(sql *sqlutils.SQLRunner)) {
if c.Rng.Intn(2) == 0 {
return 0, func(sql *sqlutils.SQLRunner) {}
}
offset := 100000
c.DestSysSQL.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING physical_cluster_replication.reader_system_table_id_offset = %d`, offset))
// Set on source to ensure failback works well too.
c.SrcSysSQL.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING physical_cluster_replication.reader_system_table_id_offset = %d`, offset))

// swap a system table ID and a user table ID to simulate a cluster that has interleaving user and system table ids.
scaryTableIDRemapFunc := `
CREATE OR REPLACE FUNCTION renumber_desc(oldID INT, newID INT) RETURNS BOOL AS
$$
BEGIN
-- Rewrite the ID within the descriptor
SELECT crdb_internal.unsafe_upsert_descriptor(
newid,
crdb_internal.json_to_pb(
'cockroach.sql.sqlbase.Descriptor',
d
),
true
)
FROM (
SELECT id,
json_set(
json_set(
crdb_internal.pb_to_json(
'cockroach.sql.sqlbase.Descriptor',
descriptor,
false
),
ARRAY['table', 'id'],
newid::STRING::JSONB
),
ARRAY['table', 'modificationTime'],
json_build_object(
'wallTime',
(
(
extract('epoch', now())
* 1000000
)::INT8
* 1000
)::STRING
)
) AS d
FROM system.descriptor
WHERE id IN (oldid,)
);
-- Update the namespace entry and delete the old descriptor.
SELECT crdb_internal.unsafe_upsert_namespace_entry("parentID", "parentSchemaID", name, newID, true) FROM (SELECT "parentID", "parentSchemaID", name, id FROM system.namespace where id =oldID) UNION ALL
SELECT crdb_internal.unsafe_delete_descriptor(oldID, true);

RETURN true;

END
$$ LANGUAGE PLpgSQL;`

c.SrcTenantSQL.Exec(t, scaryTableIDRemapFunc)
var txnInsightsID, privilegesID int
c.SrcTenantSQL.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'transaction_execution_insights'`).Scan(&txnInsightsID)
c.SrcTenantSQL.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'privileges'`).Scan(&privilegesID)
require.NotEqual(t, 0, txnInsightsID)
require.NotEqual(t, 0, privilegesID)

// renumber these two priv tables to be out of the way
txnInsightIDRemapedID := txnInsightsID + 1000
privilegesIDRemapedID := privilegesID + 1000
c.SrcTenantSQL.Exec(t, `SELECT renumber_desc($1, $2)`, txnInsightsID, txnInsightIDRemapedID)
c.SrcTenantSQL.Exec(t, `SELECT renumber_desc($1, $2)`, privilegesID, privilegesIDRemapedID)

// create two user tables on the source and interleave them with system table ids
var fooID, barID int
c.SrcTenantSQL.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'foo'`).Scan(&fooID)
c.SrcTenantSQL.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'bar'`).Scan(&barID)
require.NotEqual(t, 0, fooID)
require.NotEqual(t, 0, barID)

c.SrcTenantSQL.Exec(t, `SELECT renumber_desc($1, $2)`, fooID, txnInsightsID)
c.SrcTenantSQL.Exec(t, `SELECT renumber_desc($1, $2)`, barID, privilegesID)

// Drop the function, to avoid hitting 152978
c.SrcTenantSQL.Exec(t, `DROP FUNCTION renumber_desc`)

offsetChecksInReaderTenant := func(sql *sqlutils.SQLRunner) {
// Check that txn execution insights table is not at the same id as source as it's not replicated.
sql.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'transaction_execution_insights'`).Scan(&txnInsightsID)
require.NotEqual(t, txnInsightIDRemapedID, txnInsightsID)

// On 25.3, the privs table is not replicated so the ids should differ.
sql.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'privileges'`).Scan(&privilegesID)
require.NotEqual(t, privilegesIDRemapedID, privilegesID)
var count int
sql.QueryRow(t, `SELECT count(*) FROM system.namespace WHERE name = 'privileges'`).Scan(&count)
require.Equal(t, 1, count)
}

return offset, offsetChecksInReaderTenant
}

func observeValueInReaderTenant(t *testing.T, readerSQL *sqlutils.SQLRunner) {
Expand All @@ -113,8 +227,8 @@ func observeValueInReaderTenant(t *testing.T, readerSQL *sqlutils.SQLRunner) {
var numTables int
readerSQL.QueryRow(t, `SELECT count(*) FROM [SHOW TABLES]`).Scan(&numTables)

if numTables != 1 {
return errors.Errorf("expected 1 table to be present in reader tenant, but got %d instead", numTables)
if numTables != 3 {
return errors.Errorf("expected 3 tables to be present in reader tenant, but got %d instead", numTables)
}

var actualQueryResult int
Expand Down Expand Up @@ -175,6 +289,9 @@ func TestReaderTenantCutover(t *testing.T) {
c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
defer cleanup()

c.SrcTenantSQL.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY)`)
c.SrcTenantSQL.Exec(t, `CREATE TABLE bar (i INT PRIMARY KEY)`)

producerJobID, ingestionJobID := c.StartStreamReplication(ctx)

jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
Expand Down
15 changes: 14 additions & 1 deletion pkg/crosscluster/physical/stream_ingestion_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package physical

import (
"context"
"math"

"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/exprutil"
Expand All @@ -34,6 +36,16 @@ import (
// replicated data will be retained.
const defaultRetentionTTLSeconds = int32(4 * 60 * 60)

var readerTenantSystemTableIDOffset = settings.RegisterIntSetting(
settings.ApplicationLevel,
"physical_cluster_replication.reader_system_table_id_offset",
"the offset added to dynamically allocated system table IDs in the reader tenant",
0,
// Max offset is 1000 less than MaxUint32 to leave room 1000 dynamically
// allocated system table ids. Hope that never happens.
settings.NonNegativeIntWithMaximum(math.MaxUint32-1000),
)

func streamIngestionJobDescription(
p sql.PlanHookState,
source streamclient.ConfigUri,
Expand Down Expand Up @@ -331,7 +343,8 @@ func createReaderTenant(
}

readerInfo.ID = readerID.ToUint64()
_, err = sql.BootstrapTenant(ctx, p.ExecCfg(), p.Txn(), readerInfo, readerZcfg)
systemTableIDOffset := readerTenantSystemTableIDOffset.Get(&p.ExecCfg().Settings.SV)
_, err = sql.BootstrapTenant(ctx, p.ExecCfg(), p.Txn(), readerInfo, readerZcfg, uint32(systemTableIDOffset))
if err != nil {
return readerID, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ type testStoreOpts struct {

func (opts *testStoreOpts) splits() (_kvs []roachpb.KeyValue, _splits []roachpb.RKey) {
kvs, splits := bootstrap.MakeMetadataSchema(
keys.SystemSQLCodec, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(),
keys.SystemSQLCodec, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(), bootstrap.NoOffset,
).GetInitialValues()
if !opts.createSystemRanges {
return kvs, nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,10 +452,10 @@ func allocateStoreIDs(

// GetBootstrapSchema returns the schema which will be used to bootstrap a new
// server.
func GetBootstrapSchema(
func GetBootstrapSchemaForTest(
defaultZoneConfig *zonepb.ZoneConfig, defaultSystemZoneConfig *zonepb.ZoneConfig,
) bootstrap.MetadataSchema {
return bootstrap.MakeMetadataSchema(keys.SystemSQLCodec, defaultZoneConfig, defaultSystemZoneConfig)
return bootstrap.MakeMetadataSchema(keys.SystemSQLCodec, defaultZoneConfig, defaultSystemZoneConfig, bootstrap.NoOffset)
}

// bootstrapCluster initializes the passed-in engines for a new cluster.
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestBootstrapCluster(t *testing.T) {
}

// Add the initial keys for sql.
kvs, tableSplits := GetBootstrapSchema(
kvs, tableSplits := GetBootstrapSchemaForTest(
zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(),
).GetInitialValues()
for _, kv := range kvs {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1886,7 +1886,7 @@ func ExpectedInitialRangeCount(
defaultZoneConfig *zonepb.ZoneConfig,
defaultSystemZoneConfig *zonepb.ZoneConfig,
) (int, error) {
_, splits := bootstrap.MakeMetadataSchema(codec, defaultZoneConfig, defaultSystemZoneConfig).GetInitialValues()
_, splits := bootstrap.MakeMetadataSchema(codec, defaultZoneConfig, defaultSystemZoneConfig, bootstrap.NoOffset).GetInitialValues()
// N splits means N+1 ranges.
return len(config.StaticSplits()) + len(splits) + 1, nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/catalog/bootstrap/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ go_test(
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql/catalog/descpb",
"//pkg/testutils",
"//pkg/testutils/datapathutils",
"//pkg/testutils/serverutils",
Expand Down
25 changes: 24 additions & 1 deletion pkg/sql/catalog/bootstrap/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -132,5 +133,27 @@ func makeMetadataSchema(tenantID uint64) MetadataSchema {
if tenantID > 0 {
codec = keys.MakeSQLCodec(roachpb.MustMakeTenantID(tenantID))
}
return MakeMetadataSchema(codec, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef())
return MakeMetadataSchema(codec, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(), NoOffset)
}

func TestDynamicSystemTableIDOffset(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

offset := uint32(1000)

defaultMetadata := MakeMetadataSchema(keys.SystemSQLCodec, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(), NoOffset)
offsetMetadata := MakeMetadataSchema(keys.SystemSQLCodec, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(), offset)

require.Len(t, defaultMetadata.descs, len(offsetMetadata.descs))

for i := range defaultMetadata.descs {
defaultID := defaultMetadata.descs[i].GetID()
if defaultID <= keys.MaxReservedDescID {
// Reserved IDs are not offset.
require.Equal(t, defaultID, offsetMetadata.descs[i].GetID())
} else {
require.Equal(t, defaultMetadata.descs[i].GetID()+descpb.ID(offset), offsetMetadata.descs[i].GetID())
}
}
}
11 changes: 6 additions & 5 deletions pkg/sql/catalog/bootstrap/initial_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import (
// InitialValuesOpts is used to get initial values for system/secondary tenants
// and allows overriding initial values with ones from previous releases.
type InitialValuesOpts struct {
DefaultZoneConfig *zonepb.ZoneConfig
DefaultSystemZoneConfig *zonepb.ZoneConfig
OverrideKey clusterversion.Key
Codec keys.SQLCodec
DefaultZoneConfig *zonepb.ZoneConfig
DefaultSystemZoneConfig *zonepb.ZoneConfig
OverrideKey clusterversion.Key
Codec keys.SQLCodec
DynamicSystemTableIDOffset uint32
}

// GenerateInitialValues generates the initial values with which to bootstrap a
Expand Down Expand Up @@ -77,7 +78,7 @@ var initialValuesFactoryByKey = map[clusterversion.Key]initialValuesFactoryFn{
func buildLatestInitialValues(
opts InitialValuesOpts,
) (kvs []roachpb.KeyValue, splits []roachpb.RKey, _ error) {
schema := MakeMetadataSchema(opts.Codec, opts.DefaultZoneConfig, opts.DefaultSystemZoneConfig)
schema := MakeMetadataSchema(opts.Codec, opts.DefaultZoneConfig, opts.DefaultSystemZoneConfig, opts.DynamicSystemTableIDOffset)
kvs, splits = schema.GetInitialValues()
return kvs, splits, nil
}
Expand Down
Loading