Python线程池实现

chouliqingke 2016-11-06

Python 的线程池主要有threadpool,不过它并不是内置的库,每次使用都需要安装,而且使用起来也不是那么好用,所以自己写了一个线程池实现,每次需要使用直接import即可。其中还可以根据传入的特征量handlerkey来获取每个任务的结果。

#!/bin/env python
# -*- coding:utf-8 -*-

"""
@lx
created on 2016-04-14
"""

import Queue
import sys
import threading
import time
import StringIO
import traceback

reload(sys)
sys.setdefaultencoding("utf8")


class MyThread(threading.Thread):
    """Background thread connected to the requests/results queues."""
    def __init__(self, workQueue, resultQueue, timeout=0.1, **kwds):
        threading.Thread.__init__(self, **kwds)
        self.setDaemon(True)
        self._workQueue = workQueue
        self._resultQueue = resultQueue
        self._timeout = timeout
        self._dismissed = threading.Event()
        self.start()

    def run(self):
        """Repeatedly process the job queue until told to exit."""
        while True:
            if self._dismissed.isSet():
                break

            handlerKey = None  # unique key
            code = 0  # callback return code
            handlerRet = None
            errMsg = ""

            try:
                callable, args, kwds = self._workQueue.get(True, self._timeout)
            except Queue.Empty:
                continue
            except:
                exceptMsg = StringIO.StringIO()
                traceback.print_exc(file=exceptMsg)
                errMsg = exceptMsg.getvalue()
                code = 3301  # system error
                self._resultQueue.put(
                        (handlerKey, code, (callable, args, kwds), errMsg))
                break

            if self._dismissed.isSet():
                self._workQueue.put((callable, args, kwds))
                break

            try:
                if "handlerKey" in kwds:
                    handlerKey = kwds["handlerKey"]
                handlerRet = callable(*args, **kwds)  # block
                self._resultQueue.put((handlerKey, code, handlerRet, errMsg))
            except:
                exceptMsg = StringIO.StringIO()
                traceback.print_exc(file=exceptMsg)
                errMsg = exceptMsg.getvalue()
                code = 3303
                self._resultQueue.put((handlerKey, code, handlerRet, errMsg))

    def dismiss(self):
        """Sets a flag to tell the thread to exit when done with current job."""
        self._dismissed.set()


class ThreadPool(object):
    def __init__(self, workerNums=3, timeout=0.1):
        self._workerNums = workerNums
        self._timeout = timeout
        self._workQueue = Queue.Queue()  # no maximum
        self._resultQueue = Queue.Queue()
        self.workers = []
        self.dismissedWorkers = []
        self._createWorkers(self._workerNums)

    def _createWorkers(self, workerNums):
        """Add num_workers worker threads to the pool."""
        for i in range(workerNums):
            worker = MyThread(self._workQueue, self._resultQueue,
                              timeout=self._timeout)
            self.workers.append(worker)

    def _dismissWorkers(self, workerNums, _join=False):
        """Tell num_workers worker threads to quit after their current task."""
        dismissList = []
        for i in range(min(workerNums, len(self.workers))):
            worker = self.workers.pop()
            worker.dismiss()
            dismissList.append(worker)

        if _join:
            for worker in dismissList:
                worker.join()
        else:
            self.dismissedWorkers.extend(dismissList)

    def _joinAllDissmissedWorkers(self):
        """
        Perform Thread.join() on all
        worker threads that have been dismissed.
        """
        for worker in self.dismissedWorkers:
            worker.join()
        self.dismissedWorkers = []

    def addJob(self, callable, *args, **kwds):
        self._workQueue.put((callable, args, kwds))

    def getResult(self, block=False, timeout=0.1):
        try:
            item = self._resultQueue.get(block, timeout)
            return item
        except Queue.Empty, e:
            return None
        except:
            raise

    def waitForComplete(self, timeout=0.1):
        """
        Last function. To dismiss all worker threads. Delete ThreadPool.
        :param timeout
        """
        while True:
            workerNums = self._workQueue.qsize()  # 释放掉所有线程
            runWorkers = len(self.workers)

            if 0 == workerNums:
                time.sleep(timeout)  # waiting for thread to do job
                self._dismissWorkers(runWorkers)
                break
            # if workerNums < runWorkers:  # 不能这样子乱取消
            #    self._dismissWorkers(runWorkers - workerNums)
            time.sleep(timeout)
        self._joinAllDissmissedWorkers()


if "__main__" == __name__:
    test1 = """
    def doSomething(*args, **kwds):
        if "sleep" in kwds:
            sleep = kwds["sleep"]
        msgTxt = "sleep %fs.." % sleep
        time.sleep(sleep)
        return msgTxt

    for i in range(10):
        print doSomething(sleep=0.1, handlerKey="key-%d"%i)

    wm = ThreadPool(10)
    for i in range(10):
        wm.addJob(doSomething, sleep=1, handlerKey="key-%d"%i)
    wm.waitForComplete()
    for i in range(10):
        print wm.getResult()
    del wm
    """
    # test2 = """

    def doSomething_(*args, **kwds):
        sleep = int(args[0])
        msgTxt = "sleep %ds.." % sleep
        time.sleep(sleep)
        return msgTxt


    wm = ThreadPool(10)
    result = []
    for i in range(10):
        data = 5
        wm.addJob(doSomething_, data)

    while 1:
        res = wm.getResult()
        if res:
            result.append(res)
        if 10 == len(result):
            break
        print "sleep 0.1"
        time.sleep(0.1)
    print time.time()
    wm.waitForComplete()
    print time.time()
# """

相关推荐