4.2.3 Java NIO

1.简介

自从J2SE 1.4版本以来,JDK发布了全新的I/O类库,简称NIO(New IO)。它不但引入了全新的高效的I/O机制,同时引入了基于Reactor设计模式的多路复用异步模式。NIO的包中主要包含了以下几种抽象数据类型。

❑Channel(通道):NIO把它支持的I/O对象抽象为Channel。它模拟了通信连接,类似于原I/O中的流(Stream),用户可以通过它读取和写入数据。目前已知的实例类有SocketChannel、ServerSocketChannel、DatagramChannel、FileChannel等。

❑Buffer(缓冲区):Buffer是一块连续的内存区域,一般作为Channel收发数据的载体出现。所有数据都通过Buffer对象来处理。用户永远不会将字节直接写入通道中,相反,需将数据写入包含一个或者多个字节的缓冲区;同样,也不会直接从通道中读取字节,而是将数据从通道读入缓冲区,再从缓冲区获取这个字节。

❑Selector(选择器):Selector类提供了监控一个或多个通道当前状态的机制。只要Channel向Selector注册了某种特定事件,Selector就会监听这些事件是否会发生,一旦发生某个事件,便会通知对应的Channel。使用选择器,借助单一线程,就可对数量庞大的活动I/O通道实施监控和维护,具体如图4-4所示。

4.2.3 Java NIO - 图1

图 4-4 新I/O模型,单个线程阻塞等待客户端请求

2.常用类

(1)Buffer相关类

java. nio包公开了Buffer API,使得Java程序员可以直接控制和运用缓存区。所有缓冲区包含以下3个属性。

❑capacity:缓冲区的末位值。它表明了缓冲区最多可以保存多少数据。

❑limit:表示缓冲区的当前存放数据的终点。不能对超过limit的区域进行读写数据。

❑position:下一个读写单元的位置。每次读写缓冲区时,均会修改该值,为下一次读写数据做准备。

这三个属性的大小关系是capacity≥limit≥position≥0。

如图4-5所示,Buffer有两种不同的工作模式——写模式和读模式。在写模式下,limit与capacity相同,position随着写入数据增加,逐渐增加到limit,因此,0到position之间的数据即为已经写入的数据;在读模式下,limit初始指向position所在位置,position随着数据的读取,逐渐增加到limit,则0到position之间的数据即为已经读取的数据。函数flip()可将写模式转化为读模式,其他常用函数如下。

❑clear():重置Buffer,即将limit设为capacity,而position为0。

❑hasRemaining()/remaining():分别用于判断Buffer是否有剩余空间和获取Buffer剩余空间,其中剩余空间大小即为limit-position。

❑capacity()/limit()/position():分别用于获取Buffer的capacity、limit和position属性的值。

❑limit(int newLimit)/position(newPosition):分别用于设置Buffer的limit和position属性。

4.2.3 Java NIO - 图2

图 4-5 Buffer的写模式和读模式

java. nio.Buffer是一个抽象类,不能被实例化。除boolean类型外,每种基本类型都有对应的具体的Buffer类,其中最基本、最常用的是ByteBuffer。它存放的数据单元是字节。它并没有提供直接的构造函数,而是提供了以下两个静态工厂方法:


//方法1 创建一个Heap Buffer,其空间分配在JVM的堆上,和其他对象一样,由GC回收

static ByteBuffer allocate(int capacity)

/*方法2 创建一个Direct Buffer,并通过底层的JNI调用C Runtime Time的malloc函数分配空

间,可看作“内核空间”。其创建代价比Heap Buffer大,但更高效。*/

static ByteBuffer allocateDirect(int capacity)


每个具体类均提供了一系列读写Buffer的函数,想进一步了解的读者可查看JDK Document[1]

(2)Channel相关类

java. nio提供了多种Channel实现,其中,最常用的是以SelectableChannel为基类的通道。SelectableChannel是一种支持阻塞I/O和非阻塞I/O的通道,它的主要方法如下:

❑SelectableChannel configureBlocking(boolean block)throws IOException。

〇作用:设置当前SelectableChannel的阻塞模式。

〇参数含义:block表示是否将SelectableChannel设置为阻塞模式。

〇返回值:SelectableChannel对象本身的引用,相当于“return this”。

❑SelectionKey register(Selector sel, int ops)throws ClosedChannelException。

〇作用:将当前Channel注册到一个Selector中。

〇参数含义:sel表示要注册的Selector;ops表示注册事件。

〇返回值:与注册Channel关联的SelectionKey对象,用于跟踪被注册事件。

SelectableChannel的两个子类是ServerSocketChannel和SocketChannel,它们分别是ServerSocket和Socket的替代类。

ServerSocketChannel主要用于监听TCP连接,它提供了以下3个最常用的方法。

❑ServerSocketChannel open()throws IOException:用于创建ServerSocketChannel的静态工厂方法。其返回的ServerSocketChannel对象没有与任何本地端口号绑定,处于阻塞状态。

❑SocketChannel accept()throws IOException:接收来自客户端的连接。当ServerSocket-Channel设置为阻塞模式时,该函数一直会处于阻塞状态,直到有客户端请求到达或者抛出异常。一旦有客户端请求出现,则会返回一个处于阻塞模式的SocketChannel。

❑ServerSocket socket():返回一个与ServerSocketChannel关联的ServerSocket对象。注意,每个ServerSocketChannel对象都有一个ServerSocket对象与之关联。

SocketChannel可看作Socket的替代类,但功能比Socket更加强大。同ServerSocket-Channel类似,它提供了静态工厂方法open()(创建对象)和socket()方法(返回与SocketChannel关联的Socket对象)。它的其他常用方法如下。

❑boolean connect(SocketAddress remote)throws IOException:连接Channel对应的Socket。如果SocketChannel处于阻塞模式,则直接返回结果;否则,进入阻塞状态,直到连接成功或者抛出异常。

❑int read(ByteBuffer dst)throws IOException:将当前Channel中的数据读取到ByteBuffer中。在阻塞和非阻塞模式下,该函数实现方式不同:在非阻塞模式下,遵从能读取多少数据就读取多少数据的原则,总是立即返回结果;而在阻塞模式下,将尝试一直读取数据,直到ByteBuffer被填满,到达输入流末尾或者抛出异常。该函数的返回值为实际读取的数据字节数。

❑int write(ByteBuffer src)throws IOException:将ByteBuffer中的数据写入Channel中。与read函数类似,在阻塞模式和非阻塞模式下,该函数实现方式不同:在非阻塞模式下,遵从能输出多少数据就输出多少数据的原则,总是立即返回结果;在阻塞模式下,会尝试着将所有数据写入Channel,如果底层的网络缓冲区容纳不了这么多字节,则会阻塞至可写入所有数据或者抛出异常。

当调用write方法将一个Heap Buffer中的数据写入某个Channel(或者调用read方法将Channel中的数据读入一个Heap Buffer对象)时,Sun Java底层实现中使用了Direct Buffer暂时对数据进行缓冲,大体步骤为:JVM初始创建一个固定大小的Direct Buffer,并将数据写入该Buffer,如果Buffer大小不够,则再创建一个更大的Direct Buffer,并将之前的Direct Buffer中的内容复制到新的Direct Buffer中,依此类推,直到将数据全部写入。很明显,该过程涉及大量内存复制操作,会明显降低性能。此外,由于Direct Buffer所占内存不会被马上释放,因此会造成内存使用骤升。为解决该问题,可将写入的数据分成固定大小(比如8KB)的chunk,并以chunk为单位写入Direct Buffer[2],代码如代码清单4-2所示。

代码清单4-2 将ByteBuffer中的数据以chunk为单位写入Direct Buffer


……

int NIO_BUFFER_LIMIT=8*1024;//chunk大小:8KB

//将Buffer中的数据写入Channel中,其中Channel处于非阻塞模式

int channelWrite(WritableByteChannel channel,

ByteBuffer buffer)throws IOException{

//如果缓冲区中的数据小于8KB,则直接写到Channel中,否则以chunk为单位写入

return(buffer.remaining()<=NIO_BUFFER_LIMIT)?

channel.write(buffer):channelIO(null, channel, buffer);

}

private static int channelIO(ReadableByteChannel readCh,

WritableByteChannel writeCh,

ByteBuffer buf)throws IOException{

int originalLimit=buf.limit();

int initialRemaining=buf.remaining();

int ret=0;

while(buf.remaining()>0){

try{

int ioSize=Math.min(buf.remaining(),NIO_BUFFER_LIMIT);

buf.limit(buf.position()+ioSize);

ret=(readCh==null)?writeCh.write(buf):readCh.read(buf);

//非阻塞模式下,write或者read函数对应的网络缓冲区满后,会直接返回

//返回值为实际写入或者读取的数据

if(ret<ioSize){

break;

}

}finally{

buf.limit(originalLimit);

}

}

int nBytes=initialRemaining-buf.remaining();

return(nBytes>0)?nBytes:ret;

}


图4-6阐释了按照代码清单4-2中的逻辑,ByteBuffer中的数据写入Channel过程中,其内部各个属性的变化情况。

4.2.3 Java NIO - 图3

图 4-6 将ByteBuffer中的数据以chunk为单位写入Channel过程

(3)Selector类

Selector可监听ServerSocketChannel和SocketChannel注册的特定事件,一旦某个事件发生,则会通知对应的Channel。SelectableChannel的register()方法负责注册事件,该方法返回一个SelectionKey对象,该对象即为用于跟踪这些注册事件的句柄。

Selector中常用的方法如下。

❑static Selector open():一个静态工厂方法,可用于创建Selector对象。

❑int select(long timeout):该方法等待并返回发生的事件。一旦某个注册的事件发生,就会返回对应的SelectionKey的数目,否则,一直处于阻塞状态,直到以下四种情况之一发生:至少一个事件发生;其他线程调用了Selector的wakeup()方法;当前执行select()方法的线程被中断;超出等待时间timeout,如果不设置等待时间,则表示永远不会超时。

❑set selectedKeys():Selector捕获的已经发生事件对应的SelectionKey集合。

❑Selector wakeup():立刻唤醒当前处于阻塞状态的Selector。常见应用场景是,线程A调用Selector对象的select()方法,阻塞等待某个注册事件发生,线程B通过调用wakeup()函数可立刻唤醒线程A,使其从select()方法中返回。

(4)SelectionKey类

ServerSocketChannel或SocketChannel通过register()方法向Selector注册事件时,register()方法会创建一个SelectionKey对象,用于跟踪注册事件。在SelectionKey中定义了4种事件,分别用以下4个整型常量表示。

❑SelectionKey. OP_ACCEPT:接收(accept)连接就绪事件,表示服务器端接收到了客户端连接。

❑SelectionKey. OP_CONNECT:连接就绪事件,表示客户端与服务器端的连接已经建立成功。

❑SelectionKey. OP_READ:读就绪事件,表示通道中已经有了可读数据,可执行读操作了。

❑SelectionKey. OP_WRITE:写就绪事件,表示可向通道中写入数据了。

通常而言,ServerSocketChannel对象向Selector中注册SelectionKey.OP_ACCEPT事件,而SocketChannel对象向Selector中注册SelectionKey.OP_CONNECT、SelectionKey.OP_READ和SelectionKey.OP_WRITE三种事件。

SelectionKey类中比较重要的方法如下。

❑Object attach(Object ob):为当前SelectionKey关联一个Object类型的对象。每个SelectionKey只能关联一个对象。

❑Object attachment():获取当前SelectionKey关联的Object对象。

❑SelectableChannel channel():返回与当前SelectionKey关联的SelectableChannel对象。

(5)应用实例——echoServer

echoServer是一个采用NIO编写的服务器。它接收客户端发送过来的字符串,不经任何处理直接再次返回给客户端。

echoServer由五个程序段组成,分别如下。

1)初始化:


public Selector initSelector()throws IOException{

address=new InetSocketAddress(bindAddress, port);

//创建一个ServerSocketChannel,并将之设为非阻塞模式

acceptChannel=ServerSocketChannel.open();

acceptChannel.configureBlocking(false);

//将Server Socket绑定到address地址上,并设置监听队列长度

acceptChannel.socket().bind(address, backlogLength);

Selector selector=Selector.open();

//向Selector注册ServerSocketChannel,注册事件为SelectionKey.OP_ACCEPT

acceptChannel.register(selector, SelectionKey.OP_ACCEPT);

return selector;

}


2)监听客户端事件,并调用对应事件处理函数:


public void run(){

while(true){

SelectionKey key=null;

try{

selector.select();//处于阻塞状态,直到有新事件发生

Iterator<SelectionKey>iter=selector.selectedKeys().iterator();

while(iter.hasNext()){

key=iter.next();

iter.remove();

if(!key.isValid()){

continue;

}

if(key.isAcceptable()){

doAccept(key);//新客户端要求建立连接

}else if(key.isReadable()){

receive(key);//从客户端接收数据

}else if(key.isWritable()){

send(key);//向客户端发送数据

}

key=null;

}

}catch(Exception e){}

}

}


3)接受新客户端连接请求:


private void doAccept(SelectionKey key)throws IOException{

ServerSocketChannel serverSocketChannel=(ServerSocketChannel)key.channel();

//接受连接请求,并将之设置为非阻塞模式

SocketChannel socketChannel=serverSocketChannel.accept();

socketChannel.configureBlocking(false);

//将新的SocketChannel注册到Selector中,一旦有需要读或者写的数据,就会通知相应的程序

SelectionKey clientKey=

socketChannel.register(this.selector, SelectionKey.OP_READ);

ByteBuffer buffer=ByteBuffer.allocate(8192);

clientKey.attach(buffer);//关联一个缓冲区

}


4)处理读数据请求:


private void receive(SelectionKey key)throws IOException{

SocketChannel socketChannel=(SocketChannel)key.channel();

ByteBuffer readBuffer=(ByteBuffer)key.attachment();

socketChannel.read(readBuffer);

if(numRead>0){

readBuffer.flip();

key.interestOps(SelectionKey.OP_WRITE);//切换至OP_WRITE

}

}


5)处理写数据请求:


private void send(SelectionKey key)throws IOException{

SocketChannel socketChannel=(SocketChannel)key.channel();

ByteBuffer writeBuffer=(ByteBuffer)key.attachment();

socketChannel.write(writeBuffer);

if(writeBuffer.remaining()==0){//写完成后,切换至SelectionKey.OP_READ

writeBuffer.clear();

key.interestOps(SelectionKey.OP_READ);

}

}


[1]http://docs. oracle.com/javase/1.4.2/docs/guide/nio/

[2]可参考https://issues.apache.org/jira/browse/HADOOP-4797下的相关文档。