|
35 | 35 | from synapse.util import Clock
|
36 | 36 |
|
37 | 37 | from tests import unittest
|
| 38 | +from tests.media.test_media_storage import small_png |
38 | 39 | from tests.test_utils import SMALL_PNG
|
39 | 40 |
|
40 | 41 |
|
@@ -146,3 +147,112 @@ def test_file_download(self) -> None:
|
146 | 147 | # check that the png file exists and matches what was uploaded
|
147 | 148 | found_file = any(SMALL_PNG in field for field in stripped_bytes)
|
148 | 149 | self.assertTrue(found_file)
|
| 150 | + |
| 151 | + |
| 152 | +class FederationThumbnailTest(unittest.FederatingHomeserverTestCase): |
| 153 | + |
| 154 | + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: |
| 155 | + super().prepare(reactor, clock, hs) |
| 156 | + self.test_dir = tempfile.mkdtemp(prefix="synapse-tests-") |
| 157 | + self.addCleanup(shutil.rmtree, self.test_dir) |
| 158 | + self.primary_base_path = os.path.join(self.test_dir, "primary") |
| 159 | + self.secondary_base_path = os.path.join(self.test_dir, "secondary") |
| 160 | + |
| 161 | + hs.config.media.media_store_path = self.primary_base_path |
| 162 | + |
| 163 | + storage_providers = [ |
| 164 | + StorageProviderWrapper( |
| 165 | + FileStorageProviderBackend(hs, self.secondary_base_path), |
| 166 | + store_local=True, |
| 167 | + store_remote=False, |
| 168 | + store_synchronous=True, |
| 169 | + ) |
| 170 | + ] |
| 171 | + |
| 172 | + self.filepaths = MediaFilePaths(self.primary_base_path) |
| 173 | + self.media_storage = MediaStorage( |
| 174 | + hs, self.primary_base_path, self.filepaths, storage_providers |
| 175 | + ) |
| 176 | + self.media_repo = hs.get_media_repository() |
| 177 | + |
| 178 | + def test_thumbnail_download_scaled(self) -> None: |
| 179 | + content = io.BytesIO(small_png.data) |
| 180 | + content_uri = self.get_success( |
| 181 | + self.media_repo.create_content( |
| 182 | + "image/png", |
| 183 | + "test_png_thumbnail", |
| 184 | + content, |
| 185 | + 67, |
| 186 | + UserID.from_string("@user_id:whatever.org"), |
| 187 | + ) |
| 188 | + ) |
| 189 | + # test with an image file |
| 190 | + channel = self.make_signed_federation_request( |
| 191 | + "GET", |
| 192 | + f"/_matrix/federation/v1/media/thumbnail/{content_uri.media_id}?width=32&height=32&method=scale", |
| 193 | + ) |
| 194 | + self.pump() |
| 195 | + self.assertEqual(200, channel.code) |
| 196 | + |
| 197 | + content_type = channel.headers.getRawHeaders("content-type") |
| 198 | + assert content_type is not None |
| 199 | + assert "multipart/mixed" in content_type[0] |
| 200 | + assert "boundary" in content_type[0] |
| 201 | + |
| 202 | + # extract boundary |
| 203 | + boundary = content_type[0].split("boundary=")[1] |
| 204 | + # split on boundary and check that json field and expected value exist |
| 205 | + body = channel.result.get("body") |
| 206 | + assert body is not None |
| 207 | + stripped_bytes = body.split(b"\r\n" + b"--" + boundary.encode("utf-8")) |
| 208 | + found_json = any( |
| 209 | + b"\r\nContent-Type: application/json\r\n\r\n{}" in field |
| 210 | + for field in stripped_bytes |
| 211 | + ) |
| 212 | + self.assertTrue(found_json) |
| 213 | + |
| 214 | + # check that the png file exists and matches the expected scaled bytes |
| 215 | + found_file = any(small_png.expected_scaled in field for field in stripped_bytes) |
| 216 | + self.assertTrue(found_file) |
| 217 | + |
| 218 | + def test_thumbnail_download_cropped(self) -> None: |
| 219 | + content = io.BytesIO(small_png.data) |
| 220 | + content_uri = self.get_success( |
| 221 | + self.media_repo.create_content( |
| 222 | + "image/png", |
| 223 | + "test_png_thumbnail", |
| 224 | + content, |
| 225 | + 67, |
| 226 | + UserID.from_string("@user_id:whatever.org"), |
| 227 | + ) |
| 228 | + ) |
| 229 | + # test with an image file |
| 230 | + channel = self.make_signed_federation_request( |
| 231 | + "GET", |
| 232 | + f"/_matrix/federation/v1/media/thumbnail/{content_uri.media_id}?width=32&height=32&method=crop", |
| 233 | + ) |
| 234 | + self.pump() |
| 235 | + self.assertEqual(200, channel.code) |
| 236 | + |
| 237 | + content_type = channel.headers.getRawHeaders("content-type") |
| 238 | + assert content_type is not None |
| 239 | + assert "multipart/mixed" in content_type[0] |
| 240 | + assert "boundary" in content_type[0] |
| 241 | + |
| 242 | + # extract boundary |
| 243 | + boundary = content_type[0].split("boundary=")[1] |
| 244 | + # split on boundary and check that json field and expected value exist |
| 245 | + body = channel.result.get("body") |
| 246 | + assert body is not None |
| 247 | + stripped_bytes = body.split(b"\r\n" + b"--" + boundary.encode("utf-8")) |
| 248 | + found_json = any( |
| 249 | + b"\r\nContent-Type: application/json\r\n\r\n{}" in field |
| 250 | + for field in stripped_bytes |
| 251 | + ) |
| 252 | + self.assertTrue(found_json) |
| 253 | + |
| 254 | + # check that the png file exists and matches the expected cropped bytes |
| 255 | + found_file = any( |
| 256 | + small_png.expected_cropped in field for field in stripped_bytes |
| 257 | + ) |
| 258 | + self.assertTrue(found_file) |
0 commit comments