shipinsky 2019-11-05
信号灯(semaphore),也叫信号量,它是不同进程间或一个给定进程内部不同线程间同步的机制。
信号灯种类:
1. posix 有名信号灯 (见 posix信号量的应用--生产者-消费者)
2. posix 基于内存的信号灯(无名信号灯 见 posix信号量的应用--生产者-消费者)
3. System V 信号灯 (IPC对象,以下主要说明);
信号灯:
1. 二值信号灯:值为 0 或 1。与互斥锁类似,资源可用时值为1,不可用时值为 0;
2. 计数信号灯:值在 0 到 n 之间。用来统计资源,其值代表可用资源数;
等待操作时等待信号灯的值变为大于0,然后将其减一;而释放操作则相反,用来唤醒等待资源的进程或者线程;
详情见《unix环境高级编程》15.8小节
在Linux 系统中,使用信号量通常分为以下几个步骤:
如果不需要信号量,则从系统中删除它,此时使用semctl() 函数的IPC_RMID 操作。此时需要注意,在程序中不应该出现对已经被删除的信号量的操作;
#include <sys/types.h> #include <sys/ipc.h> #include <sys/sem.h> /*创建或打开信号量集 key :和信号灯集关联的key 值 nsems:信号灯集中包含的信号灯数目 semflg:信号灯集的访问权限,通常为IPC_CREAT|0666 返回值:成功,信号灯集ID 出错,-1 */ int semget(key_t key, int nsems, int semflg ); /*信号量集的控制 semid:信号灯集ID semnum:要修改的信号灯编号 union semun { int val; // Value for SETVAL struct semid_ds *buf; // Buffer for IPC_STAT, IPC_SET unsigned short *array; // Array for GETALL, SETALL struct seminfo *__buf; // Buffer for IPC_INFO(Linux-specific) }; cmd :GETVAL:获取信号灯的值 SETVAL:设置信号灯的值 IPC_RMID:从系统中删除信号灯集合 返回值:成功,0 出错,-1 */ int semctl(int semid, int semnum, int cmd, union semun arg); /*对信号量集的操作 semid:信号灯集ID struct sembuf 结构体每一个元素表示一个操作; struct sembuf { unsigned short sem_num; //要操作的信号灯的编号 short sem_op; // 0: 等待,知道信号灯的值变为0 // 1: 释放资源,V操作 // -1: 分配资源,P操作 short sem_flg; //0,IPC_NOWAIT,SEM_UNDO } nops:要操作的信号灯的个数 返回值:成功,0 出错,-1 */ int semop(int semid,struct sembuf *opsptr,size_t nops);
还是之前POSIX信号量的生产者和消费者,使用的是system V的信号量实现
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <pthread.h> #include <errno.h> #include <unistd.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <sys/ipc.h> #include <sys/sem.h> #include <signal.h> #define MYFIFO "myfifo" #define BUFFER_SIZE 3 #define UNIT_SIZE 5 #define RUN_TIME 30 #define DELAY_TIME_LEVELS 5 #define MUTEX 0 #define FULL 1 #define AVAIL 2 union semun { int val; // value for SETVAL struct semid_ds *buf; // buffer for IPC_STAT, IPC_SET unsigned short *array; // array for GETALL, SETALL struct seminfo *__buf; // buffer for IPC_INFO }; void *producer(void *arg);//生产者 void *customer(void *arg);//消费者 int fd, semid; time_t end_time; int p(int semid, int semnum) { struct sembuf sops = {semnum, -1, SEM_UNDO}; return semop(semid, &sops, 1); } int v(int semid, int semnum) { struct sembuf sops = {semnum, +1, SEM_UNDO}; return semop(semid, &sops, 1); } void * signal_set(int signo, void (*func)(int)) { int ret; struct sigaction sig; struct sigaction osig; sig.sa_handler = func; sigemptyset(&sig.sa_mask); sig.sa_flags = 0; #ifdef SA_RESTART sig.sa_flags |= SA_RESTART; #endif ret = sigaction(signo, &sig, &osig); if (ret < 0) return SIG_ERR; else return osig.sa_handler; } void sigint(int sig) { unlink(MYFIFO); semctl(semid, 0, IPC_RMID, 0); semctl(semid, 1, IPC_RMID, 0); semctl(semid, 2, IPC_RMID, 0); exit(sig); } int main(int argc, const char *argv[]) { int ret, key; union semun arg; pthread_t thrd_prd_id,thrd_cst_id; signal_set(SIGINT, sigint);//使用信号处理异常,防止程序异常退出,某些资源没有回收 signal_set(SIGTERM, sigint); signal_set(SIGQUIT, sigint); signal_set(SIGKILL, sigint); srand(time(NULL)); end_time = time(NULL) + RUN_TIME; //创建管道 if ((mkfifo(MYFIFO, 0644) < 0) && (errno != EEXIST)) { fprintf(stderr, "mkfifo error!"); exit(-1); } //打开管道 fd = open(MYFIFO, O_RDWR); if (fd == -1) { fprintf(stderr, "open fifo error"); goto err1; } //系统创建IPC通讯(消息队列、信号量、共享内存)时必须指定一个ID值。通常情况下,该id值通过ftok函数得到。 key = ftok("/tmp", 0x66); if (key < 0) { fprintf(stderr, "ftok key error"); goto err1; } //创建了三个信号量 semid = semget(key, 3, IPC_CREAT | 0600); if (semid == -1) { fprintf(stderr, "create semget error"); goto err1; } //对信号量设置初始值 arg.val = 1; ret = semctl(semid, MUTEX, SETVAL, arg); if (ret < 0) { fprintf(stderr, "ctl sem error"); semctl(semid, MUTEX, IPC_RMID, arg); goto err2; } arg.val = BUFFER_SIZE; ret = semctl(semid, AVAIL, SETVAL, arg); if (ret < 0) { fprintf(stderr, "ctl sem error"); semctl(semid, AVAIL, IPC_RMID, arg); goto err2; } arg.val = 0; ret = semctl(semid, FULL, SETVAL, arg); if (ret < 0) { fprintf(stderr, "ctl sem error"); semctl(semid, FULL, IPC_RMID, arg); goto err2; } //取信号量的值 ret = semctl(semid, 0, GETVAL, arg); printf("after semctl setval sem[0].val = [%d]\n", ret); ret = semctl(semid, 1, GETVAL, arg); printf("after semctl setval sem[1].val = [%d]\n", ret); ret = semctl(semid, 2, GETVAL, arg); printf("after semctl setval sem[2].val = [%d]\n", ret); //创建两个线程 ret = pthread_create(&thrd_prd_id, NULL, producer, NULL); if (ret != 0) { fprintf(stderr, "producer pthread_create error"); goto err2; } ret = pthread_create(&thrd_cst_id, NULL, customer, NULL); if (ret != 0) { fprintf(stderr, "customer pthread_create error"); goto err2; } pthread_join(thrd_prd_id, NULL); pthread_join(thrd_cst_id, NULL); close(fd); err2: semctl(semid, 0, IPC_RMID, 0); semctl(semid, 1, IPC_RMID, 0); semctl(semid, 2, IPC_RMID, 0); err1: unlink(MYFIFO); return 0; } void *producer(void *arg) { int real_write; int delay_time; while (time(NULL) < end_time) { delay_time = rand()%DELAY_TIME_LEVELS; sleep(delay_time); //p操作 if (p(semid, AVAIL) < 0) { fprintf(stderr, "p operate error"); kill(0, SIGQUIT); } if (p(semid, MUTEX) < 0) { fprintf(stderr, "p operate error"); kill(0, SIGQUIT); } printf("\nproducer have delayed %d seconds\n", delay_time); //生产者写入数据 执行的操作 if (-1 == (real_write = write(fd, "hello", UNIT_SIZE))) { if (errno == EAGAIN) { printf("The buffer is full, please wait for reading!\n"); } } else { printf("producer writes %d bytes to the FIFO\n", real_write); } //v操作 if (v(semid, FULL) < 0) { fprintf(stderr, "v operate error"); kill(0, SIGQUIT); } if (v(semid, MUTEX) < 0) { fprintf(stderr, "v operate error"); kill(0, SIGQUIT); } } pthread_exit(NULL); } void *customer(void *arg) { unsigned char read_buffer[UNIT_SIZE]; int real_read; int delay_time; while (time(NULL) < end_time) { delay_time = rand()%DELAY_TIME_LEVELS; sleep(delay_time); if (p(semid, FULL) < 0) { fprintf(stderr, "p operate error"); kill(0, SIGQUIT); } if (p(semid, MUTEX) < 0) { fprintf(stderr, "p operate error"); kill(0, SIGQUIT); } memset(read_buffer, 0, UNIT_SIZE); printf("\nCustomer have delayed %d seconds\n", delay_time); //消费 操作 if (-1 == (real_read = read(fd, read_buffer, UNIT_SIZE))) { if (errno == EAGAIN) { printf("The buffer is empty, please wait for writing!\n"); } } else { printf("customer reads %d bytes from the FIFO\n", real_read); } if (v(semid, AVAIL) < 0) { fprintf(stderr, "v operate error"); kill(0, SIGQUIT); } if (v(semid, MUTEX) < 0) { fprintf(stderr, "v operate error"); kill(0, SIGQUIT); } } pthread_exit(NULL); }