-
-
Notifications
You must be signed in to change notification settings - Fork 32.9k
gh-96471: Add multiprocessing queue shutdown #138828
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Thank you! Unfortunately I don't know the multiprocessing code at all and I can't offer to review this. Maybe you can ask for help in the core development category on discuss.python.org. |
# Use of Semaphores instead of an array from `heap.BufferWrapper' | ||
# is here more efficient and explicit. | ||
self._sem_flag_shutdown = ctx.Semaphore(0) | ||
self._sem_flag_shutdown_immediate = ctx.Semaphore(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We found in the threading and async queues that it was simpler to implement and reason about when we only have a single boolean flag for queue-shutdown state: all immediate
did was consume all of the items in the queue (and subtracted the count from unfinished task count). Is that possible here?
if block and timeout is None: | ||
with self._rlock: | ||
res = self._recv_bytes() | ||
self._sem.release() | ||
else: | ||
if block: | ||
deadline = time.monotonic() + timeout | ||
if not self._rlock.acquire(block, timeout): | ||
raise Empty | ||
try: | ||
if block: | ||
timeout = deadline - time.monotonic() | ||
if not self._poll(timeout): | ||
|
||
if (empty := self.empty()) and self._is_shutdown(): | ||
raise ShutDown | ||
try: | ||
with self._handle_pending_processes(self._sem_pending_getters): | ||
if block and timeout is None: | ||
with self._rlock: | ||
res = self._recv_bytes() | ||
self._sem.release() | ||
else: | ||
if block: | ||
deadline = time.monotonic() + timeout | ||
if not self._rlock.acquire(block, timeout): | ||
raise Empty | ||
elif not self._poll(): | ||
raise Empty | ||
res = self._recv_bytes() | ||
self._sem.release() | ||
finally: | ||
self._rlock.release() | ||
# unserialize the data after having released the lock | ||
return _ForkingPickler.loads(res) | ||
try: | ||
if block: | ||
timeout = deadline - time.monotonic() | ||
if not self._poll(timeout): | ||
raise Empty | ||
elif not self._poll(): | ||
raise Empty | ||
res = self._recv_bytes() | ||
self._sem.release() | ||
finally: | ||
self._rlock.release() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've found it easier to review and better to keep blame history by introducing a new function keeping the same indentation level. For example, instead of:
def foo():
- with ctx:
- res = bar()
- baz(res, ctx)
+ try:
+ with ctx:
+ res = bar()
+ baz(res, ctx)
+ finally:
+ close()
do:
def foo():
+ try:
+ _foo_inner()
+ finally:
+ close()
+
+
+def _foo_inner():
with ctx:
res = bar()
baz(res, ctx)
Add multiprocessing queue shutdown
This feature is already implemented in
asyncio
(#104228) andthreading
(#104750) modules.Reminders of new behaviors
When queue shuts down:
put
method raises an exception. All the pending put calls are unblocked and raise an exception.get
method returns available items until queue is empty, then raises Shutdown exception. All the pending get calls are unblocked and raise an exception.Updates to the main code.
There are 2 constraints here:
multiprocessing.Value
object to store the 3 shutdown states because_ctypes
module is not available on all platforms.get_value
Semaphore method because of not implemented on MacOSX, but we can just check if the semaphore is locked or not (internal counter is zero or not)The additional private variables are as follows:
Lock
to prevent concurrent updates when shutting down,Semaphore
to store the 3 shutdown states,Semaphore
used to count pending getter and putter processes.It would also have been possible to use an array of integers from
mulitprocessing.heap.BufferWrapper
but I find simpler and more explicit to useSemaphore
.The main additional codes are as follows: