diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 925f043900004e..f59d328bd5f83e 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -17,8 +17,9 @@ import types import weakref import errno +from contextlib import contextmanager -from queue import Empty, Full +from queue import Empty, Full, ShutDown from . import connection from . import context @@ -45,21 +46,37 @@ def __init__(self, maxsize=0, *, ctx): else: self._wlock = ctx.Lock() self._sem = ctx.BoundedSemaphore(maxsize) + + self._lock_shutdown = ctx.Lock() + # Cannot use a ctx.Value because 'ctypes' library is + # not always available on all Linux platforms. + # Use of Semaphores instead of an array from `heap.BufferWrapper' + # is here more efficient and explicit. + self._sem_flag_shutdown = ctx.Semaphore(0) + self._sem_flag_shutdown_immediate = ctx.Semaphore(0) + self._sem_pending_getters = ctx.Semaphore(0) + self._sem_pending_putters = ctx.Semaphore(0) + # For use by concurrent.futures self._ignore_epipe = False self._reset() - if sys.platform != 'win32': register_after_fork(self, Queue._after_fork) def __getstate__(self): context.assert_spawning(self) return (self._ignore_epipe, self._maxsize, self._reader, self._writer, - self._rlock, self._wlock, self._sem, self._opid) + self._rlock, self._wlock, self._sem, self._opid, + self._lock_shutdown, + self._sem_flag_shutdown, self._sem_flag_shutdown_immediate, + self._sem_pending_getters, self._sem_pending_putters) def __setstate__(self, state): (self._ignore_epipe, self._maxsize, self._reader, self._writer, - self._rlock, self._wlock, self._sem, self._opid) = state + self._rlock, self._wlock, self._sem, self._opid, + self._lock_shutdown, + self._sem_flag_shutdown, self._sem_flag_shutdown_immediate, + self._sem_pending_getters, self._sem_pending_putters) = state self._reset() def _after_fork(self): @@ -81,11 +98,47 @@ def _reset(self, after_fork=False): self._recv_bytes = self._reader.recv_bytes self._poll = self._reader.poll + def _is_shutdown(self): + return not self._sem_flag_shutdown.locked() + + def _set_shutdown(self, immediate=False): + self._sem_flag_shutdown.release() + if immediate: + self._sem_flag_shutdown_immediate.release() + + @contextmanager + def _handle_pending_processes(self, sem): + # Count pending getter or putter processes in a dedicated + # semaphore. These 2 semaphores are only used when queue + # shuts down to release one by one all pending processes. + sem.release() + try: + # Wraps potentialy blocking calls: + # sem._sem.acquire() in put method, + # _recv_bytes()/_poll(*args) in get method. + yield + finally: + sem.acquire() + + def _release_pending_putters(self): + with self._lock_shutdown: + if not self._sem_pending_putters.locked(): + self._sem.release() + def put(self, obj, block=True, timeout=None): if self._closed: raise ValueError(f"Queue {self!r} is closed") - if not self._sem.acquire(block, timeout): - raise Full + + if self._is_shutdown(): + raise ShutDown + try: + with self._handle_pending_processes(self._sem_pending_putters): + if not self._sem.acquire(block, timeout): + raise Full + finally: + if self._is_shutdown(): + self._release_pending_putters() + raise ShutDown with self._notempty: if self._thread is None: @@ -93,31 +146,53 @@ def put(self, obj, block=True, timeout=None): self._buffer.append(obj) self._notempty.notify() + def _release_pending_getters(self): + with self._lock_shutdown: + if not self._sem_pending_getters.locked(): + self._put_sentinel() + def get(self, block=True, timeout=None): if self._closed: raise ValueError(f"Queue {self!r} is closed") - if block and timeout is None: - with self._rlock: - res = self._recv_bytes() - self._sem.release() - else: - if block: - deadline = time.monotonic() + timeout - if not self._rlock.acquire(block, timeout): - raise Empty - try: - if block: - timeout = deadline - time.monotonic() - if not self._poll(timeout): + + if (empty := self.empty()) and self._is_shutdown(): + raise ShutDown + try: + with self._handle_pending_processes(self._sem_pending_getters): + if block and timeout is None: + with self._rlock: + res = self._recv_bytes() + self._sem.release() + else: + if block: + deadline = time.monotonic() + timeout + if not self._rlock.acquire(block, timeout): raise Empty - elif not self._poll(): - raise Empty - res = self._recv_bytes() - self._sem.release() - finally: - self._rlock.release() - # unserialize the data after having released the lock - return _ForkingPickler.loads(res) + try: + if block: + timeout = deadline - time.monotonic() + if not self._poll(timeout): + raise Empty + elif not self._poll(): + raise Empty + res = self._recv_bytes() + self._sem.release() + finally: + self._rlock.release() + finally: + if self._is_shutdown() and empty: + self._release_pending_getters() + raise ShutDown + + item = _ForkingPickler.loads(res) + if self._is_shutdown() \ + and isinstance(item, _ShutdownSentinel): + # A pending getter process is just unblocked, + # Unblock a next one if exists. + self._release_pending_getters() + raise ShutDown + + return item def qsize(self): # Raises NotImplementedError on Mac OSX because of broken sem_getvalue() @@ -135,6 +210,57 @@ def get_nowait(self): def put_nowait(self, obj): return self.put(obj, False) + def _clear(self): + with self._rlock: + while self._poll(): + self._recv_bytes() + + def _put_sentinel(self): + # When put a sentinel into an empty queue, + # dont forget to call to _sem.acquire in order to + # maintain a correct count of acquire/release + # calls for BoudedSempaphore. + self._sem.acquire() + + with self._notempty: + if self._thread is None: + self._start_thread() + self._buffer.append(_sentinel_shutdown) + self._notempty.notify() + + def shutdown(self, immediate=False): + if self._closed: + raise ValueError(f"Queue {self!r} is closed") + + with self._lock_shutdown: + if self._is_shutdown(): + raise RuntimeError(f"Queue {self!r} already shut down") + + is_pending_getters = not self._sem_pending_getters.locked() + is_pending_putters = not self._sem_pending_putters.locked() + str_shutdown = f"shutdown -> immediate:{immediate}" + str_shutdown += f"/PGetters:{is_pending_getters}" \ + f"/PPutters:{is_pending_putters}" \ + f"/Empty:{self.empty()}" \ + f"/Full:{self.full()}" + debug(str_shutdown) + self._set_shutdown(immediate) + + # Shut down is immediatly and there is no pending getter, + # we purge the queue (pipe). If there are datas into the buffer + # the 'feeder' thread should erase all of them. + if immediate and not is_pending_getters: + self._clear() + + # Starting release one pending getter process. + # Put a first shutdown sentinel data into the pipe. + if self.empty() and is_pending_getters: + self._put_sentinel() + + # Starting release one pending putter processes. + if is_pending_putters: + self._sem.release() + def close(self): self._closed = True close = self._close @@ -180,7 +306,7 @@ def _start_thread(self): args=(self._buffer, self._notempty, self._send_bytes, self._wlock, self._reader.close, self._writer.close, self._ignore_epipe, self._on_queue_feeder_error, - self._sem), + self._sem, self._sem_flag_shutdown_immediate), name='QueueFeederThread', daemon=True, ) @@ -228,7 +354,8 @@ def _finalize_close(buffer, notempty): @staticmethod def _feed(buffer, notempty, send_bytes, writelock, reader_close, - writer_close, ignore_epipe, onerror, queue_sem): + writer_close, ignore_epipe, onerror, queue_sem, + flag_shutdown_immediate): debug('starting thread to feed data to pipe') nacquire = notempty.acquire nrelease = notempty.release @@ -240,7 +367,7 @@ def _feed(buffer, notempty, send_bytes, writelock, reader_close, wrelease = writelock.release else: wacquire = None - + is_shutdown_immediate = lambda: not flag_shutdown_immediate.locked() while 1: try: nacquire() @@ -258,6 +385,14 @@ def _feed(buffer, notempty, send_bytes, writelock, reader_close, writer_close() return + # When queue shuts down immediatly, do not insert + # regular data in pipe, only shutdown sentinel. + if is_shutdown_immediate() \ + and not isinstance(obj, _ShutdownSentinel): + debug("Queue shuts down immediatly, " \ + "don't feed regular data to pipe") + continue + # serialize the data before acquiring the lock obj = _ForkingPickler.dumps(obj) if wacquire is None: @@ -301,6 +436,12 @@ def _on_queue_feeder_error(e, obj): __class_getitem__ = classmethod(types.GenericAlias) +# Sentinel item used to release pending getter processes +# when queue shuts down. +class _ShutdownSentinel: pass +_sentinel_shutdown = _ShutdownSentinel() + + _sentinel = object() # @@ -328,8 +469,16 @@ def __setstate__(self, state): def put(self, obj, block=True, timeout=None): if self._closed: raise ValueError(f"Queue {self!r} is closed") - if not self._sem.acquire(block, timeout): - raise Full + if self._is_shutdown(): + raise ShutDown + try: + with self._handle_pending_processes(self._sem_pending_putters): + if not self._sem.acquire(block, timeout): + raise Full + finally: + if self._is_shutdown(): + self._release_pending_putters() + raise ShutDown with self._notempty, self._cond: if self._thread is None: @@ -350,6 +499,17 @@ def join(self): if not self._unfinished_tasks._semlock._is_zero(): self._cond.wait() + def _clear(self): + super()._clear() + + # Data could be in the buffer, not in the pipe. + # Call acquire method of '_unfinished_tasks' Semaphore + # until counter is zero. + with self._cond: + while not self._unfinished_tasks.locked(): + self._unfinished_tasks.acquire(block=False) + self._cond.notify_all() + # # Simplified Queue type -- really just a locked pipe # diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 850744e47d0e0b..69dfd480fd61fc 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -453,7 +453,7 @@ def _test_report_parent_status(cls, wconn): @warnings_helper.ignore_fork_in_thread_deprecation_warnings() def test_process(self): q = self.Queue(1) - e = self.Event() + #e = self.Event() args = (q, 1, 2) kwargs = {'hello':23, 'bye':2.54} name = 'SomeProcess' @@ -1474,6 +1474,453 @@ def test_closed_queue_put_get_exceptions(self): q.put('foo') with self.assertRaisesRegex(ValueError, 'is closed'): q.get() + + # + # Issue gh-96471: multiprocessing queue shutdown. + # + + @classmethod + def _wait(cls): + # See _wait function. + _wait() + + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() + def test_queue_shutdown_twice(self): + if self.TYPE != 'processes': + self.skipTest(f'test not appropriate for {self.TYPE}') + + q = self.Queue() + q.shutdown(immediate=False) + self.assertTrue(q._is_shutdown()) + + with self.assertRaises(RuntimeError): + q.shutdown(immediate=False) + + close_queue(q) + + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() + def test_queue_shutdown_empty(self): + if self.TYPE != 'processes': + self.skipTest('test not appropriate for {}'.format(self.TYPE)) + + q = self.Queue() + self.assertTrue(q.empty()) + + q.shutdown(immediate=False) + self.assertTrue(q._is_shutdown()) + + with self.assertRaises(pyqueue.ShutDown): + q.put("data") + with self.assertRaises(pyqueue.ShutDown): + q.get() + + close_queue(q) + + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() + def test_queue_shutdown_notempty(self): + if self.TYPE != 'processes': + self.skipTest(f'test not appropriate for {self.TYPE}') + + q = self.Queue() + q.put("Y") + q.put("D") + self._wait() + + self.assertFalse(q.empty()) + q.shutdown(immediate=False) + self.assertTrue(q._is_shutdown()) + + self.assertEqual(q.get(), "Y") + self.assertEqual(q.get(), "D") + with self.assertRaises(pyqueue.ShutDown): + q.get() + + close_queue(q) + + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() + def test_queue_shutdown_full(self): + if self.TYPE != 'processes': + self.skipTest(f'test not appropriate for {self.TYPE}') + + q = self.Queue(maxsize=2) + q.put("YY") + q.put("DD") + self._wait() + + self.assertTrue(q.full()) + q.shutdown(immediate=False) + self.assertTrue(q._is_shutdown()) + + self.assertEqual(q.get(), "YY") + self.assertEqual(q.get(), "DD") + with self.assertRaises(pyqueue.ShutDown): + q.put("data") + with self.assertRaises(pyqueue.ShutDown): + q.get() + self.assertFalse(q.full()) + + close_queue(q) + + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() + def test_queue_shutdown_immediate_notempty(self): + # when queue shuts down immediatly, all datas inserted + # into the pipe or into the buffer are erased. + # The queue must be empty. + if self.TYPE != 'processes': + self.skipTest(f'test not appropriate for {self.TYPE}') + + q = self.Queue(maxsize=3) + q.put("data") + self._wait() + + self.assertFalse(q.full()) + q.shutdown(immediate=True) + self.assertTrue(q.empty()) + self.assertTrue(q._is_shutdown()) + + with self.assertRaises(pyqueue.ShutDown): + q.get() + + close_queue(q) + + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() + def test_queue_shutdown_immediate_full(self): + if self.TYPE != 'processes': + self.skipTest(f'test not appropriate for {self.TYPE}') + + q = self.Queue(maxsize=2) + q.put("YD") + q.put("LO") + self._wait() + + self.assertTrue(q.full()) + q.shutdown(immediate=True) + self.assertTrue(q.empty()) + self.assertTrue(q._is_shutdown()) + + with self.assertRaises(pyqueue.ShutDown): + q.get() + + close_queue(q) + + def _joinablequeue_shutdown_all_methods(self, immediate): + size = 2 + q = self.JoinableQueue(maxsize=size) + q.put("L") + q.put_nowait("O") + self._wait() + + q.shutdown(immediate) + if immediate: + self.assertTrue(q.empty()) + self.assertTrue(q._is_shutdown()) + + with self.assertRaises(pyqueue.ShutDown): + q.put("E") + with self.assertRaises(pyqueue.ShutDown): + q.put_nowait("W") + if immediate: + # All items into the queue are removed. + with self.assertRaises(pyqueue.ShutDown): + q.get() + with self.assertRaises(pyqueue.ShutDown): + q.get_nowait() + with self.assertRaises(pyqueue.ShutDown): + q.get(True, 0.125) + else: + self.assertEqual(q.get(), "L") + q.task_done() + self.assertEqual(q.get_nowait(), "O") + q.task_done() + + # when `shutdown` queue is empty, + # should raise ShutDown Exception + with self.assertRaises(pyqueue.ShutDown): + q.get() # p.get(True) + with self.assertRaises(pyqueue.ShutDown): + q.get_nowait() # p.get(False) + with self.assertRaises(pyqueue.ShutDown): + q.get(True, 0.125) + q.join() + + close_queue(q) + + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() + def test_queue_shutdown_all_methods(self): + if self.TYPE != 'processes': + self.skipTest(f'test not appropriate for {self.TYPE}') + + self._joinablequeue_shutdown_all_methods(immediate=False) + self._joinablequeue_shutdown_all_methods(immediate=True) + + def _start_processes_pending(self, n, target, args): + self.ps = [] + for i in range(n): + self.ps.append(self.Process(target=target, + args=args)) + for p in self.ps: + p.start() + + @classmethod + def _pending_put(cls, q, barrier, results): + val = os.getpid() + try: + barrier.wait() + q.put(val) + results.append(val) + except pyqueue.ShutDown: + results.append(pyqueue.ShutDown) + + @unittest.skipIf(sys.platform == 'darwin', + "'get_value' is not implemented on MacOSX") + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() + def test_queue_shutdown_count_pending_put(self): + if self.TYPE != 'processes': + self.skipTest(f'test not appropriate for {self.TYPE}') + + # Queue must have a size + size = 2 + q = self.Queue(maxsize=size) + results = multiprocessing.Manager().list() + n = 30 + b = self.Barrier(n+1) + + self._start_processes_pending(n, target=self._pending_put, + args=(q, b, results)) + # Wait for all procesess start. + b.wait() + + # to be sure that queue is full, and all 'n-size' others processes + # are pending. + self._wait() + + self.assertEqual(q._sem_pending_putters.get_value(), n-size) + self.assertEqual(q._sem_pending_getters.get_value(), 0) + q.shutdown(immediate=False) + self.assertTrue(q._is_shutdown()) + for p in self.ps: + p.join() + + self.assertEqual(q._sem_pending_putters.get_value(), 0) + self.assertEqual(q._sem_pending_getters.get_value(), 0) + self.assertEqual(len(results), n) + + close_queue(q) + + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() + def test_queue_shutdown_immediate_pending_put(self): + if self.TYPE != 'processes': + self.skipTest(f'test not appropriate for {self.TYPE}') + + # Regardless of the value of the immediate variable, + # the tests are identical. + size = 5 + q = self.Queue(maxsize=size) + results = multiprocessing.Manager().list() + n = 30 + b = self.Barrier(n+1) + self._start_processes_pending(n, target=self._pending_put, + args=(q, b, results)) + # Wait for all procesess start. + b.wait() + + # We need to call _wait to be sure that queue is full, + # and all others processes are pending. + self._wait() + + q.shutdown(immediate=True) + self.assertTrue(q.empty()) + self.assertTrue(q._is_shutdown()) + for p in self.ps: + p.join() + + self.assertEqual(len(results), n) + self.assertEqual(results.count(pyqueue.ShutDown), n-size) + + self.assertTrue(q.empty()) + close_queue(q) + + @classmethod + def _pending_get(cls, q, barrier, results): + try: + barrier.wait() + results.append(q.get()) + except pyqueue.ShutDown: + results.append(pyqueue.ShutDown) + + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() + @unittest.skipIf(sys.platform == 'darwin', + "'get_value' is not implemented on MacOSX") + def test_queue_shutdown_count_pending_get(self): + if self.TYPE != 'processes': + self.skipTest(f'test not appropriate for {self.TYPE}') + + q = self.Queue() + results = multiprocessing.Manager().list() + n = 25 + b = self.Barrier(n+1) + self._start_processes_pending(n, target=self._pending_get, + args=(q, b, results)) + # Wait for all procesess start. + b.wait() + + # wait for all pending get processes to be blocked. + self._wait() + + self.assertTrue(q.empty()) + self.assertEqual(q._sem_pending_getters.get_value(), n) + self.assertEqual(q._sem_pending_putters.get_value(), 0) + q.shutdown(immediate=False) + self.assertTrue(q._is_shutdown()) + for p in self.ps: + p.join() + + self.assertEqual(q._sem_pending_putters.get_value(), 0) + self.assertEqual(q._sem_pending_putters.get_value(), 0) + self.assertEqual(len(results), n) + + close_queue(q) + + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() + def test_queue_shutdown_immediate_pending_get(self): + if self.TYPE != 'processes': + self.skipTest(f'test not appropriate for {self.TYPE}') + + q = self.Queue() + results = multiprocessing.Manager().list() + n = 25 + b = self.Barrier(n+1) + self._start_processes_pending(n, target=self._pending_get, + args=(q, b, results)) + # Wait for all procesess start. + b.wait() + + # wait for all pending get processes to be blocked. + self._wait() + + q.shutdown(immediate=True) + self.assertTrue(q.empty()) + self._wait() # adding data as 'sentinel shutdown'. + self.assertTrue(q._is_shutdown()) + for p in self.ps: + p.join() + + self.assertEqual(len(results), n) + self.assertEqual(results.count(pyqueue.ShutDown), n) + + self.assertTrue(q.empty()) + close_queue(q) + + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() + def test_queue_shutdown_pending_get(self): + if self.TYPE != 'processes': + self.skipTest(f'test not appropriate for {self.TYPE}') + + q = self.Queue() + q.put("AZ67") + self._wait() + + results = multiprocessing.Manager().list() + n = 25 + b = self.Barrier(n+1) + self._start_processes_pending(n, target=self._pending_get, + args=(q, b, results)) + # Wait for all procesess start. + b.wait() + + # wait for all pending get processes to be blocked. + self._wait() + + q.shutdown(immediate=False) + self.assertTrue(q._is_shutdown()) + for p in self.ps: + p.join() + + self.assertEqual(len(results), n) + self.assertEqual(results.count("AZ67"), 1) + + close_queue(q) + + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() + def queue_shutdown_immediate_check_purge_buffer(self): + if self.TYPE != 'processes': + self.skipTest(f'test not appropriate for {self.TYPE}') + + logger = multiprocessing.get_logger() + original_level = logger.level + logger.setLevel(logging.DEBUG) + stream = io.StringIO() + handler = logging.StreamHandler(stream) + logging_format = '[%(levelname)s] [%(filename)s] %(message)s' + handler.setFormatter(logging.Formatter(logging_format)) + logger.addHandler(handler) + + try: + q = self.JoinableQueue() + for i in range(1500): + q.put("tictoc") + q.shutdown(immediate=True) + self.assertTrue(q.empty()) + q.join() + + log_record = stream.getvalue() + self.assertIn("don't feed regular data to pipe", log_record) + + close_queue(q) + finally: + logger.setLevel(original_level) + logger.removeHandler(handler) + handler.close() + + @classmethod + def _shutdown(cls, q, immediate, results, retcode): + q.shutdown(immediate) + cls._wait() + if not immediate: + while not q.empty(): + results.append(q.get()) + q.task_done() + results.insert(0, retcode) + + def _join_joinablequeue(self, immediate): + if self.TYPE != 'processes': + self.skipTest(f'test not appropriate for {self.TYPE}') + + results = multiprocessing.Manager().list() + n = 10 + q = self.JoinableQueue() + for i in range(n): + q.put(i) + self._wait() + + return_process = 1000 + p = self.Process(target=self._shutdown, + args=(q, immediate, results, + return_process+immediate + ) + ) + p.start() + q.join() + p.join() + + if immediate: + self.assertEqual(len(results), 1) + self.assertTrue(q.empty()) + else: + self.assertEqual(len(results), n+1) + self.assertListEqual(results[1:], list(range(n))) + self.assertEqual(results[0], return_process+immediate) + + close_queue(q) + + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() + def test_queue_shutdown_immediate_join_joinablequeue(self): + return self._join_joinablequeue(True) + + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() + def test_queue_shutdown_join_joinablequeue(self): + return self._join_joinablequeue(False) + # # # diff --git a/Misc/NEWS.d/next/Library/2025-09-12-12-02-02.gh-issue-96471.R6sqGE.rst b/Misc/NEWS.d/next/Library/2025-09-12-12-02-02.gh-issue-96471.R6sqGE.rst new file mode 100644 index 00000000000000..a0ae0c823241a2 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-09-12-12-02-02.gh-issue-96471.R6sqGE.rst @@ -0,0 +1 @@ +Add :py:class:`multiprocessing.Queue` termination with ``multiprocessing.Queue.shutdown`` method.