fixed bug where producer would exit prematurely

pull/12/head
lanjelot 9 years ago
parent 641ea8f563
commit 3dff78d88c

@ -1364,37 +1364,45 @@ Please read the README inside for more examples and usage information.
self.start_threads()
self.monitor_progress()
except KeyboardInterrupt:
ns.quit_now = True
pass
except:
ns.quit_now = True
logger.exception(exc_info()[1])
finally:
ns.quit_now = True
try:
while len(active_children()) > 1:
logger.debug('active: %s' % active_children())
# waiting for reports enqueued by consumers to be flushed
while True:
active = active_children()
self.report_progress()
if not len(active)> 1:
break
logger.debug('active: %s' % active)
sleep(.1)
self.report_progress()
except KeyboardInterrupt:
pass
if ns.total_size >= maxint:
total_size = -1
else:
total_size = ns.total_size
total_time = default_timer() - ns.start_time
hits_count = sum(p.hits_count for p in self.thread_progress)
done_count = sum(p.done_count for p in self.thread_progress)
skip_count = sum(p.skip_count for p in self.thread_progress)
fail_count = sum(p.fail_count for p in self.thread_progress)
total_time = default_timer() - ns.start_time
speed_avg = done_count / total_time
if ns.total_size >= maxint:
ns.total_size = -1
self.show_final()
logger.info('Hits/Done/Skip/Fail/Size: %d/%d/%d/%d/%d, Avg: %d r/s, Time: %s' % (
hits_count, done_count, skip_count, fail_count, ns.total_size, speed_avg,
hits_count, done_count, skip_count, fail_count, total_size, speed_avg,
pprint_seconds(total_time, '%dh %dm %ds')))
if ns.quit_now:
if done_count < total_size:
resume = []
for i, p in enumerate(self.thread_progress):
c = p.done_count + p.skip_count
@ -1419,11 +1427,11 @@ Please read the README inside for more examples and usage information.
self.fail_count = 0
self.seconds = [1]*25 # avoid division by zero early bug condition
task_queues = [Queue(maxsize=10000) for _ in range(self.num_threads)]
task_queues = [Queue() for _ in range(self.num_threads)]
# consumers
for num in range(self.num_threads):
report_queue = Queue(maxsize=1000)
report_queue = Queue()
t = Process(name='Consumer-%d' % num, target=self.consume, args=(task_queues[num], report_queue))
t.daemon = True
t.start()
@ -1544,12 +1552,19 @@ Please read the README inside for more examples and usage information.
count += 1
for q in task_queues:
q.cancel_join_thread()
if not ns.quit_now:
if not ns.quit_now:
for q in task_queues:
q.put(None)
logger.debug('producer done')
while True:
if ns.quit_now:
for q in task_queues:
q.cancel_join_thread()
break
sleep(.5)
logger.debug('producer exits')
def consume(self, task_queue, report_queue):
@ -1560,10 +1575,9 @@ Please read the README inside for more examples and usage information.
signal.signal(signal.SIGINT, signal.SIG_IGN)
def shutdown():
report_queue.cancel_join_thread()
if hasattr(module, '__del__'):
module.__del__()
logger.debug('consumer exits')
logger.debug('consumer done')
while True:
if ns.quit_now:
@ -1676,7 +1690,8 @@ Please read the README inside for more examples and usage information.
break
def monitor_progress(self):
while len(active_children()) > 1 and not ns.quit_now:
# loop until SyncManager and Producer are the only children left alive
while len(active_children()) > 2 and not ns.quit_now:
self.report_progress()
self.monitor_interaction()

Loading…
Cancel
Save