0%

Netty的优化

共享Handler

1
2
3
4
5
6
7
8
9
10
11
serverBootstrap
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new Spliter());//帧解码器
ch.pipeline().addLast(new PacketDecoder());//数据包解码器
ch.pipeline().addLast(new HandlerA());//业务Handler
ch.pipeline().addLast(new HandlerB());//业务Handler
ch.pipeline().addLast(new HandlerC());//业务Handler
ch.pipeline().addLast(new PacketEncoder());//数据包编码器
}
});

每次有新连接到来的时候,都会调用 ChannelInitializerinitChannel() 方法,然后这里相关的 handler 都会被 new 一次。许多 handler,他们内部都是没有成员变量的,也就是说是无状态的,我们完全可以使用单例模式,即调用 pipeline().addLast() 方法的时候,都直接使用单例,不需要每次都 new,提高效率,也避免了创建很多小的对象。

单例改造:对于无状态Handler,使用单例模式,多个channel共享一个实例

1
2
3
4
5
6
7
8
9
10
11
12

// 1. 加上注解标识,表明该 handler 是可以多个 channel 共享的
@ChannelHandler.Sharable
public class HandlerA extends SimpleChannelInboundHandler<APacket> {

// 2. 构造单例
public static final HandlerA INSTANCE = new HandlerA();

protected HandlerA() {
}

}

@ChannelHandler.Sharable 显示地告诉 Netty,这个 handler 是支持多个 channel 共享的,否则会报错。

合并编解码器

Netty 内部提供了一个类,叫做 MessageToMessageCodec,使用它可以让我们的编解码操作放到一个类里面去实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@ChannelHandler.Sharable
public class PacketCodecHandler extends MessageToMessageCodec<ByteBuf,Packet> {

public final static PacketCodecHandler INSTANCE=new PacketCodecHandler();
@Override
protected void encode(ChannelHandlerContext ctx, Packet msg, List<Object> out) throws Exception {
out.add(PacketCodeC.DEFAULT.encode(ctx.alloc(),msg));
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
out.add(PacketCodeC.DEFAULT.decode(msg));
}
}

PacketCodecHandler一般是一个无状态的 handler,因此,同样可以使用单例模式来实现。我们需要实现 MessageToMessageCodecdecode()encode() 方法,decode 是将二进制数据 ByteBuf 转换为 java 对象 Packet,而 encode 操作是一个相反的过程,在 encode() 方法里面,我们需要传入了 channel 的 内存分配器,手工分配了 ByteBuf

缩短事件传播路径

合并平行 handler

在很多情况下,每次 decode 出来一个指令对象之后,其实只会在一个指令 handler 上进行处理,因此,我们其实可以把这么多的指令 handler 压缩为一个 handler实现路由功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@ChannelHandler.Sharable
public class RouteHandler extends SimpleChannelInboundHandler<Packet> {
public static final RouteHandler INSTANCE = new RouteHandler();

private Map<Byte, SimpleChannelInboundHandler<? extends Packet>> routeMap;

private IMHandler() {
routeMap = new HashMap<Byte, SimpleChannelInboundHandler<? extends Packet>>(){{
put(CommandA, HandlerA.INSTANCE);
put(CommandB, HandlerB.INSTANCE);
put(CommandC, HandlerC.INSTANCE);

}}
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, Packet packet) throws Exception {
routeMap.get(packet.getCommand()).channelRead(ctx, packet);
}
}

经过上述优化方法后,我们的pipeline最终变成这样:

1
2
3
4
5
6
7
8
serverBootstrap
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new Spliter());//帧解码器
ch.pipeline().addLast(new PacketCodecHandler.INSTANCE());//数据包编解码器
ch.pipeline().addLast(RouteHandler.INSTANCE);//路由
}
});

第一个Handler是帧解码器 Spliter,我们是无法使用单例模式进行改造的,因为他内部实现是与每个 channel 有关,每个 Spliter 需要维持每个 channel 当前读到的数据,也就是说他是有状态的。

更改事件传播源

如果你的 outBound 类型的 handler 较多,在写数据的时候能用 ctx.writeAndFlush() 就用这个方法。

ctx.writeAndFlush() 是从 pipeline 链中的当前节点开始往前找到第一个 outBound 类型的 handler 把对象往前进行传播,如果这个对象确认不需要经过其他 outBound 类型的 handler 处理,就使用这个方法。

image.png

ctx.channel().writeAndFlush() 是从 pipeline 链中的最后一个 outBound 类型的 handler 开始,把对象往前进行传播,如果你确认当前创建的对象需要经过后面的 outBound 类型的 handler,那么就调用此方法。

image.png

减少阻塞主线程的操作

通常我们的应用程序会涉及到数据库或者网络,比如以下这个例子

1
2
3
4
5
6
protected void channelRead0(ChannelHandlerContext ctx, T packet) {
// 1. balabala 一些逻辑
// 2. 数据库或者网络等一些耗时的操作
// 3. writeAndFlush()
// 4. balabala 其他的逻辑
}

由于Netty是基于事件驱动的异步网络框架,一个 handler 中的 channelRead0() 方法阻塞了 NIO 线程,最终都会拖慢绑定在该 NIO 线程上的其他所有的 channel。对于耗时的操作,我们需要把这些耗时的操作丢到我们的业务线程池或利用消息队列中去处理,这样,就可以避免一些耗时的操作影响 Netty 的 NIO 线程,从而影响其他的 channel