Skip to content

Commit 9be6443

Browse files
committed
replicators: Move version query before ddl setup
If we are connecting to postgres 13, the ddl setup commands we run cause our connection to be considered a "replication connection", which if the upstream is configured in a certain way (in a way that Aurora Postgres is configured), would cause the query for the server version to fail. Moving the query to before the ddl setup allows the query to succeed. Release-Note-Core: Fixed a bug that would cause replication to fail on Aurora Postgres version 13 databases. Refs: ENG-2208 Change-Id: Ie3f506e593c9bbd11802852472b0d29cbc5b0671 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/4015 Tested-by: Buildkite CI Reviewed-by: Sif Hall <[email protected]>
1 parent 3107da9 commit 9be6443

File tree

2 files changed

+23
-10
lines changed

2 files changed

+23
-10
lines changed

replicators/src/noria_adapter.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,26 @@ impl NoriaAdapter {
411411
None,
412412
)?;
413413

414+
// For Postgres 13, once we setup ddl replication, the following query can be rejected, so
415+
// run it ahead of time.
416+
// TODO: (luke): We can probably consolidate this query with the db_version string query
417+
// below
418+
let version_num: u32 = {
419+
let (client, connection) = pgsql_opts.connect(tls_connector.clone()).await?;
420+
let _connection_handle = tokio::spawn(connection);
421+
client
422+
.query_one("SHOW server_version_num", &[])
423+
.await
424+
.and_then(|row| row.try_get::<_, String>(0))
425+
.map_err(|e| {
426+
ReadySetError::Internal(format!("Unable to determine postgres version: {}", e))
427+
})?
428+
.parse()
429+
.map_err(|e| {
430+
ReadySetError::Internal(format!("Unable to parse postgres version: {}", e))
431+
})?
432+
};
433+
414434
let mut connector = Box::new(
415435
PostgresWalConnector::connect(
416436
pgsql_opts.clone(),
@@ -451,7 +471,7 @@ impl NoriaAdapter {
451471
let snapshot_start = Instant::now();
452472
// If snapshot name exists, it means we need to make a snapshot to noria
453473

454-
let (mut client, connection) = pgsql_opts.connect(tls_connector).await?;
474+
let (mut client, connection) = pgsql_opts.connect(tls_connector.clone()).await?;
455475

456476
let connection_handle = tokio::spawn(connection);
457477
let db_version = client
@@ -504,7 +524,7 @@ impl NoriaAdapter {
504524
}
505525

506526
connector
507-
.start_replication(REPLICATION_SLOT, PUBLICATION_NAME)
527+
.start_replication(REPLICATION_SLOT, PUBLICATION_NAME, version_num)
508528
.await?;
509529

510530
let replication_offsets = noria.replication_offsets().await?;

replicators/src/postgres_connector/connector.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -260,15 +260,8 @@ impl PostgresWalConnector {
260260
&mut self,
261261
slot: &str,
262262
publication: &str,
263+
version: u32,
263264
) -> ReadySetResult<()> {
264-
let version: u32 = self
265-
.one_row_query("SHOW server_version_num", 1)
266-
.await?
267-
.get(0)
268-
.unwrap()
269-
.parse()
270-
.unwrap_or(0);
271-
272265
let inner_client = self.client.inner();
273266
let wal_position = self.next_position.unwrap_or_default();
274267
let messages_support = if version >= 140000 {

0 commit comments

Comments
 (0)