OIO与NIO
标签:Java基础

OIO

对于普通的Socket:

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author liuyao
 * @date 2018/07/31
 */
public class OioServerThreadPool {
    public static void main(String[] args) throws IOException {
        ExecutorService executorService= Executors.newCachedThreadPool();
        ServerSocket server=new ServerSocket(10000);
        System.out.println("Server Started");
        while (true){
//            在accpet会阻塞
            Socket socket=server.accept();
            System.out.println("Get a socket");
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    handler(socket);
                }
            });
        }
    }

    private static void handler(Socket socket) {
        try {
            byte[] bytes=new byte[1024];
            InputStream inputStream=socket.getInputStream();
            while (true){
//                在read方法也会阻塞
                int read=inputStream.read(bytes);
                if (read != -1){
                    System.out.println(new String(bytes,0,read));
                }else {
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                socket.close();
                System.out.println("Socket Close");
            } catch (IOException e) {
                e.printStackTrace();
            }

        }
    }
}

上诉方法会在等待连接和等待输入的时候会发生阻塞,我们可以采用线程池的方法解决,但是一个新的连接来,就产生一个线程,势必会浪费资源。

NIO

NIO的一个关键组件就是Channel(通道),Channel有点类似于流,一个Channel可以和文件或者网络Socket对应,如果Channel对应着一个Socket,那么往这个Socket里面写数据就等同于向Socket里面写数据。

和Channel一起使用的另外一个组件就是Buffer,可以简单的把Buffer理解成一个内存区域或者Byte数组。数组需要包装成Buffer的形式才能和Channel进行交互(写入或者读出)。

另外一个和Channel密切相关的时Selector(选择器),在Channel的众多实现中,有个SelectableChannel实现,表示可被选择的通道。任何一个SelectableChannel都可以将自己注册到一个Selector中。这样Channel就可以被Selector管理,而一个Channel可以管理多个SelectableChannel。当SelectableChannel的数据准备好时,Selector就会接到通知,得到那些已经准备好的数据。而SocketChannel就是SelectableChannel的一种。

726kAf.png

可以看到一个Selector由一个一个线程管理,一个SelectableChannel可以表示一个客户端连接,这样就构成了由一个或少数线程来处理大量客户端连接的结构。当没有客户端准备好的时候,Selector就处于等待状态,一旦有一个客户端准备好了,Selector就能立即得到通知,进行数据处理。

package com.liuyao.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/**
 * @author liuyao
 * @date 2018/07/31
 */

public class NIOServer {
    // 通道管理器
    private Selector selector;

    /**
     * 获得一个ServerSocket通道,并对该通道做一些初始化的工作
     *
     * @param port 绑定的端口号
     * @throws IOException
     */
    public void initServer(int port) throws IOException {
        // 获得一个ServerSocket通道
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 设置通道为非阻塞
        serverChannel.configureBlocking(false);
        // 将该通道对应的ServerSocket绑定到port端口
        serverChannel.socket().bind(new InetSocketAddress(port));
        // 获得一个通道管理器
        this.selector = Selector.open();
        // 将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后,
        // 当该事件到达时,selector.select()会返回,如果该事件没到达selector.select()会一直阻塞。
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    }

    /**
     * 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理
     *
     * @throws IOException
     */
    public void listen() throws IOException {
        System.out.println("服务端启动成功!");
        // 轮询访问selector
        while (true) {
            // 当注册的事件到达时,方法返回;否则,该方法会一直阻塞
            selector.select();
            // 获得selector中选中的项的迭代器,选中的项为注册的事件
            Iterator<?> ite = this.selector.selectedKeys().iterator();
            while (ite.hasNext()) {
                SelectionKey key = (SelectionKey) ite.next();
                // 删除已选的key,以防重复处理
                ite.remove();

                handler(key);
            }
        }
    }

    /**
     * 处理请求
     *
     * @param key
     * @throws IOException
     */
    public void handler(SelectionKey key) throws IOException {

        // 客户端请求连接事件
        if (key.isAcceptable()) {
            handlerAccept(key);
            // 获得了可读的事件
        } else if (key.isReadable()) {
            handelerRead(key);
        }
    }

    /**
     * 处理连接请求
     *
     * @param key
     * @throws IOException
     */
    public void handlerAccept(SelectionKey key) throws IOException {
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        // 获得和客户端连接的通道
        SocketChannel channel = server.accept();
        // 设置成非阻塞
        channel.configureBlocking(false);

        // 在这里可以给客户端发送信息哦
        System.out.println("新的客户端连接");
        // 在和客户端连接成功之后,为了可以接收到客户端的信息,需要给通道设置读的权限。
        channel.register(this.selector, SelectionKey.OP_READ);
    }

    /**
     * 处理读的事件
     *
     * @param key
     * @throws IOException
     */
    public void handelerRead(SelectionKey key) throws IOException {
        // 服务器可读取消息:得到事件发生的Socket通道
        SocketChannel channel = (SocketChannel) key.channel();
        // 创建读取的缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int read = channel.read(buffer);
        if (read > 0) {
            byte[] data = buffer.array();
            String msg = new String(data).trim();
            System.out.println("服务端收到信息:" + msg);

            //回写数据
            ByteBuffer outBuffer = ByteBuffer.wrap("好的".getBytes());
            channel.write(outBuffer);// 将消息回送给客户端
        } else {
            System.out.println("客户端关闭");
            key.cancel();
        }
    }

    /**
     * 启动服务端测试
     *
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        NIOServer server = new NIOServer();
        server.initServer(10000);
        server.listen();
    }

}

下面是上面类的内部的函数调用关系图:

一些疑问:

1、客户端关闭的时候会抛出异常,死循环
解决方案:

int read = channel.read(buffer);
		if(read > 0){
			byte[] data = buffer.array();
			String msg = new String(data).trim();
			System.out.println("服务端收到信息:" + msg);
			//回写数据
			ByteBuffer outBuffer = ByteBuffer.wrap("好的".getBytes());
			channel.write(outBuffer);// 将消息回送给客户端
		}else{
			System.out.println("客户端关闭");
			key.cancel();
		}

2、selector.select();阻塞,那为什么说nio是非阻塞的IO?

selector.select()
selector.select(1000);不阻塞
selector.wakeup();也可以唤醒selector
selector.selectNow();也可以立马返还

3、SelectionKey.OP_WRITE是代表什么意思

OP_WRITE表示底层缓冲区是否有空间,是则响应返还true

同步与异步

同步

就是一个任务A的完成需要依赖另外一个任务B时,只有等待被依赖的任务B完成后,源任务A才能才能完成,这是一种可靠的任务序列,要成功都成功,要失败都失败。两个任务状态保存一致。

异步

异步不需要等待被依赖B的任务完成,只是通知被依赖B的任务要完成什么工作,源任务A继续执行,只要自己完成了,任务就算完成了。至于被依赖任务B最终什么时候完成,源任务A也无法确定,所以他是不可靠的任务序列。

阻塞与非阻塞

阻塞与非阻塞主要是从CPU的消耗上来说的

阻塞

阻塞就是CPU停下来等待一个慢的操作完成以后,CPU才接着完成其他的工作。

非阻塞

非阻塞就是在慢的操作执行时,CPU去做其他工作,等这个慢的操作完成时,CPU再接着完成后续的操作。

非阻塞会存在多个线程之间的切换,所需的时间与等待阻塞时间需要做一个衡量。

  • 7 min read

CONTRIBUTORS


  • 7 min read