【linux源码分析】io复用之poll - 木东驿站 - Powered by MoodBlog

CONTENT

【linux源码分析】io复用之poll

在介绍poll系统调用之前,不得不提linux虚拟文件系统中poll机制的原理,在linux中万物皆是文件,比如一个驱动,一个网络套接字,一个磁盘文件。如果我们对文件的某种事件感兴趣,比如可读、可写事件,就可以通过poll机制异步的得到事件准备就绪的消息。具体方法是给文件建立一个等待队列,然后把对该文件感兴趣的进程挂到等待队列上,当文件状态发生改变(即事件发生)时,就会逐个通知挂在等待队列上的进程。怎样通知呢?一般情况下是唤醒该进程,因为此时这个进程极有可能在睡眠。然后该进程自己去判断发生了哪些事件。
如果进程没有在睡眠怎么办?(比如epoll,epoll在内核有自己的数据结构,可以在进程处于用户态时收到poll通知),这样进程如何得知此时有新事件发生?所以还可以在唤醒进程的同时保存新事件到某种结构上。(个人见解)


我们在用户空间调用poll函数的时候,实际上在内核里调用的是sys_poll。
可以看到sys_poll主要是进行了一些时间处理,转换时间单位,然后调用do_sys_poll。

 asmlinkage long sys_poll(struct pollfd __user *ufds, unsigned int nfds,
   long timeout_msecs)
{
 s64 timeout_jiffies;
 if (timeout_msecs > 0) {
#if HZ > 1000
  /* We can only overflow if HZ > 1000 */
  if (timeout_msecs / 1000 > (s64)0x7fffffffffffffffULL / (s64)HZ)
   timeout_jiffies = -1;
  else
#endif
   timeout_jiffies = msecs_to_jiffies(timeout_msecs);
 } else {
  /* Infinite (< 0) or no (0) timeout */
  timeout_jiffies = timeout_msecs;
 }
 return do_sys_poll(ufds, nfds, &timeout_jiffies);
}

 

do_sys_poll:

int do_sys_poll(struct pollfd __user *ufds, unsigned int nfds, s64 *timeout)
{
    struct poll_wqueues table;
    int fdcount, err;
    unsigned int i;
    struct poll_list *head;
    struct poll_list *walk;
    /* Allocate small arguments on the stack to save memory and be
       faster - use long to make sure the buffer is aligned properly
       on 64 bit archs to avoid unaligned access */
    long stack_pps[POLL_STACK_ALLOC/sizeof(long)];
    struct poll_list *stack_pp = NULL;

    /* Do a sanity check on nfds ... */
    if (nfds > current->signal->rlim[RLIMIT_NOFILE].rlim_cur)
        return -EINVAL;

    poll_initwait(&table);

    head = NULL;
    walk = NULL;
    i = nfds;
    err = -ENOMEM;
    while(i!=0) {
        struct poll_list *pp;
        int num, size;
        if (stack_pp == NULL)
            num = N_STACK_PPS;
        else
            num = POLLFD_PER_PAGE;
        if (num > i)
            num = i;
        size = sizeof(struct poll_list) + sizeof(struct pollfd)*num;
        if (!stack_pp)
            stack_pp = pp = (struct poll_list *)stack_pps;
        else {
            pp = kmalloc(size, GFP_KERNEL);
            if (!pp)
                goto out_fds;
        }
        pp->next=NULL;
        pp->len = num;
        if (head == NULL)
            head = pp;
        else
            walk->next = pp;

        walk = pp;
        if (copy_from_user(pp->entries, ufds + nfds-i, 
                sizeof(struct pollfd)*num)) {
            err = -EFAULT;
            goto out_fds;
        }
        i -= pp->len;
    }

    fdcount = do_poll(nfds, head, &table, timeout);

    /* OK, now copy the revents fields back to user space. */
    walk = head;
    err = -EFAULT;
    while(walk != NULL) {
        struct pollfd *fds = walk->entries;
        int j;

        for (j=0; j < walk->len; j++, ufds++) {
            if(__put_user(fds[j].revents, &ufds->revents))
                goto out_fds;
        }
        walk = walk->next;
    }
    err = fdcount;
    if (!fdcount && signal_pending(current))
        err = -EINTR;
out_fds:
    walk = head;
    while(walk!=NULL) {
        struct poll_list *pp = walk->next;
        if (walk != stack_pp)
            kfree(walk);
        walk = pp;
    }
    poll_freewait(&table);
    return err;
}

 

我们可以看到首先定义了一个poll_wqueues类型的对象,定义如下:

struct poll_wqueues {
 poll_table pt;
 struct poll_table_page * table;
 int error;
 int inline_index;
 struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES];
};

先不管这个pt是做什么用的。这个结构存储了一个指向poll_table_page结构的指针,还有一个poll_table_entry的数组,所以我们得了解这个结构是做什么用的。(层层深挖)

struct poll_table_entry {
    struct file * filp;
    wait_queue_t wait;
    wait_queue_head_t * wait_address;
};

这里出现了传说中的wait_queue_t,这就是上面所说的等待队列成员了,而wait_queue_head_t是等待队列头部,filp其实指向这个等待队列所属的文件。wait_queue_t的结构如下:

struct __wait_queue {
    unsigned int flags;
    void *private;
    wait_queue_func_t func;
    struct list_head task_list;
};

list_head结构包含next、last指针,其实就是把等待队列串起来。func就是唤醒队列成员时所执行的函数了。private指向什么?在poll函数调用中,这个private其实是指向当前进程的task_struct结构的指针,之后会用来唤醒该进程。flags暂且不管。

所以可以推测,poll_table_entry保存等待队列成员的一些信息,那table指针有什么用?在poll_wqueues中,poll_table_entry数组的大小是固定的,每添加本进程到一个等待队列(其实有多少个传进poll函数的fd,就会添加到多少个等待队列),都会消耗一个entry,所以当entry数组不够用时,内核会申请新的内存来存储entry,这块内存由table指针指向的结构管理。

所以回到do_sys_poll函数,它所定义的table对象就是为了存储之后的等待队列节点信息。再往下看,poll_list存储着一组pollfd,pollfd是用户空间传过来的需要监听事件的fd信息,为什么poll_list也有多个?因为内核为了避免申请内存,开辟了一块栈空间存储pollfd,如果该空间足够用,那么直接让poll_list指向该空间就可以了。如果传进来的pollfd过多,在栈空间用完之后就使用kmalloc再申请一块内存使用。栈空间分配了POLL_STACK_ALLOC字节,这个常量在linux2.6中是256。一个pollfd需要8个字节,所以可以得知该空间可以存储32-1个pollfd(poll_list本身占用8字节)。每个poll_list存储的pollfd是有数量限制的,所以可能会存在多个poll_list,是用链表连接起来。

注意,这里使用了copy_from_user函数把数据从用户空间复制过来,因为内核无法直接访问用户空间。这个函数底层是用汇编实现的,这里不再进行分析。接下来重头戏,开始调用do_poll函数,这是poll函数的核心了,返回值就是发生事件的fd数目。先不管do_poll,继续往下走,把我们获得的每个fd的事件复制(覆盖)到用户空间,这样用户空间的代码就能知道哪些事件发生了。最后会释放为entry、poll_list申请的动态内存,返回fdcount计数。这就是整个poll调用的流程。

进入核心函数do_poll:

static int do_poll(unsigned int nfds,  struct poll_list *list,
     struct poll_wqueues *wait, s64 *timeout)
{
 int count = 0;
 poll_table* pt = &wait->pt;
 /* Optimise the no-wait case */
 if (!(*timeout))
  pt = NULL;
 
 for (;;) {
  struct poll_list *walk;
  long __timeout;
  set_current_state(TASK_INTERRUPTIBLE);
  for (walk = list; walk != NULL; walk = walk->next) {
   struct pollfd * pfd, * pfd_end;
   pfd = walk->entries;
   pfd_end = pfd + walk->len;
   for (; pfd != pfd_end; pfd++) {
    if (do_pollfd(pfd, pt)) {
     count++;
     pt = NULL;
    }
   }
  }
  pt = NULL;
  if (count || !*timeout || signal_pending(current))
   break;
  count = wait->error;
  if (count)
   break;
  if (*timeout < 0) {
   __timeout = MAX_SCHEDULE_TIMEOUT;
  } else if (unlikely(*timeout >= (s64)MAX_SCHEDULE_TIMEOUT-1)) {
   __timeout = MAX_SCHEDULE_TIMEOUT - 1;
   *timeout -= __timeout;
  } else {
   __timeout = *timeout;
   *timeout = 0;
  }
  __timeout = schedule_timeout(__timeout);
  if (*timeout >= 0)
   *timeout += __timeout;
 }
 __set_current_state(TASK_RUNNING);
 return count;
}

 首先进入一个死循环,把进程状态设置为TASK_INTERRUPTIBLE,可中断的休眠状态,所以进入休眠了?错!此时只是设置了状态,进程还没有休眠,休眠会发生在调度程序执行后。这里有一个问题,如果恰好此时触发时钟中断,进程时间片用完被抢占怎么办?如果发现进程状态不是TASK_RUNNING,schedule函数会把该进程从运行队列删除的,而且此时该进程没有挂到任何等待队列上,岂不是永远醒不过来了?这个问题我现在也没有答案,有木有内核高手给解答一下呀……

不管了继续分析,还记得poll_list吗?里面存储的是我们从用户空间传进来的pollfd,有多个poll_list,每个poll_list有多个pollfd,所以我们需要双层循环,为每一个pollfd执行do_pollfd。其实do_pollfd做的事就是检查一下该fd有没有事件发生,如果有的话就修改pollfd的revent,如果没有就把当前进程挂到fd对应文件的等待队列上。双层循环结束后,如果没有任何事件发生,也没有需要处理的信号,就开始休眠了。睡到自然醒或者被某个等待队列唤醒,注意醒来之后没有退出最外面的死循环,会再来一遍事件轮询,由此可见poll的性能不是很高。确定能退出死循环后,设置进程状态为TASK_RUNNING,返回事件数目。

再来看看,do_pollfd的具体代码:

static inline unsigned int do_pollfd(struct pollfd *pollfd, poll_table *pwait)
{
    unsigned int mask;
    int fd;

    mask = 0;
    fd = pollfd->fd;
    if (fd >= 0) {
        int fput_needed;
        struct file * file;

        file = fget_light(fd, &fput_needed);
        mask = POLLNVAL;
        if (file != NULL) {
            mask = DEFAULT_POLLMASK;
            if (file->f_op && file->f_op->poll)
                mask = file->f_op->poll(file, pwait);
            /* Mask out unneeded events. */
            mask &= pollfd->events | POLLERR | POLLHUP;
            fput_light(file, fput_needed);
        }
    }
    pollfd->revents = mask;

    return mask;
}

我们可以看到mask保存file->f_op->poll返回的事件,但是我们只关注pollfd传进来的事件,所以需要&=一下取交集。file->f_op->poll中一般会调用我们传进去的pwait,在之前poll_initwait(&table)时,pwait已经初始化为__pollwait。

static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,
                poll_table *p)
{
    struct poll_table_entry *entry = poll_get_entry(p);
    if (!entry)
        return;
    get_file(filp);
    entry->filp = filp;
    entry->wait_address = wait_address;
    init_waitqueue_entry(&entry->wait, current);
    add_wait_queue(wait_address,&entry->wait);
}

可见系统是在这个函数里把当前进程加到文件filp的等待队列中去的。

至此,poll调用的大部分代码已经分析完了,用一个词总结poll所做的事情就是:轮询。(有趣的是不光内核需要轮询,事件返回到用户空间后,用户代码还得继续轮询)不过在活跃事件较多的情况下,poll机制也可以发挥到极致性能,毕竟此时只需要遍历一次fd集就能返回结果,也不用睡觉了。

个快快 2019年04月11日 天气 晴

REMARKS

© 2018 MoodBlog 0.2 个快快 作品 | 参考主题: mathilda by fuzzz. | 鲁ICP备16047814号