@@ -20,14 +20,16 @@ import (
20
20
"strings"
21
21
22
22
"github.com/gorilla/mux"
23
- "github.com/matrix-org/dendrite/setup/base"
24
- userapi "github.com/matrix-org/dendrite/userapi/api"
25
23
"github.com/matrix-org/gomatrixserverlib/fclient"
26
24
"github.com/matrix-org/gomatrixserverlib/spec"
27
25
"github.com/matrix-org/util"
28
26
"github.com/nats-io/nats.go"
29
27
"github.com/prometheus/client_golang/prometheus"
30
28
"github.com/sirupsen/logrus"
29
+ "golang.org/x/sync/singleflight"
30
+
31
+ "github.com/matrix-org/dendrite/setup/base"
32
+ userapi "github.com/matrix-org/dendrite/userapi/api"
31
33
32
34
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
33
35
"github.com/matrix-org/dendrite/clientapi/api"
@@ -84,6 +86,14 @@ func Setup(
84
86
unstableFeatures ["org.matrix." + msc ] = true
85
87
}
86
88
89
+ // singleflight protects /join endpoints from being invoked
90
+ // multiple times from the same user and room, otherwise
91
+ // a state reset can occur. This also avoids unneeded
92
+ // state calculations.
93
+ // TODO: actually fix this in the roomserver, as there are
94
+ // possibly other ways that can result in a stat reset.
95
+ sf := singleflight.Group {}
96
+
87
97
if cfg .Matrix .WellKnownClientName != "" {
88
98
logrus .Infof ("Setting m.homeserver base_url as %s at /.well-known/matrix/client" , cfg .Matrix .WellKnownClientName )
89
99
wkMux .Handle ("/client" , httputil .MakeExternalAPI ("wellknown" , func (r * http.Request ) util.JSONResponse {
@@ -264,9 +274,17 @@ func Setup(
264
274
if err != nil {
265
275
return util .ErrorResponse (err )
266
276
}
267
- return JoinRoomByIDOrAlias (
268
- req , device , rsAPI , userAPI , vars ["roomIDOrAlias" ],
269
- )
277
+ // Only execute a join for roomIDOrAlias and UserID once. If there is a join in progress
278
+ // it waits for it to complete and returns that result for subsequent requests.
279
+ resp , _ , _ := sf .Do (vars ["roomIDOrAlias" ]+ device .UserID , func () (any , error ) {
280
+ return JoinRoomByIDOrAlias (
281
+ req , device , rsAPI , userAPI , vars ["roomIDOrAlias" ],
282
+ ), nil
283
+ })
284
+ // once all joins are processed, drop them from the cache. Further requests
285
+ // will be processed as usual.
286
+ sf .Forget (vars ["roomIDOrAlias" ] + device .UserID )
287
+ return resp .(util.JSONResponse )
270
288
}, httputil .WithAllowGuests ()),
271
289
).Methods (http .MethodPost , http .MethodOptions )
272
290
@@ -300,9 +318,17 @@ func Setup(
300
318
if err != nil {
301
319
return util .ErrorResponse (err )
302
320
}
303
- return JoinRoomByIDOrAlias (
304
- req , device , rsAPI , userAPI , vars ["roomID" ],
305
- )
321
+ // Only execute a join for roomID and UserID once. If there is a join in progress
322
+ // it waits for it to complete and returns that result for subsequent requests.
323
+ resp , _ , _ := sf .Do (vars ["roomID" ]+ device .UserID , func () (any , error ) {
324
+ return JoinRoomByIDOrAlias (
325
+ req , device , rsAPI , userAPI , vars ["roomID" ],
326
+ ), nil
327
+ })
328
+ // once all joins are processed, drop them from the cache. Further requests
329
+ // will be processed as usual.
330
+ sf .Forget (vars ["roomID" ] + device .UserID )
331
+ return resp .(util.JSONResponse )
306
332
}, httputil .WithAllowGuests ()),
307
333
).Methods (http .MethodPost , http .MethodOptions )
308
334
v3mux .Handle ("/rooms/{roomID}/leave" ,
0 commit comments