From 3dff78d88c83ee408b05d3ed53ab7d7a03b5437a Mon Sep 17 00:00:00 2001 From: lanjelot Date: Mon, 20 Apr 2015 01:40:46 +1000 Subject: [PATCH] fixed bug where producer would exit prematurely --- patator.py | 55 ++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/patator.py b/patator.py index cf5b086..7dc9802 100755 --- a/patator.py +++ b/patator.py @@ -1364,37 +1364,45 @@ Please read the README inside for more examples and usage information. self.start_threads() self.monitor_progress() except KeyboardInterrupt: - ns.quit_now = True + pass except: - ns.quit_now = True logger.exception(exc_info()[1]) + finally: + ns.quit_now = True try: - while len(active_children()) > 1: - logger.debug('active: %s' % active_children()) + # waiting for reports enqueued by consumers to be flushed + while True: + active = active_children() + self.report_progress() + if not len(active)> 1: + break + logger.debug('active: %s' % active) sleep(.1) - self.report_progress() except KeyboardInterrupt: pass + if ns.total_size >= maxint: + total_size = -1 + else: + total_size = ns.total_size + + total_time = default_timer() - ns.start_time + hits_count = sum(p.hits_count for p in self.thread_progress) done_count = sum(p.done_count for p in self.thread_progress) skip_count = sum(p.skip_count for p in self.thread_progress) fail_count = sum(p.fail_count for p in self.thread_progress) - total_time = default_timer() - ns.start_time speed_avg = done_count / total_time - if ns.total_size >= maxint: - ns.total_size = -1 - self.show_final() logger.info('Hits/Done/Skip/Fail/Size: %d/%d/%d/%d/%d, Avg: %d r/s, Time: %s' % ( - hits_count, done_count, skip_count, fail_count, ns.total_size, speed_avg, + hits_count, done_count, skip_count, fail_count, total_size, speed_avg, pprint_seconds(total_time, '%dh %dm %ds'))) - if ns.quit_now: + if done_count < total_size: resume = [] for i, p in enumerate(self.thread_progress): c = p.done_count + p.skip_count @@ -1419,11 +1427,11 @@ Please read the README inside for more examples and usage information. self.fail_count = 0 self.seconds = [1]*25 # avoid division by zero early bug condition - task_queues = [Queue(maxsize=10000) for _ in range(self.num_threads)] + task_queues = [Queue() for _ in range(self.num_threads)] # consumers for num in range(self.num_threads): - report_queue = Queue(maxsize=1000) + report_queue = Queue() t = Process(name='Consumer-%d' % num, target=self.consume, args=(task_queues[num], report_queue)) t.daemon = True t.start() @@ -1544,12 +1552,19 @@ Please read the README inside for more examples and usage information. count += 1 - for q in task_queues: - q.cancel_join_thread() - - if not ns.quit_now: + if not ns.quit_now: + for q in task_queues: q.put(None) + logger.debug('producer done') + + while True: + if ns.quit_now: + for q in task_queues: + q.cancel_join_thread() + break + sleep(.5) + logger.debug('producer exits') def consume(self, task_queue, report_queue): @@ -1560,10 +1575,9 @@ Please read the README inside for more examples and usage information. signal.signal(signal.SIGINT, signal.SIG_IGN) def shutdown(): - report_queue.cancel_join_thread() if hasattr(module, '__del__'): module.__del__() - logger.debug('consumer exits') + logger.debug('consumer done') while True: if ns.quit_now: @@ -1676,7 +1690,8 @@ Please read the README inside for more examples and usage information. break def monitor_progress(self): - while len(active_children()) > 1 and not ns.quit_now: + # loop until SyncManager and Producer are the only children left alive + while len(active_children()) > 2 and not ns.quit_now: self.report_progress() self.monitor_interaction()