Java NIO 学习之Selector

概述

传统的(NIO之前)监控多个socket的Java解决方案是为每个socket创建一个线程并使得线程可以在read( )调用中阻塞,直到数据可用。这事实上将每个被阻塞的线程当作了socket监控器,将Java虚拟机的线程调度当作了通知机制。这两者本来都不是为了这种目的而设计的。程序员和Java虚拟机都为管理所有这些线程的复杂性和性能损耗付出了代价,这在线程数量的增长失控时表现得更为突出。

实际上,NIO解决这些问题的方案早已经存在,只是在NIO之前,Java程序员还无法直接利用。可以说,NIO只是将操作系统处理IO请求时提供的就绪选择等功能封装了。因此,就绪选择,多工IO,select(),poll()函数等知识是理解Selector相关类的基础,可以参考Java NIO 学习之背景知识

selector-classes.png

上图是相关类的关系图,可以再看完概念与原理后回顾这张图加以理解(我觉得这张图很好的揭示了这几个类的关系,非常形象

PS:由于本文较长且标题较多,而不同等级标题大小的区别不明显,因此建议参照右边栏的文章目录来理清本文思路

概念与原理

可选择通道(SelectableChannel)

这个抽象类提供了实现通道的可选择性所需要的公共方法。它是所有支持就绪检查的通道类的父类。FileChannel对象不是可选择的,因为它们没有继承SelectableChannel。所有 socket 通道都是可选择的,包括从管道(Pipe)对象的中获得的通道。SelectableChannel可以被注册到Selector对象上,同时可以指定对那个选择器而言,那种操作是感兴趣的。一个通道可以被注册到多个选择器上,但对每个选择器而言只能被注册一次

选择器(Selector)

选择器维护着注册过的通道的集合,并对这些通道执行就绪选择。每一个Selector对象维护着三个集合,分别代表三种类型的键:

已注册的键的集合(registered key ket)

与选择器关联的已经注册的键的集合。并不是所有注册过的键都仍然有效。这个集合通过keys( )方法返回,并且可能是空的

已选择的键的集合(selected key set)

已注册的键的集合的子集。这个集合的每个成员都是相关的通道被选择器(在前一个选择操作中)判断为已经准备好的,并且包含于键的interest集合中的操作。这个集合通过selectedKeys()方法返回(并有可能是空的)。键可以直接从这个集合中移除,但不能添加。

不要将已选择的键的集合与ready集合弄混了。这是一个键的集合,每个键都关联一个已经准备好至少一种操作的通道。每个键都有一个内嵌的ready集合,指示了所关联的通道已经准备好的操作

已取消的键的集合(cancelled key set)

已注册的键的集合的子集,这个集合包含了cancel()方法被调用过的键(这个键已经被无效化),但它们还没有被注销。这个集合是选择器对象的私有成员,因而无法直接访问

select()过程详解(重点)

Selector 类的核心是选择过程。基本上来说,选择器是对 select()、 poll()等本地调用或者类似的操作系统特定的系统调用的一个包装。但是Selector所作的不仅仅是简单地向本地代码传送参数。它对每个选择操作应用了特定的过程。对这个过程的理解是合理地管理键和它们所表示的状态信息的基础。
当三种形式的select()中的任意一种被调用时,选择操作将由选择器执行。不管是哪一种形式的调用,下面步骤将被执行:
1.已取消的键的集合将会被检查。如果它是非空的,每个已取消的键的集合中的键将从另外两个集合中移除,并且相关的通道将被注销。这个步骤结束后,已取消的键的集合将是空的
2.已注册的键的集合中的键的interest集合将被检查。在这个步骤中的检查执行过后,对
interest 集合的改动不会影响剩余的检查过程

一旦就绪条件被定下来,底层操作系统将会进行查询,以确定每个通道所关心的操作的真实就绪状态。如果没有通道已经准备好,线程可能会在这时阻塞,通常会有一个超时值(具体看调用的是哪一个select()方法)

直到系统调用完成为止,这个过程可能会使得调用线程睡眠一段时间,然后当前每个通道的就绪状态将确定下来。对于那些还没准备好的通道将不会执行任何的操作。对于那些操作系统指示至少已经准备好interest集合中的一种操作的通道,将执行以下两种操作中的一种:
a.如果通道的键还没有处于已选择的键的集合中,那么键的ready集合将先被清空,再被设置为操作系统发现的当前通道已经准备好的操作的比特掩码,然后这个键会被加入到已选择的键的集合中
b.否则,也就是键在已选择的键的集合中。键的ready集合将被操作系统发现的当前已经准备好的操作的比特掩码更新。所有之前的已经不再是就绪状态的操作并不会被清除,ready集合是与之前的ready集合按位或运算得来的。也就是说,一旦键被放置于选择器的已选择的键的集合中,它的ready集合将是累积的,比特位只会被设置,不会被清理。

3.步骤2可能会花费很长时间,特别是所激发的线程处于休眠状态时。在这期间,与该选择器相关的键可能会被取消(调用SelectionKey的cancel()方法)。因否则,也就是键在已选择的键的集合中。键的ready集合将被操作系统发现的当前已经准备好的操作的比特掩码更新。所有之前的已经不再是就绪状态的操作并不会被清除,ready集合是与之前的ready集合按位或运算得来的,一旦键被放置于选择器的已选择的键的集合中,它的ready集合将是累积的。比特位只会被设置,不会被清理。此当步骤2结束时,步骤1将重新执行,以完成那些在选择进行的过程中,键被取消的通道的注销

要分清通道被选择器注销和相关键被取消(无效化)的区别

4.select操作返回的值是ready集合在步骤2中被修改的SelectionKey的数量,而不是已选择的键的集合中的通道的总数。或者说,返回值不是已准备好的通道的总数,而是从上一个 select( )调用之后进入就绪状态的通道的数量。之前的调用中就绪的,并且在本次调用中仍然就绪的通道不会被计入(我:因为ready集合没有更新),而那些在前一次调用中已经就绪但已经不再处于就绪状态的通道也不会被计入(我:因为ready集合只会被设置不会被清理,ready集合也没有更新)。这些通道可能仍然在已选择的键的集合中,但不会被计入返回值中。返回值可能是0

源码对返回值的解释是:The number of keys, possibly zero, whose ready-operation sets were updated

select()方法有三种不同的形式,具体见API详解。

停止选择过程的三种方式

线程会在阻塞的select()方法中睡眠,如果要停止选择过程,唤醒线程,有三种方法:

1.调用Selector对象的wakeup()方法
效果见API详解
有时这种延迟的唤醒行为并不是你想要的。你可能只想唤醒一个睡眠中的线程,而使得后续的选择继续正常地进行。你可以通过在调用wakeup()方法后调用selectNow()方法来绕过这个问题。尽管如此,如果你将你的代码构造为合理地关注于返回值和执行选择集合(我:下文Demo里面就没有用selectNow),那么即使下一个select()方法的调用在没有通道就绪时就立即返回,也应该不会有什么不同。不管怎么说,你应该为可能发生的事件做好准备

2.调用Selector的close()方法
如果选择器的close()方法被调用,那么任何一个在选择操作中阻塞的线程都将被唤醒,就像wakeup()方法被调用了一样。与选择器相关的通道将被注销,而键将被取消

3.调用线程的interrupt()方法
如果睡眠中的线程的interrupt()方法被调用,它的返回状态将被设置。如果被唤醒的线程之后将试图在通道上执行I/O操作,通道将立即关闭,然后线程将捕捉到一个异常,这是因为通道的中断语义。与主动调用wakeup()能优雅唤醒睡眠中的线程不一样,如果你想让一个睡眠的线程在直接中断之后继续执行,需要执行一些步骤来清理中断状态(参见Thread.interrupted()的相关文档)。Selector对象将捕捉InterruptedException异常并调用wakeup()方法

我:主动调用的一般是wakeup()吧,另外两个只是告诉我们,这两个时机也会导致select返回,让我们做好准备

这些方法中的任意一个都不会关闭任何一个相关的通道。中断一个选择器与中断一个通道是不一样的。选择器不会改变任意一个相关的通道,它只会检查它们的状态。

选择键(SelectionKey)

选择键封装了特定的通道与特定的选择器的注册关系。选择键对象被SelectableChannel.register()返回并提供一个表示这种注册关系的标记。选择键包含了两个比特集(以整数的形式进行编码),指示了该注册关系所关心的通道操作,以及通道已经准备好的操作。

一个SelectionKey对象包含两个以整数形式进行编码的比特掩码:一个用于指示那些通道/选择器组合体所关心的操作(instrest集合),另一个表示通道准备好要执行的操作(ready 集合)。interset集合永远不会被选择器改变,但可以通过调用interestOps()方法并传入一个新的比特掩码参数来改变它。当相关的Selector上的select( )操作正在进行时改变键的interest集合,不会影响那个正在进行的选择操作。所有更改将会在select()的下一次调用中体现出来。

选择键取消与通道被注销的时机

当通道关闭时,所有相关的键会自动取消(记住,一个通道可以被注册到多个选择器上)。
当选择器关闭时,所有被注册到该选择器的通道都将被注销,并且相关的键将立即被无效化(取消)

要理解通道被选择器注销和相关的键被无效化(取消)的时机区别,请参考select()的过程详解

管理选择键

理解已选择的键的集合在选择过程所扮演的角色是合理地使用选择器的关键(参见select()过程详解,特别是选择过程的第二步),最重要的是理解当键已经不再在已选择的键的集合中时将会发生什么——不在已选择的键的集合中也就意味着,当通道上的至少一个感兴趣的操作就绪时,键的ready集合就会被清空,并且当前已经就绪的操作将会被添加到ready集合中,该键之后将被添加到已选择的键的集合中

清理一个SelectKey的ready集合的方式是将这个键从已选择的键的集合中移除。

选择过程的可拓展性

选择器可以简化用单线程同时管理多个可选择通道的实现。使用一个线程来为多个通道提供服务,通过消除管理各个线程的额外开销,可能会降低复杂性并可能大幅提升性能。但只使用一个线程来服务所有可选择的通道是否是一个好主意呢?考虑下面两种场景:

一. 在一个有n个CPU的系统(多CPU系统)上,当一个单一的线程线性地轮流处理每一个线程时,可能有n-1个cpu处于空闲状态。

二. 如果一个应用程序为大量的分布式的传感器记录信息,而且每个传感器在服务线程遍历每个就绪的通道时需要等待数秒钟。这在响应时间不重要时是可以的,但对于高优先级的连接(如操作命令),如果只用一个线程为所有通道提供服务,将不得不在队列中等待。不同的应用程序的要求也是不同的。

解决方案:
场景一:为了提高CPU利用率,我们需要用更多的线程来为通道服务。但这并不意味着要使用多个选择器,更好的策略是对所有的可选择通道只使用一个选择器,并将对就绪通道的服务委托给其他线程。只用一个线程监控通道的就绪状态并使用一个协调好的工作线程池来处理共接收到的数据。根据部署的条件,线程池的大小是可以调整的(或者它自己进行动态的调整)

在大量通道上执行就绪选择并不会有很大的开销,大多数工作是由底层操作系统完成的。

场景二:为了使某些通道比其他通道有更高的响应速度,有两种方案:
1.使用两个选择器来解决,一个为命令连接服务,另一个为普通连接服务

2.使用与第一个场景十分相似的办法来解决。与将所有准备好的通道放到同一个线程池的做法不同,通道可以根据功能由不同的工作线程来处理。它们可能可以是日志线程池,命令/控制线程池,状态请求线程池,等等

SelectableChannel相关API

register(Selector sel, int ops)

将该channel注册到给定的selector上

通道在被注册到一个选择器上之前,必须先设置为非阻塞模式,通过调用configureBlocking(false)

register(Selector sel, int ops, Object att)

将该channel注册到给定的selector上,并将att作为生成的SelectionKey的附件(attachment),参考SelectionKey的attach()方法

keyFor(Selector sel)

返回与该通道的和指定的选择器相关的键。如果它们之间没有注册关系,返回null

Selector类API

open()

实例化一个selector对象

isOpen()

测试一个selector是否处于被打开的状态

keys()

返回已注册的键的集合

select()

执行选择过程,如果没有通道就绪将一直阻塞

select(long timeout)

执行选择过程,指定超时时间为timeout,超时后返回0

selectNow()

执行选择过程,如果没有通道就绪,立即返回0

wakeup()

使选择器上的还没有返回的选择操作立即返回。如果当前没有在进行中的选择,那么下一次对 select()方法的一种形式的调用将立即返回

SelectionKey类API

attach()

返回该键对应的“附件”

attach(Object ob)

在键上放置一个“附件”,可以通过attach()获取它。可以使用null清除附件

如果选择键的存续时间很长,但附加的对象不应该存在那么长时间,请记得在完成后清理附件。否则,附加的对象将不能被垃圾回收,程序将会面临内存泄漏问题

cancel()

取消该键,即使该键失效。当键被取消时,它将被放在相关的选择器的已取消的键的集合里。注册不会立即被取消,但键会立即失效。当再次调用select()方法时(或者一个正在进行的 select()调用结束时),已取消的键的集合中的被取消的键将被清理掉,并且相应的注销也将完成。通道会被注销,而新的SelectionKey 将被返回。

要分清键被取消(无效化)与注册关系的时机区别,见“概念与原理”的select()过程详解

interestOps()

返回该SelectionKey当前的interest集合

从interest集合清除一个条目的方法:key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));,从interest集合添加一个条目的方法:key.interestOps(key.interestOps( ) | SelectionKey.OP_READ);

readyOps()

返回相关通道的已经就绪的操作的集合

通过相关的选择键的 readyOps( )方法返回的就绪状态指示只是一个提示,不是保证。底层的通道在任何时候都会不断改变。其他线程可能在通道上执行操作并影响它的就绪状态。同时,操作系统的特点也总是需要考虑的

isValid()

测试对应的通道与选择器的注册关系是否有效。

请务必在键可能被取消的情况下检查SelectionKey对象的状态。

isWritable()

测试对应的通道是否已经处于可写的就绪状态

if (key.isWritable( ))等价于if ((key.readyOps( ) & SelectionKey.OP_WRITE) != 0)

isReadable()

测试对应的通道是否已经处于可读的就绪状态

isConnectable()

isAcceptable()

Demo与解释

创建选择器

1
2
3
4
5
6
Selector selector = Selector.open( );
channel1.register (selector, SelectionKey.OP_READ);
channel2.register (selector, SelectionKey.OP_WRITE);
channel3.register (selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
// Wait up to 10 seconds for a channel to become ready
readyCount = selector.select (10000);

这些代码创建了一个新的选择器,然后将三个socket通道注册到选择器上,而且感兴趣的操作各不相同。select()方法在将线程置于睡眠状态,直到这些感兴趣的事情中的一个发生或者10秒钟的时间过去。

使用select( )来为多个通道提供服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
/**
* Code from the book 'Java NIO', edited by gene on 17/05/06
*
* Simple echo-back server which listens for incoming stream connections and
* echoes back whatever it reads. A single Selector object is used to listen to
* the server socket (to accept new connections) and all the active socket
* channels.
*
* @author Ron Hitchens (ron@ronsoft.com)
*/
public class SelectSockets {
public static int PORT_NUMBER = 1234;
private long times=1;
public static void main(String[] argv) throws Exception {
new SelectSockets().go(argv);
}
public void go(String[] argv) throws Exception {
int port = PORT_NUMBER;
if (argv.length > 0) { // Override default listen port
port = Integer.parseInt(argv[0]);
}
System.out.println("Listening on port " + port);
// Allocate an unbound server socket channel
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// Get the associated ServerSocket to bind it with
ServerSocket serverSocket = serverChannel.socket();
// Create a new Selector for use below
Selector selector = Selector.open();
// Set the port the server channel will listen to
serverSocket.bind(new InetSocketAddress(port));
// Set nonblocking mode for the listening socket
serverChannel.configureBlocking(false);
// Register the ServerSocketChannel with the Selector
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// This may block for a long time. Upon returning, the
// selected set contains keys of the ready channels.
int n = selector.select();
// TODO: 2017/5/7 delete this
System.out.println("第"+times+++"次select,返回值为"+n);
if (n == 0) {
continue; // nothing to do
}
// Get an iterator over the set of selected keys
Iterator it = selector.selectedKeys().iterator();
// Look at each key in the selected set
while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
// Is a new connection coming in?
if (key.isAcceptable()) {
ServerSocketChannel server =
(ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
if (channel == null) {
continue;//could happen
}
// Set the new channel nonblocking
channel.configureBlocking(false);
// Register it with the selector
channel.register(selector, SelectionKey.OP_READ);
sayHello(channel);
}
// Is there data to read on this channel?
if (key.isReadable()) {
readDataFromSocket(key);
}
// Remove key from selected set; it's been handled
it.remove();
}
}
}
// ----------------------------------------------------------
// Use the same byte buffer for all channels. A single thread is
// servicing all the channels, so no danger of concurrent access.
private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
/**
* Sample data handler method for a channel with data ready to read.
*
* @param key A SelectionKey object associated with a channel determined by
* the selector to be ready for reading. If the channel returns
* an EOF condition, it is closed here, which automatically
* invalidates the associated key. The selector will then
* de-register the channel on the next select call.
* @throws Exception
*/
protected void readDataFromSocket(SelectionKey key) throws Exception {
SocketChannel socketChannel = (SocketChannel) key.channel();
WritableByteChannel wbc = Channels.newChannel(System.out);
int count;
buffer.clear(); // Empty buffer
// Loop while data is available; channel is nonblocking
while ((count = socketChannel.read(buffer)) > 0) {
buffer.flip(); // Make buffer readable
// Send the data; don't assume it goes all at once
while (buffer.hasRemaining()) {
//socketChannel.write(buffer);
wbc.write(buffer);
}
buffer.clear(); // Empty buffer
}
if (count < 0) {
// Close channel on EOF, invalidates the key
socketChannel.close();
}
}
// ----------------------------------------------------------
/**
* Spew a greeting to the incoming client connection.
*
* @param channel The newly connected SocketChannel to say hello to.
*/
private void sayHello(SocketChannel channel) throws Exception {
String greeting = "Hello, this is data from the server !"+System.getProperty("line.separator") ;
buffer.clear();
buffer.put(greeting.getBytes());
buffer.flip();
channel.write(buffer);
}
}

上述代码是典型的服务器的例子,经过我的部分修改,建议看一下未经过修改的程序以及书中对该程序解读,在这里——>《Java NIO》部分代码

我根据书中给的SocketChannel用法的例子修改得到一个对应的客户端的例子,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.TimeUnit;
/**
* Code from the book 'Java NIO', edited by gene on 17/05/06
* Demonstrate asynchronous connection of a SocketChannel.
*
* @author Ron Hitchens (ron@ronsoft.com)
*/
public class ConnectAsync {
private static ByteBuffer buffer;
private static String message = "This is data from the client, please reply soon !!!"+System.getProperty("line.separator");
public static void main(String[] argv)
throws Exception {
String host = "localhost";
int port = 1234;
if (argv.length == 2) {
host = argv[0];
port = Integer.parseInt(argv[1]);
}
InetSocketAddress addr = new InetSocketAddress(host, port);
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
System.out.println("initiating connection");
sc.connect(addr);
while (!sc.finishConnect()) {
waitForConnection();
}
System.out.println("connection established !!!");
// greet to the server with the connected socket
if (sc.isConnected()) {
//假如在doSomethingUseful里面没有接收到EOF,也就是sc未被关闭
//那么每隔5秒给服务器发一次信息并接受响应
//当发送60次,也就是5分钟之后,如果还没有EOF,主动关闭连接
int times=0;
while (sc.isOpen()) {
System.out.println("第"+(++times)+"次发送信息......");
doSomethingUseful(sc);
TimeUnit.SECONDS.sleep(5);
if (times == 60) {
System.out.println("Timeout, closing the connection ...");
sc.close();
}
}
}
// The SocketChannel is still nonblocking
//sc.close();
}
private static void doSomethingUseful(SocketChannel sc) throws IOException {
//greeting
buffer = ByteBuffer.wrap(message.getBytes());
while (buffer.hasRemaining()) {
sc.write(buffer);
}
buffer.clear();
//waiting for reply
System.out.println("waiting for reply ...");
int count;
WritableByteChannel out = Channels.newChannel(System.out);
while ((count = sc.read(buffer)) > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
out.write(buffer);
}
buffer.clear(); // Empty buffer
}
if (count < 0) {
System.out.println("Have received all data, closing...");
// Close channel on EOF, invalidates the key
sc.close();
}
}
private static void waitForConnection() throws InterruptedException{
System.out.println("waiting for the establishment of the connection...");
System.out.println("Please wait...");
Thread.sleep(1000);
}
}

使用线程池来为通道提供服务

该版本的服务端可以使用与上一个例子相同的客户端程序,只要同时开启多个即可,同样,建议看一下未经过修改的程序以及书中对该程序解读,在这里——>《Java NIO》部分代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.LinkedList;
import java.util.List;
/**
* Specialization of the SelectSockets class which uses a thread pool
* to service channels. The thread pool is an ad-hoc implementation
* quickly lashed together in a few hours for demonstration purposes.
* It's definitely not production quality.
*
* @author Ron Hitchens (ron@ronsoft.com)
*/
public class SelectSocketsThreadPool extends SelectSockets
{
private static final int MAX_THREADS = 5;
private ThreadPool pool = new ThreadPool (MAX_THREADS);
// -------------------------------------------------------------
public static void main (String [] argv)
throws Exception
{
new SelectSocketsThreadPool( ).go (argv);
}
// -------------------------------------------------------------
/**
* Sample data handler method for a channel with data ready to read.
* This method is invoked from the go( ) method in the parent class.
* This handler delegates to a worker thread in a thread pool to
* service the channel, then returns immediately.
* @param key A SelectionKey object representing a channel
* determined by the selector to be ready for reading. If the
* channel returns an EOF condition, it is closed here, which
* automatically invalidates the associated key. The selector
* will then de-register the channel on the next select call.
*/
protected void readDataFromSocket (SelectionKey key)
throws Exception
{
WorkerThread worker = pool.getWorker( );
if (worker == null) {
// No threads available. Do nothing. The selection
// loop will keep calling this method until a
// thread becomes available. This design could
// be improved.
return;
}
// Invoking this wakes up the worker thread, then returns
worker.serviceChannel (key);
}
// ---------------------------------------------------------------
/**
* A very simple thread pool class. The pool size is set at
* construction time and remains fixed. Threads are cycled
* through a FIFO idle queue.
*/
private class ThreadPool
{
List idle = new LinkedList( );
ThreadPool (int poolSize)
{
// Fill up the pool with worker threads
for (int i = 0; i < poolSize; i++) {
WorkerThread thread = new WorkerThread (this);
// Set thread name for debugging. Start it.
thread.setName ("Worker" + (i + 1));
thread.start( );
idle.add (thread);
}
}
/**
* Find an idle worker thread, if any. Could return null.
*/
WorkerThread getWorker( )
{
WorkerThread worker = null;
synchronized (idle) {
if (idle.size( ) > 0) {
worker = (WorkerThread) idle.remove (0);
}
}
return (worker);
}
/**
* Called by the worker thread to return itself to the
* idle pool.
*/
void returnWorker (WorkerThread worker)
{
synchronized (idle) {
idle.add (worker);
}
}
}
/**
* A worker thread class which can drain channels and echo-back
* the input. Each instance is constructed with a reference to
* the owning thread pool object. When started, the thread loops
* forever waiting to be awakened to service the channel associated
* with a SelectionKey object.
* The worker is tasked by calling its serviceChannel( ) method
* with a SelectionKey object. The serviceChannel( ) method stores
* the key reference in the thread object then calls notify( )
* to wake it up. When the channel has been drained, the worker
* thread returns itself to its parent pool.
*/
private class WorkerThread extends Thread
{
private ByteBuffer buffer = ByteBuffer.allocate (1024);
private ThreadPool pool;
private SelectionKey key;
WorkerThread (ThreadPool pool)
{
this.pool = pool;
}
// Loop forever waiting for work to do
public synchronized void run( )
{
System.out.println (this.getName( ) + " is ready");
while (true) {
try {
// Sleep and release object lock
this.wait( );
} catch (InterruptedException e) {
e.printStackTrace( );
// Clear interrupt status
//关于interrupt机制,参考笔记中《详细分析Java中断机制》
//这里只是简单的把中断状态清除,并没有重新设置中断状态并往方法调用者抛出中断异常
Thread.interrupted( );
}
if (key == null) {
//的确有可能吧,例如wait的过程,key还没得到正确的值这个线程就被interrupt了
continue; // just in case
}
System.out.println (this.getName( )
+ " has been awakened");
try {
drainChannel (key);
} catch (Exception e) {
//drainChannel抛出的IOException也是在这里捕获
System.out.println ("Caught '"
+ e + "' closing channel");
// Close channel and nudge(用肘推,可以理解唤醒吧) selector
try {
key.channel().close( );
} catch (IOException ex) {
ex.printStackTrace( );
}
key.selector().wakeup( );
}
key = null;
// Done. Ready for more. Return to pool
this.pool.returnWorker (this);
//归还到线程池,只是相当于告诉线程池这个线程可用了(可以由getWorker返回),
// 这个while循环还是会继续等待任务,也就是还会重复——wait,再被notify,然后执行任务——的这个流程
}
}
/**
* Called to initiate a unit of work by this worker thread
* on the provided SelectionKey object. This method is
* synchronized, as is the run( ) method, so only one key
* can be serviced at a given time.
* Before waking the worker thread, and before returning
* to the main selection loop, this key's interest set is
* updated to remove OP_READ. This will cause the selector
* to ignore read-readiness for this channel while the
* worker thread is servicing it.
*/
synchronized void serviceChannel (SelectionKey key)
{
this.key = key;
//这是从interest集合清除Read的方法,这样做的目的见下文
key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
//线程一开始就自己调用wait,直到自己调用serviceChannel方法
this.notify( ); // Awaken the thread
}
/**
* The actual code which drains the channel associated with
* the given key. This method assumes the key has been
* modified prior to invocation to turn off selection
* interest in OP_READ. When this method completes it
* re-enables OP_READ and calls wakeup( ) on the selector
* so the selector will resume watching this channel.
*/
void drainChannel (SelectionKey key)
throws Exception
{
SocketChannel channel = (SocketChannel) key.channel( );
WritableByteChannel wbc = Channels.newChannel(System.out);
int count;
buffer.clear( ); // Empty buffer
// Loop while data is available; channel is nonblocking
while ((count = channel.read (buffer)) > 0) {
buffer.flip(); // Make buffer readable
// Send the data; don't assume it goes all at once
while (buffer.hasRemaining()) {
//socketChannel.write(buffer);
wbc.write(buffer);
}
buffer.clear(); // Empty buffer
}
if (count < 0) {
// Close channel on EOF; invalidates the key
channel.close( );
return;
}
// Resume interest in OP_READ
//前面去除了Read,这里恢复,目的见下文
key.interestOps (key.interestOps( ) | SelectionKey.OP_READ);
// Cycle the selector so this key is active again
//使另一个线程(即执行选择操作的线程)的select操作立即返回
key.selector().wakeup( );
/*
为了弄明白这里调用wakeup的作用,可以把wakeup调用注释掉,然后会发现,服务端程序将在select过程一直阻塞
但是客户端后来不停地在发信息,按理说应该会有某个时刻这个阻塞的select会检测到通道可读就绪而返回吧?
所以wakeup在这里就发挥作用了,让select返回,再进入有select的那个死循环重新select
那么去掉wakeup然后改用有超时值的select能不能也避免这种一直阻塞的情况
并且保证通信正常呢(即客户端发的信息服务端应该很快收到,毕竟这是本地回环测试)
验证表明,的确可以避免一直阻塞,但是有明显缺陷,如果超时值设置的
过短(比客户端发信息的间隔(5s)短)则会导致(大量)没必要的select系统调用
过长,则也会出现原本的情况,即在select调用陷入阻塞,虽然有超时值,但仍使服务端表现为响应延迟(没及时收到客户端信息)
从这个例子可以看出,wakeup的作用是(在多线程的环境中)动态的调整select的返回时机防止阻塞,并且避免没必要的select调用
至于上面的“按理说应该会有某个时刻这个阻塞的select会检测到可读而返回吧?”这个疑惑,我是还没搞明白的
毕竟在单线程的版本SelectSocket中,每次select阻塞都会在客户端发信息来之后检测到可读而返回
*/
}
}
}

Because the thread doing the selection will loop back and call select( ) again almost immediately, the interest set in the key is modified to remove interest in read-readiness. This prevents the selector from repeatedly invoking readDataFromSocket( ) (because the channel will remain ready to read until the worker thread can drain the data from it). When a worker thread has finished servicing the channel, it will again update the key’s interest set to reassert an interest in read-readiness. It also does an explicit wakeup( ) on the selector. If the main thread is blocked in select( ), this causes it to resume. The selection loop will then cycle (possibly doing nothing) and reenter select( ) with the updated key.

个人理解:首先要清楚drainChannel的过程需要一定时间,并且由于选择线程和响应socket的线程分开,选择线程并不会在处理socket这个步骤阻塞而是又进入了下一次选择,这就导致有可能下一次select操作时上一次响应socket的处理还未结束。在不将Read从interest集合移除的情况下:假如drainChannel还未完成,未将对应的Channel从已选择的集合中移除,那么在下一次select操作后这个Channel对应的key还会被selectKeys()返回,接着readDataFromSocket()还会被调用,即又启动了一个新的线程来处理之前处理到中途的socketChannel