Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions pypdf/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,20 @@ def logger_warning(msg: str, src: str) -> None:
logging.getLogger(src).warning(msg)


def logger_debug(msg: str, src: str) -> None:
"""
Use this instead of logger.debug directly.

That allows people to overwrite it more easily.

## Exception, warnings.warn, logger_warning, logger_debug
- Exceptions → cuando el usuario debe manejar el error (PDF roto).
- warnings.warn → cuando el usuario debe corregir su código (ej: DeprecationWarnings).
- logger_warning → cuando pypdf maneja un problema pero sigue funcionando (modo strict=False).
- logger_debug → para información de depuración detallada, útil para desarrolladores.
"""
logging.getLogger(src).debug(msg)

def rename_kwargs(
func_name: str, kwargs: dict[str, Any], aliases: dict[str, str], fail: bool = False
) -> None:
Expand Down
229 changes: 184 additions & 45 deletions pypdf/_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,13 @@
from re import Pattern
from types import TracebackType
from typing import (
IO,
Any,
Callable,
cast,
IO,
Optional,
Union,
cast,
)

from ._cmap import _default_fonts_space_width, build_char_map_from_dict
from ._doc_common import DocumentInformation, PdfDocCommon
from ._encryption import EncryptAlgorithm, Encryption
Expand All @@ -59,6 +58,7 @@
StreamType,
_get_max_pdf_version_header,
deprecation_no_replacement,
logger_debug,
logger_warning,
)
from .constants import AnnotationDictionaryAttributes as AA
Expand All @@ -80,7 +80,6 @@
from .constants import TrailerKeys as TK
from .errors import PyPdfError
from .generic import (
PAGE_FIT,
ArrayObject,
BooleanObject,
ByteStringObject,
Expand All @@ -95,6 +94,7 @@
NameObject,
NullObject,
NumberObject,
PAGE_FIT,
PdfObject,
RectangleObject,
ReferenceLink,
Expand Down Expand Up @@ -1679,75 +1679,214 @@ def compress_identical_objects(
remove_orphans: bool = True,
) -> None:
"""
Parse the PDF file and merge objects that have the same hash.
This will make objects common to multiple pages.
Recommended to be used just before writing output.
Compress identical objects in the PDF file.

Args:
remove_identicals: Remove identical objects.
remove_orphans: Remove unreferenced objects.
"""
if not remove_identicals and not remove_orphans:
logger_warning(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need a warning?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually, both args being set to True is a mistake. It can be the case that this happens programmatically as a result of some logic I don't even want to think about. In this case, just a warning is fine.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you mean both arguments being set to False is a mistake. While this method does not have an effect in this case, I do not see why we should issue a warning. If we want to completely avoid this, we should issue a proper exception to point the user to a mis-use. Warnings might be read, but are not guaranteed to be always read.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would advice not to use exceptions in this case. Warning is for debugging. But it's not my call.

"Remove identical objects and remove unreferenced objects are both disabled. Nothing has been done.",
__name__,
)
return

orphans: Optional[list[bool]] = None

if remove_identicals:
# When remove_orphans=True, also mark reachability while rewriting refs
orphans = self._remove_identical_objects(mark_orphans=remove_orphans)

if remove_orphans:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that this closes #3306 as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yes. I didn't realize that issue was there

if orphans is None:
# recursive DFS (mark phase)
orphans = self._perform_orphan_recursive_mark_phase()
# sweep phase
assert orphans is not None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this assertion?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly for defensive programming and so that next developers see that orphans must be something at that point. If somebody changes the method _perform_orphan_recursive_mark_phase and the new logic doesn't necessarily return the orphans at that point, it will stop at the assertion. It's better to have it fail at that point than later when using orphans expecting something else than None.

self._perform_orphan_sweep_phase(orphans)


def _remove_identical_objects(self, mark_orphans: bool) -> Optional[list[bool]]:
"""
Detect identical objects by hash and collapse duplicates.
If mark_orphans is True, mark reachable objects during the rewrite so that the caller
can later sweep truly unreachable ones.

This function:
- groups objects by obj.hash_value(), --- WARNING: this can cause collisions !!!
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which real-world effect and probability does this warning have?

Copy link
Author

@luis-cd luis-cd Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe really large PDFs with repeating elements. I was just being extra. I don't really think it pays off to change this.

- keeps the first occurrence as the representative,
- for each duplicate sets self._objects[idx] = None (we do not insert an IndirectObject here),
- updates references inside *live* objects so that any IndirectObject that previously
pointed to a duplicate will point to the representative.

IMPORTANT: This function does NOT remove unreachable objects (orphans).
Removing unreachable objects is done by the orphan path (mark & sweep) outside.

Args:
mark_orphans: if True, mark reachability while rewriting references.

def replace_in_obj(
obj: PdfObject, crossref: dict[IndirectObject, IndirectObject]
Returns:
Optional[list[bool]]: a boolean list the size of self._objects where True means
"unvisited (potential orphan)" and False means "reachable". Returns None if
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this indentation correct?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep 👍 — that indentation is correct.
Well as the Google-style docstring convention.

If you want to be super PEP conforming, we can add a blank line between the sections Args and Returns.

mark_orphans is False.
"""
# -----
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks superfluous.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can change it. But I would make it clear that True means unvisited but not necessarily orphan, until the end of the mark phase.

def replace_indirect_object(
obj: PdfObject,
crossref: dict[IndirectObject, IndirectObject],
mark_orphans: bool,
) -> None:
if isinstance(obj, DictionaryObject):
key_val = obj.items()
"""
Replace an indirect object with its representative.
If mark_orphans is True, mark reachable objects during the rewrite so that the caller
can later sweep truly unreachable ones.

--- SECURITY CONCERN !!!
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please propose a solution on how you recommend to mitigate this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're asking about the SECURITY CONCERN !!! thing

The dirtiest solution is to set a MAX_DEPTH int variable and if exceeded, stop the mark phase and the compression, raise the necessary errors. Timeouts or iteration caps of some short go in the same direction. Although I believe it's a good idea to implement this, even if they are optional arguments, I don't think it would be a complete solution.

A better, more complete solution, would be to use stacks instead of recursion.

This function is recursive. A maliciusly crafted PDF file could cause an infinite loop
"""
if isinstance(obj, (DictionaryObject, StreamObject)):
key_value_pairs = obj.items()
elif isinstance(obj, ArrayObject):
key_val = enumerate(obj) # type: ignore
else:
key_value_pairs = enumerate(obj)
else: # primitives
return
assert isinstance(obj, (DictionaryObject, ArrayObject))
for k, v in key_val:
if isinstance(v, IndirectObject):
orphans[v.idnum - 1] = False
if v in crossref:
obj[k] = crossref[v]

# Traverse nested containers
for key, value in key_value_pairs:
if isinstance(value, IndirectObject):
if mark_orphans and orphans is not None:
# idnum is 1-based; list is 0-based
orphans[value.idnum - 1] = False
if value in crossref:
obj[key] = crossref[value]
else:
"""the filtering on DictionaryObject and ArrayObject only
will be performed within replace_in_obj"""
replace_in_obj(v, crossref)

# _idnum_hash :dict[hash]=(1st_ind_obj,[other_indir_objs,...])
self._idnum_hash = {}
orphans = [True] * len(self._objects)
# look for similar objects
for idx, obj in enumerate(self._objects):
replace_indirect_object(value, crossref, mark_orphans)
# -----
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# -----

Copy link
Author

@luis-cd luis-cd Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# ----- marks the end of the inner function. I believe it adds readability. But we can remove it if you want.


# Map: hash_value -> (representative_indirect, [duplicate_indirects...])
self._idnum_hash: dict[int, tuple[IndirectObject, list[IndirectObject]]] = {}
objects = list(self._objects) # save copy
orphans: Optional[list[bool]] = [True] * len(objects) if mark_orphans else None

# PHASE 1: Group objects by hash and blank out duplicates
for idx, obj in enumerate(objects):
if is_null_or_none(obj):
continue
assert obj is not None, "mypy" # mypy: TypeGuard of `is_null_or_none` does not help here.
assert obj is not None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please elaborate on this change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably my mistake, having

, "mypy

in is a good idea.

assert isinstance(obj.indirect_reference, IndirectObject)
h = obj.hash_value()
if remove_identicals and h in self._idnum_hash:
self._idnum_hash[h][1].append(obj.indirect_reference)
hsh = obj.hash_value()
if hsh in self._idnum_hash:
# duplicate: record its indirect and clear the slot
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"its indirect" what?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — that comment is a bit shorthand-y.

The “its indirect” means its IndirectObject reference (the pointer that other PDF objects use to refer to it).

self._idnum_hash[hsh][1].append(obj.indirect_reference)
self._objects[idx] = None
"""
Here we are mutating self._objects in-place, while iterating at the
same time. If multi-threading is added in the future, this could
cause a race condition.
"""
else:
self._idnum_hash[h] = (obj.indirect_reference, [])
self._idnum_hash[hsh] = (obj.indirect_reference, [])

# generate the dict converting others to 1st
cnv = {v[0]: v[1] for v in self._idnum_hash.values() if len(v[1]) > 0}
# PHASE 2: Build reverse conversion map {dup_indirect -> rep_indirect}
cnv_rev: dict[IndirectObject, IndirectObject] = {}
for k, v in cnv.items():
cnv_rev.update(zip(v, (k,) * len(v)))
for rep, dups in self._idnum_hash.values():
for dup in dups:
cnv_rev[dup] = rep

# replace reference to merged objects
# PHASE 3: Rewrite references inside remaining live container objects
for obj in self._objects:
if isinstance(obj, (DictionaryObject, ArrayObject)):
replace_in_obj(obj, cnv_rev)
if isinstance(obj, (DictionaryObject, StreamObject, ArrayObject)):
replace_indirect_object(obj, cnv_rev, mark_orphans)

# remove orphans (if applicable)
orphans[self.root_object.indirect_reference.idnum - 1] = False # type: ignore
if mark_orphans:
orphans[self.root_object.indirect_reference.idnum - 1] = False
orphans[self._info.indirect_reference.idnum - 1] = False

try:
orphans[self._ID.indirect_reference.idnum - 1] = False
except AttributeError:
logger_debug(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Instead of catching exception, please prefer checking the availability beforehand.
  2. I do not see any benefit of the debug logging in this case.

f"AttributeError: {__name__} has no attribute '_ID'.",
__name__
)
return orphans

orphans[self._info.indirect_reference.idnum - 1] = False # type: ignore
def _perform_orphan_recursive_mark_phase(
self
) -> list[bool]:
"""
Mark phase: mark reachable objects during the rewrite so that the caller
can later sweep truly unreachable ones.

Returns:
list[bool]: a boolean list the size of self._objects where True means
"unvisited (potential orphan)" and False means "reachable". Returns None if
mark_orphans is False.
"""
# -----
def mark_orphans(
obj: PdfObject
)-> list[bool]:
"""
Mark reachable objects during the rewrite so that the caller
can later sweep truly unreachable ones.

--- SECURITY CONCERN !!!
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please propose a solution for this concern.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dirtiest solution is to set a MAX_DEPTH int variable and if exceeded, stop the mark phase and the compression, raise the necessary errors. Timeouts or iteration caps of some short go in the same direction. Although I believe it's a good idea to implement this, even if they are optional arguments, I don't think it would be a complete solution.

A better, more complete solution, would be to use stacks instead of recursion.

This function is recursive. A maliciusly crafted PDF file could cause an infinite loop
"""
if isinstance(obj, (DictionaryObject, StreamObject)):
key_value_pairs = obj.items()
elif isinstance(obj, ArrayObject):
key_value_pairs = enumerate(obj)
else: # ignore primitives
return
assert isinstance(obj, (DictionaryObject, StreamObject, ArrayObject))

for key, value in key_value_pairs:
if isinstance(value, IndirectObject):
orphans[value.idnum - 1] = False
else:
mark_orphans(value)
# -----
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is superfluous.

objects = list(self._objects)
orphans = [True] * len(objects)

for obj in self._objects:
if isinstance(obj, (DictionaryObject, StreamObject, ArrayObject)):
mark_orphans(obj)

orphans[self.root_object.indirect_reference.idnum - 1] = False
orphans[self._info.indirect_reference.idnum - 1] = False

try:
orphans[self._ID.indirect_reference.idnum - 1] = False # type: ignore
orphans[self._ID.indirect_reference.idnum - 1] = False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above.

except AttributeError:
pass
logger_debug(
f"AttributeError: {__name__} has no attribute '_ID'.",
__name__
)
return orphans

def _perform_orphan_sweep_phase(
self,
orphans: list[bool],
) -> None:
"""
Perform orphan sweep phase removing the unreachable objects.

Args:
orphans (list[bool]): a boolean list the size of self._objects where True means
"unvisited (potential orphan)" and False means "reachable".
"""
for i in compress(range(len(self._objects)), orphans):
self._objects[i] = None


Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These empty lines look wrong.





def get_reference(self, obj: PdfObject) -> IndirectObject:
idnum = self._objects.index(obj) + 1
ref = IndirectObject(idnum, 0, self)
Expand Down
68 changes: 68 additions & 0 deletions tests/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2485,6 +2485,74 @@ def test_compress_identical_objects():
assert len(out2.getvalue()) > len(out3.getvalue())


@pytest.mark.enable_socket
def test_compress_identical_objects_add_and_remove_page():
"""
This test is a regression test for issue #3418.

compress_identical_objects restores the PDF size to approximately the single-page
size after removing a duplicated page.
"""
SAMPLE_URLS = [
Copy link
Collaborator

@stefan6419846 stefan6419846 Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The preferred way to handle multiple cases is to use parameterization as this produces nicer error messages.

Additionally: No need to download the sample files - they can be used directly from the source checkout and pytest.mark.enable_socket can be replaced by pytest.mark.samples.

"https://raw.githubusercontent.com/py-pdf/sample-files/main/001-trivial/minimal-document.pdf",
"https://raw.githubusercontent.com/py-pdf/sample-files/main/011-google-doc-document/google-doc-document.pdf",
]

for url in SAMPLE_URLS:
name = url.split("/")[-1]
in_bytes = BytesIO(get_data_from_url(url, name=name))

reader = PdfReader(in_bytes)

# baseline writer with 1 copy (single-page PDF)
writer_single = PdfWriter()
writer_single.append(reader)
out_single = BytesIO()
writer_single.write(out_single)
single_len = len(out_single.getvalue())

# writer with 2 copies (duplicated pages)
writer_double = PdfWriter()
writer_double.append(reader)
writer_double.append(reader)
out_double = BytesIO()
writer_double.write(out_double)
double_len = len(out_double.getvalue())

# remove duplicated page and compress
try:
writer_double.remove_page(len(writer_double.pages) - 1)
except TypeError:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not need any exception handling in tests. Could you please elaborate why this is required here?

writer_double.remove_page(len(writer_double.pages) - 1, True)

writer_double.compress_identical_objects(remove_orphans=True)

out_after = BytesIO()
writer_double.write(out_after)
after_len = len(out_after.getvalue())

# tolerance: 3% of single_len or 2000 bytes, whichever is greater
tol = max(int(0.03 * single_len), 2000)

assert abs(after_len - single_len) <= tol, (
f"[{url}] expected size ≈ {single_len} ± {tol} bytes, got {after_len} "
f"(1p={single_len}, 2p={double_len})"
)

def test_compress_identical_objects_noop(caplog):
writer = PdfWriter()
writer.add_blank_page(width=72, height=72)

# Ejecutamos con ambos flags en False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use English (instead of Spanish?)

with caplog.at_level("WARNING"):
writer.compress_identical_objects(remove_identicals=False, remove_orphans=False)

# Verificamos que el warning se haya emitido
assert "Nothing has been done" in caplog.text

# Verificamos que el documento sigue intacto (sigue teniendo 1 página)
assert len(writer.pages) == 1

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

def test_set_need_appearances_writer():
"""Minimal test for coverage"""
writer = PdfWriter()
Expand Down
Loading