Skip to content

Commit d1ceccf

Browse files
committed
WIP: consume queue on immediate shutdown
1 parent ee8d4df commit d1ceccf

File tree

2 files changed

+197
-280
lines changed

2 files changed

+197
-280
lines changed

Lib/multiprocessing/queues.py

Lines changed: 30 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,6 @@
2626

2727
from .util import debug, info, Finalize, register_after_fork, is_exiting
2828

29-
_queue_alive = 0
30-
_queue_shutdown = 1
31-
_queue_shutdown_immediate = 2
32-
3329
#
3430
# Queue type using a pipe, buffer and thread
3531
#
@@ -52,7 +48,7 @@ def __init__(self, maxsize=0, *, ctx):
5248
# For use by concurrent.futures
5349
self._ignore_epipe = False
5450
self._reset()
55-
self._shutdown_state = ctx.Value('i', _queue_alive)
51+
self._is_shutdown = ctx.Value('B', False, lock=self._rlock)
5652

5753
if sys.platform != 'win32':
5854
register_after_fork(self, Queue._after_fork)
@@ -61,12 +57,12 @@ def __getstate__(self):
6157
context.assert_spawning(self)
6258
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
6359
self._rlock, self._wlock, self._sem, self._opid,
64-
self._shutdown_state)
60+
self._is_shutdown)
6561

6662
def __setstate__(self, state):
6763
(self._ignore_epipe, self._maxsize, self._reader, self._writer,
6864
self._rlock, self._wlock, self._sem, self._opid,
69-
self._shutdown_state) = state
65+
self._is_shutdown) = state
7066
self._reset()
7167

7268
def _after_fork(self):
@@ -88,32 +84,19 @@ def _reset(self, after_fork=False):
8884
self._recv_bytes = self._reader.recv_bytes
8985
self._poll = self._reader.poll
9086

91-
def _is_alive(self):
92-
return self._shutdown_state.value == _queue_alive
93-
94-
def _is_shutdown(self):
95-
return self._shutdown_state.value == _queue_shutdown
96-
97-
def _is_shutdown_immediate(self):
98-
return self._shutdown_state.value == _queue_shutdown_immediate
99-
100-
def _set_shutdown(self):
101-
self._shutdown_state.value = _queue_shutdown
102-
103-
def _set_shutdown_immediate(self):
104-
self._shutdown_state.value = _queue_shutdown_immediate
105-
10687
def put(self, obj, block=True, timeout=None):
10788
if self._closed:
10889
raise ValueError(f"Queue {self!r} is closed")
109-
if not self._is_alive():
90+
if self._is_shutdown.value:
11091
raise ShutDown
11192
if not self._sem.acquire(block, timeout):
112-
if not self._is_alive():
93+
if self._is_shutdown.value:
11394
raise ShutDown
11495
raise Full
11596

11697
with self._notempty:
98+
if self._is_shutdown.value:
99+
raise ShutDown
117100
if self._thread is None:
118101
self._start_thread()
119102
self._buffer.append(obj)
@@ -124,36 +107,29 @@ def get(self, block=True, timeout=None):
124107
raise ValueError(f"Queue {self!r} is closed")
125108
if block and timeout is None:
126109
with self._rlock:
127-
# checks shutdown state
128-
if (self._is_shutdown_immediate()
129-
or (self._is_shutdown() and self.empty())):
110+
if self._is_shutdown.value and self.empty():
130111
raise ShutDown
131112
res = self._recv_bytes()
132113
self._sem.release()
133114
else:
134115
if block:
135116
deadline = time.monotonic() + timeout
136117
if not self._rlock.acquire(block, timeout):
137-
if (self._is_shutdown_immediate()
138-
or (self._is_shutdown() and self.empty())):
118+
if self._is_shutdown.value and self.empty():
139119
raise ShutDown
140120
raise Empty
141121
try:
142122
if block:
143123
timeout = deadline - time.monotonic()
144124
if not self._poll(timeout):
145-
if not self._is_alive():
125+
if self._is_shutdown.value:
146126
raise ShutDown
147127
raise Empty
148128
elif not self._poll():
149-
if not self._is_alive():
129+
if self._is_shutdown.value:
150130
raise ShutDown
151131
raise Empty
152132

153-
# here queue is not empty
154-
if self._is_shutdown_immediate():
155-
raise ShutDown
156-
# here shutdown state queue is alive or shutdown
157133
res = self._recv_bytes()
158134
self._sem.release()
159135
finally:
@@ -178,18 +154,21 @@ def get_nowait(self):
178154
def put_nowait(self, obj):
179155
return self.put(obj, False)
180156

157+
def _clear(self):
158+
with self._rlock:
159+
while self._poll():
160+
self._recv_bytes()
161+
181162
def shutdown(self, immediate=False):
182163
if self._closed:
183164
raise ValueError(f"Queue {self!r} is closed")
184-
with self._shutdown_state.get_lock():
185-
if self._is_shutdown_immediate():
186-
return
165+
with self._is_shutdown.get_lock():
166+
self._is_shutdown.value = True
187167
if immediate:
188-
self._set_shutdown_immediate()
168+
self._clear()
189169
with self._notempty:
190170
self._notempty.notify_all()
191-
else:
192-
self._set_shutdown()
171+
self._sem.release(self.qsize())
193172

194173
def close(self):
195174
self._closed = True
@@ -384,14 +363,16 @@ def __setstate__(self, state):
384363
def put(self, obj, block=True, timeout=None):
385364
if self._closed:
386365
raise ValueError(f"Queue {self!r} is closed")
387-
if not self._is_alive():
366+
if self._is_shutdown.value:
388367
raise ShutDown
389368
if not self._sem.acquire(block, timeout):
390-
if not self._is_alive():
369+
if self._is_shutdown.value:
391370
raise ShutDown
392371
raise Full
393372

394373
with self._notempty, self._cond:
374+
if self._is_shutdown.value:
375+
raise ShutDown
395376
if self._thread is None:
396377
self._start_thread()
397378
self._buffer.append(obj)
@@ -400,27 +381,22 @@ def put(self, obj, block=True, timeout=None):
400381

401382
def task_done(self):
402383
with self._cond:
403-
if self._is_shutdown_immediate():
404-
raise ShutDown
405384
if not self._unfinished_tasks.acquire(False):
406385
raise ValueError('task_done() called too many times')
407386
if self._unfinished_tasks._semlock._is_zero():
408387
self._cond.notify_all()
409388

410389
def join(self):
411390
with self._cond:
412-
if self._is_shutdown_immediate():
413-
raise ShutDown
414391
if not self._unfinished_tasks._semlock._is_zero():
415392
self._cond.wait()
416-
if self._is_shutdown_immediate():
417-
raise ShutDown
418393

419-
def shutdown(self, immediate=False):
420-
with self._cond:
421-
is_alive = self._is_alive()
422-
super().shutdown(immediate)
423-
if is_alive:
394+
def _clear(self):
395+
with self._rlock:
396+
while self._poll():
397+
self._recv_bytes()
398+
self._unfinished_tasks.acquire(block=False)
399+
with self._cond:
424400
self._cond.notify_all()
425401

426402
#

0 commit comments

Comments
 (0)