4.2.3 Java NIO
1.简介
自从J2SE 1.4版本以来,JDK发布了全新的I/O类库,简称NIO(New IO)。它不但引入了全新的高效的I/O机制,同时引入了基于Reactor设计模式的多路复用异步模式。NIO的包中主要包含了以下几种抽象数据类型。
❑Channel(通道):NIO把它支持的I/O对象抽象为Channel。它模拟了通信连接,类似于原I/O中的流(Stream),用户可以通过它读取和写入数据。目前已知的实例类有SocketChannel、ServerSocketChannel、DatagramChannel、FileChannel等。
❑Buffer(缓冲区):Buffer是一块连续的内存区域,一般作为Channel收发数据的载体出现。所有数据都通过Buffer对象来处理。用户永远不会将字节直接写入通道中,相反,需将数据写入包含一个或者多个字节的缓冲区;同样,也不会直接从通道中读取字节,而是将数据从通道读入缓冲区,再从缓冲区获取这个字节。
❑Selector(选择器):Selector类提供了监控一个或多个通道当前状态的机制。只要Channel向Selector注册了某种特定事件,Selector就会监听这些事件是否会发生,一旦发生某个事件,便会通知对应的Channel。使用选择器,借助单一线程,就可对数量庞大的活动I/O通道实施监控和维护,具体如图4-4所示。
图 4-4 新I/O模型,单个线程阻塞等待客户端请求
2.常用类
(1)Buffer相关类
java. nio包公开了Buffer API,使得Java程序员可以直接控制和运用缓存区。所有缓冲区包含以下3个属性。
❑capacity:缓冲区的末位值。它表明了缓冲区最多可以保存多少数据。
❑limit:表示缓冲区的当前存放数据的终点。不能对超过limit的区域进行读写数据。
❑position:下一个读写单元的位置。每次读写缓冲区时,均会修改该值,为下一次读写数据做准备。
这三个属性的大小关系是capacity≥limit≥position≥0。
如图4-5所示,Buffer有两种不同的工作模式——写模式和读模式。在写模式下,limit与capacity相同,position随着写入数据增加,逐渐增加到limit,因此,0到position之间的数据即为已经写入的数据;在读模式下,limit初始指向position所在位置,position随着数据的读取,逐渐增加到limit,则0到position之间的数据即为已经读取的数据。函数flip()可将写模式转化为读模式,其他常用函数如下。
❑clear():重置Buffer,即将limit设为capacity,而position为0。
❑hasRemaining()/remaining():分别用于判断Buffer是否有剩余空间和获取Buffer剩余空间,其中剩余空间大小即为limit-position。
❑capacity()/limit()/position():分别用于获取Buffer的capacity、limit和position属性的值。
❑limit(int newLimit)/position(newPosition):分别用于设置Buffer的limit和position属性。
图 4-5 Buffer的写模式和读模式
java. nio.Buffer是一个抽象类,不能被实例化。除boolean类型外,每种基本类型都有对应的具体的Buffer类,其中最基本、最常用的是ByteBuffer。它存放的数据单元是字节。它并没有提供直接的构造函数,而是提供了以下两个静态工厂方法:
//方法1 创建一个Heap Buffer,其空间分配在JVM的堆上,和其他对象一样,由GC回收
static ByteBuffer allocate(int capacity)
/*方法2 创建一个Direct Buffer,并通过底层的JNI调用C Runtime Time的malloc函数分配空
间,可看作“内核空间”。其创建代价比Heap Buffer大,但更高效。*/
static ByteBuffer allocateDirect(int capacity)
每个具体类均提供了一系列读写Buffer的函数,想进一步了解的读者可查看JDK Document[1]。
(2)Channel相关类
java. nio提供了多种Channel实现,其中,最常用的是以SelectableChannel为基类的通道。SelectableChannel是一种支持阻塞I/O和非阻塞I/O的通道,它的主要方法如下:
❑SelectableChannel configureBlocking(boolean block)throws IOException。
〇作用:设置当前SelectableChannel的阻塞模式。
〇参数含义:block表示是否将SelectableChannel设置为阻塞模式。
〇返回值:SelectableChannel对象本身的引用,相当于“return this”。
❑SelectionKey register(Selector sel, int ops)throws ClosedChannelException。
〇作用:将当前Channel注册到一个Selector中。
〇参数含义:sel表示要注册的Selector;ops表示注册事件。
〇返回值:与注册Channel关联的SelectionKey对象,用于跟踪被注册事件。
SelectableChannel的两个子类是ServerSocketChannel和SocketChannel,它们分别是ServerSocket和Socket的替代类。
ServerSocketChannel主要用于监听TCP连接,它提供了以下3个最常用的方法。
❑ServerSocketChannel open()throws IOException:用于创建ServerSocketChannel的静态工厂方法。其返回的ServerSocketChannel对象没有与任何本地端口号绑定,处于阻塞状态。
❑SocketChannel accept()throws IOException:接收来自客户端的连接。当ServerSocket-Channel设置为阻塞模式时,该函数一直会处于阻塞状态,直到有客户端请求到达或者抛出异常。一旦有客户端请求出现,则会返回一个处于阻塞模式的SocketChannel。
❑ServerSocket socket():返回一个与ServerSocketChannel关联的ServerSocket对象。注意,每个ServerSocketChannel对象都有一个ServerSocket对象与之关联。
SocketChannel可看作Socket的替代类,但功能比Socket更加强大。同ServerSocket-Channel类似,它提供了静态工厂方法open()(创建对象)和socket()方法(返回与SocketChannel关联的Socket对象)。它的其他常用方法如下。
❑boolean connect(SocketAddress remote)throws IOException:连接Channel对应的Socket。如果SocketChannel处于阻塞模式,则直接返回结果;否则,进入阻塞状态,直到连接成功或者抛出异常。
❑int read(ByteBuffer dst)throws IOException:将当前Channel中的数据读取到ByteBuffer中。在阻塞和非阻塞模式下,该函数实现方式不同:在非阻塞模式下,遵从能读取多少数据就读取多少数据的原则,总是立即返回结果;而在阻塞模式下,将尝试一直读取数据,直到ByteBuffer被填满,到达输入流末尾或者抛出异常。该函数的返回值为实际读取的数据字节数。
❑int write(ByteBuffer src)throws IOException:将ByteBuffer中的数据写入Channel中。与read函数类似,在阻塞模式和非阻塞模式下,该函数实现方式不同:在非阻塞模式下,遵从能输出多少数据就输出多少数据的原则,总是立即返回结果;在阻塞模式下,会尝试着将所有数据写入Channel,如果底层的网络缓冲区容纳不了这么多字节,则会阻塞至可写入所有数据或者抛出异常。
当调用write方法将一个Heap Buffer中的数据写入某个Channel(或者调用read方法将Channel中的数据读入一个Heap Buffer对象)时,Sun Java底层实现中使用了Direct Buffer暂时对数据进行缓冲,大体步骤为:JVM初始创建一个固定大小的Direct Buffer,并将数据写入该Buffer,如果Buffer大小不够,则再创建一个更大的Direct Buffer,并将之前的Direct Buffer中的内容复制到新的Direct Buffer中,依此类推,直到将数据全部写入。很明显,该过程涉及大量内存复制操作,会明显降低性能。此外,由于Direct Buffer所占内存不会被马上释放,因此会造成内存使用骤升。为解决该问题,可将写入的数据分成固定大小(比如8KB)的chunk,并以chunk为单位写入Direct Buffer[2],代码如代码清单4-2所示。
代码清单4-2 将ByteBuffer中的数据以chunk为单位写入Direct Buffer
……
int NIO_BUFFER_LIMIT=8*1024;//chunk大小:8KB
//将Buffer中的数据写入Channel中,其中Channel处于非阻塞模式
int channelWrite(WritableByteChannel channel,
ByteBuffer buffer)throws IOException{
//如果缓冲区中的数据小于8KB,则直接写到Channel中,否则以chunk为单位写入
return(buffer.remaining()<=NIO_BUFFER_LIMIT)?
channel.write(buffer):channelIO(null, channel, buffer);
}
private static int channelIO(ReadableByteChannel readCh,
WritableByteChannel writeCh,
ByteBuffer buf)throws IOException{
int originalLimit=buf.limit();
int initialRemaining=buf.remaining();
int ret=0;
while(buf.remaining()>0){
try{
int ioSize=Math.min(buf.remaining(),NIO_BUFFER_LIMIT);
buf.limit(buf.position()+ioSize);
ret=(readCh==null)?writeCh.write(buf):readCh.read(buf);
//非阻塞模式下,write或者read函数对应的网络缓冲区满后,会直接返回
//返回值为实际写入或者读取的数据
if(ret<ioSize){
break;
}
}finally{
buf.limit(originalLimit);
}
}
int nBytes=initialRemaining-buf.remaining();
return(nBytes>0)?nBytes:ret;
}
图4-6阐释了按照代码清单4-2中的逻辑,ByteBuffer中的数据写入Channel过程中,其内部各个属性的变化情况。
图 4-6 将ByteBuffer中的数据以chunk为单位写入Channel过程
(3)Selector类
Selector可监听ServerSocketChannel和SocketChannel注册的特定事件,一旦某个事件发生,则会通知对应的Channel。SelectableChannel的register()方法负责注册事件,该方法返回一个SelectionKey对象,该对象即为用于跟踪这些注册事件的句柄。
Selector中常用的方法如下。
❑static Selector open():一个静态工厂方法,可用于创建Selector对象。
❑int select(long timeout):该方法等待并返回发生的事件。一旦某个注册的事件发生,就会返回对应的SelectionKey的数目,否则,一直处于阻塞状态,直到以下四种情况之一发生:至少一个事件发生;其他线程调用了Selector的wakeup()方法;当前执行select()方法的线程被中断;超出等待时间timeout,如果不设置等待时间,则表示永远不会超时。
❑set selectedKeys():Selector捕获的已经发生事件对应的SelectionKey集合。
❑Selector wakeup():立刻唤醒当前处于阻塞状态的Selector。常见应用场景是,线程A调用Selector对象的select()方法,阻塞等待某个注册事件发生,线程B通过调用wakeup()函数可立刻唤醒线程A,使其从select()方法中返回。
(4)SelectionKey类
ServerSocketChannel或SocketChannel通过register()方法向Selector注册事件时,register()方法会创建一个SelectionKey对象,用于跟踪注册事件。在SelectionKey中定义了4种事件,分别用以下4个整型常量表示。
❑SelectionKey. OP_ACCEPT:接收(accept)连接就绪事件,表示服务器端接收到了客户端连接。
❑SelectionKey. OP_CONNECT:连接就绪事件,表示客户端与服务器端的连接已经建立成功。
❑SelectionKey. OP_READ:读就绪事件,表示通道中已经有了可读数据,可执行读操作了。
❑SelectionKey. OP_WRITE:写就绪事件,表示可向通道中写入数据了。
通常而言,ServerSocketChannel对象向Selector中注册SelectionKey.OP_ACCEPT事件,而SocketChannel对象向Selector中注册SelectionKey.OP_CONNECT、SelectionKey.OP_READ和SelectionKey.OP_WRITE三种事件。
SelectionKey类中比较重要的方法如下。
❑Object attach(Object ob):为当前SelectionKey关联一个Object类型的对象。每个SelectionKey只能关联一个对象。
❑Object attachment():获取当前SelectionKey关联的Object对象。
❑SelectableChannel channel():返回与当前SelectionKey关联的SelectableChannel对象。
(5)应用实例——echoServer
echoServer是一个采用NIO编写的服务器。它接收客户端发送过来的字符串,不经任何处理直接再次返回给客户端。
echoServer由五个程序段组成,分别如下。
1)初始化:
public Selector initSelector()throws IOException{
address=new InetSocketAddress(bindAddress, port);
//创建一个ServerSocketChannel,并将之设为非阻塞模式
acceptChannel=ServerSocketChannel.open();
acceptChannel.configureBlocking(false);
//将Server Socket绑定到address地址上,并设置监听队列长度
acceptChannel.socket().bind(address, backlogLength);
Selector selector=Selector.open();
//向Selector注册ServerSocketChannel,注册事件为SelectionKey.OP_ACCEPT
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
return selector;
}
2)监听客户端事件,并调用对应事件处理函数:
public void run(){
while(true){
SelectionKey key=null;
try{
selector.select();//处于阻塞状态,直到有新事件发生
Iterator<SelectionKey>iter=selector.selectedKeys().iterator();
while(iter.hasNext()){
key=iter.next();
iter.remove();
if(!key.isValid()){
continue;
}
if(key.isAcceptable()){
doAccept(key);//新客户端要求建立连接
}else if(key.isReadable()){
receive(key);//从客户端接收数据
}else if(key.isWritable()){
send(key);//向客户端发送数据
}
key=null;
}
}catch(Exception e){}
}
}
3)接受新客户端连接请求:
private void doAccept(SelectionKey key)throws IOException{
ServerSocketChannel serverSocketChannel=(ServerSocketChannel)key.channel();
//接受连接请求,并将之设置为非阻塞模式
SocketChannel socketChannel=serverSocketChannel.accept();
socketChannel.configureBlocking(false);
//将新的SocketChannel注册到Selector中,一旦有需要读或者写的数据,就会通知相应的程序
SelectionKey clientKey=
socketChannel.register(this.selector, SelectionKey.OP_READ);
ByteBuffer buffer=ByteBuffer.allocate(8192);
clientKey.attach(buffer);//关联一个缓冲区
}
4)处理读数据请求:
private void receive(SelectionKey key)throws IOException{
SocketChannel socketChannel=(SocketChannel)key.channel();
ByteBuffer readBuffer=(ByteBuffer)key.attachment();
socketChannel.read(readBuffer);
if(numRead>0){
readBuffer.flip();
key.interestOps(SelectionKey.OP_WRITE);//切换至OP_WRITE
}
}
5)处理写数据请求:
private void send(SelectionKey key)throws IOException{
SocketChannel socketChannel=(SocketChannel)key.channel();
ByteBuffer writeBuffer=(ByteBuffer)key.attachment();
socketChannel.write(writeBuffer);
if(writeBuffer.remaining()==0){//写完成后,切换至SelectionKey.OP_READ
writeBuffer.clear();
key.interestOps(SelectionKey.OP_READ);
}
}
[1]http://docs. oracle.com/javase/1.4.2/docs/guide/nio/
[2]可参考https://issues.apache.org/jira/browse/HADOOP-4797下的相关文档。