Skip to content

Commit 4f802e8

Browse files
author
Michael Hultman
committed
save every incremential snapshot to store
1 parent e64cb27 commit 4f802e8

File tree

2 files changed

+21
-12
lines changed

2 files changed

+21
-12
lines changed

eventstore/acceptance_testing.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,8 @@ func SnapshotAcceptanceTest(t *testing.T, store eh.EventStore, ctx context.Conte
269269
assert.Equal(t, snapshot.AggregateType, loaded.AggregateType)
270270
assert.Equal(t, snapshot.State, loaded.State)
271271

272+
snapshot.Version += 1
273+
snapshot.Timestamp = time.Now()
272274
snapshot.State = &TestData{
273275
Data: "this is new incredible data",
274276
}

eventstore/mongodb_v2/eventstore.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,18 @@ func newEventStoreWithClient(client *mongo.Client, clientOwnership clientOwnersh
120120
return nil, fmt.Errorf("could not ensure events index: %w", err)
121121
}
122122

123+
if _, err := s.snapshots.Indexes().CreateOne(ctx, mongo.IndexModel{
124+
Keys: bson.M{"aggregate_id": 1},
125+
}); err != nil {
126+
return nil, fmt.Errorf("could not ensure snapshot aggregate_id index: %w", err)
127+
}
128+
129+
if _, err := s.snapshots.Indexes().CreateOne(ctx, mongo.IndexModel{
130+
Keys: bson.M{"version": 1},
131+
}); err != nil {
132+
return nil, fmt.Errorf("could not ensure snapshot version index: %w", err)
133+
}
134+
123135
// Make sure the $all stream exists.
124136
if err := s.streams.FindOne(ctx, bson.M{
125137
"_id": "$all",
@@ -498,7 +510,7 @@ func (s *EventStore) loadFromCursor(ctx context.Context, id uuid.UUID, cursor *m
498510
}
499511

500512
func (s *EventStore) LoadSnapshot(ctx context.Context, id uuid.UUID) (*eh.Snapshot, error) {
501-
result := s.snapshots.FindOne(ctx, bson.M{"_id": id})
513+
result := s.snapshots.FindOne(ctx, bson.M{"aggregate_id": id}, options.FindOne().SetSort(bson.M{"version": -1}))
502514
if err := result.Err(); err != nil {
503515
if errors.Is(err, mongo.ErrNoDocuments) {
504516
return nil, nil
@@ -521,7 +533,7 @@ func (s *EventStore) LoadSnapshot(ctx context.Context, id uuid.UUID) (*eh.Snapsh
521533
}
522534
}
523535

524-
if snapshot.State, err = eh.CreateSnapshotData(record.Id, record.AggregateType); err != nil {
536+
if snapshot.State, err = eh.CreateSnapshotData(record.AggregateID, record.AggregateType); err != nil {
525537
return nil, &eh.EventStoreError{
526538
Err: fmt.Errorf("could not decode snapshot: %w", err),
527539
Op: eh.EventStoreOpLoadSnapshot,
@@ -568,7 +580,7 @@ func (s *EventStore) SaveSnapshot(ctx context.Context, id uuid.UUID, snapshot eh
568580
}
569581

570582
record := SnapshotRecord{
571-
Id: id,
583+
AggregateID: id,
572584
AggregateType: snapshot.AggregateType,
573585
Timestamp: time.Now(),
574586
Version: snapshot.Version,
@@ -586,14 +598,9 @@ func (s *EventStore) SaveSnapshot(ctx context.Context, id uuid.UUID, snapshot eh
586598
}
587599
}
588600

589-
if _, err := s.snapshots.UpdateOne(ctx,
590-
bson.M{
591-
"_id": id.String(),
592-
},
593-
bson.M{
594-
"$set": record,
595-
},
596-
options.Update().SetUpsert(true),
601+
if _, err := s.snapshots.InsertOne(ctx,
602+
record,
603+
options.InsertOne(),
597604
); err != nil {
598605
return &eh.EventStoreError{
599606
Err: fmt.Errorf("could not save snapshot: %w", err),
@@ -616,7 +623,7 @@ func (s *EventStore) Close() error {
616623
}
617624

618625
type SnapshotRecord struct {
619-
Id uuid.UUID `bson:"_id"`
626+
AggregateID uuid.UUID `bson:"aggregate_id"`
620627
RawData []byte `bson:"data"`
621628
Timestamp time.Time `bson:"timestamp"`
622629
Version int `bson:"version"`

0 commit comments

Comments
 (0)