Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 192 additions & 32 deletions Lib/multiprocessing/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
import types
import weakref
import errno
from contextlib import contextmanager

from queue import Empty, Full
from queue import Empty, Full, ShutDown

from . import connection
from . import context
Expand All @@ -45,21 +46,37 @@ def __init__(self, maxsize=0, *, ctx):
else:
self._wlock = ctx.Lock()
self._sem = ctx.BoundedSemaphore(maxsize)

self._lock_shutdown = ctx.Lock()
# Cannot use a ctx.Value because 'ctypes' library is
# not always available on all Linux platforms.
# Use of Semaphores instead of an array from `heap.BufferWrapper'
# is here more efficient and explicit.
self._sem_flag_shutdown = ctx.Semaphore(0)
self._sem_flag_shutdown_immediate = ctx.Semaphore(0)
self._sem_pending_getters = ctx.Semaphore(0)
self._sem_pending_putters = ctx.Semaphore(0)

# For use by concurrent.futures
self._ignore_epipe = False
self._reset()

if sys.platform != 'win32':
register_after_fork(self, Queue._after_fork)

def __getstate__(self):
context.assert_spawning(self)
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)
self._rlock, self._wlock, self._sem, self._opid,
self._lock_shutdown,
self._sem_flag_shutdown, self._sem_flag_shutdown_immediate,
self._sem_pending_getters, self._sem_pending_putters)

def __setstate__(self, state):
(self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) = state
self._rlock, self._wlock, self._sem, self._opid,
self._lock_shutdown,
self._sem_flag_shutdown, self._sem_flag_shutdown_immediate,
self._sem_pending_getters, self._sem_pending_putters) = state
self._reset()

def _after_fork(self):
Expand All @@ -81,43 +98,101 @@ def _reset(self, after_fork=False):
self._recv_bytes = self._reader.recv_bytes
self._poll = self._reader.poll

def _is_shutdown(self):
return not self._sem_flag_shutdown.locked()

def _set_shutdown(self, immediate=False):
self._sem_flag_shutdown.release()
if immediate:
self._sem_flag_shutdown_immediate.release()

@contextmanager
def _handle_pending_processes(self, sem):
# Count pending getter or putter processes in a dedicated
# semaphore. These 2 semaphores are only used when queue
# shuts down to release one by one all pending processes.
sem.release()
try:
# Wraps potentialy blocking calls:
# sem._sem.acquire() in put method,
# _recv_bytes()/_poll(*args) in get method.
yield
finally:
sem.acquire()

def _release_pending_putters(self):
with self._lock_shutdown:
if not self._sem_pending_putters.locked():
self._sem.release()

def put(self, obj, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if not self._sem.acquire(block, timeout):
raise Full

if self._is_shutdown():
raise ShutDown
try:
with self._handle_pending_processes(self._sem_pending_putters):
if not self._sem.acquire(block, timeout):
raise Full
finally:
if self._is_shutdown():
self._release_pending_putters()
raise ShutDown

with self._notempty:
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()

def _release_pending_getters(self):
with self._lock_shutdown:
if not self._sem_pending_getters.locked():
self._put_sentinel()

def get(self, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if block and timeout is None:
with self._rlock:
res = self._recv_bytes()
self._sem.release()
else:
if block:
deadline = time.monotonic() + timeout
if not self._rlock.acquire(block, timeout):
raise Empty
try:
if block:
timeout = deadline - time.monotonic()
if not self._poll(timeout):

if (empty := self.empty()) and self._is_shutdown():
raise ShutDown
try:
with self._handle_pending_processes(self._sem_pending_getters):
if block and timeout is None:
with self._rlock:
res = self._recv_bytes()
self._sem.release()
else:
if block:
deadline = time.monotonic() + timeout
if not self._rlock.acquire(block, timeout):
raise Empty
elif not self._poll():
raise Empty
res = self._recv_bytes()
self._sem.release()
finally:
self._rlock.release()
# unserialize the data after having released the lock
return _ForkingPickler.loads(res)
try:
if block:
timeout = deadline - time.monotonic()
if not self._poll(timeout):
raise Empty
elif not self._poll():
raise Empty
res = self._recv_bytes()
self._sem.release()
finally:
self._rlock.release()
finally:
if self._is_shutdown() and empty:
self._release_pending_getters()
raise ShutDown

item = _ForkingPickler.loads(res)
if self._is_shutdown() \
and isinstance(item, _ShutdownSentinel):
# A pending getter process is just unblocked,
# Unblock a next one if exists.
self._release_pending_getters()
raise ShutDown

return item

def qsize(self):
# Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
Expand All @@ -135,6 +210,57 @@ def get_nowait(self):
def put_nowait(self, obj):
return self.put(obj, False)

def _clear(self):
with self._rlock:
while self._poll():
self._recv_bytes()

def _put_sentinel(self):
# When put a sentinel into an empty queue,
# dont forget to call to _sem.acquire in order to
# maintain a correct count of acquire/release
# calls for BoudedSempaphore.
self._sem.acquire()

with self._notempty:
if self._thread is None:
self._start_thread()
self._buffer.append(_sentinel_shutdown)
self._notempty.notify()

def shutdown(self, immediate=False):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")

with self._lock_shutdown:
if self._is_shutdown():
raise RuntimeError(f"Queue {self!r} already shut down")

is_pending_getters = not self._sem_pending_getters.locked()
is_pending_putters = not self._sem_pending_putters.locked()
str_shutdown = f"shutdown -> immediate:{immediate}"
str_shutdown += f"/PGetters:{is_pending_getters}" \
f"/PPutters:{is_pending_putters}" \
f"/Empty:{self.empty()}" \
f"/Full:{self.full()}"
debug(str_shutdown)
self._set_shutdown(immediate)

# Shut down is immediatly and there is no pending getter,
# we purge the queue (pipe). If there are datas into the buffer
# the 'feeder' thread should erase all of them.
if immediate and not is_pending_getters:
self._clear()

# Starting release one pending getter process.
# Put a first shutdown sentinel data into the pipe.
if self.empty() and is_pending_getters:
self._put_sentinel()

# Starting release one pending putter processes.
if is_pending_putters:
self._sem.release()

def close(self):
self._closed = True
close = self._close
Expand Down Expand Up @@ -180,7 +306,7 @@ def _start_thread(self):
args=(self._buffer, self._notempty, self._send_bytes,
self._wlock, self._reader.close, self._writer.close,
self._ignore_epipe, self._on_queue_feeder_error,
self._sem),
self._sem, self._sem_flag_shutdown_immediate),
name='QueueFeederThread',
daemon=True,
)
Expand Down Expand Up @@ -228,7 +354,8 @@ def _finalize_close(buffer, notempty):

@staticmethod
def _feed(buffer, notempty, send_bytes, writelock, reader_close,
writer_close, ignore_epipe, onerror, queue_sem):
writer_close, ignore_epipe, onerror, queue_sem,
flag_shutdown_immediate):
debug('starting thread to feed data to pipe')
nacquire = notempty.acquire
nrelease = notempty.release
Expand All @@ -240,7 +367,7 @@ def _feed(buffer, notempty, send_bytes, writelock, reader_close,
wrelease = writelock.release
else:
wacquire = None

is_shutdown_immediate = lambda: not flag_shutdown_immediate.locked()
while 1:
try:
nacquire()
Expand All @@ -258,6 +385,14 @@ def _feed(buffer, notempty, send_bytes, writelock, reader_close,
writer_close()
return

# When queue shuts down immediatly, do not insert
# regular data in pipe, only shutdown sentinel.
if is_shutdown_immediate() \
and not isinstance(obj, _ShutdownSentinel):
debug("Queue shuts down immediatly, " \
"don't feed regular data to pipe")
continue

# serialize the data before acquiring the lock
obj = _ForkingPickler.dumps(obj)
if wacquire is None:
Expand Down Expand Up @@ -301,6 +436,12 @@ def _on_queue_feeder_error(e, obj):
__class_getitem__ = classmethod(types.GenericAlias)


# Sentinel item used to release pending getter processes
# when queue shuts down.
class _ShutdownSentinel: pass
_sentinel_shutdown = _ShutdownSentinel()


_sentinel = object()

#
Expand Down Expand Up @@ -328,8 +469,16 @@ def __setstate__(self, state):
def put(self, obj, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if not self._sem.acquire(block, timeout):
raise Full
if self._is_shutdown():
raise ShutDown
try:
with self._handle_pending_processes(self._sem_pending_putters):
if not self._sem.acquire(block, timeout):
raise Full
finally:
if self._is_shutdown():
self._release_pending_putters()
raise ShutDown

with self._notempty, self._cond:
if self._thread is None:
Expand All @@ -350,6 +499,17 @@ def join(self):
if not self._unfinished_tasks._semlock._is_zero():
self._cond.wait()

def _clear(self):
super()._clear()

# Data could be in the buffer, not in the pipe.
# Call acquire method of '_unfinished_tasks' Semaphore
# until counter is zero.
with self._cond:
while not self._unfinished_tasks.locked():
self._unfinished_tasks.acquire(block=False)
self._cond.notify_all()

#
# Simplified Queue type -- really just a locked pipe
#
Expand Down
Loading
Loading