Skip to content

Commit 3107da9

Browse files
controller: Don't commit failed migrations
Prior to this commit, we were ignoring errors in the `migrate` function, which meant that we would still call `Migration::commit` even if `f` returned an Err. We would still see warnings and such later in the logs because the error would get returned from `migrate` and then caught elsewhere in the code, but calling `commit` would still cause some side effects to happen that shouldn't. This could cause strange effects, such as nodes being created in domains but never registered in the controller as part of the graph. I'm not sure whether this caused any problems in the past (other than leaking resources) but it started causing snapshotting failures for certain invalid queries after the merging of... 72f191c17 (dataflow: Fix replication offsets in memory state, 2022-12-01) ...because domains would attempt to return replication offsets for these orphaned nodes, which would confuse the controller since it would have no record of these nodes in its state. To fix this, we immediately return an error if `f` fails in `migrate`, thus skipping the call to `commit`. This commit also contains a replicators test that reproduces the problem prior to this fix, but which now passes with the fix in place. Release-Note-Core: Fixed a bug that could cause snapshotting and replication to stall forever after encountering certain types of errors while loading a view from the upstream database. Co-authored-by: Griffin Smith <[email protected]> Fixes: ENG-2190 Change-Id: I3ac529de0d0be702172913225440d5f58f23bacf Reviewed-on: https://gerrit.readyset.name/c/readyset/+/4013 Reviewed-by: Griffin Smith <[email protected]> Tested-by: Buildkite CI
1 parent e5edae8 commit 3107da9

File tree

3 files changed

+53
-12
lines changed

3 files changed

+53
-12
lines changed

readyset-server/src/controller/mod.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -395,12 +395,15 @@ impl Controller {
395395
if let Some(ref mut inner) = *guard {
396396
let mut writer = inner.dataflow_state_handle.write().await;
397397
let ds = writer.as_mut();
398-
let ret = ds.migrate(false, dialect, move |m| func(m)).await?;
399-
inner
400-
.dataflow_state_handle
401-
.commit(writer, &self.authority)
402-
.await?;
403-
if done_tx.send(ret).is_err() {
398+
let res = ds.migrate(false, dialect, move |m| func(m)).await;
399+
if res.is_ok() {
400+
inner
401+
.dataflow_state_handle
402+
.commit(writer, &self.authority)
403+
.await?;
404+
}
405+
406+
if done_tx.send(res).is_err() {
404407
warn!("handle-based migration sender hung up!");
405408
}
406409
} else {

readyset-server/src/controller/state.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -839,14 +839,14 @@ impl DfState {
839839
dry_run: bool,
840840
dialect: Dialect,
841841
f: F,
842-
) -> Result<T, ReadySetError>
842+
) -> ReadySetResult<T>
843843
where
844-
F: FnOnce(&mut Migration<'_>) -> T,
844+
F: FnOnce(&mut Migration<'_>) -> ReadySetResult<T>,
845845
{
846846
debug!("starting migration");
847847
gauge!(recorded::CONTROLLER_MIGRATION_IN_PROGRESS, 1.0);
848848
let mut m = Migration::new(self, dialect);
849-
let r = f(&mut m);
849+
let r = f(&mut m)?;
850850
m.commit(dry_run).await?;
851851
debug!("finished migration");
852852
gauge!(recorded::CONTROLLER_MIGRATION_IN_PROGRESS, 0.0);
@@ -1149,11 +1149,11 @@ impl DfState {
11491149
.migrate(dry_run, changelist.dialect, |mig| {
11501150
new.activate(mig, changelist)
11511151
})
1152-
.await?;
1152+
.await;
11531153

1154-
match r {
1154+
match &r {
11551155
Ok(_) => self.recipe = new,
1156-
Err(ref e) => {
1156+
Err(e) => {
11571157
tracing::
11581158
warn!(error = %e, "failed to apply recipe. Will retry periodically up to max_processing_minutes.");
11591159
}

replicators/tests/tests.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1634,6 +1634,44 @@ async fn postgresql_non_base_offsets() {
16341634
.unwrap();
16351635
}
16361636

1637+
#[tokio::test(flavor = "multi_thread")]
1638+
#[serial_test::serial]
1639+
async fn postgresql_orphaned_nodes() {
1640+
// This test checks for the error described in ENG-2190.
1641+
// The bug would be triggered when a view would fail to be added due to certain types of
1642+
// errors, because we would still call `commit` on the migration after the error occurred,
1643+
// resulting in nodes being created in the domain without being added to the controller.
1644+
readyset_tracing::init_test_logging();
1645+
let url = pgsql_url();
1646+
let mut client = DbConnection::connect(&url).await.unwrap();
1647+
1648+
// The important thing about creating RS_FAKE_TEST_FUN is that it's not a builtin function, so
1649+
// we will get a ReadySetError::NoSuchFunction during lowering and view creation will fail.
1650+
client
1651+
.query(
1652+
"CREATE OR REPLACE FUNCTION RS_FAKE_TEST_FUN(bigint) RETURNS bigint
1653+
AS 'select $1' LANGUAGE SQL;
1654+
DROP TABLE IF EXISTS t1 CASCADE;
1655+
CREATE TABLE t1(i INTEGER);
1656+
CREATE OR REPLACE VIEW v1 AS
1657+
SELECT RS_FAKE_TEST_FUN(subq.my_count) FROM
1658+
(SELECT count(*) AS my_count FROM t1 GROUP BY i) subq;
1659+
CREATE OR REPLACE VIEW check_t1 AS SELECT * FROM t1;
1660+
INSERT INTO t1 VALUES(99);",
1661+
)
1662+
.await
1663+
.unwrap();
1664+
1665+
let mut ctx = TestHandle::start_noria(url.to_string(), None)
1666+
.await
1667+
.unwrap();
1668+
ctx.ready_notify.as_ref().unwrap().notified().await;
1669+
1670+
ctx.check_results("check_t1", "Snapshot", &[&[99.into()]])
1671+
.await
1672+
.unwrap();
1673+
}
1674+
16371675
#[tokio::test(flavor = "multi_thread")]
16381676
#[serial_test::serial]
16391677
async fn postgresql_replicate_citext() {

0 commit comments

Comments
 (0)