Netty(2):EventLoop

EventLoop

EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。

它的继承关系比较复杂

EventLoopGroup

EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)

EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)

所以我们一般不直接操作EventLoop,而是通过EventLoopGroup去获取EventLoop

创建EventLoopGroup并获取EventLoop

        //1.创建事件循环组
        DefaultEventLoopGroup eventLoopGroup = new DefaultEventLoopGroup(2);//普通任务,定时任务
        System.out.println(eventLoopGroup.next());
        System.out.println(eventLoopGroup.next());
        System.out.println(eventLoopGroup.next());
        System.out.println(eventLoopGroup.next());

        //默认线程数
        //        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
        //                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();//io任务,普通任务,定时任务
        //执行普通任务
        eventExecutors.next().submit(() -> {
            System.out.println("abc");
        }).get();

        //执行定时任务
        eventExecutors.next().scheduleAtFixedRate(()-> System.out.println("111"),
                1L, 1L, TimeUnit.SECONDS);

测试结果:

? 优雅关闭

优雅关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的

演示 NioEventLoop 处理 io 事件

server端:

public class EventLoopServer {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf)msg;
                                System.out.println(Thread.currentThread().getName() + ":" +buf.toString(Charset.defaultCharset()));
                            }
                        });
                    }
                }).bind(8080);
    }
}

client端:

public class EventLoopClient {
    public static void main(String[] args) throws InterruptedException {
        //1.创建启动器
        Channel channel = new Bootstrap()
                //添加eventLoop
                .group(new NioEventLoopGroup())
                //选择客户端channel实现
                .channel(NioSocketChannel.class)
                //添加处理器
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    //在连接建立后被调用
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                //连接到服务器
                .connect(new InetSocketAddress("localhost", 8080))
                .sync() //阻塞方法,直到连接建立
                .channel();//代表连接对象
        System.out.println(channel);
    }
}

通过client端向server端发送数据:

可以发现当前当前客户端的所有数据请求都由服务器端的同一个EventLoop去处理。

再复制一个client端,继续发送数据,可观察到换了一个EventLoop

分工细化

group方法除了传递一个参数外,还能传递第二个参数:

    @Override
    public ServerBootstrap group(EventLoopGroup group) {
        return group(group, group);
    }

    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        super.group(parentGroup);
        ObjectUtil.checkNotNull(childGroup, "childGroup");
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = childGroup;
        return this;
    }

parentGroup只负责ServerSocketChannel上的accept事件,childGroup只负责socketChannel上的读写

修改服务器端代码:

        new ServerBootstrap()
                //parentGroup只负责ServerSocketChannel上的accept事件 childGroup只负责socketChannel上的读写
                .group(new NioEventLoopGroup(), new NioEventLoopGroup(5))
                //.group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf)msg;
                                System.out.println(Thread.currentThread().getName() + ":" +buf.toString(Charset.defaultCharset()));
                            }
                        });
                    }
                }).bind(8080);

进一步分工细化:

        EventLoopGroup group = new DefaultEventLoopGroup();
        new ServerBootstrap()
                //parentGroup只负责ServerSocketChannel上的accept事件 childGroup只负责socketChannel上的读写
                .group(new NioEventLoopGroup(), new NioEventLoopGroup(5))
                //.group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf)msg;
                                System.out.println(Thread.currentThread().getName() + ":" +buf.toString(Charset.defaultCharset()));
                                ctx.fireChannelRead(msg);//把消息传递给下一个Handler
                            }
                        }).addLast(group, "handler2", new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf)msg;
                                System.out.println(Thread.currentThread().getName() + ":" +buf.toString(Charset.defaultCharset()));
                            }
                        });
                    }
                }).bind(8080);

运行效果:可以看到,nio 工人和 非 nio 工人也分别绑定了 channel