User guide for 4.x

User guide for 4.x

前言

令人头疼的问题

现在我们一般使用通用应用程序或者类库互相通信。例如,我们经常使用一个 HTTP 客户端类库从 WEB 服务器上获取信息,或者通过 Web 服务来实现远程方法调用。然而,一个通用的协议或者其实现有时候并没有良好的伸缩性。这就像是我们不会使用一个通用的 HTTP 服务器来传输大文件,e-mail 信息,或者金融信息和多玩家游戏数据那样的实时性要求很高的信息。一个通用型的最优化的协议实现要求什么呢?例如,你可能想实现一个 HTTP 服务器,需要同时用于基于 AJAX的 聊天应用,提供流媒体服务或者大文件传输功能。你也许会想针对自己的需求设计并实一整套新的协议。另外有一种不可避免的情况就是,为了与一个旧系统进行通信,你必须得处理一个旧系统专属的协议。在这种情况下,我们怎么在不牺牲稳定性和性能的前提下快速实现我们的协议?

解决方法

对于快速开发可维护、高性能并且伸缩性良好的协议服务器和客户端,Netty 提供了一种异步事件驱动网络应用框架。
换句话说,Netty 是一个 NIO 客户端服务器框架,它让我们开发像协议服务器或客户端等这些网络应用变得快速而简单。它极大的简化了像 TCP 和 UDP 套接字等网络编程。
“快速且容易”并不意味着会导致维护或性能问题。Netty 是在充分考虑了大量的网络协议如 FTP,SMTP,HTTP 和 大量的历史遗留的基于二进制和基于文本的协议之后被设计出来的。因此,Netty 成功地实现了易开发、高性能、高稳定性、强可伸缩性这些优点。
一些用户可能也发现了其他的声称有和 Netty 相同优势的网络应用框架,所以你会问 Netty 与那些框架有什么不同呢。答案就是 Netty 依据的哲学。在 API 和具体实现上,Netty 是在充分考虑了用户使用体验后设计出来的。虽然有时候并不现实,但在阅读了指南和 API 后,你将会意识到 Netty 让你的生活变得简单的哲学。

开始跑两步

这章主要围绕 Netty 核心构造的讲解,并辅以可以让你快速上手的简单示例。当你阅读到本章末尾时,你就可以写一个基于 Netty 的客户端和服务器。
如果你更喜欢自顶向下的学习方式,你也可以从第二章-架构概览看起,看完第二章后再回到这里。

前景提示

运行本章介绍的实例有两个前提——最新版本的 Netty 和 1.6以上的 JDK 版本。最新的 Netty 版本可以在下载页面找到。为了可以下载到正确的 JDK 版本,请参考你喜欢的 JDK 供应商的网站。
在你阅读学习本章的过程中,对于本章提到的类你可能有很多疑问。当你想知道这些类的详细信息时,可以去查阅 API 文档。为了方便学习者,本文中所有的类都链接到了在线的 API 文档。当然,如果你发现了任何异常、错误的语法以及排版错误,或者你有更好地改进该文档的想法请毫不犹豫的联系Netty 社区来告知我们。

写一个拒绝一切的服务器

最简单的协议服务器并不是“Hello,World!”而是DISCARD
为了实现 DISCARD 协议,我们需要做的仅仅是忽略所有接收到的数据。让我们直接从处理器实现开始,它是 Netty 生成的,用来处理 I/O 事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package io.netty.example.discard;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Handles a server-side channel.
*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
// Discard the received data silently.
((ByteBuf) msg).release(); // (3)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
  1. DiscardServerHandler 继承了 ChannelInboundHandlerAdapter,而 ChannelInboundHandlerAdapter 实现了 ChannelInboundHandlerChannelInboundHandler 提供了大量的我们可以重写的事件处理方法。就目前的问题,我们只需要继承 ChannelInboundHandlerAdapter 就够了,而不需要自己实现接口 ChannelInboundHandler
  2. 本例中,我们重写了 channelRead() 事件处理方法。当接收到来自客户端的新的数据后,这个方法就会被调用。在这个例子中,接收到的信息的类型是 ByteBuf
  3. 为了实现 DISCARD 协议服务器,处理器必须忽略接收到的信息。ByteBuf 是一个不得不通过 release() 方法释放的引用计数对象。请记住:必须在处理器中释放掉任何传入处理器的引用记数对象。一般地,channelRead() 处理方法的实现如下:

    1
    2
    3
    4
    5
    6
    7
    8
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
    // Do something with msg
    } finally {
    ReferenceCountUtil.release(msg);
    }
    }
  4. exceptionCaught() 事件处理方法会在 Netty 产生了 I/O 错误或者处理器的实现方法中产生了异常时被调用。大多数情况下,应该以日志记录捕获的异常,并将其相关的通信通道关闭掉,但实际的实现依赖于用户自己处理异常的情景。例如,也许你想要在关闭通信连接之前发送一个附带错误码的错误信息响应。

到目前为止,我们已经实现了 DISCARD 服务器的第一部分。现在剩下的是在 main() 方法中使用 DiscardServerHandler 启动服务器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package io.netty.example.discard;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Discards any incoming data.
*/
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new DiscardServer(port).run();
}
}
  1. NioEventLoopGroup 是一个处理 I/O 操作的多线程循环事件处理器。为了应对不同的传输类型,Netty 提供了大量的 EventLoopGroup 的实现。本例中,我们正在实现一个服务端应用,因此两个 NioEventLoopGroup 将被用到。第一个,通常被称为 ‘boss’,接收一个即将到来的连接。第二个,通常被称为 ‘worker’,当 ‘boss’ 接受了连接并向 ‘worker’ 注册了这个连接,’worker’ 就会处理连接上的流量。使用多少个线程以及这些线程怎么被映射到 Channels 取决于 EventLoopGroup 的实现,甚至也可以通过构造方法来配置。
  2. ServerBootstrap 是一个可以帮我们建立一个服务器的工具类。你可以通过直接使用 Channel 来建立服务器。然而,那个过程是极其繁杂无聊的,并且大多数情况下我们都不必这么做。
  3. 这里,我们使用了 NioServerSocketChannel 类来初始化一个可以接收连接的 Channel
  4. 处理器每次都将使用一个新的 ChannelChannelInitializer 是个特殊的处理器,旨在帮一个用户配置一个新的 Channel。对于一个新的 Channel,大多数时候我们更想通过配置其 ChannelPipeline 来添加像 DiscardServerHandler 这样的处理器来实现网络应用。随着网络应用程序变得逐渐复杂,我们更可能是添加更多的处理器到 ‘pipeline’ 上和一些匿名类到顶级类上。
  5. 你也可以添加一下参数到 Channel 的实现上。这里我们正在写一个 TCP/IP 服务器,因此我们可以设置像 tcpNodelaykeepAlive 等套接字选项。可以去查阅 ChannelOptionChannelConfig 实现的 API 文档去学习如何配置 ChannelOptionS。
  6. 不知道你是否已经注意到 option()childOption() 了?option() 是为了设置接收即将到来的连接的 NioServerSocketChannelchildOption() 是为了设置从父 ServerChannel 那里接受的 ChannelS,在这里是 NioServerSocketChannel
  7. 现在我们准备去跑一下这个程序。剩下的就是绑定端口号并启动这个服务器。在这里,我们绑定了端口号 8080。你现在可以随意调用 bind() 方法多次去绑定不同的地址。

恭喜!你已经编码完成了你的第一个基于 Netty 的服务器(Ps:这个例子并不好玩!)。

查看接收到的数据

现在我们已经写好了我们第一个服务器,我们需要测试下它是否可以正常工作。最简单的测试方法是使用 telnet 命令。例如,你可以在命令行中输入 telnet localhost 8080 并输入一些内容。
然而,我们可以认为服务器已经可以正常工作了吗?因为它是一个拒绝一切的服务器,所以我们并不能就这么理所当让的认为它是正常工作的。我们不会从服务器得到任何响应信息。因此,为了证明服务器是正常工作的,我们要去修改代码——将收到的信息打印出来。
我们已经知道:在接收到数据时,channelRead() 方法将被调用。让我们再往 channelRead() 方法中添加些代码:

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
try {
while (in.isReadable()) { // (1)
System.out.print((char) in.readByte());
System.out.flush();
}
} finally {
ReferenceCountUtil.release(msg); // (2)
}
}

  1. 这里的无限循环实际上是相当简单的:System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
  2. 在这里是否要调用 in.release() 是可选的

完整的代码在 Netty 发行版的io.netty.example.discard中。

写一个回显服务器

到目前为止,我们已经丢掉了接收到的数据,并没有任何响应。然而,一个服务器通常要对请求做出一些响应的。接下来通过实现一个ECHO让我们学习怎么向客户端返回响应信息——即将客户端发来的数据原封不动的返回回去。
回显服务器与拒绝一切的服务器唯一的不同在于它将接收到的数据返回而不是将其打印在控制台。因此,我们仅仅需要修改下 channelRead() 方法:

1
2
3
4
5
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg); // (1)
ctx.flush(); // (2)
}

  1. ChannelHandlerContext 对象提供很多操作方法可以让用户触发各种各样的 I/O 事件。这里,我们调用了 write(Object) 方法来一字不差的输出接收到的信息。请注意:这里我们并没有像 DISCARD 那样手动释放接收到的信息。因为当信息全部写完到连线信道后 Netty 会自动帮我们释放它。
  2. ctx.write(Object) 并没有将信息全部写到连线信道。它仅是内部缓存了而已,下面的 ctx.flush() 才是将信息全部写入到连线信道。当然了,你也可以直接使用 ctx.writeAndFlush(msg)

如果你再次运行 telnet 命令测试的话,你就会发现无论你发送了什么就能接收到什么。
完整的代码在 Netty 发行版的io.netty.example.echo中。

写一个时间服务器

这个小节中我们将会实现基于一个TIME协议的服务器。它与之前的例子是不同的,无论服务器端接收到什么请求,它都将返回一个包含着 32 个比特位的整型消息,并且一旦响应发送出去就关闭连接。通过本例,你将会学习到怎么构造并发送一个消息,发送完成之后关闭连接。
因为我们要实现的是忽略所有接收到的信息并返回一个信息,所以我们这里不能再使用 channelRead() 方法了。相应地,我们应该重写 channelActive() 方法。下面是该方法的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package io.netty.example.time;
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) { // (1)
final ByteBuf time = ctx.alloc().buffer(4); // (2)
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time); // (3)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // (4)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

  1. 正如上文的解释,当一个连接建立后并可以进行传输流量时,channelActive() 将会被调用。接下来我们向通信连线上写入了代表着当前时间的 32 个比特位的整型数字。
  2. 为了写入一个新的消息,我们需要分配一个新的 buffer ,用来存储新消息。因为我们需要写入一个 32 比特位的整数,所以我们需要新建一个至少包含 4 个字节的ByteBuf类型。通过 ChannelHandlerContext.alloc() 我们获取到当前的ByteBufAllocator并分配一个新的 buffer。
  3. 通常,我们是写入一个构造好的信息。
    但是等等,flip 在哪呢?过去我们使用 NIO 发送一个消息之前,是不是会经常调用 java.nio.ByteBuffer.flip() 呢? 因为其实索引点和结束索引点,所以ByteBuf 并没有这样的方法;一个用来读操作,另一个用来写操作。当你向 ByteBuf 写入一些内容,写索引值将会增加,而读索引值并没有变。读索引值和写索引值分别代表代表着信息起始索引值和结束索引值。
    相比之下,NIO buffer 没有提供一个清空信息的方法来获取信息的起始索引值和结束索引值。如果你忘记调用 flip 方法就会很蛋疼,因为这样是没有任何数据被发送的。Netty 中就不会有这种问题,因为我们有起始索引值和结束索引值并且有不同的操作。你就会发现这使你的生活变得更容易——不必在时刻记着 flip
    其他需要注意的地方就是 ChannelHandlerContext.write() (和 writeAndFlush())方法会返回一个ChannelFutureChannelFuture 代表一个尚未发生的 I/O 操作。这也就意味着,任何请求操作可能都尚未发生,因为在 Netty 中任何操作都是异步的嘛。例如,下面的代码可能在消息被发送之前就关闭了连接:
    1
    2
    3
    Channel ch = ...;
    ch.writeAndFlush(message);
    ch.close();

因为你需要在ChannelFuture完成之后调用 close() 方法,正如上面代码所示,我们在 ChannelFuture 中添加了监听器,并注册了其操作完成的监听方法。请注意,close() 方法也许不会立即关闭连接,并且它会返回一个ChannelFuture

  1. 我们怎么捕获到一个写入要求完成事件呢?这时我们只需要在 Channel Future 上添加一个监听器ChannelFutureListener。这里我们新建了一个匿名的ChannelFutureListener在操作完成之后关闭连接。
  2. 可选的,你可以用一个预定义的监听器来简化代码:
    1
    f.addListener(ChannelFutureListener.CLOSE);

为了测试我们的时间服务器是否正常工作,我们使用 UNIX 下的 rdate 命令:

1
$ rdate -o <port> -p <host>

就是我们在 main() 方法中指定的端口号, 则一般是 localhost。

写一个时间客户端

不像 DISCARD 和 ECHO 服务器,我们需要一个客户端来将 32 个比特位的数据转换成一个人类可读的日期。这个小节里,我们将探讨如何确定时间服务器是正常工作的,并且学习使用 Netty 如何写一个时间客户端。
使用 Netty 写一个客户端和一个服务器有且仅有的不同就是用到的BootstrapChannel的实现的不同。请看下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package io.netty.example.time;
public class TimeClient {
public static void main(String[] args) throws Exception {
String host = args[0];
int port = Integer.parseInt(args[1]);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}

  1. BootstrapServerBootstrap是相似的,除了它是像客户端或无连接的非服务类型的通信信道。
  2. 如果你仅仅为Bootstrap设置了一个EventLoopGroup,那么这个EventLoopGroup将会充当 boss 和 worker 的角色。当然,boss worker 在客户端是没用的。
  3. 相应地,NioSocketChannel 也替代了NioServerSocketChannel被用来在客户端创建Channel
  4. 请注意这里我们没有使用 childOption() 方法,因为客户端SocketChannel没有一个父亲类。
  5. 在客户端,我们也应该使用 connect() 方法代替 bind() 方法。

正如你看到的,客户端的代码和服务器端的确实有很多不同。那么ChannelHandler该怎么实现呢?它应该从服务器端接收一个 32 位的整数,并将其转换为人类可读的形式打印出来,最后关闭连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package io.netty.example.time;
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

  1. 依照 TCP/IP 协议,Netty 将接收到的数据读入一个ByteBuf

客户端代码看起来很简单并且和服务器端代码看起来没太多不同。然而,这个处理器有时会出现异常 IndexOutOfBoundsException 。我们将在下一小节中讨论这个问题。

处理一个基于流传输的问题

套接字缓存的一个小告警

在使用像 TCP/IP 等基于流的传输时,接收到的数据被存储在套接字缓存中。不幸的是,基于流的缓存保存的不是一个分包队列而是字节队列。这意味着,即使你使用两个相互独立的包发送了两个消息,操作系统也仅仅是将其作为一批字节对待而已,并未对这两个包做区分。因此,客户端读到的数据并不能保证就是服务器写入的信息。例如,假设操作系统的 TCP/IP 栈已经收到了三个包:

因为是基于流的协议,所以我们的应用程序有很高的可能性像下面分成帧的形式读取它们:

因此,一个接收方,无论是服务器端还是客户端都应该整理接收到的数据,将其转换为有意义的并方便应用程序理解的帧格式。上例中,接收到的数据的整理后的帧格式就像下面这样:

第一个解决方法

现在再让我们看看 TIME 客户端的例子。这个例子中就有这个问题。一个 32 位的整数是量非常小的数据,并且通常情况下它是不会被分帧的。然而,问题是它有可能会被分帧,并且随着流量的增加分帧的可能性也随之增加。
最简单的解决方法是创建一个内部累加缓存,直到从内部缓存中得到 4 个字节的数字。修复了问题的 TimeClientHandler 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package io.netty.example.time;
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private ByteBuf buf;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
buf = ctx.alloc().buffer(4); // (1)
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
buf.release(); // (1)
buf = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
buf.writeBytes(m); // (2)
m.release();
if (buf.readableBytes() >= 4) { // (3)
long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

  1. ChannelHandler 有两个生命周期方法:handlerAdded()handlerRemoved() 。你可以在这两个方法中进行一些不会阻塞太长时间的初始化工作或者收尾工作 。
  2. 首先,所有接收到的数据都应该累计缓存到 buf。
  3. 然后,处理器首先要检测 buf 是否有足够的数据(本例中即 4 个字节),然后再处理实际的业务逻辑。要不然,当接收到更多的数据时,Netty 将会再次调用 ChannelRead() 方法,最终所有的 4 字节都将会被缓存。

第二个解决方法

尽管第一个解决方法已经完美地解决了这个问题,但是修改后的处理器代码看起来并不是那么清晰。想象下如果一个更复杂的协议需要传输多个不同长度的字段。你的ChannelInboundHandler实现很快就会变得不可维护。
正如你可以猜到的,你可以增加超过一个ChannelHandlerChannelPipeline,因此,你可以分离出一个ChannelHandler作为单独的模块来降低应用程序的复杂度。例如,你可以将 TimeClientHandler 分割成两个处理器:

  • 用来处理分帧问题的 TimeDecoder,和
  • 刚开始的最简单的 TimeClientHandler 版本

幸运的是,Netty 提供了一个扩展类来帮助你写第一个处理器:

1
2
3
4
5
6
7
8
9
10
11
12
package io.netty.example.time;
public class TimeDecoder extends ByteToMessageDecoder { // (1)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
if (in.readableBytes() < 4) {
return; // (3)
}
out.add(in.readBytes(4)); // (4)
}
}

  1. ByteToMessageDecoderChannelInboundHandler的一个实现,使用ByteToMessageDecoder可以很容易的处理分帧问题。
  2. 无论何时接收到数据,ByteToMessageDecoder都会调用内部附带可维护的缓存的 decode() 方法。
  3. 在没有接收到足够的数据时,decode() 方法被设计成不会接收任何数据。并且,只要有接收到了更多的数据,decode() 方法会被再次调用。
  4. 如果 decode() 方法在 out 中添加了一个对象,则说明解析器已经成功解析了一个消息。ByteToMessageDecoder将会丢弃已读取的内部缓存数据。请牢记:你不需要自己解析多个消息。ByteToMessageDecoder会一直调用 decode() 方法,直到它不再向 out 中添加任何数据。

现在我们有更多的处理器要插入到ChannelPipeline,我们应该修改ChannelInitizlizer的实现:

1
2
3
4
5
6
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
}
});

如果你对 Netty 足够了解,也许你想试试可以再次简化解析器的ReplayingDecoder。如果需要了解详情,你需要去查阅 API 文档。

1
2
3
4
5
6
7
public class TimeDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(
ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
out.add(in.readBytes(4));
}
}

另外,Netty 提供了 out-of-the-box 解析器,可以帮助你轻松地实现更多的协议,并让处理器的实现有更高的可维护性。欲知详情,请参考如下包中的例子:

使用 POJO 代替 ByteBuf

到目前为止,我们学习到的所有例子都是使用 ByteBuf 作为协议消息的数据结构。这个小节中,我们将使用 POJO 来代替 ByteBuf 来改进 TIME 协议服务器和客户端。
ChannelHandlerS中使用 POJO 的效果是明显的;通过分离处理器中的ByteBuf,来提高处理器的可维护性和重用性。在 TIME 客户端和服务器的例子中,我们仅仅是读取一个 32 位的整数,所以直接使用 ByteBuf 并没有直接的问题。然而,当你实现一个真正的协议服务器时你将会发现从你的处理器中分离 ByteBuf 是非常必要的。
首先,让我们定义一个新的类型 UnixTime:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package io.netty.example.time;
import java.util.Date;
public class UnixTime {
private final long value;
public UnixTime() {
this(System.currentTimeMillis() / 1000L + 2208988800L);
}
public UnixTime(long value) {
this.value = value;
}
public long value() {
return value;
}
@Override
public String toString() {
return new Date((value() - 2208988800L) * 1000L).toString();
}
}

我们现在可以重构 TimeDecoder 产生一个 UnixTime 对象来替换 ByteBuf

1
2
3
4
5
6
7
8
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < 4) {
return;
}
out.add(new UnixTime(in.readUnsignedInt()));
}

跟随着解析器的更新,TimeClientHandler 也不再使用ByteBuf

1
2
3
4
5
6
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
UnixTime m = (UnixTime) msg;
System.out.println(m);
ctx.close();
}

非常简单优雅是不是?相同的改造同样可以应用于服务器端。让我们先更新 TimeServerHandler:

1
2
3
4
5
@Override
public void channelActive(ChannelHandlerContext ctx) {
ChannelFuture f = ctx.writeAndFlush(new UnixTime());
f.addListener(ChannelFutureListener.CLOSE);
}

现在,剩下的就是缺少一个编码器。编码器实现了ChannelOutboundHandler将 UnixTime 转换成ByteBuf。相对于写解析器而言,写一个编码器是非常简单的,因为它不需要处理包分帧:

1
2
3
4
5
6
7
8
9
10
11
package io.netty.example.time;
public class TimeEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
UnixTime m = (UnixTime) msg;
ByteBuf encoded = ctx.alloc().buffer(4);
encoded.writeInt((int)m.value());
ctx.write(encoded, promise); // (1)
}
}

  1. 这里有一些需要注意的地方。
    首先,我们传入ChannelPromise来将编码后的数据写入连线通道中。
    其次,我们没有调用 ctx.flush() 方法。这个任务放在了另外一个处理器里面。
    为了进一步简化代码,你可以使用MessageToByteEncoder
    1
    2
    3
    4
    5
    6
    public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
    out.writeInt((int)msg.value());
    }
    }

最后的任务就是将 TimeEncoder 在 TimeserverHandler 之前插入到ChannelPipeline中。

关闭你的应用

关闭一个 Netty 应用通常是非常简单的,你要做的仅仅是通过 shutdownGracefully() 方法来关闭掉EventLoopGroupS。那个方法会返回一个Future来提醒你EventLoopGroup已经完成关闭,并且所有属于这个EventLoopGroupChannelS也会同时关闭。

总结

本章我们快速学习了使用 Netty 来写一个正常工作的网络应用程序。
未来的章节里将会有关于 Netty 的更详细的信息。同时我们也鼓励你查阅在io.netty.example包中的例子。
请记住:社区期待你的问题反馈,以及改进 Netty 或者其文档的想法。