Skip to content

Conversation

YvesDup
Copy link
Contributor

@YvesDup YvesDup commented Sep 12, 2025

Add multiprocessing queue shutdown

This feature is already implemented in asyncio(#104228) and threading (#104750) modules.

Reminders of new behaviors

When queue shuts down:

  • put method raises an exception. All the pending put calls are unblocked and raise an exception.
  • get method returns available items until queue is empty, then raises Shutdown exception. All the pending get calls are unblocked and raise an exception.
  • when immediate flag is set, all items into the queue are removed.

Updates to the main code.

There are 2 constraints here:

  • We can't use a multiprocessing.Value object to store the 3 shutdown states because _ctypes module is not available on all platforms.
  • It's always impossible to use the get_value Semaphore method because of not implemented on MacOSX, but we can just check if the semaphore is locked or not (internal counter is zero or not)

The additional private variables are as follows:

  • Addition of a Lock to prevent concurrent updates when shutting down,
  • Addition of 2 Semaphore to store the 3 shutdown states,
  • Addition of 2 Semaphore used to count pending getter and putter processes.

It would also have been possible to use an array of integers from mulitprocessing.heap.BufferWrapper but I find simpler and more explicit to use Semaphore.

The main additional codes are as follows:

  • Wrap in a CM the get/put methods behaviors (checks full/empty queue, controls timeout and how to add/remove item) in order to count the number of pending processes.
  • Using a new specific sentinel to release all blocked reads from the pipe (blocked getter processes).
  • In the feeder thread, do not insert data into the pipe from the buffer when queue shuts down immediatly

Docs is not updated, so ref to `shutdown`method is wrong.
@gvanrossum
Copy link
Member

Thank you! Unfortunately I don't know the multiprocessing code at all and I can't offer to review this. Maybe you can ask for help in the core development category on discuss.python.org.

# Use of Semaphores instead of an array from `heap.BufferWrapper'
# is here more efficient and explicit.
self._sem_flag_shutdown = ctx.Semaphore(0)
self._sem_flag_shutdown_immediate = ctx.Semaphore(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We found in the threading and async queues that it was simpler to implement and reason about when we only have a single boolean flag for queue-shutdown state: all immediate did was consume all of the items in the queue (and subtracted the count from unfinished task count). Is that possible here?

Comment on lines -99 to +181
if block and timeout is None:
with self._rlock:
res = self._recv_bytes()
self._sem.release()
else:
if block:
deadline = time.monotonic() + timeout
if not self._rlock.acquire(block, timeout):
raise Empty
try:
if block:
timeout = deadline - time.monotonic()
if not self._poll(timeout):

if (empty := self.empty()) and self._is_shutdown():
raise ShutDown
try:
with self._handle_pending_processes(self._sem_pending_getters):
if block and timeout is None:
with self._rlock:
res = self._recv_bytes()
self._sem.release()
else:
if block:
deadline = time.monotonic() + timeout
if not self._rlock.acquire(block, timeout):
raise Empty
elif not self._poll():
raise Empty
res = self._recv_bytes()
self._sem.release()
finally:
self._rlock.release()
# unserialize the data after having released the lock
return _ForkingPickler.loads(res)
try:
if block:
timeout = deadline - time.monotonic()
if not self._poll(timeout):
raise Empty
elif not self._poll():
raise Empty
res = self._recv_bytes()
self._sem.release()
finally:
self._rlock.release()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've found it easier to review and better to keep blame history by introducing a new function keeping the same indentation level. For example, instead of:

 def foo():
-    with ctx:
-        res = bar()
-        baz(res, ctx)
+    try:
+        with ctx:
+            res = bar()
+            baz(res, ctx)
+    finally:
+        close()

do:

 def foo():
+    try:
+        _foo_inner()
+    finally:
+        close()
+
+
+def _foo_inner():
     with ctx:
         res = bar()
         baz(res, ctx)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants