Netty(2):EventLoop
EventLoop
EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。
它的继承关系比较复杂
- 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
- 另一条线是继承自 netty 自己的 OrderedEventExecutor,
- 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
- 提供了 parent 方法来看看自己属于哪个 EventLoopGroup
EventLoopGroup
EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
- 继承自 netty 自己的 EventExecutorGroup
- 实现了 Iterable 接口提供遍历 EventLoop 的能力
- 另有 next 方法获取集合中下一个 EventLoop
EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
- 继承自 netty 自己的 EventExecutorGroup
- 实现了 Iterable 接口提供遍历 EventLoop 的能力
- 另有 next 方法获取集合中下一个 EventLoop
所以我们一般不直接操作EventLoop,而是通过EventLoopGroup去获取EventLoop。
创建EventLoopGroup并获取EventLoop
//1.创建事件循环组
DefaultEventLoopGroup eventLoopGroup = new DefaultEventLoopGroup(2);//普通任务,定时任务
System.out.println(eventLoopGroup.next());
System.out.println(eventLoopGroup.next());
System.out.println(eventLoopGroup.next());
System.out.println(eventLoopGroup.next());
//默认线程数
// DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
// "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();//io任务,普通任务,定时任务
//执行普通任务
eventExecutors.next().submit(() -> {
System.out.println("abc");
}).get();
//执行定时任务
eventExecutors.next().scheduleAtFixedRate(()-> System.out.println("111"),
1L, 1L, TimeUnit.SECONDS);
测试结果:
? 优雅关闭
优雅关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的
演示 NioEventLoop 处理 io 事件
server端:
public class EventLoopServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf)msg;
System.out.println(Thread.currentThread().getName() + ":" +buf.toString(Charset.defaultCharset()));
}
});
}
}).bind(8080);
}
}
client端:
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
//1.创建启动器
Channel channel = new Bootstrap()
//添加eventLoop
.group(new NioEventLoopGroup())
//选择客户端channel实现
.channel(NioSocketChannel.class)
//添加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
//在连接建立后被调用
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
//连接到服务器
.connect(new InetSocketAddress("localhost", 8080))
.sync() //阻塞方法,直到连接建立
.channel();//代表连接对象
System.out.println(channel);
}
}
通过client端向server端发送数据:
可以发现当前当前客户端的所有数据请求都由服务器端的同一个EventLoop去处理。
再复制一个client端,继续发送数据,可观察到换了一个EventLoop
分工细化
group方法除了传递一个参数外,还能传递第二个参数:
@Override
public ServerBootstrap group(EventLoopGroup group) {
return group(group, group);
}
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
ObjectUtil.checkNotNull(childGroup, "childGroup");
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
parentGroup只负责ServerSocketChannel上的accept事件,childGroup只负责socketChannel上的读写
修改服务器端代码:
new ServerBootstrap()
//parentGroup只负责ServerSocketChannel上的accept事件 childGroup只负责socketChannel上的读写
.group(new NioEventLoopGroup(), new NioEventLoopGroup(5))
//.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf)msg;
System.out.println(Thread.currentThread().getName() + ":" +buf.toString(Charset.defaultCharset()));
}
});
}
}).bind(8080);
进一步分工细化:
EventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
//parentGroup只负责ServerSocketChannel上的accept事件 childGroup只负责socketChannel上的读写
.group(new NioEventLoopGroup(), new NioEventLoopGroup(5))
//.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf)msg;
System.out.println(Thread.currentThread().getName() + ":" +buf.toString(Charset.defaultCharset()));
ctx.fireChannelRead(msg);//把消息传递给下一个Handler
}
}).addLast(group, "handler2", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf)msg;
System.out.println(Thread.currentThread().getName() + ":" +buf.toString(Charset.defaultCharset()));
}
});
}
}).bind(8080);
运行效果:可以看到,nio 工人和 非 nio 工人也分别绑定了 channel