Skip to content
51 changes: 38 additions & 13 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,16 @@ export interface ChangeStreamDocumentCommon {
splitEvent?: ChangeStreamSplitEvent;
}

/** @public */
export interface ChangeStreamDocumentWallTime {
/**
* The server date and time of the database operation.
* wallTime differs from clusterTime in that clusterTime is a timestamp taken from the oplog entry associated with the database operation event.
* @sinceServerVersion 6.0.0
*/
wallTime?: Date;
}

/** @public */
export interface ChangeStreamDocumentCollectionUUID {
/**
Expand Down Expand Up @@ -239,7 +249,8 @@ export interface ChangeStreamDocumentOperationDescription {
export interface ChangeStreamInsertDocument<TSchema extends Document = Document>
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentKey<TSchema>,
ChangeStreamDocumentCollectionUUID {
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'insert';
/** This key will contain the document being inserted */
Expand All @@ -255,7 +266,8 @@ export interface ChangeStreamInsertDocument<TSchema extends Document = Document>
export interface ChangeStreamUpdateDocument<TSchema extends Document = Document>
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentKey<TSchema>,
ChangeStreamDocumentCollectionUUID {
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'update';
/**
Expand Down Expand Up @@ -285,7 +297,8 @@ export interface ChangeStreamUpdateDocument<TSchema extends Document = Document>
*/
export interface ChangeStreamReplaceDocument<TSchema extends Document = Document>
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentKey<TSchema> {
ChangeStreamDocumentKey<TSchema>,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'replace';
/** The fullDocument of a replace event represents the document after the insert of the replacement document */
Expand All @@ -309,7 +322,8 @@ export interface ChangeStreamReplaceDocument<TSchema extends Document = Document
export interface ChangeStreamDeleteDocument<TSchema extends Document = Document>
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentKey<TSchema>,
ChangeStreamDocumentCollectionUUID {
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'delete';
/** Namespace the delete event occurred on */
Expand All @@ -330,7 +344,8 @@ export interface ChangeStreamDeleteDocument<TSchema extends Document = Document>
*/
export interface ChangeStreamDropDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID {
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'drop';
/** Namespace the drop event occurred on */
Expand All @@ -343,7 +358,8 @@ export interface ChangeStreamDropDocument
*/
export interface ChangeStreamRenameDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID {
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'rename';
/** The new name for the `ns.coll` collection */
Expand All @@ -356,7 +372,9 @@ export interface ChangeStreamRenameDocument
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/#dropdatabase-event
*/
export interface ChangeStreamDropDatabaseDocument extends ChangeStreamDocumentCommon {
export interface ChangeStreamDropDatabaseDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'dropDatabase';
/** The database dropped */
Expand All @@ -367,7 +385,9 @@ export interface ChangeStreamDropDatabaseDocument extends ChangeStreamDocumentCo
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/#invalidate-event
*/
export interface ChangeStreamInvalidateDocument extends ChangeStreamDocumentCommon {
export interface ChangeStreamInvalidateDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'invalidate';
}
Expand All @@ -380,7 +400,8 @@ export interface ChangeStreamInvalidateDocument extends ChangeStreamDocumentComm
export interface ChangeStreamCreateIndexDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentOperationDescription {
ChangeStreamDocumentOperationDescription,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'createIndexes';
}
Expand All @@ -393,7 +414,8 @@ export interface ChangeStreamCreateIndexDocument
export interface ChangeStreamDropIndexDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentOperationDescription {
ChangeStreamDocumentOperationDescription,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'dropIndexes';
}
Expand All @@ -405,7 +427,8 @@ export interface ChangeStreamDropIndexDocument
*/
export interface ChangeStreamCollModDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID {
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'modify';
}
Expand All @@ -416,7 +439,8 @@ export interface ChangeStreamCollModDocument
*/
export interface ChangeStreamCreateDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID {
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'create';

Expand All @@ -435,7 +459,8 @@ export interface ChangeStreamCreateDocument
export interface ChangeStreamShardCollectionDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentOperationDescription {
ChangeStreamDocumentOperationDescription,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'shardCollection';
}
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ export type {
ChangeStreamDocumentCommon,
ChangeStreamDocumentKey,
ChangeStreamDocumentOperationDescription,
ChangeStreamDocumentWallTime,
ChangeStreamDropDatabaseDocument,
ChangeStreamDropDocument,
ChangeStreamDropIndexDocument,
Expand Down
20 changes: 20 additions & 0 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,26 @@ describe('Change Streams', function () {
}
});

it('contains a wallTime date property on the change', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=6.0.0' } },
async test() {
const collection = db.collection('wallTimeTest');
const changeStream = collection.watch(pipeline);

const willBeChanges = on(changeStream, 'change');
await once(changeStream.cursor, 'init');

await collection.insertOne({ d: 4 });

const change = (await willBeChanges.next()).value[0];

await changeStream.close();

expect(change).to.have.property('wallTime');
expect(change.wallTime).to.be.instanceOf(Date);
}
});

it('should create a ChangeStream on a collection and emit change events', {
metadata: { requires: { topology: 'replicaset' } },
async test() {
Expand Down
14 changes: 14 additions & 0 deletions test/types/change_stream.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ declare const crudChange: CrudChangeDoc;
expectType<CrudChangeDoc extends ChangeStreamDocumentKey<Schema> ? true : false>(true);
expectType<number>(crudChange.documentKey._id); // _id will get typed
expectType<any>(crudChange.documentKey.blah); // shard keys could be anything
expectType<Date | undefined>(crudChange.wallTime);

// ChangeStreamFullNameSpace
expectType<ChangeStreamNameSpace>(crudChange.ns);
Expand All @@ -87,12 +88,14 @@ switch (change.operationType) {
expectType<number>(change.documentKey._id);
expectType<any>(change.documentKey.blah);
expectType<Schema>(change.fullDocument);
expectType<Date | undefined>(change.wallTime);
break;
}
case 'update': {
expectType<ChangeStreamUpdateDocument<Schema>>(change);
expectType<'update'>(change.operationType);
expectType<Schema | undefined>(change.fullDocument); // Update only attaches fullDocument if configured
expectType<Date | undefined>(change.wallTime);
expectType<UpdateDescription<Schema>>(change.updateDescription);
expectType<Partial<Schema> | undefined>(change.updateDescription.updatedFields);
expectType<string[] | undefined>(change.updateDescription.removedFields);
Expand All @@ -104,61 +107,72 @@ switch (change.operationType) {
case 'replace': {
expectType<ChangeStreamReplaceDocument<Schema>>(change);
expectType<'replace'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
expectType<Schema>(change.fullDocument);
break;
}
case 'delete': {
expectType<ChangeStreamDeleteDocument<Schema>>(change);
expectType<Date | undefined>(change.wallTime);
expectType<'delete'>(change.operationType);
break;
}
case 'drop': {
expectType<ChangeStreamDropDocument>(change);
expectType<'drop'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
expectType<{ db: string; coll: string }>(change.ns);
break;
}
case 'rename': {
expectType<ChangeStreamRenameDocument>(change);
expectType<'rename'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
expectType<{ db: string; coll: string }>(change.ns);
expectType<{ db: string; coll: string }>(change.to);
break;
}
case 'dropDatabase': {
expectType<ChangeStreamDropDatabaseDocument>(change);
expectType<'dropDatabase'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
expectError(change.ns.coll);
break;
}
case 'invalidate': {
expectType<ChangeStreamInvalidateDocument>(change);
expectType<'invalidate'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
break;
}
case 'create': {
expectType<ChangeStreamCreateDocument>(change);
expectType<'create'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
break;
}
case 'modify': {
expectType<ChangeStreamCollModDocument>(change);
expectType<'modify'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
break;
}
case 'createIndexes': {
expectType<ChangeStreamCreateIndexDocument>(change);
expectType<'createIndexes'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
break;
}
case 'dropIndexes': {
expectType<ChangeStreamDropIndexDocument>(change);
expectType<'dropIndexes'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
break;
}
case 'shardCollection': {
expectType<ChangeStreamShardCollectionDocument>(change);
expectType<'shardCollection'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
break;
}
case 'reshardCollection': {
Expand Down