跳到主要内容

42、Netty 源码解析 - 任务加入异步线程池

一、基本说明

1、在 Netty 中做耗时的,不可预料的操作,比如数据库、网络请求,会严重影响 Netty 对 Socket 的处理速度。
2、解决方案就是将耗时任务添加到异步线程池中。但是就添加线程池这步操作来讲,可以有2种方式:

  • 处理耗时业务的第一种方式----handler中加入线程池
  • 处理耗时业务的第二种方式----Context中添加线程池

二、handler中加入线程池

2.1 业务代码

@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    // group 充当业务线程池,可以将任务提交到该线程池中
    // 这里创建了 16 个线程
    static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {

        System.out.println("EchoServerHandler 的线程是=" + Thread.currentThread().getName());

        // 将任务提交到 group 线程池
        group.submit(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                // 接收客户端消息
                ByteBuf buf = (ByteBuf) msg;
                byte[] bytes = new byte[buf.readableBytes()];
                buf.readBytes(bytes);
                String body = new String(bytes, "UTF-8");
                // 休眠 10 秒
                Thread.sleep(10 * 1000);
                System.out.println("group.submit 的 call 的线程是=" + Thread.currentThread().getName());
                // 回复消息
                ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端222", CharsetUtil.UTF_8));
                return null;
            }
        });
        
        System.out.println("go on");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

在channelRead 方法中,模拟了一个耗时 10秒的操作,这里我们将这个任务提交到了一个自定义的业务线程池(group)中,这样,就不会阻塞 Netty 的 IO 线程。

2.2 整个程序处理过程

 

1、当 IO 线程轮询到一个 socket 事件后,交给 IO 线程开始处理,当走到耗时 handler 的时候,将耗时任务交给业务线程池。
2、当耗时任务执行完毕再执行 pipeline write 方法的时候(代码中使用的是ctx.writeAndFlush),会将这个任务交给 IO 线程

2.3 write方法源码(AbstractChannelHandlerContext)

private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

1、当判断下个 outbound 的 executor 线程不是当前线程的时候,会将当前的工作封装成 task,然后放入队列中,等待 IO 任务执行完毕后执行队列中的任务。
2、当我们使用了group.submit(new Callable() {},在 handler 中加入线程池,就会进入到 safeExecute(executor, task, promise, m); 如果去掉这段代码,而使用普通方式来执行耗时的业务,那么就不会进入到 safeExecute(executor, task, promise, m);

三、Context中加入线程池

3.1 业务代码

public final class EchoServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    // 创建业务线程池
    // 这里创建了有两个线程的线程池
    static final EventExecutorGroup group = new DefaultEventExecutorGroup(2);

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc()));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     //p.addLast(new EchoServerHandler());
                     // 说明:如果在 addLast 添加 handler 时,前面有指定
                     // EventExecutorGroup ,那么该 handler 会优先加入到该线程池中
                     p.addLast(group,new EchoServerHandler());
                 }
             });

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

1、handler 中的代码就使用普通的方式来处理耗时业务。
2、当我们在调用 addLast 方法添加线程池后,handler 将优先使用这个线程池,如果不添加,将使用 IO 线程。
3、当走到 AbstractChannelHandlerContext 的 invokeChannelRead 方法的时候,executor.inEventLoop() 是不会通过的,因为当前线程是 IO 线程,Context(也就是Handler)的 executor 是业务线程,所以会异步执行。

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
     final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
      EventExecutor executor = next.executor();
      if (executor.inEventLoop()) {
          next.invokeChannelRead(m);
      } else {
          executor.execute(new Runnable() {
              @Override
              public void run() {
                  next.invokeChannelRead(m);
              }
          });
      }
  }

4、验证时,如果去掉 p.addLast(group,new EchoServerHandler()); 改为 p.addLast(new EchoServerHandler()); 就不会进行异步执行

四、两种方式的比较

1、第一种方式在 handler 中添加异步,可能更加的自由,比如如果需要访问数据库,那就异步,如果不需要,那就不异步。异步会拖长接口响应时间,因为需要将任务放进 mpscTask 中。如果 IO 时间很短,task 很多,可能一个循环下来,都没时间执行整个 task,导致响应时间达不到指标
2、第二种方式是 Netty 标准方式(即加入到队列),但是,这么做会将整个 handler 都交给业务线程池。不论耗时不耗时,都加入到队列,不够灵活