The problem is that you have a race condition, defined as:
A race condition arises in software when a computer program, to operate properly, depends on the sequence or timing of the program's processes or threads.
In method WorkerProcess._work
, your main loop begins:
while not self._stop_event.is_set():
try:
job = self._in_queue.get(timeout=.01)
except Empty:
continue
if not job:
self._in_queue.task_done()
break
self._stop_event
is being set by the _collect
thread. Depending on where WorkerProcess._work
is in the loop when this occurs, it can exit the loop leaving the None
that has been place on the _in_queue
signifying no more jobs. Clearly, this occurs twice for two processes. It could happen even for 0, 1, 2 or 3 processes.
The fix is to replace while not self._stop_event.is_set():
with while True:
and to just rely on finding None
on the _in_queue
to signify termination. This enables you to remove those extra calls to task_done
for those processes that have completed normally (you actually only needed one extra call per successfully completed process instead of the two you have).
But that is half of the problem. The other half is you have in your code:
def _join_workers(self):
for p in self._spawned_procs:
p.join(TIMEOUT)
...
p.terminate()
Therefore, you are not allowing your workers enough time to deplete the _in_queue
and thus there is the possibility of an arbitrary number of messages being left on it (in the example you have, of course, there would be just the current "job" being processed and the None
sentinel for a total of 2).
But this is the problem in general with the code: it has been over-engineered. As an example, referring back to the first code snippet above. It can be further simplified to:
while True:
job = self._in_queue.get() # blocking get
if not job:
break
Moreover, there is no reason to even be using a JoinableQueue
or Event
instance since the use of a None
sentinel placed on the _in_queue
is sufficient to signify that the worker processes should terminate, especially if you are going to be prematurely terminating the workers. The simplified, working code is:
import time
import threading
from multiprocessing import Process, Queue, cpu_count, current_process
TIMEOUT = 3
class WorkersManager(object):
def __init__(self, jobs, processes_num):
self._processes_num = processes_num if processes_num else cpu_count()
self._workers_num = processes_num
self._in_queue, self._run_queue, self._out_queue = Queue(), Queue(), Queue()
self._spawned_procs = []
self._total = 0
self._jobs_on_procs = {}
self._wk_kwargs = dict(
in_queue=self._in_queue, run_queue=self._run_queue, out_queue=self._out_queue
)
self._in_stream = [j for j in jobs]
self._out_stream = []
self._total = len(self._in_stream)
def run(self):
# Spawn Worker
worker_processes = [
WorkerProcess(i, **self._wk_kwargs) for i in range(self._processes_num)
]
self._spawned_procs = [
Process(target=process.run, args=tuple())
for process in worker_processes
]
for p in self._spawned_procs:
p.start()
self._serve()
monitor = threading.Thread(target=self._monitor, args=tuple())
monitor.start()
collector = threading.Thread(target=self._collect, args=tuple())
collector.start()
self._join_workers()
# TODO: Terminiate threads
monitor.join()
collector.join()
return self._out_stream
def _join_workers(self):
for p in self._spawned_procs:
p.join(TIMEOUT)
if p.is_alive():
p.terminate()
job = self._jobs_on_procs.get(p.name)
print('Process TIMEOUT: {0} {1}'.format(p.name, job))
result = {
"status": "failed"
}
self._out_queue.put(result)
else:
if p.exitcode == 0:
print("{} exit with code:{}".format(p, p.exitcode))
else:
job = self._jobs_on_procs.get(p.name)
if p.exitcode > 0:
print("{} with code:{} {}".format(p, p.exitcode, job))
else:
print("{} been killed with code:{} {}".format(p, p.exitcode, job))
result = {
"status": "failed"
}
self._out_queue.put(result)
def _collect(self):
# TODO: Spawn a collector proc
while True:
r = self._out_queue.get()
self._out_stream.append(r)
if len(self._out_stream) >= self._total:
print("Total {} jobs done.".format(len(self._out_stream)))
break
def _serve(self):
for job in self._in_stream:
self._in_queue.put(job)
for _ in range(self._workers_num):
self._in_queue.put(None)
def _monitor(self):
running = 0
while True:
proc_name, job = self._run_queue.get()
running += 1
self._jobs_on_procs.update({proc_name: job})
if running == self._total:
break
class WorkerProcess(object):
def __init__(self, worker_id, in_queue, run_queue, out_queue):
self._worker_id = worker_id
self._in_queue = in_queue
self._run_queue = run_queue
self._out_queue = out_queue
def run(self):
self._work()
print('worker - {} quit'.format(self._worker_id))
def _work(self):
print("worker - {0} start to work".format(self._worker_id))
job = {}
while True:
job = self._in_queue.get()
if not job:
break
try:
proc = current_process()
self._run_queue.put((proc.name, job))
r = self._run_job(job)
self._out_queue.put(r)
except Exception as err:
print('Unhandle exception: {0}'.format(err), exc_info=True)
result = {"status": 'failed'}
self._out_queue.put(result)
def _run_job(self, job):
time.sleep(job)
return {
'status': 'succeed'
}
def main():
jobs = [3, 4, 5, 6, 7]
procs_num = 3
m = WorkersManager(jobs, procs_num)
m.run()
if __name__ == "__main__":
main()
Prints:
worker - 0 start to work
worker - 1 start to work
worker - 2 start to work
Process TIMEOUT: Process-1 3
Process TIMEOUT: Process-2 6
Process TIMEOUT: Process-3 7
Total 5 jobs done.
You are probably aware of this, but due diligence requires that I mention that there are two excellent classes multiprocessing.Pool
and concurrent.futures.ProcessPoolExecutor
for doing what you want to accomplish. See this for some comparisons.
Further Explanation
What is the point of using a JoinableQueue
, which supports calls to task_done
? Usually, it is so that you can be sure that all of the messages that you have placed on the queue have been taken off the queue and processed and the main process will not be terminating prematurely before that has occurred. But this could not work correctly in the code as you had it because you were giving your processes only TIMEOUT
seconds to process its messages and then terminating the process if it was still alive with the possibility that messages were still left on its queue. This is what forced you to artificially issue extra calls to task_done
just so your calls to join
on the queues in the main process would not hang and why you had to post this question to begin with.
So there are two ways you could have proceeded differently. One way would have allowed you to continue using JoinableQueue
instances and calling join
on these instances to know when to terminate. But (1) you would not then be able to prematurely terminate your message processes and (2) your message processes must handle exceptions correctly so that they do not prematurely terminate without emptying their queues.
The other way is what I proposed, which is much simpler. The main process simply places on the input queue a special sentinel message, in this case None
. This is just a message that cannot be mistaken for an actual message to be processed and instead signifies end of file or, in other words, a signal to the message process that there are no more messages that will be placed on the queue and it may now terminate. Thus, the main process just has to place in addition to the "real" messages to be processed on the queues, the additional sentinel message and then instead of doing a join
call on the message queues (which are now only regular, non-joinable queues), it does join(TIMEOUT)
on each process instance, which you will either find to be no longer alive because it has seen the sentinel and therefore you know that it has processed all of its messages or you can call terminate
on the process if you are willing to leave messages on its input queue.
Of course, to be really sure that processes that terminated on their own really emptied their queue might require you to check their queues to see that they are indeed empty. But I assume that you should be able to code your processes to handle exceptions correctly, at least those that can be handled, so that they do not terminate prematurely and do something "reasonable" with every message.