Linux select/poll/epoll 原理(三)poll 实现

xiaobaichen 2019-11-05

针对 select 系统调用的三个不足,poll 解决的是第一个、最多 1024 个 FD 限制的问题。

其实现思路是:

1. 不再使用位图来传递事件和结果,而是使用 pollfd 。 结构体数组来传递。

2. 在内部实现时,以 poll_list 链表的形式来分批次保存 pollfd 。不像 select 那样一次申请完整的一大块内存。

3. 通过从进程的信号量里获取能打开的最大文件数量,解决 1024 个限制的问题。

0. 基本数据结构

// 源码位置:include/uapi/asm-generic/poll.h

struct pollfd {

int fd; // FD

short events; // 输入的敢兴趣事件

short revents; // 输出的结果

};

// 源码位置:fs/select.c

struct poll_list {

struct poll_list *next;

// entries 指向的数组里 pollfd 的数量

int len;

// 指向 pollfd 数组的指针

struct pollfd entries[0];

};

pollfd 结构体用来传递单个FD的输入事件、输出结果。

poll_list 是一个链表,其节点指向 pollfd 结构体的数组,这个数组要么是在栈上预分配、要么是按内存页分配(保持页对齐)。

1. poll 系统调用主逻辑

源码位置:fs/select.c

SYSCALL_DEFINE3(poll, struct pollfd __user *, ufds, unsigned int, nfds,

int, timeout_msecs)

{

struct timespec64 end_time, *to = NULL;

int ret;

if (timeout_msecs >= 0) {

to = &end_time;

poll_select_set_timeout(to, timeout_msecs / MSEC_PER_SEC,

NSEC_PER_MSEC * (timeout_msecs % MSEC_PER_SEC));

}

ret = do_sys_poll(ufds, nfds, to);

if (ret == -EINTR) {

struct restart_block *restart_block;

restart_block = &current->restart_block;

restart_block->fn = do_restart_poll;

restart_block->poll.ufds = ufds;

restart_block->poll.nfds = nfds;

if (timeout_msecs >= 0) {

restart_block->poll.tv_sec = end_time.tv_sec;

restart_block->poll.tv_nsec = end_time.tv_nsec;

restart_block->poll.has_timeout = 1;

} else

restart_block->poll.has_timeout = 0;

ret = -ERESTART_RESTARTBLOCK;

}

return ret;

}

static int do_sys_poll(struct pollfd __user *ufds, unsigned int nfds,

struct timespec64 *end_time)

{

struct poll_wqueues table;

int err = -EFAULT, fdcount, len, size;

/* Allocate small arguments on the stack to save memory and be

faster - use long to make sure the buffer is aligned properly

on 64 bit archs to avoid unaligned access */

// 创建 256 个字节大小的数组

long stack_pps[POLL_STACK_ALLOC/sizeof(long)];

struct poll_list *const head = (struct poll_list *)stack_pps;

struct poll_list *walk = head;

unsigned long todo = nfds;

// 如果超过能打开的最大文件数则返回

// 这个 rlimit 通过判等进程信息里的信号量来实现的

// 因此修改文件能打开的最大文件数不需要重新编译,可以实时修改

if (nfds > rlimit(RLIMIT_NOFILE))

return -EINVAL;

// N_STACK_PPS 是用于计算 stack_pps 里能存放多少个 pollfd 结构体

len = min_t(unsigned int, nfds, N_STACK_PPS);

// 从用户空间拷贝 pollfd 数组到内核空间

// 分批拷贝,不同批次之间用 poll_list 链表维护起来

for (;;) {

walk->next = NULL;

walk->len = len;

if (!len)

break;

if (copy_from_user(walk->entries, ufds + nfds-todo,

sizeof(struct pollfd) * walk->len))

goto out_fds;

todo -= walk->len;

if (!todo)

break;

// POLLFD_PER_PAGE 是一个页面能存放多少个 pollfd 结构体

len = min(todo, POLLFD_PER_PAGE);

size = sizeof(struct poll_list) + sizeof(struct pollfd) * len;

walk = walk->next = kmalloc(size, GFP_KERNEL);

if (!walk) {

err = -ENOMEM;

goto out_fds;

}

}

// 初始化 poll_wqueues,设置队列处理函数等

poll_initwait(&table);

// 主逻辑:调用目标文件的 poll 函数

fdcount = do_poll(head, &table, end_time);

// 删除主逻辑里添加到目标文件的等待节点

poll_freewait(&table);

// 把结果拷贝到用户空间

for (walk = head; walk; walk = walk->next) {

struct pollfd *fds = walk->entries;

int j;

for (j = 0; j < walk->len; j++, ufds++)

if (__put_user(fds[j].revents, &ufds->revents))

goto out_fds;

}

err = fdcount;

out_fds:

// 释放申请的内存

walk = head->next;

while (walk) {

struct poll_list *pos = walk;

walk = walk->next;

kfree(pos);

}

return err;

}

static int do_poll(struct poll_list *list, struct poll_wqueues *wait,

struct timespec64 *end_time)

{

poll_table* pt = &wait->pt;

ktime_t expire, *to = NULL;

int timed_out = 0, count = 0;

u64 slack = 0;

unsigned int busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;

unsigned long busy_start = 0;

/* Optimise the no-wait case */

if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {

pt->_qproc = NULL;

timed_out = 1;

}

if (end_time && !timed_out)

slack = select_estimate_accuracy(end_time);

for (;;) {

struct poll_list *walk;

bool can_busy_loop = false;

// 遍历链表

for (walk = list; walk != NULL; walk = walk->next) {

struct pollfd * pfd, * pfd_end;

pfd = walk->entries;

pfd_end = pfd + walk->len;

// 遍历节点里的数组

for (; pfd != pfd_end; pfd++) {

/*

* Fish for events. If we found one, record it

* and kill poll_table->_qproc, so we don't

* needlessly register any other waiters after

* this. They'll get immediately deregistered

* when we break out and return.

*/

// 处理单个 pollfd

if (do_pollfd(pfd, pt, &can_busy_loop,

busy_flag)) {

// 有事件发生了

count++;

// 后续的文件即使没有事件发生也不需要等待了。

pt->_qproc = NULL;

/* found something, stop busy polling */

busy_flag = 0;

can_busy_loop = false;

}

}

}

// 注意:上面的遍历循环里,并没有像 select 那样,在小批次poll后进行睡眠。

/*

* All waiters have already been registered, so don't provide

* a poll_table->_qproc to them on the next loop iteration.

*/

pt->_qproc = NULL;

if (!count) {

count = wait->error;

if (signal_pending(current))

count = -EINTR;

}

if (count || timed_out)

break;

/* only if found POLL_BUSY_LOOP sockets && not out of time */

if (can_busy_loop && !need_resched()) {

if (!busy_start) {

busy_start = busy_loop_current_time();

continue;

}

if (!busy_loop_timeout(busy_start))

continue;

}

busy_flag = 0;

/*

* If this is the first loop and we have a timeout

* given, then we convert to ktime_t and set the to

* pointer to the expiry value.

*/

if (end_time && !to) {

expire = timespec64_to_ktime(*end_time);

to = &expire;

}

// 睡眠等待直至超时或被唤醒

if (!poll_schedule_timeout(wait, TASK_INTERRUPTIBLE, to, slack))

timed_out = 1;

}

return count;

}

static inline unsigned int do_pollfd(struct pollfd *pollfd, poll_table *pwait,

bool *can_busy_poll,

unsigned int busy_flag)

{

unsigned int mask;

int fd;

mask = 0;

fd = pollfd->fd;

if (fd >= 0) {

struct fd f = fdget(fd);

mask = POLLNVAL;

if (f.file) {

mask = DEFAULT_POLLMASK;

if (f.file->f_op->poll) {

pwait->_key = pollfd->events|POLLERR|POLLHUP;

pwait->_key |= busy_flag;

mask = f.file->f_op->poll(f.file, pwait);

if (mask & busy_flag)

*can_busy_poll = true;

}

/* Mask out unneeded events. */

// 清除不需要的事件

mask &= pollfd->events | POLLERR | POLLHUP;

fdput(f);

}

}

pollfd->revents = mask;

return mask;

由于采用 pollfd 结构体来传递文件的事件,do_poll 遍历所有文件时的逻辑更清晰些、有层次,对某个 pollfd 的处理提取成 do_pollfd 函数。

2. 等待与唤醒

poll 系统调用的等待与唤醒逻辑与 select 系统调用是一样的,调用的代码是一样的。

3. 小结

  1. poll 系统调用解决了 select 系统调用最多 1024 个文件的限制。
  2. 每次调用仍然需要拷贝所有文件的事件。
  3. 其实现上,每次唤醒后仍然需要 poll 所有文件,效率依赖于 poll 的文件数量。在网络的服务端应用里,要监听的 socket 连接的事件是比较稳定的、其数量也是比较稳定的,不会每次 select/poll 都会发生变化,因此第2、3点的问题有很大的优化空间,这在 epoll 系统调用里解决。

相关推荐

LandryBean / 0评论 2011-02-06