123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-"
- # vim: set expandtab tabstop=4 shiftwidth=4:
- """
- This file is part of the XSSer project, https://xsser.03c8.net
- Copyright (c) 2010/2020 | psy <epsylon@riseup.net>
- xsser is free software; you can redistribute it and/or modify it under
- the terms of the GNU General Public License as published by the Free
- Software Foundation version 3 of the License.
- xsser is distributed in the hope that it will be useful, but WITHOUT ANY
- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
- details.
- You should have received a copy of the GNU General Public License along
- with xsser; if not, write to the Free Software Foundation, Inc., 51
- Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
- """
- """Easy to use object-oriented thread pool framework.
- A thread pool is an object that maintains a pool of worker threads to perform
- time consuming operations in parallel. It assigns jobs to the threads
- by putting them in a work request queue, where they are picked up by the
- next available thread. This then performs the requested operation in the
- background and puts the results in another queue.
- The thread pool object can then collect the results from all threads from
- this queue as soon as they become available or after all threads have
- finished their work. It's also possible, to define callbacks to handle
- each result as it comes in.
- The basic concept and some code was taken from the book "Python in a Nutshell,
- 2nd edition" by Alex Martelli, O'Reilly 2006, ISBN 0-596-10046-9, from section
- 14.5 "Threaded Program Architecture". I wrapped the main program logic in the
- ThreadPool class, added the WorkRequest class and the callback system and
- tweaked the code here and there. Kudos also to Florent Aide for the exception
- handling mechanism.
- Basic usage::
- >>> pool = ThreadPool(poolsize)
- >>> requests = makeRequests(some_callable, list_of_args, callback)
- >>> [pool.putRequest(req) for req in requests]
- >>> pool.wait()
- See the end of the module code for a brief, annotated usage example.
- Website : http://chrisarndt.de/projects/threadpool/
- """
- __docformat__ = "restructuredtext en"
- __all__ = [
- 'makeRequests',
- 'NoResultsPending',
- 'NoWorkersAvailable',
- 'ThreadPool',
- 'WorkRequest',
- 'WorkerThread'
- ]
- __author__ = "Christopher Arndt"
- __version__ = '1.2.7'
- __revision__ = "$Revision: 416 $"
- __date__ = "$Date: 2009-10-07 05:41:27 +0200 (Wed, 07 Oct 2009) $"
- __license__ = "MIT license"
- # standard library modules
- import sys
- import threading
- try:
- import queue
- except ImportError:
- import queue as Queue
- from queue import Empty
- import traceback
- # exceptions
- class NoResultsPending(Exception):
- """All work requests have been processed."""
- pass
- class NoWorkersAvailable(Exception):
- """No worker threads available to process remaining requests."""
- pass
- # internal module helper functions
- def _handle_thread_exception(request, exc_info):
- """Default exception handler callback function.
- This just prints the exception info via ``traceback.print_exception``.
- """
- traceback.print_exception(*exc_info)
- # utility functions
- def makeRequests(callable_, args_list, callback=None,
- exc_callback=_handle_thread_exception):
- """Create several work requests for same callable with different arguments.
- Convenience function for creating several work requests for the same
- callable where each invocation of the callable receives different values
- for its arguments.
- ``args_list`` contains the parameters for each invocation of callable.
- Each item in ``args_list`` should be either a 2-item tuple of the list of
- positional arguments and a dictionary of keyword arguments or a single,
- non-tuple argument.
- See docstring for ``WorkRequest`` for info on ``callback`` and
- ``exc_callback``.
- """
- requests = []
- for item in args_list:
- is_crawling = False
- try:
- psy = item[3] # black magic!
- is_crawling = True
- except:
- is_crawling = False
- if isinstance(item, tuple):
- requests.append(
- WorkRequest(callable_, item[0], item[1], callback=callback,
- exc_callback=exc_callback)
- )
- else:
- if is_crawling == True:
- requests.append(
- WorkRequest(callable_, [item], None, callback=callback,
- exc_callback=exc_callback)
- )
- else:
- requests.append(
- WorkRequest(callable_, item, None, callback=callback,
- exc_callback=exc_callback)
- )
- return requests
- # classes
- class WorkerThread(threading.Thread):
- """Background thread connected to the requests/results queues.
- A worker thread sits in the background and picks up work requests from
- one queue and puts the results in another until it is dismissed.
- """
- def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds):
- """Set up thread in daemonic mode and start it immediatedly.
- ``requests_queue`` and ``results_queue`` are instances of
- ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a new
- worker thread.
- """
- threading.Thread.__init__(self, **kwds)
- self.setDaemon(1)
- self._requests_queue = requests_queue
- self._results_queue = results_queue
- self._poll_timeout = poll_timeout
- self._dismissed = threading.Event()
- self.start()
- def run(self):
- """Repeatedly process the job queue until told to exit."""
- while True:
- if self._dismissed.isSet():
- # we are dismissed, break out of loop
- break
- # get next work request. If we don't get a new request from the
- # queue after self._poll_timout seconds, we jump to the start of
- # the while loop again, to give the thread a chance to exit.
- try:
- request = self._requests_queue.get(True, self._poll_timeout)
- except Empty:
- continue
- else:
- if self._dismissed.isSet():
- # we are dismissed, put back request in queue and exit loop
- self._requests_queue.put(request)
- break
- try:
- result = request.callable(*request.args, **request.kwds)
- self._results_queue.put((request, result))
- except:
- request.exception = True
- self._results_queue.put((request, sys.exc_info()))
- def dismiss(self):
- """Sets a flag to tell the thread to exit when done with current job."""
- self._dismissed.set()
- class WorkRequest:
- """A request to execute a callable for putting in the request queue later.
- See the module function ``makeRequests`` for the common case
- where you want to build several ``WorkRequest`` objects for the same
- callable but with different arguments for each call.
- """
- def __init__(self, callable_, args=None, kwds=None, requestID=None,
- callback=None, exc_callback=_handle_thread_exception):
- """Create a work request for a callable and attach callbacks.
- A work request consists of the a callable to be executed by a
- worker thread, a list of positional arguments, a dictionary
- of keyword arguments.
- A ``callback`` function can be specified, that is called when the
- results of the request are picked up from the result queue. It must
- accept two anonymous arguments, the ``WorkRequest`` object and the
- results of the callable, in that order. If you want to pass additional
- information to the callback, just stick it on the request object.
- You can also give custom callback for when an exception occurs with
- the ``exc_callback`` keyword parameter. It should also accept two
- anonymous arguments, the ``WorkRequest`` and a tuple with the exception
- details as returned by ``sys.exc_info()``. The default implementation
- of this callback just prints the exception info via
- ``traceback.print_exception``. If you want no exception handler
- callback, just pass in ``None``.
- ``requestID``, if given, must be hashable since it is used by
- ``ThreadPool`` object to store the results of that work request in a
- dictionary. It defaults to the return value of ``id(self)``.
- """
- if requestID is None:
- self.requestID = id(self)
- else:
- try:
- self.requestID = hash(requestID)
- except TypeError:
- raise TypeError("requestID must be hashable.")
- self.exception = False
- self.callback = callback
- self.exc_callback = exc_callback
- self.callable = callable_
- self.args = args or []
- self.kwds = kwds or {}
- def __str__(self):
- return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \
- (self.requestID, self.args, self.kwds, self.exception)
- class ThreadPool:
- """A thread pool, distributing work requests and collecting results.
- See the module docstring for more information.
- """
- def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):
- """Set up the thread pool and start num_workers worker threads.
- ``num_workers`` is the number of worker threads to start initially.
- If ``q_size > 0`` the size of the work *request queue* is limited and
- the thread pool blocks when the queue is full and it tries to put
- more work requests in it (see ``putRequest`` method), unless you also
- use a positive ``timeout`` value for ``putRequest``.
- If ``resq_size > 0`` the size of the *results queue* is limited and the
- worker threads will block when the queue is full and they try to put
- new results in it.
- .. warning:
- If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is
- the possibilty of a deadlock, when the results queue is not pulled
- regularly and too many jobs are put in the work requests queue.
- To prevent this, always set ``timeout > 0`` when calling
- ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions.
- """
- self._requests_queue = queue.Queue(q_size)
- self._results_queue = queue.Queue(resq_size)
- self.workers = []
- self.dismissedWorkers = []
- self.workRequests = {}
- self.createWorkers(num_workers, poll_timeout)
- def createWorkers(self, num_workers, poll_timeout=5):
- """Add num_workers worker threads to the pool.
- ``poll_timout`` sets the interval in seconds (int or float) for how
- ofte threads should check whether they are dismissed, while waiting for
- requests.
- """
- for i in range(num_workers):
- self.workers.append(WorkerThread(self._requests_queue,
- self._results_queue, poll_timeout=poll_timeout))
- def dismissWorkers(self, num_workers, do_join=False):
- """Tell num_workers worker threads to quit after their current task."""
- dismiss_list = []
- for i in range(min(num_workers, len(self.workers))):
- worker = self.workers.pop()
- worker.dismiss()
- dismiss_list.append(worker)
- if do_join:
- for worker in dismiss_list:
- worker.join()
- else:
- self.dismissedWorkers.extend(dismiss_list)
- def joinAllDismissedWorkers(self):
- """Perform Thread.join() on all worker threads that have been dismissed.
- """
- for worker in self.dismissedWorkers:
- worker.join()
- self.dismissedWorkers = []
- def putRequest(self, request, block=True, timeout=None):
- """Put work request into work queue and save its id for later."""
- assert isinstance(request, WorkRequest)
- # don't reuse old work requests
- assert not getattr(request, 'exception', None)
- self._requests_queue.put(request, block, timeout)
- self.workRequests[request.requestID] = request
- def addRequest(self, do_cb, data, print_cb, exception_cb, block=True,
- timeout=None):
- """Put work request into work queue and save its id for later."""
- requests = makeRequests(do_cb, data, print_cb, exception_cb)
- for req in requests:
- self.putRequest(req, block, timeout)
- def poll(self, block=False):
- """Process any new results in the queue."""
- while True:
- # still results pending?
- if not self.workRequests:
- raise NoResultsPending
- # are there still workers to process remaining requests?
- elif block and not self.workers:
- raise NoWorkersAvailable
- try:
- # get back next results
- request, result = self._results_queue.get(block=block)
- # has an exception occured?
- if request.exception and request.exc_callback:
- request.exc_callback(request, result)
- # hand results to callback, if any
- if request.callback and not \
- (request.exception and request.exc_callback):
- request.callback(request, result)
- del self.workRequests[request.requestID]
- except Empty:
- break
- def wait(self):
- """Wait for results, blocking until all have arrived."""
- while 1:
- try:
- self.poll(True)
- except NoResultsPending:
- break
- ################
- # USAGE EXAMPLE
- ################
- if __name__ == '__main__':
- import random
- import time
- # the work the threads will have to do (rather trivial in our example)
- def do_something(data):
- time.sleep(random.randint(1,5))
- result = round(random.random() * data, 5)
- # just to show off, we throw an exception once in a while
- if result > 5:
- raise RuntimeError("Something extraordinary happened!")
- return result
- # this will be called each time a result is available
- def print_result(request, result):
- print(("**** Result from request #%s: %r" % (request.requestID, result)))
- # this will be called when an exception occurs within a thread
- # this example exception handler does little more than the default handler
- def handle_exception(request, exc_info):
- if not isinstance(exc_info, tuple):
- # Something is seriously wrong...
- print(request)
- print(exc_info)
- raise SystemExit
- print(("**** Exception occured in request #%s: %s" % \
- (request.requestID, exc_info)))
- # assemble the arguments for each job to a list...
- data = [random.randint(1,10) for i in range(20)]
- # ... and build a WorkRequest object for each item in data
- requests = makeRequests(do_something, data, print_result, handle_exception)
- # to use the default exception handler, uncomment next line and comment out
- # the preceding one.
- #requests = makeRequests(do_something, data, print_result)
- # or the other form of args_lists accepted by makeRequests: ((,), {})
- data = [((random.randint(1,10),), {}) for i in range(20)]
- requests.extend(
- makeRequests(do_something, data, print_result, handle_exception)
- #makeRequests(do_something, data, print_result)
- # to use the default exception handler, uncomment next line and comment
- # out the preceding one.
- )
- # we create a pool of 3 worker threads
- print("Creating thread pool with 3 worker threads.")
- main = ThreadPool(3)
- # then we put the work requests in the queue...
- for req in requests:
- main.putRequest(req)
- print(("Work request #%s added." % req.requestID))
- # or shorter:
- # [main.putRequest(req) for req in requests]
- # ...and wait for the results to arrive in the result queue
- # by using ThreadPool.wait(). This would block until results for
- # all work requests have arrived:
- # main.wait()
- # instead we can poll for results while doing something else:
- i = 0
- while True:
- try:
- time.sleep(0.5)
- main.poll()
- print(("Main thread working...",))
- print(("(active worker threads: %i)" % (threading.activeCount()-1, )))
- if i == 10:
- print("**** Adding 3 more worker threads...")
- main.createWorkers(3)
- if i == 20:
- print("**** Dismissing 2 worker threads...")
- main.dismissWorkers(2)
- i += 1
- except KeyboardInterrupt:
- print("**** Interrupted!")
- break
- except NoResultsPending:
- print("**** No pending results.")
- break
- if main.dismissedWorkers:
- print("Joining all dismissed worker threads...")
- main.joinAllDismissedWorkers()
|