我对muduo c++网络库的理解(四) - 木东驿站 - Powered by MoodBlog

CONTENT

我对muduo c++网络库的理解(四)

io事件处理

muduo支持poll和epoll,这里分析一下epoll。

Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
  int numEvents = ::epoll_wait(epollfd_,
                               &*events_.begin(),
                               static_cast<int>(events_.size()),
                               timeoutMs);
  int savedErrno = errno;
  Timestamp now(Timestamp::now());
  if (numEvents > 0)
  {
    fillActiveChannels(numEvents, activeChannels);
    if (implicit_cast<size_t>(numEvents) == events_.size())
    {
      events_.resize(events_.size()*2);
    }
  }
  return now;
}

首先调用epoll_wait,把结果存到events中,events是一个关于epoll_event的vector,这里传的是首元素的地址。然后调用fillActiveChannels把结果存到activeChannels中,如果触发io的fd数达到了最大值就进行扩容。

然后接下来就是eventloop那边的处理:

    for (Channel* channel : activeChannels_)
    {
      currentActiveChannel_ = channel;
      currentActiveChannel_->handleEvent(pollReturnTime_);
    }

这里对每一个activeChannel都调用了handleEvent方法,在这个方法里对事件进行处理。这个方法如何知道要处理哪些事件呢?其实之前在fillActiveChannels中就已经设定,已经存储到对应的channel中。

channel->set_revents(events_[i].events);

然后会对每个channel调用handleEvent,这个方法很简单,就是根据epoll得到的事件类型调用对应的回调函数(如果存在的话)。

处理可读事件

对于可读事件使用tcpConnection::handleRead处理

void TcpConnection::handleRead(Timestamp receiveTime)
{
  loop_->assertInLoopThread();
  int savedErrno = 0;
  ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
  if (n > 0)
  {
    messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
  }
  else if (n == 0)
  {
    handleClose();
  }
  else
  {
    errno = savedErrno;
    LOG_SYSERR << "TcpConnection::handleRead";
    handleError();
  }
}

使用readFd从内核缓冲区读取数据到用户缓冲区inputBuffer,如果读取到的字节数大于0,就使用messageCallback处理,这是一个用户定义的回调函数,我们的业务逻辑应该在这个方法中得到执行。如果用户不定义,muduo会设置为一个默认方法defaultMessageCallback来丢弃数据。如果读取字节数等于0,说明客户端发送了FIN标志,此时muduo开始进行关闭处理。

处理可写事件

可写事件的处理方法是handleWrite,核心代码如下:

void TcpConnection::handleWrite()
{
  loop_->assertInLoopThread();
  if (channel_->isWriting())
  {
    ssize_t n = sockets::write(channel_->fd(),
                               outputBuffer_.peek(),
                               outputBuffer_.readableBytes());
    if (n > 0)
    {
      outputBuffer_.retrieve(n);
      if (outputBuffer_.readableBytes() == 0)
      {
        channel_->disableWriting();
        if (writeCompleteCallback_)
        {
          loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
        }
        if (state_ == kDisconnecting)
        {
          shutdownInLoop();
        }
      }
    }
  }
}

首先判断channel是否正在写,如果返回值为真,说明此时输出缓存区是有内容的,我们需要将这部分内容写回客户端。socket::write返回成功写入内核缓冲区的字节数,由于各种原因所致,这个字节数未必等于我们要发送内容的字节数(比如写着写着内核缓冲区就满了)。

如果n确实等于输出缓存区的可读字节,也就是完全发送了全部数据,这时候使用channel->disableWriting()关闭可写事件,其实也就是取消poll中对应该fd的事件类型。

最后会判断当前状态,如果状态是正在关闭,就调用shutdownInloop方法关闭本地写。(发送FIN标志)

void TcpConnection::shutdownInLoop()
{
    loop_->assertInLoopThread();
      if (!channel_->isWriting())
    {
        // we are not writing
        socket_->shutdownWrite();
    }
}

主动发送

上文写到socket可写而且输出缓冲区有内容时,muduo才会发送缓冲区的数据。输出缓冲区的数据从哪里来的呢?其实是调用TcpConnection::sendInLoop添加的。

void TcpConnection::sendInLoop(const void* data, size_t len)
{
  loop_->assertInLoopThread();
  ssize_t nwrote = 0;
  size_t remaining = len;
  bool faultError = false;
  
  if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
  {
    nwrote = sockets::write(channel_->fd(), data, len);
    if (nwrote >= 0)
    {
      remaining = len - nwrote;
      if (remaining == 0 && writeCompleteCallback_)
      {
        loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
      }
    }
  }

  if (!faultError && remaining > 0)
  {
    size_t oldLen = outputBuffer_.readableBytes();
    if (oldLen + remaining >= highWaterMark_
        && oldLen < highWaterMark_
        && highWaterMarkCallback_)
    {
      loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
    }
    outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
    if (!channel_->isWriting())
    {
      channel_->enableWriting();
    }
  }
  
}

这个方法稍微有些复杂,将执行以下操作:

(1)如果当前channel没有处于写入状态,也就是没有任何发送任务时,直接调用write写入数据。这里主要考虑的是乱序问题,如果处于写入状态,说明输出缓冲区有内容,我们如果写入新内容,可能会和还未发送的旧内容混合在一起。

(2)不管第一步有没有进行,只要还有未发送完的数据(有可能一点也没发送),就把剩余内容写入输出缓冲区末尾,把channel的正在写状态打开。

个快快 2018年11月15日 天气 晴

REMARKS

© 2018 MoodBlog 0.2 个快快 作品 | 参考主题: mathilda by fuzzz. | 鲁ICP备16047814号