liuxiaohua 2019-06-25
对Python线程池的研究是之前对Apshceduler分析的附加工作。
在之前对Apshceduler源码分析的文章中,写到调度器将任务放入线程池的函数
def _do_submit_job(self, job, run_times): def callback(f): exc, tb = (f.exception_info() if hasattr(f, 'exception_info') else (f.exception(), getattr(f.exception(), '__traceback__', None))) if exc: self._run_job_error(job.id, exc, tb) else: self._run_job_success(job.id, f.result()) f = self._pool.submit(_run_job, job, job._jobstore_alias, run_times, self._logger.name) f.add_done_callback(callback)
这里分析的线程池类是concurrent.futures.ThreadPoolExecutor,也就是上述代码中self._pool所使用的类。先上self._pool.submit函数的代码,再做详细分析
def submit(self, fn, *args, **kwargs): with self._shutdown_lock: if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') f = _base.Future() w = _WorkItem(f, fn, args, kwargs) self._work_queue.put(w) self._adjust_thread_count() return f
f和w是两个非常重要的变量,f作为submit返回的对象,submit函数的调用者可以对其添加回调,待fn执行完成后,会在当前线程执行,具体是如何实现的,这里先不说,下面再详细分析;w则是封装了线程需要执行的方法和参数,通过self._work_queue.put(w)方法放入一个队列当中。
self._adjust_thread_count()方法则是检查当前线程池的线程数量,如果小于设定的最大值,就开辟一个线程,代码就不上了,直接看这些个线程都是干嘛的
def _worker(executor_reference, work_queue): try: while True: work_item = work_queue.get(block=True) if work_item is not None: work_item.run() # Delete references to object. See issue16284 del work_item continue executor = executor_reference() # Exit if: # - The interpreter is shutting down OR # - The executor that owns the worker has been collected OR # - The executor that owns the worker has been shutdown. if _shutdown or executor is None or executor._shutdown: # Notice other workers work_queue.put(None) return del executor except BaseException: _base.LOGGER.critical('Exception in worker', exc_info=True)
这些线程就是一个死循环,不断的从任务队列中获取到_WorkItem,然后通过其封装方法,执行我们需要的任务。如果取到的任务为None,就往队列中再放入一个None,以通知其它线程结束,然后结束当前循环。
def run(self): if not self.future.set_running_or_notify_cancel(): return try: result = self.fn(*self.args, **self.kwargs) except BaseException as e: self.future.set_exception(e) else: self.future.set_result(result)
如果没有异常,执行结束后,会执行之前我们说的回调。在self.future.set_result(result)方法中会执行任务回调,当然了,是在当前线程中。如果需要写入数据库之类的操作,不建议在回调中直接写入。