Lighttpd 1.5的多线程实现

82463862 2011-09-13

lighttpd 1.4.x是一个典型的多进程linux程序,在单个进程内部没有使用多线程,同一时刻只有一个线程在运行。

而到了lighttpd 1.5版本,也使用了多线程来完成某些工作。lighttpd通过GAsyncQueue(GLIB异步队列)的方式实现了一个线程池,从而完成了多线程的运作(需要安装glibc-dev库)。

GAsyncQueue类型的异步消息队列的定义在base.h中

  1. GAsyncQueue *stat_queue; /* send a stat_job into this queue and joblist_queue will get a wakeup when the stat is finished */ 
  2. GAsyncQueue *joblist_queue; 
  3. GAsyncQueue *aio_write_queue; 

在sever.c文件中,对其进行初始化:

  1. srv->stat_queue = g_async_queue_new(); 
  2. srv->joblist_queue = g_async_queue_new(); 
  3. srv->aio_write_queue = g_async_queue_new(); 

同时,在server.c中,也定义了线程:

  1. #ifdef USE_GTHREAD  
  2.     GThread **stat_cache_threads;   //定义了一个指针的指针,也就是一个数组。GThread由g_thread-2.0库提供,需要安装这个库  
  3.     GThread **aio_write_threads = NULL; 
  4. #ifdef USE_LINUX_AIO_SENDFILE  
  5.     GThread *linux_aio_read_thread_id = NULL; 
  6. #endif  
  7.     GError *gerr = NULL; 
  8. #endif 

下面以stat_cache_threads为例,说明GAsyncQueue实现线程池的方式,这个线程的功能是处理页面的stat_cache信息:

 

首先在lighttpd的main函数中,为线程申请空间、创建线程,其中srv->srvconf.max_stat_threads即stat_cache线程数量,是从配置文件中读取到的:

  1. stat_cache_threads = calloc(srv->srvconf.max_stat_threads, sizeof(*stat_cache_threads)); 
  2.  
  3. for (i = 0; i < srv->srvconf.max_stat_threads; i++) { 
  4.         stat_cache_threads[i] = g_thread_create(stat_cache_thread, srv, 1, &gerr); 
  5.         if (gerr) { 
  6.             ERROR("g_thread_create failed: %s", gerr->message); 
  7.             return -1; 
  8.         } 

于是stat_cache_thread函数(注意,结尾没有“s”)就被作为线程,执行起来了,其句柄保存在stat_cache_threads数组中。

stat_cache_thread函数在stat_cache.c文件中,为:

  1. gpointer stat_cache_thread(gpointer _srv) { 
  2.     server *srv = (server *)_srv; 
  3.     stat_job *sj = NULL;     
  4.     /* take the stat-job-queue */ 
  5.     GAsyncQueue * inq; 
  6.     g_async_queue_ref(srv->stat_queue); 
  7.     inq = srv->stat_queue; 
  8.     /* */ 
  9.     while (!srv->is_shutdown) { 
  10.         /* let's see what we have to stat */ 
  11.         struct stat st; 
  12.         if ((sj = g_async_queue_pop(inq))) { 
  13.             if(sj == (stat_job *) 1) 
  14.                 continue/* just notifying us that srv->is_shutdown changed */ 
  15.             /* don't care about the return code for now */ 
  16.             stat(sj->name->ptr, &st); 
  17.             joblist_async_append(srv, sj->con); 
  18.             stat_job_free(sj); 
  19.         } 
  20.     } 
  21.     g_async_queue_unref(srv->stat_queue); 
  22.     return NULL; 

在这里,又看到了g_async_queue的身影,g_async_queue_ref函数的作用是在不加锁的情况下,获得对stat_gueue异步队列的引用。在个在线程启动的时候调用,之后,就会进入while循环,这个while循环的条件是!srv->is_shutdown,也就是程序只要不退出,循环一直进行。在循环内部,可以看到一条if判断语句:

  1. if ((sj = g_async_queue_pop(inq))) 

g_async_queue_pop也是线程池的关键,glibc中对这个函数的注释为:

 

Pops data from the queue. This function blocks until data become available

当异步队列中有可用数据时,将其弹出,否则将一直阻塞在这里。

 

接下来还要判断从异步队列中取得的数据是否为1,这是因为lighttpd将1设为线程退出的指令,当异步队列中pop出1时,线程就要退出。如果不为1说明是正常的数据(任务),需要处理。处理完成后,继续循环等待新的任务到来。

那又是在哪把任务、消息加入到stat_cache队列中的呢?主要有两个地方,一个是在stat_cache.c中的stat_cache_get_entry_internal()函数内部,调用了g_async_queue_push(srv->stat_queue, sj);将sj推入了stat_queue队列;另一处在server.c的main函数的最后几行,

  1. for (i = 0; i < srv->srvconf.max_stat_threads; i++) { 
  2.             g_async_queue_push(srv->stat_queue, (void *) 1); 

注意,这里向stat_cache队列中加入了多次1,目的通知每个线程,程序即将结束,需要线程退出。

 

整个脉络现在基本清晰了,也就是lighttpd程序维护了异步消息队列stat_queue,然后启动了几个线程stat_cache_thread,每个线程都在循环等待stat_queue中是否有新加入的数据(任务)。当新的数据(任务)来临时,某一个线程的g_async_queue_pop函数就会返回这个数据的指针,然后线程就将处理这次的任务。处理完成后,stat_cache_thread线程就会运行到g_async_queue_pop函数处,并阻塞在这里,等待下次任务的到来。

Lighttpd 的详细介绍:请点这里
Lighttpd 的下载地址:请点这里

相关阅读

相关推荐