Source code for taurus.core.util.threadpool

#!/usr/bin/env python

# ###########################################################################
#
# This file is part of Taurus
#
# http://taurus-scada.org
#
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
#
# Taurus is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Taurus is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Taurus.  If not, see <http://www.gnu.org/licenses/>.
#
# ###########################################################################

"""adapted from http://code.activestate.com/recipes/576576/
"""

from threading import Thread, currentThread
from queue import Queue
from time import sleep, time
from traceback import extract_stack, format_list

from .prop import propertx
from .log import Logger

__all__ = ["ThreadPool", "Worker"]

__docformat__ = "restructuredtext"


[docs]class ThreadPool(Logger): """ """ NoJob = 6 * (None,) def __init__( self, name=None, parent=None, Psize=20, Qsize=20, daemons=True, worker_cls=None, ): Logger.__init__(self, name, parent) if worker_cls is None: self._worker_cls = Worker else: self._worker_cls = worker_cls self._daemons = daemons self.localThreadId = 0 self.workers = [] self.jobs = Queue(Qsize) self.size = Psize self.accept = True @propertx def size(): def set(self, newSize): """set method for the size property""" nb_workers = len(self.workers) if newSize == nb_workers: return for i in range(newSize - nb_workers): self.localThreadId += 1 name = "%s.W%03i" % (self.log_name, self.localThreadId) new = self._worker_cls(self, name, self._daemons) self.workers.append(new) self.debug("Starting %s" % name) new.start() # remove the old worker threads nb_workers = len(self.workers) for i in range(nb_workers - newSize): self.jobs.put(self.NoJob) def get(self): """get method for the size property""" return len(self.workers) return get, set, None, "number of threads"
[docs] def add(self, job, callback=None, *args, **kw): if self.accept: # first gather some information on the object which requested the # job in case the job throws an exception th_id, stack = currentThread().name, extract_stack()[:-1] self.jobs.put((job, args, kw, callback, th_id, stack))
[docs] def join(self): self.accept = False while True: for w in self.workers: if w.is_alive(): self.jobs.put(self.NoJob) break else: break
@property def qsize(self): return self.jobs.qsize()
[docs] def getNumOfBusyWorkers(self): """Get the number of workers that are in busy mode.""" n = 0 for w in self.workers: if w.isBusy(): n += 1 return n
[docs]class Worker(Thread, Logger): def __init__(self, pool, name=None, daemon=True): name = name or self.__class__.__name__ Thread.__init__(self, name=name) Logger.__init__(self, name, pool) self.daemon = daemon self.pool = pool self.cmd = "" self.busy = False
[docs] def run(self): get = self.pool.jobs.get while True: cmd, args, kw, callback, th_id, stack = get() if cmd: self.busy = True self.cmd = cmd.__name__ try: if callback: callback(cmd(*args, **kw)) else: cmd(*args, **kw) except Exception: orig_stack = "".join(format_list(stack)) self.error( "Uncaught exception running job '%s' called " "from thread %s:\n%s", self.cmd, th_id, orig_stack, exc_info=1, ) finally: self.busy = False self.cmd = "" else: self.pool.workers.remove(self) return
[docs] def isBusy(self): return self.busy
if __name__ == "__main__": def easyJob(*arg, **kw): n = arg[0] print("\tSleep\t\t", n) sleep(n) return "Slept\t%d" % n def longJob(*arg, **kw): print("\tStart\t\t\t", arg, kw) n = arg[0] * 3 sleep(n) return "Job done in %d" % n def badJob(*a, **k): print("\n !!! OOOPS !!!\n") _ = 1 / 0 def show(*arg, **kw): print("callback : %s" % arg[0]) def test_1(**kwargs): workers = kwargs.pop("workers", 5) jobqueue = kwargs.pop("jobqueue", 10) pool = ThreadPool(name="ThreadPool", Psize=workers, Qsize=jobqueue) print("\n\t\t... let's add some jobs ...\n") for j in range(5): if j == 1: pool.add(badJob) for i in range(5, 0, -1): pool.add(longJob, show, i) pool.add(easyJob, show, i) print( "\t\t... now waiting for the %i workers to get the %i jobs done..." % (pool.size, pool.qsize) ) sleep(15) print( "\n\t\t... ok, it may be long: let's get some reinforcement ...\n" ) sleep(5) pool.size = 50 print("\n\t\t... Joining ...\n") pool.join() print("\n\t\t... Ok ...\n") def test_2(**kwargs): workers = kwargs.pop("workers", 5) jobqueue = kwargs.pop("jobqueue", 10) numjobs = kwargs.pop("numjobs", 10) sleep_t = kwargs.pop("sleep_t", 1) # from taurus.core.util.threadpool import ThreadPool pool = ThreadPool(name="ThreadPool", Psize=workers, Qsize=jobqueue) print("\n\t\t... Check the number of busy workers ...\n") print("Num of busy workers = %s" % (pool.getNumOfBusyWorkers())) print("\n\t\t... let's add some jobs ...\n") for i in range(numjobs): pool.add(easyJob, None, sleep_t) print("\n\t\t... Monitoring the busy workers ...\n") t0 = time() while pool.getNumOfBusyWorkers() > 0: print("busy workers = %s" % (pool.getNumOfBusyWorkers())) sleep(0.5) t1 = time() print("Run %s jobs of 1 second took %.3f" % (numjobs, t1 - t0)) print("\n\t\t... Joining ...\n") pool.join() print("\n\t\t... Ok ...\n") def main(argv): kwargs = {} for arg in argv: k, v = arg.split("=") kwargs[k] = int(v) # Run test test_1(**kwargs) test_2(**kwargs) import sys main(sys.argv[1:])