Java NIO初探(一)

@author stormma
@date 2017-11-19


生命不息,奋斗不止


前言

NIO(Non-Blocking IO), 现称为非阻塞IO, 早期曾被解释为New-IO, 相比于BIO(Blocking IO)来说,NIO是非阻塞的, 举个很简单的例子, 就是一个线程可以管理多个连接, 这较BIO来有什么进步的地方呢? 假如现在有个聊天服务器, 使用BIO通信的形式, 我们往往是一个连接开启一个线程, 并且阻塞(accept()), 直到进行IO再进行操作!对了,就是直到进行IO的时候在进行读写处理。这有什么问题吗?
假如现在我们的聊天服务器有很多用户同一时间要建立连接, 那么依据BIO通信我们应该同时创建很多线程进行连接监听处理IO操作。但是有个问题,就是在阻塞的情况下,我们创建的线程大多都是wait状态的,这造成了很多的资源的浪费,这也注定了我们不可能处理高并发的用户请求,是有一个瓶颈限制我们不能同时很多用户的请求。假如在你硬件条件的支持范围内,你创建了足够多的线程,但是资源利用率不高,这些都是NIO出现的原因。今天的主角就是NIO,这篇文章主要讲解NIO的用法以及简单的源码分析。简单的源码分析是让你了解为什么会提供这样的接口,更快速的记住接口的用户以及作用。

NIO与BIO的区别

NIO与我们传统的IO有什么区别呢?总结有三:

面向流与面向缓冲

NIO是面向缓冲区(Buffer)的, 而BIO是面向Stream(流)的。通俗点讲, BIO面向流就是从Stream中读一个或者多个字节, 直至读取完所有的字节为止。NIO的缓冲区是直接在内存中分配一个指定字节的内存区域, 使用游标的形式读写操作。

阻塞与非阻塞

前面我们说了, BIO就是阻塞IO,那么为什么是阻塞的呢?就是当一个线程去调用read()或者write()方法的时候, 在没有读写需求的情况下,线程的被迫等待状态就是阻塞。那么相反,NIO肯定是使用了一些不一样的设计来解决阻塞这个问题使它变成Non-Blocking的。其实NIO实现非阻塞的功能很简单,引入了一个叫做Channel(通道)和选择器(Selector)的概念,再加上缓冲区就组成了NIO。读取数据的时候, 从Channel读取到Buffer中。如果要写入,同理把Buffer中的数据写入到Channel就行了。其次对于非阻塞的实现, 一个Selector可以绑定多个Channel,单个线程通过Selector的一些方法去管理这些Channel,这样就实现了非阻塞。也就是说,如果一个Channel没有数据读取或者写入的话,此时的线程不会被阻塞,而线程可以自个干自个的私事。这不就是资源利用最大化吗。

NIO可以实现异步IO通信

接下来我们着重讲解一下NIO中三个重要的概念: Buffer、Channel、Selector

Buffer

前面我说过了, Buffer其实是一块按照指定字节分配好的内存区域。在Buffer中, 有四个概念你需要知道是怎么回事!如下图:

容量(Capacity)

Buffer是个抽象类, 我们挑一个它的实现类来看看, capacity到底是干什么的?这里选用ByteBuffer这个来一探究竟。

上图中ByteBuffer中这么多方法里面我们可以看到有两个构造函数, 还有一个allocate()函数!我们依次来看。

ByteBuffer(int mark, int pos, int lim, int cap, byte[] hb, int offset)

1
2
3
4
5
6
7
8
// Creates a new buffer with the given mark, position, limit, capacity,
// backing array, and array offset
ByteBuffer(int mark, int pos, int lim, int cap,
byte[] hb, int offset) {
super(mark, pos, lim, cap);
this.hb = hb;
this.offset = offset;
}

上面的构造函数调用了父类的,并且初始化了两个实例域。关于这两个实例域, 源码的解释如下:

1
2
3
4
5
6
// These fields are declared here rather than in Heap-X-Buffer in order to
// reduce the number of virtual method invocations needed to access these
// values, which is especially costly when coding small buffers.
//
final byte[] hb; // Non-null only for heap buffers
final int offset;

其中hb是一个byte[]数组,也就是我们的缓冲区开辟的内存区域。至于分配的位置, 是在堆(heap)上。上面的图中我们还看到了allocateDirect()这样一个函数,这个函数和allocate()唯一的不同就是分配缓冲区在直接内存(不懂直接内存的去看一下前面的两篇JVM深入探究的文章,这里就不介绍了)。

接下来我们去Buffer这个抽象类的构造函数看看究竟做了哪些事情。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Creates a new buffer with the given mark, position, limit, and capacity,
// after checking invariants.
//
Buffer(int mark, int pos, int lim, int cap) { // package-private
if (cap < 0)
throw new IllegalArgumentException("Negative capacity: " + cap);
this.capacity = cap;
limit(lim);
position(pos);
if (mark >= 0) {
if (mark > pos)
throw new IllegalArgumentException("mark > position: ("
+ mark + " > " + pos + ")");
this.mark = mark;
}
}

上面的代码初始化了一个我们前面讲的Buffer的四个概念概念—容量(Capacity)、上限(Limit)、标记(Mark)和位置(Position)。

下面我们去看ByteBufferallocate()函数的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Allocates a new byte buffer.
*
* <p> The new buffer's position will be zero, its limit will be its
* capacity, its mark will be undefined, and each of its elements will be
* initialized to zero. It will have a {@link #array backing array},
* and its {@link #arrayOffset array offset} will be zero.
*
* @param capacity
* The new buffer's capacity, in bytes
*
* @return The new byte buffer
*
* @throws IllegalArgumentException
* If the <tt>capacity</tt> is a negative integer
*/
public static ByteBuffer allocate(int capacity) {
if (capacity < 0)
throw new IllegalArgumentException();
return new HeapByteBuffer(capacity, capacity);
}

HeapByteBuffer构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// For speed these fields are actually declared in X-Buffer;
// these declarations are here as documentation
/*
protected final byte[] hb;
protected final int offset;
*/
HeapByteBuffer(int cap, int lim) { // package-private
super(-1, 0, lim, cap, new byte[cap], 0);
/*
hb = new byte[cap];
offset = 0;
*/
}

HeapByteBuffer是ByteBuffer的子类,然后又调用了ByteBuffer的构造函数,就是我们最上面的那个,传了一个byte[]数组,这个就是HeapByteBuffer在堆上给我们开辟的缓冲区。现在很明白了, 容量就是这个byte数组的大小。也就是我们缓冲区最多能写多少个字节的数据。

上限(Limit)

上面的源码我们可以看出Limit初始化为容量的大小。但是这个属性是用来干什么的呢?我这里先告诉你,后面我们进一步解释。其实Limit是:

第一个不应该读取或写入的元素的索引。所以它的取值范围应该是(0 - capacity)

这句话确很绕口,等我们后面看完这些概念,你就会明白是怎么回事了。

位置(Position)

下一个要读取或写入的元素的索引。

根据它初始化为0,我们应该可以预测出,就是开始写入数据或者读取的数据的位置。

标记(Mark)

记录当前Position。

好了, 我们已经大致了解完了Buffer中四个属性的概念了。现在我们来理解一下Buffer是怎么读写数据的(为什么会用一个缓冲区进行读写两个操作呢?)。

为什么会用一个缓冲区进行读写两个操作呢?怎么实现的?

依据这个问题, 我们很容易想到有个区分条件, 所以Buffer中有个模式的概念, 分为模式和模式, 至于怎么转换我们后面会讲到。

上图来源于网络

上面这张图解释的很清楚, 我们用模式的概念来区分读写操作, 使用Position来标记我们在特定模式下要操作的第一个位置。而Limit, 顾名思义, 是我们的上限,限制。所以特定模式下,我们操作缓冲区的范围应当是postion-limit

至此, 我们已经完全解释清楚了Buffer中这四个最重要的概念以及Buffer的工作原理。

不, 不对, 我还没有告诉你怎么进行模式转换。Buffer给我们提供了一个flip()函数, 来进行读写模式的转换。如往常一样, 我们大致看一下这个函数的实现。

flip()

1
2
3
4
5
6
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}

代码很简单, 就是改变了我前面费尽周折解释的四个属性。我们来模拟一下这个读写模式转换。

  1. 假如此刻我们在进行首次写入数据, 0 -> 12(包括index = 12)是我们写入的数据, 此时position = 13(原因我们看下面源码), limit = len(hb)
  2. 现在我们调用flip()函数进行模式转换, 那么依据上面的代码我们应该修改limit = 13, position = 0进行读模式, 而0 - 12正是我们要读取的数据。

上面模拟了flip()函数的实现。为什么第一次写入完数据之后position = 13呢?原因很简单。

1
2
3
4
5
final int nextPutIndex() {
if (position >= limit)
throw new BufferOverflowException();
return position++;
}

写入的会调用nextPutIndex这个函数, 每次都是position++。由于篇幅的原因, 这里就不深入分析源码了。代码很简单, 读者可以自己按照这个步骤一步一步查看源码实现。这会是一个好习惯!

Channel

下一个概念Channel(通道)

那么什么是Channel?能干什么?

可以通过它读取和写入数据。相比BIO,通道就像是流。正如前面提到的,所有数据都通过Buffer对象来处理。你永远不会将字节直接写入通道中,相反,您是将数据写入包含一个或者多个字节的缓冲区。同样,你不会直接从通道中读取字节,而是将数据从通道读入缓冲区,再从缓冲区获取这个字节。所以很显然, 所有的读操作都是通过Channel往Buffer中写, 写操作都是从Buffer中往Channel中写。

Channel是一个接口, 提供以下两个方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Tells whether or not this channel is open.
*
* @return <tt>true</tt> if, and only if, this channel is open
*/
public boolean isOpen();
/**
* Closes this channel.
*
* <p> After a channel is closed, any further attempt to invoke I/O
* operations upon it will cause a {@link ClosedChannelException} to be
* thrown.
*
* <p> If this channel is already closed then invoking this method has no
* effect.
*
* <p> This method may be invoked at any time. If some other thread has
* already invoked it, however, then another invocation will block until
* the first invocation is complete, after which it will return without
* effect. </p>
*
* @throws IOException If an I/O error occurs
*/
public void close() throws IOException;

至此, 我们用一个完整的代码来实现用NIO的API形式来读取文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* {@code FileChannelDemo.class} test java.nio.channels.FileChannel
* @author stormma
* @date 2017/11/21
*/
public class FileChannelDemo {
private static final Logger logger = LoggerFactory.getLogger(FileChannelDemo.class);
public static void main(String[] args) throws IOException {
RandomAccessFile file = new RandomAccessFile("test.txt", "rw");
FileChannel channel = file.getChannel();
// allocate for buffer with 128 bytes memory;
ByteBuffer buffer = ByteBuffer.allocate(128);
// read bytes from channel
int count = channel.read(buffer);
while (count != -1) {
buffer.flip();
while (buffer.hasRemaining()) {
logger.info("{}", (char) buffer.get());
}
buffer.clear();
count = channel.read(buffer);
}
}
}

如果你认真看了我前面的讲解, 详细理解上面这个例子一点都不难。

我想我有必要带你分析一下channel的read()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public int read(ByteBuffer var1) throws IOException {
this.ensureOpen();
// 判断文件权限属性
if (!this.readable) {
throw new NonReadableChannelException();
} else {
// 获得排它锁
Object var2 = this.positionLock;
synchronized(this.positionLock) {
int var3 = 0;
int var4 = -1;
try {
this.begin();
var4 = this.threads.add();
if (!this.isOpen()) {
byte var12 = 0;
return var12;
} else {
do {
// 读取数据
var3 = IOUtil.read(this.fd, var1, -1L, this.nd);
} while(var3 == -3 && this.isOpen());
// 返回读取数据的大小
int var5 = IOStatus.normalize(var3);
return var5;
}
} finally {
this.threads.remove(var4);
this.end(var3 > 0);
assert IOStatus.check(var3);
}
}
}
}

这样,我确信上面的demo代码对你来说已经很好理解了。

Selector

前面我们说过了, Selector是管理Channel的, 然后线程通过Selector对Channel的管理。

下面, 我们熟悉一下Selector的Api

创建一个Selector

1
Selector selector = Selector.open();

注册Channel

1
2
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, Selectionkey.OP_READ);

注: FileChannel是阻塞的, 所以不能结合Selector来使用。

因为NIO是事件驱动的, 所以channel.register的第二个参数是一个事件。

可选值为:

1
2
3
4
Selectionkey.OP_READ // 读事件 1 << 0
Selectionkey.OP_WRITE // 写事件 1 << 2
Selectionkey.OP_CONNECT // 连接事件 1 << 3
Selectionkey.OP_ACCEPT // 接收就绪 1 << 4

我们前面说过, Selector管理多个Channel, 那么怎么管理? 就是基于事件驱动的形式来管理, 注册Channel结束之后, 如果哪个事件就绪之后, 就可以进行操作。你可能有点疑虑, 为啥注册时候只能注册一个事件呢?上面的四种事件类型都是int的常量。你只要对你感兴趣的常量进行位或运算即可。比如

SelectionKey key = channel.register(selector, Selectionkey.OP_READ | Selectionkey.OP_CONNECT);

interest集合

interest集合是你所选择的感兴趣的事件集合。它的值就是注册时传入的参数,我们可以用按为与运算把每个事件取出来:

1
2
boolean isInterestConnect = (selectionKey.interestOps() & SelectionKey.OP_CONNECT) == SelectionKey.OP_CONNECT;
...

ready集合

ready集合是通道已经准备就绪的操作的集合。就是你可以操作的事件。在一次选择(Selection)之后,你会首先访问这个ready set。

同理interest集合检查状态是就绪

1
boolean isReadyConnect = (selectionKey.readyOps() & SelectionKey.OP_CONNECT) == SelectionKey.OP_CONNECT;

当然也可以

1
2
3
4
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();

通过Selector操作Channel

得到SelectionKey集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
遍历
得到Channel
Channel channel = selectionKey.channel();
得到Selector
Selector selector = selectionKey.selector();

结尾

本文简单介绍了一下NIO的三个核心概念, 由于篇幅的原因, NIO介绍, 将分为两篇文章来介绍, 下一篇会从具体的代码示例进行分析, 并且分析一些源码实现。