|
1 | 1 | # SPDX-License-Identifier: Apache-2.0
|
2 | 2 |
|
3 | 3 | import base64
|
| 4 | +import builtins |
4 | 5 | import hashlib
|
5 | 6 | import io
|
6 | 7 | import json
|
@@ -2031,6 +2032,95 @@ def test_upload_succeeds_custom_project_size_limit(
|
2031 | 2032 | ("example", "1.0", "add source file example-1.0.tar.gz", user),
|
2032 | 2033 | ]
|
2033 | 2034 |
|
| 2035 | + def test_upload_fails_with_oserror_on_metadata_write( |
| 2036 | + self, tmpdir, monkeypatch, pyramid_config, db_request |
| 2037 | + ): |
| 2038 | + monkeypatch.setattr(tempfile, "tempdir", str(tmpdir)) |
| 2039 | + monkeypatch.setattr( |
| 2040 | + legacy, "_is_valid_dist_file", lambda *a, **kw: (True, None) |
| 2041 | + ) |
| 2042 | + |
| 2043 | + user = UserFactory.create() |
| 2044 | + EmailFactory.create(user=user) |
| 2045 | + project = ProjectFactory.create() |
| 2046 | + release = ReleaseFactory.create(project=project, version="1.0") |
| 2047 | + RoleFactory.create(user=user, project=project) |
| 2048 | + |
| 2049 | + filename = "{}-{}-py2.py3-none-any.whl".format( |
| 2050 | + project.normalized_name.replace("-", "_"), |
| 2051 | + release.version, |
| 2052 | + ) |
| 2053 | + |
| 2054 | + db_request.user = user |
| 2055 | + pyramid_config.testing_securitypolicy(identity=user) |
| 2056 | + |
| 2057 | + db_request.user_agent = "warehouse-tests/6.6.6" |
| 2058 | + |
| 2059 | + wheel_metadata = dedent( |
| 2060 | + """ |
| 2061 | + Metadata-Version: 2.1 |
| 2062 | + Name: {project.name} |
| 2063 | + Version: {release.version} |
| 2064 | + """ |
| 2065 | + ).encode("utf-8") |
| 2066 | + |
| 2067 | + wheel_testdata = _get_whl_testdata( |
| 2068 | + name=project.normalized_name.replace("-", "_"), version=release.version |
| 2069 | + ) |
| 2070 | + wheel_md5 = hashlib.md5(wheel_testdata).hexdigest() |
| 2071 | + |
| 2072 | + content = FieldStorage() |
| 2073 | + content.filename = filename |
| 2074 | + content.file = io.BytesIO(wheel_testdata) |
| 2075 | + content.type = "application/octet-stream" |
| 2076 | + |
| 2077 | + db_request.POST = MultiDict( |
| 2078 | + { |
| 2079 | + "metadata_version": "2.1", |
| 2080 | + "name": project.name, |
| 2081 | + "version": release.version, |
| 2082 | + "filetype": "bdist_wheel", |
| 2083 | + "pyversion": "py2.py3", |
| 2084 | + "content": content, |
| 2085 | + "md5_digest": wheel_md5, |
| 2086 | + "wheel_metadata_version": "1.0", |
| 2087 | + "wheel_metadata": base64.b64encode(wheel_metadata).decode("ascii"), |
| 2088 | + } |
| 2089 | + ) |
| 2090 | + |
| 2091 | + storage_service = pretend.stub(store=lambda path, file_path, *, meta: None) |
| 2092 | + db_request.find_service = pretend.call_recorder( |
| 2093 | + lambda svc, name=None, context=None: { |
| 2094 | + IFileStorage: storage_service, |
| 2095 | + }.get(svc) |
| 2096 | + ) |
| 2097 | + |
| 2098 | + # Patch open to raise OSError |
| 2099 | + original_open = builtins.open |
| 2100 | + |
| 2101 | + def mock_open(file, mode="r", *args, **kwargs): |
| 2102 | + if str(file).endswith(".metadata"): |
| 2103 | + raise OSError("Filename too long") |
| 2104 | + return original_open(file, mode, *args, **kwargs) |
| 2105 | + |
| 2106 | + monkeypatch.setattr("builtins.open", mock_open) |
| 2107 | + |
| 2108 | + with pytest.raises(HTTPBadRequest) as excinfo: |
| 2109 | + legacy.file_upload(db_request) |
| 2110 | + |
| 2111 | + resp = excinfo.value |
| 2112 | + |
| 2113 | + assert resp.status_code == 400 |
| 2114 | + assert resp.status == f"400 Filename is too long: '{filename}'" |
| 2115 | + |
| 2116 | + assert db_request.metrics.increment.calls == [ |
| 2117 | + pretend.call("warehouse.upload.attempt"), |
| 2118 | + pretend.call( |
| 2119 | + "warehouse.upload.failed", |
| 2120 | + tags=["reason:filename-too-long", "filetype:bdist_wheel"], |
| 2121 | + ), |
| 2122 | + ] |
| 2123 | + |
2034 | 2124 | def test_upload_fails_with_previously_used_filename(
|
2035 | 2125 | self, pyramid_config, db_request
|
2036 | 2126 | ):
|
|
0 commit comments