跳到主要内容

17、Netty 源码解析 - taskQueue自定义任务

任务队列中的Task有3种典型使用场景:

1、用户程序自定义的普通任务

public class NettyServerHandlerTask01 extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        // 比如这里有一个非常耗时长的业务 -> 异步执行 -> 将业务提交到该 channel 对应的
        // NioEventLoop 的 taskQueue 中处理

        // 解决方案1:用户程序自定义的普通任务
        // 10秒后运行
        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(10 * 1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端222", CharsetUtil.UTF_8));
                }catch (Exception ex){
                    System.out.println("发生异常:" + ex.getMessage()    );
                }
            }
        });

        // 30秒后运行
        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(20 * 1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端222", CharsetUtil.UTF_8));
                }catch (Exception ex){
                    System.out.println("发生异常:" + ex.getMessage()    );
                }
            }
        });

        System.out.println("go on...");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端1111",CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

2、用户自定义定时任务

public class NettyServerHandlerTask02 extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        // 用户自定义定时任务 -> 该任务提交到scheduleTaskQueue
        ctx.channel().eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(5 * 1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端444", CharsetUtil.UTF_8));
                }catch (Exception ex){
                    System.out.println("发生异常:" + ex.getMessage()    );
                }
            }
        },5, TimeUnit.SECONDS);

        System.out.println("go on...");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端",CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

3、非当前Reactor线程调用Channel的各种方法

例如在推送系统的业务线程里面,根据用户标识,找到对应的Channel引用,然后调用Write方法向该用户推送消息,就会进入到这种场景。最终的Write会提交到任务队列中被异步消费

思想:可以使用一个集合管理 SocketChannel,再推送消息时,可以将业务加入到各个 channel 对应的 NioEventLoop 的 taskQueue 或者 scheduleTaskQueue