comeonxueRong 2010-05-07
线程1:
线程和进程类似,但是线程之间能够共享更多的信息。一个进程中的所有线程可以共享进程文件描述符和内存。
有了多线程控制,我们可以把我们的程序设计成为在一个进程同时做多个任务,每一个线程做一个独立的任务,这种
方式可以有以下好处:
1、通过把每一个事件分配给一个线程处理,可以简化异步事件处理的代码。每一个线程可以用同步编程模型,而同步
编程要比异步编程简单的多。
2、多个进程需要使用复杂的机制来共享内存和文件描述符。而线程可以自动共享同一内存地址空间和文件描述符。
3、有一些问题可以划分以便提高这个程序的吞吐量。一个进程如果有多个任务,需要进行隐式的序列化任务,因为
只有一个线程控制。使用多线程控制,独立的任务可以将每个任务分配一个线程。
4、交互式的进程可以改善响应时间,通过使用多线程将I/O和程序其他部分分开实现处理。
多线程不光可以在多核系统中得到并行的优势,而且在单核系统中,也可以提高系统的吞吐量和响应时间,因为当一个线程
阻塞的时候,另一线程可以占有cpu执行。
线程有一些描述线程和执行环境的信息来表示它,包括线程ID,寄存器值的集合,栈,调度优先级和策略,信号的掩码,errno
变量以及线程特有的一些结构。进程中各个线程共享进程的程序执行文本,程序的全局变量、堆内存、栈和文件描述符。
1、线程标志:
和进程一样,每一个线程都有一个ID。和进程ID是全系统唯一不同,线程ID是在进程内唯一。进程id用pid_t类型来表示,是一个
非负的整数。线程ID由pthread_t数据类型代表,和进程一样实现可能为一个结构,所以把pthread_t类型当做一个整数是不具有可
移植性,所以也没有可移植的打印线程id的方法。这样也需要一个函数来比较两个线程的ID是否相同。
#include <pthread.h> int pthread_equal(pthread_t tid1, pthread_t tid2);
一个线程可以获得使用pthread_self来获得自己的线程id:
#include <pthread.h> pthread_t pthread_self();
这个方法可以和pthread_equal配合使用,来识别被打上线程标记的数据结构。
2、进程创建:
通过调用pthread_create可以创建一个线程:
#include <pthread.h> int pthread_create(pthread_t *restrict tidp, const pthread_attr_t *restrict attr, void *(*start_rtn)(void *),void *restrict arg);
创建成功tidp返回线程的id,attr为线程的属性,新创建的进程会运行start_rtn函数,并传入arg作为参数。如果想传入多个参数到start_rtn
函数中,需要将它们存储在一个结构体中,并把地址传到arg中。失败返回errorcode,而它们不设定errno。每个线程一个errno只是为了兼容
以前的函数而被使用的。多线程中,返回errorcode要比依赖于全局变量的errno清晰一些。
例子:创建一个线程,并打印进程id、新创建的线程id和主线程id。
#include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <string.h> pthread_t ntid; void printids(const char *s){ pid_t pid; pthread_t tid; pid = getpid(); tid = pthread_self(); printf("%s pid %u tid (0x%x) \n",s,(unsigned int)pid,(unsigned int)tid); } void * thr_fn(void *arg){ printids("new thread: "); return ((void *) 0); } int main(void){ int err; err = pthread_create(&ntid, NULL, thr_fn, NULL); if(err != 0){ fprintf(stderr,"create pthread failed: %s",strerror(err)); exit(1); } printids("main thread: "); sleep(1); exit(0); }
这个例子有两个地方比较古怪,主要是为了处理主线程和新创建线程的竞争:1)主线程休眠,以防止主线程终止,导致真个进程的终止,新建的线程没有机会
运行,我们后面介绍pthread_join可以避免这个。2)新的线程获得它的线程id是通过调用pthread_self而不是读取一个共享内存的变量或者传递的参数。这是因为
主线程不能安全的使用ntid,新的线程可能在调用pthread_create返回之前开始运行。
3、进程终止
如果进程内的任何一个线程调用exit,_Exit,_exit,整个进程就会终止。类似的如果信号的默认action是终止进程,那么一个发送到线程的信号会终止整个进程。
只终止一个线程有三种方式:
1、线程简单的返回。返回值就是退出码。
2、线程可以被进程中的另一个线程取消。
3、线程调用pthread_exit。
#include <phtread.h> void pthread_exit(void *rval_ptr);
rval_ptr是一个无类型的指针,和传递到进程的单个参数类似。这个指针可以被调用pthread_join的其他的线程得到。
#include <pthread.h> int pthread_join(pthread_t thread,void **rval_ptr);
调用pthread_join的线程会阻塞,直到指定的线程调用了pthread_exit,从start_rtn中返回,或者被取消。
如果线程简单的返回,那么rval_ptr被设置成start_rtn的返回值,如果线程被取消,rval_ptr被设置成
PTHREAD_CANCELED。
通过调用pthread_join,我们自动将线程设置成为detached状态,所以资源会被清除。如果线程已经处于detached状态,
那么pthread_join就会失败,返回EINVAL.
如果我们不关心线程的返回值,那么我们可以把rval_ptr设置为NULL。
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <pthread.h> void * thr_fun1(void *arg){ printf("Thread 1 returning...\n"); return ((void *)1); } void * thr_fun2(void *arg){ printf("Thread 2 exiting...\n"); pthread_exit((void *) 2); } int main(void){ int err; pthread_t tid1,tid2; void *tret; err = pthread_create(&tid1,NULL,thr_fun1,NULL); if(err != 0){ fprintf(stderr,"create thread1 failed: %s",strerror(err)); exit(1); } err = pthread_create(&tid2,NULL,thr_fun2,NULL); if(err != 0){ fprintf(stderr,"create thread2 failed: %s",strerror(err)); exit(1); } err = pthread_join(tid1,&tret); if( err != 0 ){ fprintf(stderr,"join thread1 failed: %s",strerror(err)); exit(1); } printf("Thread 1 exit code %d\n",(int)tret); err = pthread_join(tid2,&tret); if( err != 0 ){ fprintf(stderr,"join thread2 failed: %s",strerror(err)); exit(1); } printf("Thread 2 exit code %d\n",(int)tret); exit(0); }
无类型的指针传递给pthread_create和pthread_exit,使用它可以传递多个值,这个指针可以指向包含复杂的结构。但是需要注意这个结构在调用返回时仍然合法。如果
这个结构是在调用者的栈中,内存的内容在使用的可能时候已经被改变。比如一个线程申请在栈中申请了一个结构,然后将结构的指针传递给pthread_exit,接着这个
栈在调用thread_join的时候可能已经被销毁。
例子:
#include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <string.h> struct foo{ int a,b,c,d; }; void printfoo(const char *s, const struct foo *fp){ printf("%s",s); printf(" structure at 0x%x\n",(unsigned) fp); printf(" foo.a = %d\n", fp->a); printf(" foo.b = %d\n", fp->b); printf(" foo.c = %d\n", fp->c); printf(" foo.d = %d\n", fp->d); } void *thr_fn1(void *arg){ struct foo foo = {1,2,3,4}; printfoo("thread 1:\n",&foo); pthread_exit((void *)&foo); } void *thr_fn2(void *arg){ printf("thread 2: ID is %u\n",(unsigned)pthread_self()); pthread_exit((void *)0); } int main(void){ int err; pthread_t tid1,tid2; struct foo *fp; err = pthread_create(&tid1,NULL,thr_fn1,NULL); if(err != 0){ fprintf(stderr,"create thread1 failed: %s",strerror(err)); exit(1); } err = pthread_join(tid1,(void *)&fp); if(err != 0){ fprintf(stderr,"thread_join failed: %s",strerror(err)); exit(1); } sleep(1); printf("Parent starting second thread\n"); err = pthread_create(&tid2,NULL,thr_fn2,NULL); if(err != 0){ fprintf(stderr,"create thread2 failed: %s",strerror(err)); exit(1); } sleep(1); printfoo("Parent:\n",fp); exit(0); }
一个线程可以可以使用pthread_cancel来取消同一进程中的其他线程。
#include <pthread.h> int pthread_cancel(pthread_t tid);
在默认的条件下,pthread_cancle将会使由tid指定的线程像调用了pthread_exit(PTHREAD_CANCELED)一样,但是一个线程可以选择忽略和如果控制被取消。pthread_cancel
并不等待线程的终止,而只是发送一个请求。
一个线程可以注册函数,当它终止的时候被调用,这个和进程使用atexit注册函数,当进程终止的时候调用类似。这个函数比较出名的就是线程清理函数。一个线程可以
加入多个线程清理函数,这个清理函数保存在栈中,所以执行的顺序和注册的顺序相反:
#include <pthread.h> void pthread_cleanup_push(void (*rtn)(void *), void arg); void pthread_cleanup_pop(int execute);
pthread_cleanup_push来注册清理函数rtn,这个函数有一个参数arg。但一下三种情形之一发生时,注册的清理函数被执行:
1)调用pthread_exit
2)作为对取消线程请求(pthread_cancel)的响应。
3)以非0参数调用pthread_cleanup_pop。
如果pthread_cleanup_pop被传递0参数,则清除函数不会被调用,但是仍然会清除处于栈顶的清理函数。
一个限制是这两个函数可能被实现为一个宏,所以在线程的同一作用域必须以匹配的成对出现。pthread_cleanup_push可能有{,而pthread_cleanup_pop可能有匹配这个
字符的}字符。
#include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <string.h> void cleanup(void *arg){ printf("cleanup: %s\n",(char *)arg); } void *thr_fn1(void *arg){ printf("thread 1 start\n"); pthread_cleanup_push(cleanup,"thread 1 first handler"); pthread_cleanup_push(cleanup,"thread 1 second handler"); printf("thread 1 push complete\n"); if(arg) return ((void *)1); pthread_cleanup_pop(1); pthread_cleanup_pop(1); return ((void *)1); } void *thr_fn2(void *arg){ printf("thread 2 start\n"); pthread_cleanup_push(cleanup,"thread 2 first handler"); pthread_cleanup_push(cleanup,"thread 2 second handler"); printf("thread 2 push complete\n"); if(arg){ pthread_exit((void *)2); } pthread_cleanup_pop(0); pthread_cleanup_pop(0); pthread_exit((void *) 2); } int main(void){ int err; pthread_t tid1,tid2; void *tret; err = pthread_create(&tid1,NULL,thr_fn1,(void *)1); if( err != 0){ fprintf(stderr,"create thread1 failed: %s",strerror(err)); exit(1); } err = pthread_create(&tid2,NULL,thr_fn2,(void *)2); if(err != 0){ fprintf(stderr,"create thread 2 failed: %s",strerror(err)); exit(1); } err = pthread_join(tid1,&tret); if(err != 0){ fprintf(stderr,"thread1 join failed: %s",strerror(err)); exit(1); } printf("thread 1 exit code %d\n",(int)tret); err = pthread_join(tid2,&tret); if(err != 0){ fprintf(stderr,"thread2 join failed: %s",strerror(err)); exit(1); } printf("thread 2 exit code %d\n",(int) tret); exit(0); }
如果线程从开始例程(startroutine)中返回(byreturnstatement),清理函数不会被调用。
线程的终止状态,直到pthread_join被调用的时候才能得到。如果一个线程已经被detached,这个线程的空间将会被回收。pthread_join不能等待detached的线程,获得其
终止状态。pthread_join一个detached线程将会失败,并返回EINVAL,我们可以通过pthread_detach来detach一个线程:
#include <pthread.h> int pthread_detach(pthread_t tid);
4、线程同步
当多个线程共享相同的内存时,我们需要保证每一个线程都看到一个一致的数据。如果一个线程的变量别的线程不能够读写,或者变量时只读的,那么不会有不一致的状态。
然而一个线程可以修改一个变量,而其他的进程同时可以读取或者修改它,我们需要同步线程来保证它们访问变量,使用的是一个合法的值。
1)互斥量(Mutexes):
我们可以通过pthread提供的互斥量接口来保护我们的数据,确保每次只有一个线程访问。一个mutex基本上是一个锁,我们在访问共享数据的时候设置(上锁),在访问完成
后释放(解锁)。当我们解锁的互斥量的时候,当有多余一个的线程被阻塞时,所有阻塞在这个锁的进程都被唤醒,变成可以运行的状态,只有一个线程开始运行并设置锁,
其他的看到互斥量仍然是被锁定,继续等待。
互斥量使用pthread_mutex_t数据类型,在我们使用一个互斥量变量时,我们必须先初始化它,可以初始化为PTHREAD_MUTEX_INITIALIZER(静态初始化)或者调用
pthread_mutext_init,如果我们动态申请了互斥量,我们需要调用pthread_mutext_destory来销毁它:
#include <pthread.h> int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t * restrict attr); int pthread_mutex_destory(pthread_mutex_t *mutex);
如果想使用默认的属性来初始化互斥量,我们把attr设置为NULL。
例子:
1)静态初始化
pthread_mutex_t mylock = PTHREAD_MUTEX_INITIALIZER;
2)动态初始化:
int error; pthread_mutex_t mylock; if (error = pthread_mutex_init(&mylock, NULL)) fprintf(stderr, "Failed to initialize mylock:%s\n", strerror(error));
想给一个互斥量上锁,我们调用pthread_mutex_lock.如果mutex已经上锁,调用的线程将会被阻塞,直至信号量解锁。要解锁一个信号量,我们调用phtread_mutex_unlock
int pthread_mutex_lock(pthread_mutex_t *mutex); int pthread_mutex_trylock(pthread_mutex_t *mutex); int pthread_mutex_unlock(pthread_mutex_t *mutex);
一个线程如果lock一个已经上锁的互斥量,不想被阻塞,那么可以使用pthread_mutex_trylock,如果调用它的时候没有被上锁,就锁住这个互斥量,如果已经上锁,
就会失败,并返回EBUSY。
例子:
我们使用mutex来保护数据结构:当多个进程需要访问动态申请的结构,我们嵌入了引用计数,来保证知道所有线程都使用完它时,我们才释放它。
#include <pthread.h> #include <stdlib.h> struct foo{ int f_count; pthread_mutex_t f_lock; /* ...more stuff here... */ }; struct foo * foo_alloc(void){ struct foo *fp; if((fp = malloc(sizeof(struct foo))) != NULL){ fp->f_count = 1; if(pthread_mutex_init(&fp->f_lock,NULL) != 0){ free(fp); return NULL; } } return fp; } void foo_hold(struct foo *fp){ pthread_mutex_lock(&fp->f_lock); fp->f_count++; pthread_mutex_unlock(&fp->f_lock); } void foo_rele(struct foo *fp){ pthread_mutex_lock(&fp->f_lock); if(--fp->f_count == 0){ pthread_mutex_unlock(&fp->f_lock); pthread_mutex_destroy(&fp->f_lock); free(fp); }else{ pthread_mutex_unlock(&fp->f_lock); } }
2)读写锁:
读写锁也叫共享-排他锁,和互斥量类似,除了它可以提供更高的并行性。使用mutex,它的状态要么处于锁住和未锁状态,只有一个线程可以上锁。而读
写锁有更多的状态:在读状态锁住,在写状态锁住,未锁住。只有一个线程可以获得写锁,多个线程可以同时获得读锁。当读写锁处于写锁住状态,所有
试图上锁的进程都被阻塞,当读写锁处于读锁住状态时,所有试图上读状态的锁成功,但是试图获得写状态锁将会被阻塞,直到所有的读进程都释放读状
态锁,此后来到试图上读锁的线程也被阻塞。
读写锁适合读比写频繁情形。读写锁和互斥量一样也需要在使用前初始化,在释放他们内存的时候销毁。
#include <pthread.h> int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr); int pthread_rwlock_destroy(pthread_rwlock_t *restrict rwlock);
一个读写锁可以调用pthread_rwlock_init来初始化,我们可以传递NULL作为attr的参数,这样会使用读写锁的默认属性。
我们可以调用pthread_rwlock_destroy来清理,销毁它所占的内存空间。
上锁:
#include <pthread.h> int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
实现上可能会对读写锁中读模式的锁锁住次数有一定的限制,所以我们需要检查返回值,以确定是否成功。而其他的两个函数
会返回错误,但是只要我们的锁设计的恰当,我们可以不必做检查。
SingleUNIX规范另外两个读写锁原语:
#include <pthread.h> int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock); int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
当锁成功获取时,返回0,否则返回EBUSY。这两个函数使用在一个上锁的结构不能够保证产生死锁的时候,它可以避免死锁。
3)条件变量:
条件变量时另一中线程同步的机制,允许线程以无竞争的方式等待特定的条件发生。条件变量本身需要互斥量的保护,线程在改变条件前必须首先锁住互斥量,
且只有在锁住互斥量以后才能计算条件。条件变量使用之前必须首先进行初始化,pthread_cond_t数据类型代表的条件变量可以用两种方式初始化。
可以把常量PTHREAD_COND_INITIALIZER赋给静态分配的条件变量,但是如果条件变量是动态分配的,可以使用pthread_cond_init函数进行初始化。
在释放底层的内存空间前,可以使用pthread_mutex_destroy函数对条件变量进行销毁。除非需要创建一个非默认属性的条件变量,否则pthread_cond_init
函数的attr参数可以设置为NULL。
#include <pthread.h> int pthread_cond_init(pthread_cond_t *restrict cond, pthread_condattr_t *restrict attr); int pthread_cond_destroy(pthread_cond_t *cond);
成功返回0,失败返回错误码。使用pthread_cond_wait等待条件变为真,如果在给定时间内条件不能满足,那么会生成一个代表出错码的返回值。
调用者需要把锁住的互斥量传给pthread_cond_wait对条件进行保护。函数把调用线程放到等待条件的线程列表上,然后对互斥量解锁,这两个操作
是原子操作。当pthread_cond_wait返回时,互斥量再次被锁住。
#include <pthread.h> int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex); int pthread_cond_timewait(pthread_cond_t * restict cond, pthread_mutex_t *restrict mutex,const struct timespec * restrict timeout);
pthread_cond_timedwait函数的工作方式与pthread_cond_wait函数相似。timeout值指定了等待的时间,它通过timespec结构指定。时间值用秒数或者
分秒数表示,分秒数的单位是纳秒。时间值是一个绝对数而不是相对数。可以使用gettimeofday获取用timeval结构表示的当前时间,然后把这个时间加
上要等待的时间转换成timespec结构:
void maketimeout(struct timespec *tsp, long minutes){ struct timeval now; /* get the current time */ gettimeofday(&now); tsp->tv_sec = now.tv_sec; tsp->tv_nsec = now.tv_usec * 10000; /* usec to nsec */ tsp->tv_sec += minutes * 60; }
如果时间值到了但是条件还没有出现,pthread_cond_timedwait将重新获取互斥量,然后返回错误ETIMEDOUT。从pthread_cond_wait或者pthread_cond_timedwait
调用成功返回时,线程需要重新计算条件,因为其它线程可能已经在运行并改变了条件。pthread_cond_signal函数将唤醒等待该条件的某个线程,而pthread_cond_broadcast
函数将唤醒等待该条件的所有线程。必须注意一定要在改变条件状态以后再唤醒等待线程
#include <pthread.h> int pthread_cond_signal(pthread_cond_t *cond); int pthread_cond_broadcast(pthread_cond_t *cond);
例子:
#include <pthread.h> struct msg { struct msg *m_next; /* ... more stuff here ... */ }; struct msg *workq; pthread_cond_t qready = PTHREAD_COND_INITIALIZER; /*初始化条件变量*/ pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER; /*初始化互斥量*/ void process_msg(void) { struct msg *mp; for (;;) { pthread_mutex_lock(&qlock); /*条件本身由互斥量保护*/ while (workq == NULL) /*wait返回后要重新检查条件*/ pthread_cond_wait(&qready, &qlock); /*wait期间释放互斥量,返回时再次锁住*/ mp = workq; workq = mp->m_next; pthread_mutex_unlock(&qlock); /*真正释放互斥量*/ /* now process the message mp */ } } void enqueue_msg(struct msg *mp) { pthread_mutex_lock(&qlock); /*修改条件前锁住互斥量*/ mp->m_next = workq; workq = mp; pthread_mutex_unlock(&qlock); pthread_cond_signal(&qready); /*唤醒等待线程时不需要占有互斥量*/ /*如果希望在wait返回时不用再检查条件,就需要在唤醒时占有互斥量*/ }
参考:
《AdvancedprogramminginUnixEnvironment2ed》第11章