Java的NIO及Netty

目录:

  1. Java NIO
  2. Netty介绍
  3. Netty在Dubbo中的应用

一、Java NIO

NIO(New IO),和Linux提到的NIO(Non-Blocking IO)并不是一个概念。它主要是利用了IO多路复用的机制。

关于IO模型的知识,之前已经在另外一篇文章中写了: 并发编程之I/O多路复用及几种I/O模式 。通过IO多路复用可以实现Socket的非阻塞操作。后续的Netty也是基于IO多路复用模型实现的。

而IO多路复用机制就是依赖于事件多路分解机制实现的。我们都知道一个完整I/O操作主要有两步:

1、等待数据准备好(写到内核空间);

2、从内科空间拷贝到用户空间;

那么整个NIO主要有几个事件:读就绪、写就绪、有新请求。

事件分发就是在合适的时机,将准备好的读写事件分发给已经注册的事件处理器上。

多路复用就是利用一个Selector不段地去选择一个就绪的事件,并执行。在整个运行期间,选择哪个就绪事件这个函数是阻塞的。

目前比较著名的事件分发模式是Reactor和Proactor。Java的NIO就是基于Reactor模型。

在说Reactor之前,说下NIO的基本概念和核心。

NIO最重要的三大部分:Buffer,Channel,Selector。

Buffer:

在NIO编程中,数据的读写全部都是在Buffer中进行的,它是内存中的一块儿缓冲区,被Java封装成了一个Bugger对象,内部是个数组,属性包括容量,上限,标记,位置等等。配合Channel实现了面向缓冲区的双向处理方式。相比于流处理,因为它是面向整个缓冲区的,块处理更能反映底层操作系统的真实情况,且其中的直接缓冲区还可以减少一次拷贝。此外,Buffer本身提供了非常丰富的方法,如清楚,标记,压缩等等。

Channel:

从单词可以看到,这是一个通道,网络数据的读写都要通过Channel,它是一个双向的通道,两个方向都可以读写,且是全双工的,两个方向可以同时读写。

Java中的Channel对象主要分为两大类,一是用于网络数据操作的SelectableChannel和文件操作的FileChannel。

Selector:

NIO操作是基于IO多路复用的,因此Selector是很重要的一个组件。一个Selector会不断地轮询已经注册的Channel事件,当某个Channel有已经就绪的IO事件,就会进行事件分发。从而实现非阻塞IO。

看下一个基本NIO操作的时序图(来自Netty权威指南):

接下来说下Reactor。

Reactor的基本思想就是不断地监听事件,并将准备好的事件分发给对应的事件处理器。它主要由两个部分组成:Reactor和Handlers。

Reactor:分发合适的handler;

Handlers:执行非阻塞操作。

Reactor主要分3种:单线程Reactor,多线程Reactor以及Worker线程Reactor。

1、单线程

Reactor对象通过select不断监听事件,收到事件后会进行dispatch分发,当有请求过来会分给acceptor并创建handler做后续处理;当不是新请求,就直接分给对应的Hander进行处理。

目前Redis采用的是该模式。该模式的缺点就是不能充分利用多核CPU,因为它是单线程的。

Redis服务器中有两类事件,文件事件和时间事件。

  • 文件事件(file event):Redis客户端通过socket与Redis服务器连接,而文件事件就是服务器对套接字操作的抽象。例如,客户端发了一个GET命令请求,对于Redis服务器来说就是一个文件事件。Redis的协议本身发送的是文本。
  • 时间事件(time event):服务器定时或周期性执行的事件。例如,定期执行RDB持久化。

不过要补充一点,Redis4.0开始也引入了多线程。主要是为了解决一些耗时较长的一些操作,从而提高性能。比如删除大key啊。涉及到的命令有Unlink,FlushAll Async,FlushDB Async.从名字也可以看到,这些命令在执行时,会开启一个异步线程去做处理,而不会长时间阻塞到当前线程。

此外Nginx采用的是多进程+Reactor单线程的方式来处理的,它自己实现了一个Master进程+多个Master进程,从而实现并发。

很多网上的都说Nginx是多Reactor多线程的模式,我也不知道从哪里得来的,Nginx的多进程和多Reactor不是一个概念。因为主进程不会accept连接,而是由子进程的reactor来accept连接,通过锁来控制一次只有一个子进程accept,子进程accept成功后就放到自己的reactor进行处理。Nginx的Master是为了管理这些Worker的。它只是将单Reactor单线程,fork了多个进程而已。

2、多线程

多线程版本和单线程的区别是将实际上做处理业务的非IO操作的业务,如编解码,计算等,放到其他线程处理,这样能充分发挥多核的作用。

虽然多线程版本能发挥多核的作用,但Socket操作不是线程安全的,比如读Socket,因此一般情况下单此按成就可满足了,如果想要多线程,可考虑另一种模式:Proactor

3、多Reactor多线程

用负载均衡的思想去匹配CPU和I/O的速率,

更详细的可见: Scalable IO in Java

二、Netty介绍

由于Java的原生NIO并不友好,需要开发者做很多额外工作,且容易出bug,因此应运而生了一个Netty,它完美解决了Java原生NIO的问题。

我就直接把权威指南关于Netty的优势粘贴过来把。

Netty可以配置使用上述三种Reactor模型中的任何一种。

Netty使用的是Reactor模型,该模型底层是基于IO多路复用的epoll机制,而该机制实现了同步非阻塞的。但是Netty框架实现了异步IO,说异步主要是针对用户来说的,它的异步是通过框架层面实现的,而不是底层IO的读写操作本身。因为Netty中所有的I/O操作都是异步的,所有的调用都会立即返回一个ChannelFuture结果,该结果不一定成功,调用者可以通过该对象获取状态。比如IO操作刚开始时,会新建一个ChannelFuture对象,当完成时,对象状态是已完成。ChannelFuture继承自并发包的Future,该类在线程池的章节有过介绍。

所以,这里请注意,Netty所说的异步和底层采用的epoll的同步非阻塞并不冲突。

常见操作方法:

  • isDone  判断当前操作是否完成。
  • isSuccess 判断已完成的当前操作是否成功。
  • getCause  获取已完成的当前操作失败原因。
  • isCancelled 判断已完成的当前操作是否取消。
  • addListener 注册监听器,当操作已完成(isDone 方法返回true),将会通知指定的监听器。

下面是一张Netty的业务架构图(来自网络):

看到上面的结构,也可以看出它就是多Reactor多线程模型的变种。当客户端请求到达时,Boss Group主要用来accept请求,并进行请求分发(注册Channel到WorkGroup的NIOEventGroup);WorkGroup用来进行I/O事件的处理,它类似一个线程池。

Boss Group和WorkGroup本质都是NioEventLoopGroup,它内部维护了一个线程池,由多个EventLoop组成。而EventLoop除了IO处理外,还负责定时任务以及系统的Task。

Netty解决了原生NIO导致的epoll bug(出现空查询,死循环)。基本思想时会对空查询进行计数,如果超过了N次,即产生了epollBug,Netty会重新建立Selector。

public final class NioEventLoop extends SingleThreadEventLoop {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);
    private static final int CLEANUP_INTERVAL = 256;
    private static final boolean DISABLE_KEYSET_OPTIMIZATION = SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
    private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
    private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
    private final IntSupplier selectNowSupplier = new IntSupplier() {
        public int get() throws Exception {
            return NioEventLoop.this.selectNow();
        }
    };
    private Selector selector;    //Selector选择器
    private Selector unwrappedSelector;
    private SelectedSelectionKeySet selectedKeys;    //key
    private final SelectorProvider provider;
    private final AtomicBoolean wakenUp = new AtomicBoolean();
    private final SelectStrategy selectStrategy;
    private volatile int ioRatio = 50;
    private int cancelledKeys;
    private boolean needsToSelectAgain;

 
 }

好,到目前为止,上述所述内容都属于通信调度层,有Reactor,Channel,NIOEventLoop等等,负责请求的建立,调度,该层将数据写到内存缓冲区。此后会将后续的读,写等事件触发到PipeLine,后续操作由PipeLine完成。

一个Channel对应一个ChannelPipeLine,而ChannelPipeLine内部又维护了一个双向链表,存储了一系列的Handler。ChannelPipe的作用就是保证有序得执行入站和出站事件,可以选择哪些事件进行拦截或处理。在一系列Handler处理过程中,在任何一个节点都可能拦截或结束。

Channel在上面也提到过,它是进行网络数据读写的I/O接口,Netty并没有直接使用NIO的Channel,而是自己又实现了Channel,因为它要自实现一些具体功能,比如关联pipeline等,即其具备的功能更全。最重要的两个子类:NioSocketChannel和NioServerSocketChannel。

看一下AbstractChannel的属性,有id,有pipeline,有eventloop(选择器),还有Unsafe。Unsafe类是真正操作IO操作的对象。

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
  
    private final Channel parent;
    private final ChannelId id;
    private final Unsafe unsafe;
    private final DefaultChannelPipeline pipeline;
    private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
    private final AbstractChannel.CloseFuture closeFuture = new AbstractChannel.CloseFuture(this);
    private volatile SocketAddress localAddress;
    private volatile SocketAddress remoteAddress;
    private volatile EventLoop eventLoop;
    private volatile boolean registered;
    private boolean closeInitiated;
    private boolean strValActive;
    private String strVal;

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        this.id = this.newId();
        this.unsafe = this.newUnsafe();
        this.pipeline = this.newChannelPipeline();
    }

}

每一个Channel都具备的方法,从方法名都可以看出其含义来:

 public boolean isRegistered() {
        return this.registered;
    }

    
    public ChannelFuture bind(SocketAddress localAddress) {
        return this.pipeline.bind(localAddress);
    }

    public ChannelFuture connect(SocketAddress remoteAddress) {
        return this.pipeline.connect(remoteAddress);
    }

    public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
        return this.pipeline.connect(remoteAddress, localAddress);
    }

    public ChannelFuture disconnect() {
        return this.pipeline.disconnect();
    }

    public ChannelFuture close() {
        return this.pipeline.close();
    }

    public ChannelFuture deregister() {
        return this.pipeline.deregister();
    }

    public Channel flush() {
        this.pipeline.flush();
        return this;
    }

    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return this.pipeline.bind(localAddress, promise);
    }

    public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
        return this.pipeline.connect(remoteAddress, promise);
    }

    public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        return this.pipeline.connect(remoteAddress, localAddress, promise);
    }

    public ChannelFuture disconnect(ChannelPromise promise) {
        return this.pipeline.disconnect(promise);
    }

    public ChannelFuture close(ChannelPromise promise) {
        return this.pipeline.close(promise);
    }

    public ChannelFuture deregister(ChannelPromise promise) {
        return this.pipeline.deregister(promise);
    }

其实在Channel中一个重要的问题就是并发引起的线程安全,比如注册Channel到EventLoop时,不是当前线程发起的,而是其他线程发起的,那么就可能引起问题。针对这个问题,Nety会将其封装成一个Task放到消息队列中,由IO线程完成。比如注册代码:

  public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            } else if (AbstractChannel.this.isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
            } else if (!AbstractChannel.this.isCompatible(eventLoop)) {
                promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
            } else {
                AbstractChannel.this.eventLoop = eventLoop;
                //如果是当前线程,直接注册
                if (eventLoop.inEventLoop()) {
                    this.register0(promise);
                   //如果不是当前线程,封装成Task
                } else {
                    try {
                        eventLoop.execute(new Runnable() {
                            public void run() {
                                AbstractUnsafe.this.register0(promise);
                            }
                        });
                    } catch (Throwable var4) {
                        AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);
                        this.closeForcibly();
                        AbstractChannel.this.closeFuture.setClosed();
                        this.safeSetFailure(promise, var4);
                    }
                }

            }
        }

ChannelHandler用来处理或者拦截IO操作,比如编解码或者其他自定义的操作。看例子:


    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
        pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); //解码,将ByteBuf二进制数据解码成字符串并向后传播
        pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));  //内置编码
        pipeline.addLast("handler", new NettyServerHandler());     //自定义
    }

//自定义的Handler

@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object s) throws Exception {
        log.info("netty server 成功接收来自:{}的消息:{}",channelHandlerContext.channel().remoteAddress(),s);
        channelHandlerContext.writeAndFlush("nettyServer已经成功接收到您的消息!\n");
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("客户端[ip:" + ctx.channel().remoteAddress() + "]连接");
        ctx.writeAndFlush("CONNECT SUCCESS\n");
        super.channelActive(ctx);
    }

    /**
     * 异常
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        try {
            log.error("Netty服务出现异常", cause);
            // 当出现异常就关闭连接
            ctx.close().sync();
        } catch (Exception e) {
            log.error("Netty服务出现异常", e);
        }

    }

}

ChannelHandler主要分为入站和出站。

  • ChannelInboundHandler 用于处理入站 I/O 事件。
  • ChannelOutboundHandler 用于处理出站 I/O 操作。

对应的Adapter:

  • ChannelInboundHandlerAdapter 用于处理入站 I/O 事件。
  • ChannelOutboundHandlerAdapter 用于处理出站 I/O 操作。
  • ChannelDuplexHandler 用于处理入站和出站事件。

这里我们说下ChannelHandler的安全性。假如一个Handler被注册到不同的线程中,如果多个线程都要执行该Handler势必会出现线程安全额问题。针对这个问题,Netty的做法是检查这个Handler是否Sharable注解的,只有被Sharable注解的才可能加到不同的Channel中。

此外,对于读写操作。Netty4只有一个NioEventLoop线程来处理这个操作,业务耗时ChannelHandler被I/O线程串行执行,所以执行效率低。Netty3在消息发送线程模型上,充分利用业务线程的并行编码和ChanelHandler的优势,在一个周期T内可以处理N条业务消息。

下面时ChannelPipeLine的处理流程:

其中入站事件一般由IO线程触发,比如TCP链路建立,读事件,异常通知等。出站事件一般由用户主动发起,发送消息等。

我们建立一个Netty Server的基本代码:

@Component
@Slf4j
public class YPNettyServer {

    private EventLoopGroup boss = new NioEventLoopGroup();

    private EventLoopGroup work = new NioEventLoopGroup();

    /**
     * 最好不要用注解启动,否则会阻塞,影响springboot server
     *
     * @throws InterruptedException
     */
//    @PostConstruct
    public void start() throws InterruptedException {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(boss,work)
                .channel(NioServerSocketChannel.class)
                .localAddress(new InetSocketAddress(6666))
                .option(ChannelOption.SO_BACKLOG,1024)
                .childOption(ChannelOption.SO_KEEPALIVE,true)
                .childOption(ChannelOption.TCP_NODELAY,true)
                .childHandler(new NettyServerInitializer());

        try {
            ChannelFuture channelFuture = bootstrap.bind(6666).sync();
            log.info("nettyServer开始监听接口:6666");
            channelFuture.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }
}

NIOEventLoopGroup

三、Netty在Dubbo中的实践

Dubbo官方文档介绍得非常详细: Dubbo服务调用过程

在服务端,我们通常使用Service注解来暴露dubbo服务,在springboot启动时,会创建一个ServiceBean。

该类继承自InitializingBean,ApplicationListener等。

该类中的onApplicationEvent会在启动过程中被调用,会调用export方法,该方法会调用DubboProtocol的openServer,接着调用createServer,该方法中会利用Dubbo的SPI机制,调用NettyServer.关于DubboSPI的介绍文章: Dubbo源码之SPI机制

  private ExchangeServer createServer(URL url) {
        url = url.addParameterIfAbsent("channel.readonly.sent", Boolean.TRUE.toString());
        url = url.addParameterIfAbsent("heartbeat", String.valueOf(60000));
        String str = url.getParameter("server", "netty");
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        } else {
            url = url.addParameter("codec", "dubbo");

            ExchangeServer server;
            try {
                server = Exchangers.bind(url, this.requestHandler);
            } catch (RemotingException var5) {
                throw new RpcException("Fail to start server(url: " + url + ") " + var5.getMessage(), var5);
            }

            str = url.getParameter("client");
            if (str != null && str.length() > 0) {
                Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
                if (!supportedTypes.contains(str)) {
                    throw new RpcException("Unsupported client type: " + str);
                }
            }

            return server;
        }
    }

最终会调用NettyServer的doOpen.

protected void doOpen() throws Throwable {
        this.bootstrap = new ServerBootstrap();
        this.bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        this.workerGroup = new NioEventLoopGroup(this.getUrl().getPositiveParameter("iothreads", Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true));
        final NettyServerHandler nettyServerHandler = new NettyServerHandler(this.getUrl(), this);
        this.channels = nettyServerHandler.getChannels();
        ((ServerBootstrap)this.bootstrap.group(this.bossGroup, this.workerGroup).channel(NioServerSocketChannel.class)).childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE).childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childHandler(new ChannelInitializer<NioSocketChannel>() {
            protected void initChannel(NioSocketChannel ch) throws Exception {
                NettyCodecAdapter adapter = new NettyCodecAdapter(NettyServer.this.getCodec(), NettyServer.this.getUrl(), NettyServer.this);
                ch.pipeline().addLast("decoder", adapter.getDecoder()).addLast("encoder", adapter.getEncoder()).addLast("handler", nettyServerHandler);
            }
        });
        ChannelFuture channelFuture = this.bootstrap.bind(this.getBindAddress());
        channelFuture.syncUninterruptibly();
        this.channel = channelFuture.channel();
    }

这个方法和我在上面建立的简单的Netty服务器区别不大,还是很简单的。

Dubbo采用的是一个主Reactor,多个从Reactor多线程模型。

当服务端收到客户端的请求时,首先Dubbo首先解码数据,解码过程不说了,可以参考官方文档,解码后会封装一个Request对象,然后将请求发送至DisPatcher分发调度器,然后分发器会将请求分发至线程池,线程池负责执行具体任务。

Netty4选择的默认分发策略是all,即将所有消息都分发到业务线程池中执行。

 public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, "DubboServerHandler")));
    }


 protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(((Dispatcher)ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension()).dispatch(handler, url)));
    }

除了上面所说的Handler,还有一个DecodeHandler.

在上面的createServer代码,还有一句:

server = Exchangers.bind(url, this.requestHandler);
 public static Exchanger getExchanger(String type) {
        return (Exchanger)ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
    }

利用Dubbo SPI机制,加载HeaderExchanger对象。

  public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new ChannelHandler[]{new DecodeHandler(new HeaderExchangeHandler(handler))}));
    }

如果只暴露一个Dubbo服务端口,那么一只有一个NettyHandler。此外设置了用来处理编解码、连接建立和消息读写等的Handler,即MultiMessageHandler,HeartbeatHandler以及分发器实现类,比如AllChannelHandler,还有DecodecHandler。

MultiMessageHandler->HeartbeatHandler->AllChannelHandler->DecodecHandler-> HeaderExchangeHandler。

处理流程图(来自网络):

具体流程:

当解码器将请求封装成Request对象后,NettyServerHandler的channelRead会收到信息;

随后会调用ChannelHandler的recevied方法,依次是:

- MultiMessageHandler.received;主要处理批量消息;

- HeartbeatHandler.received;心跳消息;

- AllChannelHandler.received;Dispatcher分发器

-任务被派发之后,就会具体调用服务了,这里旧不赘述了。

看下AllChannelHandler的received方法:

 public void received(Channel channel, Object message) throws RemotingException {
        //线程池
        ExecutorService cexecutor = this.getExecutorService();

        try {
            cexecutor.execute(new ChannelEventRunnable(channel, this.handler, ChannelState.RECEIVED, message));
        } catch (Throwable var8) {
            if (message instanceof Request && var8 instanceof RejectedExecutionException) {
                Request request = (Request)message;
                if (request.isTwoWay()) {
                    String msg = "Server side(" + this.url.getIp() + "," + this.url.getPort() + ") threadpool is exhausted ,detail msg:" + var8.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus((byte)100);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }

            throw new ExecutionException(message, channel, this.getClass() + " error when process received event .", var8);
        }
    }

其线程池实例也是通过Dubbo SPI机制加载:

 public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        this.executor = (ExecutorService)((ThreadPool)ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension()).getExecutor(url);
        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if ("consumer".equalsIgnoreCase(url.getParameter("side"))) {
            componentKey = "consumer";
        }

        DataStore dataStore = (DataStore)ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), this.executor);
    }

Dubbo很好地融合了Netty,选择了多Reactor多线程的模型,充分提高了并发性能。看了里面的代码,觉得Dubbo真的很不错。接下来要好好学习一下Dubbo。

参考文章:

--------EOF---------
微信分享/微信扫码阅读