Skip to content

Commit 8331a93

Browse files
fix(NODE-4763): cache resumeToken in ChangeStream.tryNext() (#4636)
Co-authored-by: Bailey Pearson <[email protected]>
1 parent 82d6ce6 commit 8331a93

File tree

2 files changed

+208
-21
lines changed

2 files changed

+208
-21
lines changed

src/change_stream.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -809,7 +809,11 @@ export class ChangeStream<
809809
while (true) {
810810
try {
811811
const change = await this.cursor.tryNext();
812-
return change ?? null;
812+
if (!change) {
813+
return null;
814+
}
815+
const processedChange = this._processChange(change);
816+
return processedChange;
813817
} catch (error) {
814818
try {
815819
await this._processErrorIteratorMode(error, this.cursor.id != null);

test/integration/change-streams/change_stream.test.ts

Lines changed: 203 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -370,31 +370,76 @@ describe('Change Streams', function () {
370370
}
371371
);
372372

373-
it('should cache the change stream resume token using iterator form', {
374-
metadata: { requires: { topology: 'replicaset' } },
373+
describe('cache the change stream resume token', () => {
374+
describe('using iterator form', () => {
375+
context('#next', () => {
376+
it('caches the resume token on change', {
377+
metadata: { requires: { topology: 'replicaset' } },
375378

376-
async test() {
377-
await initIteratorMode(changeStream);
378-
collection.insertOne({ a: 1 });
379+
async test() {
380+
await initIteratorMode(changeStream);
381+
await collection.insertOne({ a: 1 });
379382

380-
const hasNext = await changeStream.hasNext();
381-
expect(hasNext).to.be.true;
383+
const change = await changeStream.next();
384+
expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken);
385+
}
386+
});
382387

383-
const change = await changeStream.next();
384-
expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken);
385-
}
386-
});
388+
it('caches the resume token correctly when preceded by #hasNext', {
389+
metadata: { requires: { topology: 'replicaset' } },
390+
async test() {
391+
await initIteratorMode(changeStream);
392+
await collection.insertOne({ a: 1 });
387393

388-
it('should cache the change stream resume token using event listener form', {
389-
metadata: { requires: { topology: 'replicaset' } },
390-
async test() {
391-
const willBeChange = once(changeStream, 'change');
392-
await once(changeStream.cursor, 'init');
393-
collection.insertOne({ a: 1 });
394+
await changeStream.hasNext();
394395

395-
const [change] = await willBeChange;
396-
expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken);
397-
}
396+
const change = await changeStream.next();
397+
expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken);
398+
}
399+
});
400+
});
401+
402+
it('#tryNext', {
403+
metadata: { requires: { topology: 'replicaset' } },
404+
405+
async test() {
406+
await initIteratorMode(changeStream);
407+
await collection.insertOne({ a: 1 });
408+
409+
const change = await changeStream.tryNext();
410+
expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken);
411+
}
412+
});
413+
414+
context('#hasNext', () => {
415+
it('does not cache the resume token', {
416+
metadata: { requires: { topology: 'replicaset' } },
417+
async test() {
418+
await initIteratorMode(changeStream);
419+
const resumeToken = changeStream.resumeToken;
420+
421+
await collection.insertOne({ a: 1 });
422+
423+
const hasNext = await changeStream.hasNext();
424+
expect(hasNext).to.be.true;
425+
426+
expect(changeStream.resumeToken).to.equal(resumeToken);
427+
}
428+
});
429+
});
430+
});
431+
432+
it('should cache using event listener form', {
433+
metadata: { requires: { topology: 'replicaset' } },
434+
async test() {
435+
const willBeChange = once(changeStream, 'change');
436+
await once(changeStream.cursor, 'init');
437+
await collection.insertOne({ a: 1 });
438+
439+
const [change] = await willBeChange;
440+
expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken);
441+
}
442+
});
398443
});
399444

400445
it('should error if resume token projected out of change stream document using iterator', {
@@ -1816,6 +1861,144 @@ describe('Change Streams', function () {
18161861
});
18171862
});
18181863
});
1864+
1865+
describe("NODE-4763 - doesn't produce duplicates after resume", function () {
1866+
let client: MongoClient;
1867+
let collection: Collection;
1868+
let changeStream: ChangeStream;
1869+
let aggregateEvents: CommandStartedEvent[] = [];
1870+
const resumableError = { code: 6, message: 'host unreachable' };
1871+
1872+
beforeEach(async function () {
1873+
const dbName = 'node-4763';
1874+
const collectionName = 'test-collection';
1875+
1876+
client = this.configuration.newClient({ monitorCommands: true });
1877+
client.on('commandStarted', filterForCommands(['aggregate'], aggregateEvents));
1878+
collection = client.db(dbName).collection(collectionName);
1879+
1880+
changeStream = collection.watch([]);
1881+
});
1882+
1883+
afterEach(async function () {
1884+
await client.db('admin').command({
1885+
configureFailPoint: is4_2Server(this.configuration.version)
1886+
? 'failCommand'
1887+
: 'failGetMoreAfterCursorCheckout',
1888+
mode: 'off'
1889+
} as FailCommandFailPoint);
1890+
1891+
await changeStream.close();
1892+
await client.close();
1893+
aggregateEvents = [];
1894+
});
1895+
1896+
describe('when using iterator form', function () {
1897+
it('#next', { requires: { topology: 'replicaset' } }, async function test() {
1898+
await initIteratorMode(changeStream);
1899+
1900+
await collection.insertOne({ a: 1 });
1901+
const change = await changeStream.next();
1902+
expect(change).to.containSubset({
1903+
operationType: 'insert',
1904+
fullDocument: { a: 1 }
1905+
});
1906+
1907+
await client.db('admin').command({
1908+
configureFailPoint: is4_2Server(this.configuration.version)
1909+
? 'failCommand'
1910+
: 'failGetMoreAfterCursorCheckout',
1911+
mode: { times: 1 },
1912+
data: {
1913+
failCommands: ['getMore'],
1914+
errorCode: resumableError.code,
1915+
errmsg: resumableError.message
1916+
}
1917+
} as FailCommandFailPoint);
1918+
1919+
await collection.insertOne({ a: 2 });
1920+
const change2 = await changeStream.next();
1921+
expect(change2).to.containSubset({
1922+
operationType: 'insert',
1923+
fullDocument: { a: 2 }
1924+
});
1925+
1926+
expect(aggregateEvents.length).to.equal(2);
1927+
});
1928+
1929+
it('#tryNext', { requires: { topology: 'replicaset' } }, async function test() {
1930+
await initIteratorMode(changeStream);
1931+
1932+
await collection.insertOne({ a: 1 });
1933+
const change = await changeStream.tryNext();
1934+
expect(change).to.containSubset({
1935+
operationType: 'insert',
1936+
fullDocument: { a: 1 }
1937+
});
1938+
1939+
await client.db('admin').command({
1940+
configureFailPoint: is4_2Server(this.configuration.version)
1941+
? 'failCommand'
1942+
: 'failGetMoreAfterCursorCheckout',
1943+
mode: { times: 1 },
1944+
data: {
1945+
failCommands: ['getMore'],
1946+
errorCode: resumableError.code,
1947+
errmsg: resumableError.message
1948+
}
1949+
} as FailCommandFailPoint);
1950+
1951+
await collection.insertOne({ a: 2 });
1952+
const change2 = await changeStream.tryNext();
1953+
expect(change2).to.containSubset({
1954+
operationType: 'insert',
1955+
fullDocument: { a: 2 }
1956+
});
1957+
1958+
expect(aggregateEvents.length).to.equal(2);
1959+
});
1960+
});
1961+
1962+
it('in an event listener form', { requires: { topology: 'replicaset' } }, async function () {
1963+
const willBeChange = on(changeStream, 'change');
1964+
await once(changeStream.cursor, 'init');
1965+
1966+
await collection.insertOne({ a: 1 });
1967+
const change = await willBeChange.next();
1968+
expect(change.value[0]).to.containSubset({
1969+
operationType: 'insert',
1970+
fullDocument: { a: 1 }
1971+
});
1972+
1973+
await client.db('admin').command({
1974+
configureFailPoint: is4_2Server(this.configuration.version)
1975+
? 'failCommand'
1976+
: 'failGetMoreAfterCursorCheckout',
1977+
mode: { times: 1 },
1978+
data: {
1979+
failCommands: ['getMore'],
1980+
errorCode: resumableError.code,
1981+
errmsg: resumableError.message
1982+
}
1983+
} as FailCommandFailPoint);
1984+
1985+
// There's an inherent race condition here because we need to make sure that the `aggregates` that succeed when
1986+
// resuming a change stream don't return the change event.
1987+
// So we defer the insert until a period of time after the change stream has received the first change.
1988+
// 2000ms is long enough for the change stream to attempt to resume and fail once before exhausting the failpoint
1989+
// and succeeding.
1990+
await sleep(2000);
1991+
await collection.insertOne({ a: 2 });
1992+
1993+
const change2 = await willBeChange.next();
1994+
expect(change2.value[0]).to.containSubset({
1995+
operationType: 'insert',
1996+
fullDocument: { a: 2 }
1997+
});
1998+
1999+
expect(aggregateEvents.length).to.equal(2);
2000+
});
2001+
});
18192002
});
18202003

18212004
describe('ChangeStream resumability', function () {

0 commit comments

Comments
 (0)