我对muduo c++网络库的理解(二) - 木东驿站 - Powered by MoodBlog

CONTENT

我对muduo c++网络库的理解(二)

开始运行

muduo典型启动方式如下:

Loop loop;
Server server(&loop,addr);
server.start();
loop.loop()

(1)在主线程创建一个loop

(2)在主线程创建server对象,绑定loop

(3)调用server.start()

(4)让loop开始循环

那么start应该是完成了事件循环前的准备,start方法代码如下:

void TcpServer::start()
{
  if (started_.getAndSet(1) == 0)
  {
    threadPool_->start(threadInitCallback_);

    assert(!acceptor_->listenning());
    loop_->runInLoop(
        std::bind(&Acceptor::listen, get_pointer(acceptor_)));
  }
}

start做了两件事:

(1)启动线程池,具体见 https://www.supmers.com/view.php?article=122 

(2)在主线程eventloop的方法队列中添加Acceptor::listen()方法,因为此时处于该loop所在的线程,所以runInloop不会将该方法添加到队列中,而是直接运行。

void Acceptor::listen()
{
    loop_->assertInLoopThread();
    listenning_ = true;
    acceptSocket_.listen();
    acceptChannel_.enableReading();
}

执行listen时,除了进行listen系统调用将socket套接字由主动连接变为被动连接(在内核维护一个队列用于处理新连接),还执行了enableReadding。这个方法是为acceptChannel注册可读事件,并将其添加到poll中,这样执行poll时就能监听到这个channel的可读事件了。对于该socket来说,触发可读时,往往意味着新连接的到来。

做完上面的事情,accptor就准备好了,系统开始被动的接受新连接。此时我们只有主线程中的eventloop具有一个channel,muduo只有一个任务:等待第一个用户连接到来。然后使用loop.lopp()启动当前线程的无限循环开始执行这个任务。


处理新连接

在accptor的构造方法中有这样一句代码:

acceptChannel_.setReadCallback(
std::bind(&Acceptor::handleRead, this));

对于acceptChannel来讲,可读事件就是新连接的到来,所以此时会回调handleRead这个方法。

  loop_->assertInLoopThread();
  InetAddress peerAddr;
  //FIXME loop until no more
  int connfd = acceptSocket_.accept(&peerAddr);
  if (connfd >= 0)
  {
    // string hostport = peerAddr.toIpPort();
    // LOG_TRACE << "Accepts of " << hostport;
    if (newConnectionCallback_)
    {
      newConnectionCallback_(connfd, peerAddr);
    }
    else
    {
      sockets::close(connfd);
    }
  }
  else
  {
    LOG_SYSERR << "in Acceptor::handleRead";
    // Read the section named "The special problem of
    // accept()ing when you can't" in libev's doc.
    // By Marc Lehmann, author of libev.
    if (errno == EMFILE)
    {
      ::close(idleFd_);
      idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
      ::close(idleFd_);
      idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
    }
  }

首先创建一个地址peerAddr,用来存储正在连接我们的客户端地址,然后调用socket.accept接受了这个连接,其实就是取得一个已经完成三次握手的fd。如果存在新连接回调就执行,否则就关闭连接。

在muduo提供的server类中,一开始就给accptor绑定了新连接回调,所以默认情况下这个回调不会为空。(我暂时想不出什么情况下这个回调会为空……)

acceptor_->setNewConnectionCallback(
std::bind(&TcpServer::newConnection, this, _1, _2));

newConnection里面的主要代码如下:

void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
  loop_->assertInLoopThread();
  EventLoop* ioLoop = threadPool_->getNextLoop();
  char buf[64];
  snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
  ++nextConnId_;
  string connName = name_ + buf;
  InetAddress localAddr(sockets::getLocalAddr(sockfd));

  TcpConnectionPtr conn(new TcpConnection(ioLoop,
                                          connName,
                                          sockfd,
                                          localAddr,
                                          peerAddr));
  connections_[connName] = conn;
  conn->setConnectionCallback(connectionCallback_);
  conn->setMessageCallback(messageCallback_);
  conn->setWriteCompleteCallback(writeCompleteCallback_);
  conn->setCloseCallback(
      std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
  ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}

做了以下工作:

(1)从线程池里拿一条线程中的eventloop,如果线程池为空,这里拿到的是主线程。

(2)递增connid为新连接取名。

(3)为新连接创建一个TcpConnection指针(shared_ptr)

(4)把新连接指针插入connections

(5)为新连接设置用户提供的各种事件回调

(6)在新连接所在的eventloop中的方法队列添加方法connecEstablished,等待执行。


void TcpConnection::connectEstablished()
{
    loop_->assertInLoopThread();
    assert(state_ == kConnecting);
    setState(kConnected);
    channel_->tie(shared_from_this());
    channel_->enableReading();

    connectionCallback_(shared_from_this());
}

这是处理新连接的最后一个方法了,在新连接所在的eventloop中执行。主要任务就是在新连接的channel(时间处理器)中打开可读事件。最后有一个connection回调,muduo给这个回调设置了一个默认值,打印新连接的相关信息。用户可以自定义这个回调。

至此,新连接的处理就结束了。

个快快 2018年11月13日 天气 晴

REMARKS

© 2018 MoodBlog 0.2 个快快 作品 | 参考主题: mathilda by fuzzz. | 鲁ICP备16047814号