Skip to content

Commit cf711ac

Browse files
authored
Reduce device lists replication traffic. (#17333)
Reduce the replication traffic of device lists, by not sending every destination that needs to be sent the device list update over replication. Instead a "hosts to send to have been calculated" notification over replication, and then federation senders read the destinations from the DB. For non federation senders this should heavily reduce the impact of a user in many large rooms changing a device.
1 parent 700d2cc commit cf711ac

File tree

6 files changed

+89
-48
lines changed

6 files changed

+89
-48
lines changed

changelog.d/17333.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Handle device lists notifications for large accounts more efficiently in worker mode.

synapse/replication/tcp/client.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,13 +114,19 @@ async def on_rdata(
114114
"""
115115
all_room_ids: Set[str] = set()
116116
if stream_name == DeviceListsStream.NAME:
117-
if any(row.entity.startswith("@") and not row.is_signature for row in rows):
117+
if any(not row.is_signature and not row.hosts_calculated for row in rows):
118118
prev_token = self.store.get_device_stream_token()
119119
all_room_ids = await self.store.get_all_device_list_changes(
120120
prev_token, token
121121
)
122122
self.store.device_lists_in_rooms_have_changed(all_room_ids, token)
123123

124+
# If we're sending federation we need to update the device lists
125+
# outbound pokes stream change cache with updated hosts.
126+
if self.send_handler and any(row.hosts_calculated for row in rows):
127+
hosts = await self.store.get_destinations_for_device(token)
128+
self.store.device_lists_outbound_pokes_have_changed(hosts, token)
129+
124130
self.store.process_replication_rows(stream_name, instance_name, token, rows)
125131
# NOTE: this must be called after process_replication_rows to ensure any
126132
# cache invalidations are first handled before any stream ID advances.
@@ -433,12 +439,11 @@ async def process_replication_rows(
433439
# The entities are either user IDs (starting with '@') whose devices
434440
# have changed, or remote servers that we need to tell about
435441
# changes.
436-
hosts = {
437-
row.entity
438-
for row in rows
439-
if not row.entity.startswith("@") and not row.is_signature
440-
}
441-
await self.federation_sender.send_device_messages(hosts, immediate=False)
442+
if any(row.hosts_calculated for row in rows):
443+
hosts = await self.store.get_destinations_for_device(token)
444+
await self.federation_sender.send_device_messages(
445+
hosts, immediate=False
446+
)
442447

443448
elif stream_name == ToDeviceStream.NAME:
444449
# The to_device stream includes stuff to be pushed to both local

synapse/replication/tcp/streams/_base.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -549,10 +549,14 @@ class DeviceListsStream(_StreamFromIdGen):
549549

550550
@attr.s(slots=True, frozen=True, auto_attribs=True)
551551
class DeviceListsStreamRow:
552-
entity: str
552+
user_id: str
553553
# Indicates that a user has signed their own device with their user-signing key
554554
is_signature: bool
555555

556+
# Indicates if this is a notification that we've calculated the hosts we
557+
# need to send the update to.
558+
hosts_calculated: bool
559+
556560
NAME = "device_lists"
557561
ROW_TYPE = DeviceListsStreamRow
558562

@@ -594,13 +598,13 @@ async def _update_function(
594598
upper_limit_token = min(upper_limit_token, signatures_to_token)
595599

596600
device_updates = [
597-
(stream_id, (entity, False))
598-
for stream_id, (entity,) in device_updates
601+
(stream_id, (entity, False, hosts))
602+
for stream_id, (entity, hosts) in device_updates
599603
if stream_id <= upper_limit_token
600604
]
601605

602606
signatures_updates = [
603-
(stream_id, (entity, True))
607+
(stream_id, (entity, True, False))
604608
for stream_id, (entity,) in signatures_updates
605609
if stream_id <= upper_limit_token
606610
]

synapse/storage/databases/main/devices.py

Lines changed: 58 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -164,22 +164,24 @@ def __init__(
164164
prefilled_cache=user_signature_stream_prefill,
165165
)
166166

167-
(
168-
device_list_federation_prefill,
169-
device_list_federation_list_id,
170-
) = self.db_pool.get_cache_dict(
171-
db_conn,
172-
"device_lists_outbound_pokes",
173-
entity_column="destination",
174-
stream_column="stream_id",
175-
max_value=device_list_max,
176-
limit=10000,
177-
)
178-
self._device_list_federation_stream_cache = StreamChangeCache(
179-
"DeviceListFederationStreamChangeCache",
180-
device_list_federation_list_id,
181-
prefilled_cache=device_list_federation_prefill,
182-
)
167+
self._device_list_federation_stream_cache = None
168+
if hs.should_send_federation():
169+
(
170+
device_list_federation_prefill,
171+
device_list_federation_list_id,
172+
) = self.db_pool.get_cache_dict(
173+
db_conn,
174+
"device_lists_outbound_pokes",
175+
entity_column="destination",
176+
stream_column="stream_id",
177+
max_value=device_list_max,
178+
limit=10000,
179+
)
180+
self._device_list_federation_stream_cache = StreamChangeCache(
181+
"DeviceListFederationStreamChangeCache",
182+
device_list_federation_list_id,
183+
prefilled_cache=device_list_federation_prefill,
184+
)
183185

184186
if hs.config.worker.run_background_tasks:
185187
self._clock.looping_call(
@@ -207,23 +209,30 @@ def _invalidate_caches_for_devices(
207209
) -> None:
208210
for row in rows:
209211
if row.is_signature:
210-
self._user_signature_stream_cache.entity_has_changed(row.entity, token)
212+
self._user_signature_stream_cache.entity_has_changed(row.user_id, token)
211213
continue
212214

213215
# The entities are either user IDs (starting with '@') whose devices
214216
# have changed, or remote servers that we need to tell about
215217
# changes.
216-
if row.entity.startswith("@"):
217-
self._device_list_stream_cache.entity_has_changed(row.entity, token)
218-
self.get_cached_devices_for_user.invalidate((row.entity,))
219-
self._get_cached_user_device.invalidate((row.entity,))
220-
self.get_device_list_last_stream_id_for_remote.invalidate((row.entity,))
221-
222-
else:
223-
self._device_list_federation_stream_cache.entity_has_changed(
224-
row.entity, token
218+
if not row.hosts_calculated:
219+
self._device_list_stream_cache.entity_has_changed(row.user_id, token)
220+
self.get_cached_devices_for_user.invalidate((row.user_id,))
221+
self._get_cached_user_device.invalidate((row.user_id,))
222+
self.get_device_list_last_stream_id_for_remote.invalidate(
223+
(row.user_id,)
225224
)
226225

226+
def device_lists_outbound_pokes_have_changed(
227+
self, destinations: StrCollection, token: int
228+
) -> None:
229+
assert self._device_list_federation_stream_cache is not None
230+
231+
for destination in destinations:
232+
self._device_list_federation_stream_cache.entity_has_changed(
233+
destination, token
234+
)
235+
227236
def device_lists_in_rooms_have_changed(
228237
self, room_ids: StrCollection, token: int
229238
) -> None:
@@ -363,6 +372,11 @@ async def get_device_updates_by_remote(
363372
EDU contents.
364373
"""
365374
now_stream_id = self.get_device_stream_token()
375+
if from_stream_id == now_stream_id:
376+
return now_stream_id, []
377+
378+
if self._device_list_federation_stream_cache is None:
379+
raise Exception("Func can only be used on federation senders")
366380

367381
has_changed = self._device_list_federation_stream_cache.has_entity_changed(
368382
destination, int(from_stream_id)
@@ -1018,10 +1032,10 @@ def _get_all_device_list_changes_for_remotes(
10181032
# This query Does The Right Thing where it'll correctly apply the
10191033
# bounds to the inner queries.
10201034
sql = """
1021-
SELECT stream_id, entity FROM (
1022-
SELECT stream_id, user_id AS entity FROM device_lists_stream
1035+
SELECT stream_id, user_id, hosts FROM (
1036+
SELECT stream_id, user_id, false AS hosts FROM device_lists_stream
10231037
UNION ALL
1024-
SELECT stream_id, destination AS entity FROM device_lists_outbound_pokes
1038+
SELECT DISTINCT stream_id, user_id, true AS hosts FROM device_lists_outbound_pokes
10251039
) AS e
10261040
WHERE ? < stream_id AND stream_id <= ?
10271041
ORDER BY stream_id ASC
@@ -1577,6 +1591,14 @@ def get_device_list_changes_in_room_txn(
15771591
get_device_list_changes_in_room_txn,
15781592
)
15791593

1594+
async def get_destinations_for_device(self, stream_id: int) -> StrCollection:
1595+
return await self.db_pool.simple_select_onecol(
1596+
table="device_lists_outbound_pokes",
1597+
keyvalues={"stream_id": stream_id},
1598+
retcol="destination",
1599+
desc="get_destinations_for_device",
1600+
)
1601+
15801602

15811603
class DeviceBackgroundUpdateStore(SQLBaseStore):
15821604
def __init__(
@@ -2112,12 +2134,13 @@ def _add_device_outbound_poke_to_stream_txn(
21122134
stream_ids: List[int],
21132135
context: Optional[Dict[str, str]],
21142136
) -> None:
2115-
for host in hosts:
2116-
txn.call_after(
2117-
self._device_list_federation_stream_cache.entity_has_changed,
2118-
host,
2119-
stream_ids[-1],
2120-
)
2137+
if self._device_list_federation_stream_cache:
2138+
for host in hosts:
2139+
txn.call_after(
2140+
self._device_list_federation_stream_cache.entity_has_changed,
2141+
host,
2142+
stream_ids[-1],
2143+
)
21212144

21222145
now = self._clock.time_msec()
21232146
stream_id_iterator = iter(stream_ids)

synapse/storage/databases/main/end_to_end_keys.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,9 @@ def process_replication_rows(
123123
if stream_name == DeviceListsStream.NAME:
124124
for row in rows:
125125
assert isinstance(row, DeviceListsStream.DeviceListsStreamRow)
126-
if row.entity.startswith("@"):
126+
if not row.hosts_calculated:
127127
self._get_e2e_device_keys_for_federation_query_inner.invalidate(
128-
(row.entity,)
128+
(row.user_id,)
129129
)
130130

131131
super().process_replication_rows(stream_name, instance_name, token, rows)

tests/storage/test_devices.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,14 @@ class DeviceStoreTestCase(HomeserverTestCase):
3636
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
3737
self.store = hs.get_datastores().main
3838

39+
def default_config(self) -> JsonDict:
40+
config = super().default_config()
41+
42+
# We 'enable' federation otherwise `get_device_updates_by_remote` will
43+
# throw an exception.
44+
config["federation_sender_instances"] = ["master"]
45+
return config
46+
3947
def add_device_change(self, user_id: str, device_ids: List[str], host: str) -> None:
4048
"""Add a device list change for the given device to
4149
`device_lists_outbound_pokes` table.

0 commit comments

Comments
 (0)