gh-96471: Add multiprocessing queue shutdown #138828
Open
+641
−33
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Add multiprocessing queue shutdown
This feature is already implemented in
asyncio
(#104228) andthreading
(#104750) modules.Reminders of new behaviors
When queue shuts down:
put
method raises an exception. All the pending put calls are unblocked and raise an exception.get
method returns available items until queue is empty, then raises Shutdown exception. All the pending get calls are unblocked and raise an exception.Updates to the main code.
There are 2 constraints here:
multiprocessing.Value
object to store the 3 shutdown states because_ctypes
module is not available on all platforms.get_value
Semaphore method because of not implemented on MacOSX, but we can just check if the semaphore is locked or not (internal counter is zero or not)The additional private variables are as follows:
Lock
to prevent concurrent updates when shutting down,Semaphore
to store the 3 shutdown states,Semaphore
used to count pending getter and putter processes.It would also have been possible to use an array of integers from
mulitprocessing.heap.BufferWrapper
but I find simpler and more explicit to useSemaphore
.The main additional codes are as follows: