-
Notifications
You must be signed in to change notification settings - Fork 1.5k
BUG: Fix compress_identical_objects handling of StreamObjects and rem… #3440
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
ee04cec
4f2b464
438d5dc
0890a66
59812da
b523768
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -40,14 +40,13 @@ | |||
from re import Pattern | ||||
from types import TracebackType | ||||
from typing import ( | ||||
IO, | ||||
Any, | ||||
Callable, | ||||
cast, | ||||
IO, | ||||
Optional, | ||||
Union, | ||||
cast, | ||||
) | ||||
|
||||
from ._cmap import _default_fonts_space_width, build_char_map_from_dict | ||||
from ._doc_common import DocumentInformation, PdfDocCommon | ||||
from ._encryption import EncryptAlgorithm, Encryption | ||||
|
@@ -59,6 +58,7 @@ | |||
StreamType, | ||||
_get_max_pdf_version_header, | ||||
deprecation_no_replacement, | ||||
logger_debug, | ||||
logger_warning, | ||||
) | ||||
from .constants import AnnotationDictionaryAttributes as AA | ||||
|
@@ -80,7 +80,6 @@ | |||
from .constants import TrailerKeys as TK | ||||
from .errors import PyPdfError | ||||
from .generic import ( | ||||
PAGE_FIT, | ||||
ArrayObject, | ||||
BooleanObject, | ||||
ByteStringObject, | ||||
|
@@ -95,6 +94,7 @@ | |||
NameObject, | ||||
NullObject, | ||||
NumberObject, | ||||
PAGE_FIT, | ||||
PdfObject, | ||||
RectangleObject, | ||||
ReferenceLink, | ||||
|
@@ -1679,75 +1679,214 @@ def compress_identical_objects( | |||
remove_orphans: bool = True, | ||||
) -> None: | ||||
""" | ||||
Parse the PDF file and merge objects that have the same hash. | ||||
This will make objects common to multiple pages. | ||||
Recommended to be used just before writing output. | ||||
Compress identical objects in the PDF file. | ||||
|
||||
Args: | ||||
remove_identicals: Remove identical objects. | ||||
remove_orphans: Remove unreferenced objects. | ||||
""" | ||||
if not remove_identicals and not remove_orphans: | ||||
logger_warning( | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does this need a warning? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Usually, both args being set to True is a mistake. It can be the case that this happens programmatically as a result of some logic I don't even want to think about. In this case, just a warning is fine. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume you mean both arguments being set to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would advice not to use exceptions in this case. Warning is for debugging. But it's not my call. |
||||
"Remove identical objects and remove unreferenced objects are both disabled. Nothing has been done.", | ||||
__name__, | ||||
) | ||||
return | ||||
|
||||
orphans: Optional[list[bool]] = None | ||||
|
||||
if remove_identicals: | ||||
# When remove_orphans=True, also mark reachability while rewriting refs | ||||
orphans = self._remove_identical_objects(mark_orphans=remove_orphans) | ||||
|
||||
if remove_orphans: | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume that this closes #3306 as well? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, yes. I didn't realize that issue was there |
||||
if orphans is None: | ||||
# recursive DFS (mark phase) | ||||
orphans = self._perform_orphan_recursive_mark_phase() | ||||
# sweep phase | ||||
assert orphans is not None | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need this assertion? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mostly for defensive programming and so that next developers see that orphans must be something at that point. If somebody changes the method _perform_orphan_recursive_mark_phase and the new logic doesn't necessarily return the orphans at that point, it will stop at the assertion. It's better to have it fail at that point than later when using orphans expecting something else than None. |
||||
self._perform_orphan_sweep_phase(orphans) | ||||
|
||||
|
||||
def _remove_identical_objects(self, mark_orphans: bool) -> Optional[list[bool]]: | ||||
""" | ||||
Detect identical objects by hash and collapse duplicates. | ||||
If mark_orphans is True, mark reachable objects during the rewrite so that the caller | ||||
can later sweep truly unreachable ones. | ||||
|
||||
This function: | ||||
- groups objects by obj.hash_value(), --- WARNING: this can cause collisions !!! | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Which real-world effect and probability does this warning have? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe really large PDFs with repeating elements. I was just being extra. I don't really think it pays off to change this. |
||||
- keeps the first occurrence as the representative, | ||||
- for each duplicate sets self._objects[idx] = None (we do not insert an IndirectObject here), | ||||
- updates references inside *live* objects so that any IndirectObject that previously | ||||
pointed to a duplicate will point to the representative. | ||||
luis-cd marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||
|
||||
IMPORTANT: This function does NOT remove unreachable objects (orphans). | ||||
Removing unreachable objects is done by the orphan path (mark & sweep) outside. | ||||
|
||||
Args: | ||||
mark_orphans: if True, mark reachability while rewriting references. | ||||
|
||||
def replace_in_obj( | ||||
obj: PdfObject, crossref: dict[IndirectObject, IndirectObject] | ||||
Returns: | ||||
Optional[list[bool]]: a boolean list the size of self._objects where True means | ||||
"unvisited (potential orphan)" and False means "reachable". Returns None if | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this indentation correct? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep 👍 — that indentation is correct. If you want to be super PEP conforming, we can add a blank line between the sections Args and Returns. |
||||
mark_orphans is False. | ||||
""" | ||||
# ----- | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks superfluous. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can change it. But I would make it clear that True means unvisited but not necessarily orphan, until the end of the mark phase. |
||||
def replace_indirect_object( | ||||
obj: PdfObject, | ||||
crossref: dict[IndirectObject, IndirectObject], | ||||
mark_orphans: bool, | ||||
) -> None: | ||||
if isinstance(obj, DictionaryObject): | ||||
key_val = obj.items() | ||||
""" | ||||
Replace an indirect object with its representative. | ||||
If mark_orphans is True, mark reachable objects during the rewrite so that the caller | ||||
can later sweep truly unreachable ones. | ||||
|
||||
--- SECURITY CONCERN !!! | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please propose a solution on how you recommend to mitigate this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you're asking about the SECURITY CONCERN !!! thing The dirtiest solution is to set a MAX_DEPTH int variable and if exceeded, stop the mark phase and the compression, raise the necessary errors. Timeouts or iteration caps of some short go in the same direction. Although I believe it's a good idea to implement this, even if they are optional arguments, I don't think it would be a complete solution. A better, more complete solution, would be to use stacks instead of recursion. |
||||
This function is recursive. A maliciusly crafted PDF file could cause an infinite loop | ||||
""" | ||||
if isinstance(obj, (DictionaryObject, StreamObject)): | ||||
key_value_pairs = obj.items() | ||||
elif isinstance(obj, ArrayObject): | ||||
key_val = enumerate(obj) # type: ignore | ||||
else: | ||||
key_value_pairs = enumerate(obj) | ||||
else: # primitives | ||||
return | ||||
assert isinstance(obj, (DictionaryObject, ArrayObject)) | ||||
for k, v in key_val: | ||||
if isinstance(v, IndirectObject): | ||||
orphans[v.idnum - 1] = False | ||||
if v in crossref: | ||||
obj[k] = crossref[v] | ||||
|
||||
# Traverse nested containers | ||||
for key, value in key_value_pairs: | ||||
if isinstance(value, IndirectObject): | ||||
if mark_orphans and orphans is not None: | ||||
# idnum is 1-based; list is 0-based | ||||
orphans[value.idnum - 1] = False | ||||
if value in crossref: | ||||
obj[key] = crossref[value] | ||||
else: | ||||
"""the filtering on DictionaryObject and ArrayObject only | ||||
will be performed within replace_in_obj""" | ||||
replace_in_obj(v, crossref) | ||||
|
||||
# _idnum_hash :dict[hash]=(1st_ind_obj,[other_indir_objs,...]) | ||||
self._idnum_hash = {} | ||||
orphans = [True] * len(self._objects) | ||||
# look for similar objects | ||||
for idx, obj in enumerate(self._objects): | ||||
replace_indirect_object(value, crossref, mark_orphans) | ||||
# ----- | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. # ----- marks the end of the inner function. I believe it adds readability. But we can remove it if you want. |
||||
|
||||
# Map: hash_value -> (representative_indirect, [duplicate_indirects...]) | ||||
self._idnum_hash: dict[int, tuple[IndirectObject, list[IndirectObject]]] = {} | ||||
objects = list(self._objects) # save copy | ||||
orphans: Optional[list[bool]] = [True] * len(objects) if mark_orphans else None | ||||
|
||||
# PHASE 1: Group objects by hash and blank out duplicates | ||||
for idx, obj in enumerate(objects): | ||||
if is_null_or_none(obj): | ||||
continue | ||||
assert obj is not None, "mypy" # mypy: TypeGuard of `is_null_or_none` does not help here. | ||||
assert obj is not None | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please elaborate on this change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably my mistake, having , "mypy in is a good idea. |
||||
assert isinstance(obj.indirect_reference, IndirectObject) | ||||
h = obj.hash_value() | ||||
if remove_identicals and h in self._idnum_hash: | ||||
self._idnum_hash[h][1].append(obj.indirect_reference) | ||||
hsh = obj.hash_value() | ||||
if hsh in self._idnum_hash: | ||||
# duplicate: record its indirect and clear the slot | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "its indirect" what? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch — that comment is a bit shorthand-y. The “its indirect” means its IndirectObject reference (the pointer that other PDF objects use to refer to it). |
||||
self._idnum_hash[hsh][1].append(obj.indirect_reference) | ||||
self._objects[idx] = None | ||||
""" | ||||
Here we are mutating self._objects in-place, while iterating at the | ||||
same time. If multi-threading is added in the future, this could | ||||
cause a race condition. | ||||
""" | ||||
else: | ||||
self._idnum_hash[h] = (obj.indirect_reference, []) | ||||
self._idnum_hash[hsh] = (obj.indirect_reference, []) | ||||
|
||||
# generate the dict converting others to 1st | ||||
cnv = {v[0]: v[1] for v in self._idnum_hash.values() if len(v[1]) > 0} | ||||
# PHASE 2: Build reverse conversion map {dup_indirect -> rep_indirect} | ||||
cnv_rev: dict[IndirectObject, IndirectObject] = {} | ||||
for k, v in cnv.items(): | ||||
cnv_rev.update(zip(v, (k,) * len(v))) | ||||
for rep, dups in self._idnum_hash.values(): | ||||
for dup in dups: | ||||
cnv_rev[dup] = rep | ||||
|
||||
# replace reference to merged objects | ||||
# PHASE 3: Rewrite references inside remaining live container objects | ||||
for obj in self._objects: | ||||
if isinstance(obj, (DictionaryObject, ArrayObject)): | ||||
replace_in_obj(obj, cnv_rev) | ||||
if isinstance(obj, (DictionaryObject, StreamObject, ArrayObject)): | ||||
replace_indirect_object(obj, cnv_rev, mark_orphans) | ||||
|
||||
# remove orphans (if applicable) | ||||
orphans[self.root_object.indirect_reference.idnum - 1] = False # type: ignore | ||||
if mark_orphans: | ||||
orphans[self.root_object.indirect_reference.idnum - 1] = False | ||||
orphans[self._info.indirect_reference.idnum - 1] = False | ||||
|
||||
try: | ||||
orphans[self._ID.indirect_reference.idnum - 1] = False | ||||
except AttributeError: | ||||
logger_debug( | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||
f"AttributeError: {__name__} has no attribute '_ID'.", | ||||
__name__ | ||||
) | ||||
return orphans | ||||
|
||||
orphans[self._info.indirect_reference.idnum - 1] = False # type: ignore | ||||
def _perform_orphan_recursive_mark_phase( | ||||
self | ||||
) -> list[bool]: | ||||
""" | ||||
Mark phase: mark reachable objects during the rewrite so that the caller | ||||
can later sweep truly unreachable ones. | ||||
|
||||
Returns: | ||||
list[bool]: a boolean list the size of self._objects where True means | ||||
"unvisited (potential orphan)" and False means "reachable". Returns None if | ||||
mark_orphans is False. | ||||
""" | ||||
# ----- | ||||
def mark_orphans( | ||||
obj: PdfObject | ||||
)-> list[bool]: | ||||
""" | ||||
Mark reachable objects during the rewrite so that the caller | ||||
can later sweep truly unreachable ones. | ||||
|
||||
--- SECURITY CONCERN !!! | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please propose a solution for this concern. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The dirtiest solution is to set a MAX_DEPTH int variable and if exceeded, stop the mark phase and the compression, raise the necessary errors. Timeouts or iteration caps of some short go in the same direction. Although I believe it's a good idea to implement this, even if they are optional arguments, I don't think it would be a complete solution. A better, more complete solution, would be to use stacks instead of recursion. |
||||
This function is recursive. A maliciusly crafted PDF file could cause an infinite loop | ||||
""" | ||||
if isinstance(obj, (DictionaryObject, StreamObject)): | ||||
key_value_pairs = obj.items() | ||||
elif isinstance(obj, ArrayObject): | ||||
key_value_pairs = enumerate(obj) | ||||
else: # ignore primitives | ||||
return | ||||
assert isinstance(obj, (DictionaryObject, StreamObject, ArrayObject)) | ||||
|
||||
for key, value in key_value_pairs: | ||||
if isinstance(value, IndirectObject): | ||||
orphans[value.idnum - 1] = False | ||||
else: | ||||
mark_orphans(value) | ||||
# ----- | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment is superfluous. |
||||
objects = list(self._objects) | ||||
orphans = [True] * len(objects) | ||||
|
||||
for obj in self._objects: | ||||
if isinstance(obj, (DictionaryObject, StreamObject, ArrayObject)): | ||||
mark_orphans(obj) | ||||
|
||||
orphans[self.root_object.indirect_reference.idnum - 1] = False | ||||
orphans[self._info.indirect_reference.idnum - 1] = False | ||||
|
||||
try: | ||||
orphans[self._ID.indirect_reference.idnum - 1] = False # type: ignore | ||||
orphans[self._ID.indirect_reference.idnum - 1] = False | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See above. |
||||
except AttributeError: | ||||
pass | ||||
logger_debug( | ||||
f"AttributeError: {__name__} has no attribute '_ID'.", | ||||
__name__ | ||||
) | ||||
return orphans | ||||
|
||||
def _perform_orphan_sweep_phase( | ||||
self, | ||||
orphans: list[bool], | ||||
) -> None: | ||||
""" | ||||
Perform orphan sweep phase removing the unreachable objects. | ||||
|
||||
Args: | ||||
orphans (list[bool]): a boolean list the size of self._objects where True means | ||||
"unvisited (potential orphan)" and False means "reachable". | ||||
""" | ||||
for i in compress(range(len(self._objects)), orphans): | ||||
self._objects[i] = None | ||||
|
||||
|
||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These empty lines look wrong. |
||||
|
||||
|
||||
|
||||
|
||||
def get_reference(self, obj: PdfObject) -> IndirectObject: | ||||
idnum = self._objects.index(obj) + 1 | ||||
ref = IndirectObject(idnum, 0, self) | ||||
|
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -2485,6 +2485,74 @@ def test_compress_identical_objects(): | |||||||
assert len(out2.getvalue()) > len(out3.getvalue()) | ||||||||
|
||||||||
|
||||||||
@pytest.mark.enable_socket | ||||||||
def test_compress_identical_objects_add_and_remove_page(): | ||||||||
""" | ||||||||
This test is a regression test for issue #3418. | ||||||||
|
||||||||
compress_identical_objects restores the PDF size to approximately the single-page | ||||||||
size after removing a duplicated page. | ||||||||
""" | ||||||||
SAMPLE_URLS = [ | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The preferred way to handle multiple cases is to use parameterization as this produces nicer error messages. Additionally: No need to download the sample files - they can be used directly from the source checkout and |
||||||||
"https://raw.githubusercontent.com/py-pdf/sample-files/main/001-trivial/minimal-document.pdf", | ||||||||
"https://raw.githubusercontent.com/py-pdf/sample-files/main/011-google-doc-document/google-doc-document.pdf", | ||||||||
] | ||||||||
|
||||||||
for url in SAMPLE_URLS: | ||||||||
name = url.split("/")[-1] | ||||||||
in_bytes = BytesIO(get_data_from_url(url, name=name)) | ||||||||
|
||||||||
reader = PdfReader(in_bytes) | ||||||||
|
||||||||
# baseline writer with 1 copy (single-page PDF) | ||||||||
writer_single = PdfWriter() | ||||||||
writer_single.append(reader) | ||||||||
out_single = BytesIO() | ||||||||
writer_single.write(out_single) | ||||||||
single_len = len(out_single.getvalue()) | ||||||||
|
||||||||
# writer with 2 copies (duplicated pages) | ||||||||
writer_double = PdfWriter() | ||||||||
writer_double.append(reader) | ||||||||
writer_double.append(reader) | ||||||||
out_double = BytesIO() | ||||||||
writer_double.write(out_double) | ||||||||
double_len = len(out_double.getvalue()) | ||||||||
|
||||||||
# remove duplicated page and compress | ||||||||
try: | ||||||||
writer_double.remove_page(len(writer_double.pages) - 1) | ||||||||
except TypeError: | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should not need any exception handling in tests. Could you please elaborate why this is required here? |
||||||||
writer_double.remove_page(len(writer_double.pages) - 1, True) | ||||||||
|
||||||||
writer_double.compress_identical_objects(remove_orphans=True) | ||||||||
|
||||||||
out_after = BytesIO() | ||||||||
writer_double.write(out_after) | ||||||||
after_len = len(out_after.getvalue()) | ||||||||
|
||||||||
# tolerance: 3% of single_len or 2000 bytes, whichever is greater | ||||||||
tol = max(int(0.03 * single_len), 2000) | ||||||||
|
||||||||
assert abs(after_len - single_len) <= tol, ( | ||||||||
f"[{url}] expected size ≈ {single_len} ± {tol} bytes, got {after_len} " | ||||||||
f"(1p={single_len}, 2p={double_len})" | ||||||||
) | ||||||||
|
||||||||
luis-cd marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
def test_compress_identical_objects_noop(caplog): | ||||||||
writer = PdfWriter() | ||||||||
writer.add_blank_page(width=72, height=72) | ||||||||
|
||||||||
# Ejecutamos con ambos flags en False | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use English (instead of Spanish?) |
||||||||
with caplog.at_level("WARNING"): | ||||||||
writer.compress_identical_objects(remove_identicals=False, remove_orphans=False) | ||||||||
|
||||||||
# Verificamos que el warning se haya emitido | ||||||||
assert "Nothing has been done" in caplog.text | ||||||||
|
||||||||
# Verificamos que el documento sigue intacto (sigue teniendo 1 página) | ||||||||
assert len(writer.pages) == 1 | ||||||||
|
||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
def test_set_need_appearances_writer(): | ||||||||
"""Minimal test for coverage""" | ||||||||
writer = PdfWriter() | ||||||||
|
Uh oh!
There was an error while loading. Please reload this page.