Fork me on GitHub
余鸢

Netty学习-EventLoop和Task调度

在Java早期,我们的多线程策略只不过是按需要创建和启动新的Thread,来执行并发的工作任务单元,这个粗糙的策略在高负载下表现得非常糟糕。然后Java5引入了Executor API,通过线程缓存和复用,线程池极大地提高了性能。

基本的线程池模式描述如下:

  • 从池中空闲的线程中选出一个,分配一个提交的task(一个Runnable的实现)
  • 当task完成,线程返回池中,等待复用(下一次task分配)

这个模式如图1所示。

图1 Executor执行逻辑

13

相比为每个task都创建和销毁一个线程,将线程放入池中、复用线程是一次性能的提升。但是这个模型还是无法消除上下文切换带来的开销,而这一点会随着线程数量的增加变得明显,在高负载下会变得严重。此外,在一个项目的运行过程中,因为某个应用的整体复杂度和并发需求,其他线程相关的问题也会出现。

EventLoop接口

运行task来处理一个连接在生命周期中产生的event,是任何一个网络框架的基本功能。其相应的编程结构通常被称为一个event loop, Netty采纳了这个名字,设计了接口io.netty.channel.EventLoop

EVENT/TASK执行顺序 Event和Task按FIFO(先进先出)的顺序执行。这保证了字节内容按正确的顺序被处理,免除了数据被破坏的可能性。

Netty 4中的I/O和event处理

I/O操作触发的event穿过一个包含一个或者多个ChannelHandler的ChannelPipeline。传递event的方法被ChannelHandler拦截,event按需被处理。

一个event的本质决定了它将如何被处理;它可能从网络协议栈传送数据到你的应用,或者反过来,或者做一些完全不一样的事情。但是event处理逻辑必须足够通用和灵活,来对付所有可能的情况。所以,在Netty4,所有的I/O操作和event都是由分配给EventLoop的那一个Thread来处理的。

Netty3 的I/O操作

之前发布版本里的线程模型只保证输入(之前被称为upstream)event会在所谓的I/O线程(即Netty4的EventLoop)中执行。所有输出(downstream)event被I/O或者其他任何线程处理。一开始,这个做法看起来不错,但是后来发现有问题:因为ChannelHandler中的输出event需要加上同步。简单来说,我们不能保证多个线程不会试图同时去获取一个输出event。这是有可能发生的,比如说,当你在不同的线程中,在同一个Channel上调用Channel.write(),触发了并行的输出event。

当一个输出event引发了一个输入event,另一个问题发生了。比如,当Channel.write()发生了异常,你需要产生并且触发一个exceptionCaught event。但是在Netty3的模型里,因为这是一个输入event,结果就成了你在调用线程中执行Channel.write()代码,然后把产生的异常event交给I/O线程来执行,造成了一次额外的上下文切换。

Netty4采用的线程模型,在同一个线程的EventLoop中处理所有发生的事,解决了这两个问题。这个模型提供了一个更加简单的执行结构,省去了ChannelHandler间同步的麻烦(除了那些需要在多个Channel间共享的情况)。

让我们来看下task如何被调度执行。

任务调度

有时候你需要安排一个task延迟或者周期执行。比如,你可能会想注册一个task,在客户端连接5分钟后被执行。一个常见的做法是发送心跳消息包到远端来检查连接是否还在;如果没有响应,你就知道你可以关闭这个channel了。

JDK 调度API

Java5 之前,任务调度是建立在java.util.Timer的基础上的,这个类用了一个后台线程,和标准线程有同样的限制。之后,JDK提供了java.util.concurrent包,定义了接口ScheduledExecutorService

下面的代码说明了如何用ScheduledExecutorService来运行一个延迟60秒的task。

代码1.1 用ScheduledExecutorService调度task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void schedule() {
//创建ScheduledExecutorService包含10个线程的线程池
ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
ScheduledFuture<?> future = executor.schedule(
new Runnable() { //创建一个延迟执行的Runnable
@Override
public void run() {
System.out.println("Now it is 60 seconds later");
}
}, 60, TimeUnit.SECONDS);
// do something
//
executor.shutdown();
}

虽然ScheduledExecutorService API用起来很简单,但是在高负载情况下它会带来性能的损失。

用EventLoop调度task

ScheduledExecutorService 的实现有一些局限,比如,需要创建额外的线程来管理线程池。如果要调度很多task,这一点会成为性能的瓶颈。通过采用channel的EventLoop来实现调度,Netty解决了这个问题,如下代码所示。

代码1.2 用EventLoop调度task

1
2
3
4
5
6
7
8
9
10
public static void scheduleViaEventLoop() {
Channel ch = null; // Get reference to channel
ScheduledFuture<?> future = ch.eventLoop().schedule(
new Runnable() {
@Override
public void run() {
System.out.println("Now its 60 seconds later");
}
}, 60, TimeUnit.SECONDS);//take 60秒后执行
}

60秒后,这个Runnable实例会被channel绑定的EventLoop执行。如果要让这个task每60秒跑一次,用scheduleAtFixedRate()方法,如下所示。

代码1.3 用EventLoop调度循环执行的task

1
2
3
4
5
6
7
8
9
10
public static void scheduleFixedViaEventLoop() {
Channel ch = null; // Get reference to channel
ScheduledFuture<?> future = ch.eventLoop().scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
System.out.println("Run every 60 seconds");
}
}, 60, 60, TimeUnit.SECONDS);
}

EventLoop提供了JDK包含的所有方法,包括schedule()scheduleAtFixedRate(),如上述例子所示。完整的操作可以在ScheduledExecutorService的Javadocs中找到。

为了取消task或者检查task执行的状态,利用每个异步操作都会返回的ScheduledFuture。下面的代码说明了一个简单的取消操作。

代码1.4 用ScheduledFuture取消一个task

1
2
3
4
5
//调度task,获取返回的ScheduledFuture
ScheduledFuture<?> future = ch.eventLoop().scheduleAtFixedRate(...);
boolean mayInterruptIfRunning = false;
//取消这个task,阻止它再次运行
future.cabcel(mayInterruptIfRunning);

这些例子说明了利用Netty的调度能力我们可以获得的性能收益。然而,这都依赖于底层的线程模型。下面我们就来研究下它。

实现细节

线程管理

Netty线程模型的出色性能取决于判断当前执行线程是谁;也就是说,它是否是绑定到当前Channel和EventLoop的线程。(回忆下,EventLoop负责处理一个Channel在整个生命周期中的所有event)

如果当前调用线程(calling thread)就是分配给该Channel的EventLoop,代码被执行。否则,EventLoop将task放入一个内部的队列延后执行。当EventLoop处理它的event时,它会执行队列中的这些task。这就解释了为何任何线程都可以直接和Channel交互,而不需要为ChannelHandler加上同步。

注意每个EventLoop都有它自己task队列,独立于其他任何一个EventLoop。

图2 EventLoop执行逻辑

14

之前我们已经强调过了不能阻塞当前I/O线程的重要性。现在我们换一种方式再说一遍:“永远不要把一个长时间运行的task放到执行的队列中,因为这会阻塞同一个线程中其他task的执行。”如果你必须要执行阻塞调用或者执行长时间运行的task,我们建议采用一个专门的EventExecutor。

撇开这样一个限制情况不说,Netty采用的线程模型可以影响队列task对整个系统性能产生的效果,也可以影响传输方式采用的event处理方式。

Eventloop/线程的分配

为Channel的I/O和event提供服务的EventLoop都包含在一个EventLoopGroup中。EventLoop创建和分配的方式根据传输实现的不同而有所不同。

异步传输

异步实现只用了很少几个EventLoop(和它们关联的线程),在目前Netty的模型中,这几个EventLoop被所有Channel共享。这让很多Channel被最少数量的线程服务,而不是每个Channel分配一个线程。

EventLoopGroup负责分配一个EventLoop到每个新创建的Channel。在目前的实现中,采用循环(round-robin)策略可以满足一个平衡的分配,同一个Eventloop还可能会被分配到多个Channel。(在未来的版本,这一点可能会改变。)

一但Channel被指派了一个EventLoop,在它的整个生命周期过程中,都会用这个EventLoop(及其关联的线程)。记住这一点,因为这让你不用担心线程安全和实现ChannelHandler中的同步问题。

还有,注意EventLoop分配对ThreadLocal使用带来的影响。因为一个EventLoop通常掌管多个Channel,所以ThreadLocal对所有绑定的Channel都是同一个。实现譬如状态跟踪等功能时,ThreadLocal就成了一个槽糕的选择。但是,在一个无状态的上下文中,对于在Channel间共享大对象,或者创建开销大的对象,甚至是event时,ThreadLocal仍然有用。

阻塞传输

在这里,每个Channel都分配了一个EventLoop(和它的线程)。如果你开发过调用java.io包中的阻塞I/O的应用,你也许会碰到过这种模式。

但是像之前(异步传输)看到的一样,这个模式保证了每个Channel的I/O event只被一个线程处理——就是分配给Channel的EventLoop。这又是一个Netty设计一致性的例子;这个设计同时也有助于Netty的可靠性和易用性。