Skip to content

Commit 2f5a77e

Browse files
Limit size of presence EDUs (#17371)
Otherwise they are unbounded. --------- Co-authored-by: Andrew Morgan <[email protected]>
1 parent b11f5c9 commit 2f5a77e

File tree

3 files changed

+140
-11
lines changed

3 files changed

+140
-11
lines changed

changelog.d/17371.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Limit size of presence EDUs to 50 entries.

synapse/federation/sender/per_destination_queue.py

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#
2222
import datetime
2323
import logging
24+
from collections import OrderedDict
2425
from types import TracebackType
2526
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, Type
2627

@@ -68,6 +69,10 @@
6869
# If the retry interval is larger than this then we enter "catchup" mode
6970
CATCHUP_RETRY_INTERVAL = 60 * 60 * 1000
7071

72+
# Limit how many presence states we add to each presence EDU, to ensure that
73+
# they are bounded in size.
74+
MAX_PRESENCE_STATES_PER_EDU = 50
75+
7176

7277
class PerDestinationQueue:
7378
"""
@@ -144,7 +149,7 @@ def __init__(
144149

145150
# Map of user_id -> UserPresenceState of pending presence to be sent to this
146151
# destination
147-
self._pending_presence: Dict[str, UserPresenceState] = {}
152+
self._pending_presence: OrderedDict[str, UserPresenceState] = OrderedDict()
148153

149154
# List of room_id -> receipt_type -> user_id -> receipt_dict,
150155
#
@@ -399,7 +404,7 @@ async def _transaction_transmission_loop(self) -> None:
399404
# through another mechanism, because this is all volatile!
400405
self._pending_edus = []
401406
self._pending_edus_keyed = {}
402-
self._pending_presence = {}
407+
self._pending_presence.clear()
403408
self._pending_receipt_edus = []
404409

405410
self._start_catching_up()
@@ -721,22 +726,26 @@ async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:
721726

722727
# Add presence EDU.
723728
if self.queue._pending_presence:
729+
# Only send max 50 presence entries in the EDU, to bound the amount
730+
# of data we're sending.
731+
presence_to_add: List[JsonDict] = []
732+
while (
733+
self.queue._pending_presence
734+
and len(presence_to_add) < MAX_PRESENCE_STATES_PER_EDU
735+
):
736+
_, presence = self.queue._pending_presence.popitem(last=False)
737+
presence_to_add.append(
738+
format_user_presence_state(presence, self.queue._clock.time_msec())
739+
)
740+
724741
pending_edus.append(
725742
Edu(
726743
origin=self.queue._server_name,
727744
destination=self.queue._destination,
728745
edu_type=EduTypes.PRESENCE,
729-
content={
730-
"push": [
731-
format_user_presence_state(
732-
presence, self.queue._clock.time_msec()
733-
)
734-
for presence in self.queue._pending_presence.values()
735-
]
736-
},
746+
content={"push": presence_to_add},
737747
)
738748
)
739-
self.queue._pending_presence = {}
740749

741750
# Add read receipt EDUs.
742751
pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5))

tests/federation/test_federation_sender.py

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
from twisted.test.proto_helpers import MemoryReactor
2828

2929
from synapse.api.constants import EduTypes, RoomEncryptionAlgorithms
30+
from synapse.api.presence import UserPresenceState
31+
from synapse.federation.sender.per_destination_queue import MAX_PRESENCE_STATES_PER_EDU
3032
from synapse.federation.units import Transaction
3133
from synapse.handlers.device import DeviceHandler
3234
from synapse.rest import admin
@@ -266,6 +268,123 @@ def test_send_receipts_with_backoff(self) -> None:
266268
)
267269

268270

271+
class FederationSenderPresenceTestCases(HomeserverTestCase):
272+
"""
273+
Test federation sending for presence updates.
274+
"""
275+
276+
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
277+
self.federation_transport_client = Mock(spec=["send_transaction"])
278+
self.federation_transport_client.send_transaction = AsyncMock()
279+
hs = self.setup_test_homeserver(
280+
federation_transport_client=self.federation_transport_client,
281+
)
282+
283+
return hs
284+
285+
def default_config(self) -> JsonDict:
286+
config = super().default_config()
287+
config["federation_sender_instances"] = None
288+
return config
289+
290+
def test_presence_simple(self) -> None:
291+
"Test that sending a single presence update works"
292+
293+
mock_send_transaction: AsyncMock = (
294+
self.federation_transport_client.send_transaction
295+
)
296+
mock_send_transaction.return_value = {}
297+
298+
sender = self.hs.get_federation_sender()
299+
self.get_success(
300+
sender.send_presence_to_destinations(
301+
[UserPresenceState.default("@user:test")],
302+
["server"],
303+
)
304+
)
305+
306+
self.pump()
307+
308+
# expect a call to send_transaction
309+
mock_send_transaction.assert_awaited_once()
310+
311+
json_cb = mock_send_transaction.call_args[0][1]
312+
data = json_cb()
313+
self.assertEqual(
314+
data["edus"],
315+
[
316+
{
317+
"edu_type": EduTypes.PRESENCE,
318+
"content": {
319+
"push": [
320+
{
321+
"presence": "offline",
322+
"user_id": "@user:test",
323+
}
324+
]
325+
},
326+
}
327+
],
328+
)
329+
330+
def test_presence_batched(self) -> None:
331+
"""Test that sending lots of presence updates to a destination are
332+
batched, rather than having them all sent in one EDU."""
333+
334+
mock_send_transaction: AsyncMock = (
335+
self.federation_transport_client.send_transaction
336+
)
337+
mock_send_transaction.return_value = {}
338+
339+
sender = self.hs.get_federation_sender()
340+
341+
# We now send lots of presence updates to force the federation sender to
342+
# batch the mup.
343+
number_presence_updates_to_send = MAX_PRESENCE_STATES_PER_EDU * 2
344+
self.get_success(
345+
sender.send_presence_to_destinations(
346+
[
347+
UserPresenceState.default(f"@user{i}:test")
348+
for i in range(number_presence_updates_to_send)
349+
],
350+
["server"],
351+
)
352+
)
353+
354+
self.pump()
355+
356+
# We should have seen at least one transcation be sent by now.
357+
mock_send_transaction.assert_called()
358+
359+
# We don't want to specify exactly how the presence EDUs get sent out,
360+
# could be one per transaction or multiple per transaction. We just want
361+
# to assert that a) each presence EDU has bounded number of updates, and
362+
# b) that all updates get sent out.
363+
presence_edus = []
364+
for transaction_call in mock_send_transaction.call_args_list:
365+
json_cb = transaction_call[0][1]
366+
data = json_cb()
367+
368+
for edu in data["edus"]:
369+
self.assertEqual(edu.get("edu_type"), EduTypes.PRESENCE)
370+
presence_edus.append(edu)
371+
372+
# A set of all user presence we see, this should end up matching the
373+
# number we sent out above.
374+
seen_users: Set[str] = set()
375+
376+
for edu in presence_edus:
377+
presence_states = edu["content"]["push"]
378+
379+
# This is where we actually check that the number of presence
380+
# updates is bounded.
381+
self.assertLessEqual(len(presence_states), MAX_PRESENCE_STATES_PER_EDU)
382+
383+
seen_users.update(p["user_id"] for p in presence_states)
384+
385+
self.assertEqual(len(seen_users), number_presence_updates_to_send)
386+
387+
269388
class FederationSenderDevicesTestCases(HomeserverTestCase):
270389
"""
271390
Test federation sending to update devices.

0 commit comments

Comments
 (0)