《Java NIO》部分代码

Example 4-1. Using select( ) to service multiple channels 使用 select()来为多个通道提供服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package com.ronsoft.books.nio.channels;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.Selector;
import java.nio.channels.SelectionKey;
import java.nio.channels.SelectableChannel;
import java.net.Socket;
import java.net.ServerSocket;
import java.net.InetSocketAddress;
import java.util.Iterator;
package HadoopInAction;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
/**
* Simple echo-back server which listens for incoming stream connections
* and echoes back whatever it reads. A single Selector object is used to
* listen to the server socket (to accept new connections) and all the
* active socket channels.
*
* @author Ron Hitchens (ron@ronsoft.com)
*/
public class SelectSockets {
public static int PORT_NUMBER = 1234;
public static void main(String[] argv)
throws Exception {
new SelectSockets().go(argv);
}
public void go(String[] argv)
throws Exception {
int port = PORT_NUMBER;
if (argv.length > 0) { // Override default listen port
port = Integer.parseInt(argv[0]);
}
System.out.println("Listening on port " + port);
// Allocate an unbound server socket channel
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// Get the associated ServerSocket to bind it with
ServerSocket serverSocket = serverChannel.socket();
// Create a new Selector for use below
Selector selector = Selector.open();
// Set the port the server channel will listen to
serverSocket.bind(new InetSocketAddress(port));
// Set nonblocking mode for the listening socket
serverChannel.configureBlocking(false);
// Register the ServerSocketChannel with the Selector
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// This may block for a long time. Upon returning, the
// selected set contains keys of the ready channels.
int n = selector.select();
if (n == 0) {
continue; // nothing to do
}
// Get an iterator over the set of selected keys
Iterator it = selector.selectedKeys().iterator();
// Look at each key in the selected set
while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
// Is a new connection coming in?
if (key.isAcceptable()) {
ServerSocketChannel server =
(ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
registerChannel(selector, channel,
SelectionKey.OP_READ);
sayHello(channel);
}
// Is there data to read on this channel?
if (key.isReadable()) {
readDataFromSocket(key);
}
// Remove key from selected set; it's been handled
it.remove();
}
}
}
// ----------------------------------------------------------
/**
* Register the given channel with the given selector for
* the given operations of interest
*/
protected void registerChannel(Selector selector,
SelectableChannel channel, int ops)
throws Exception {
if (channel == null) {
return; // could happen
}
// Set the new channel nonblocking
channel.configureBlocking(false);
// Register it with the selector
channel.register(selector, ops);
}
// ----------------------------------------------------------
// Use the same byte buffer for all channels. A single thread is
// servicing all the channels, so no danger of concurrent acccess.
private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
/**
* Sample data handler method for a channel with data ready to read.
*
* @param key A SelectionKey object associated with a channel
* determined by the selector to be ready for reading. If the
* channel returns an EOF condition, it is closed here, which
* automatically invalidates the associated key. The selector
* will then de-register the channel on the next select call.
*/
protected void readDataFromSocket(SelectionKey key)
throws Exception {
SocketChannel socketChannel = (SocketChannel) key.channel();
int count;
buffer.clear(); // Empty buffer
// Loop while data is available; channel is nonblocking
while ((count = socketChannel.read(buffer)) > 0) {
buffer.flip(); // Make buffer readable
// Send the data; don't assume it goes all at once
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
// WARNING: the above loop is evil. Because
// it's writing back to the same nonblocking
// channel it read the data from, this code can
// potentially spin in a busy loop. In real life
// you'd do something more useful than this.
buffer.clear(); // Empty buffer
}
if (count < 0) {
// Close channel on EOF, invalidates the key
socketChannel.close();
}
}
// ----------------------------------------------------------
/**
* Spew a greeting to the incoming client connection.
*
* @param channel The newly connected SocketChannel to say hello to.
*/
private void sayHello(SocketChannel channel)
throws Exception {
buffer.clear();
buffer.put("Hi there!\r\n".getBytes());
buffer.flip();
channel.write(buffer);
}
}

书中解读如下:

例4-1实现了一个简单的服务器。它创建了ServerSocketChannel和Selector对象,并将通道注册到选择器上。我们不在注册的键中保存服务器socket的引用,因为它永远不会被注销。这个无限循环在最上面先调用了select(),这可能会无限期地阻塞。当选择结束时,就遍历选择键并检查已经就绪的通道。如果一个键指示与它相关的通道已经准备好执行一个accecpt()操作,我们就通过键获取关联的通道,并将它转换为SeverSocketChannel对象。我们都知道这么做是安全的,因为只有ServerSocketChannel支持OP_ACCEPT操作。我们也知道我们的代码只把对一个单一的ServerSocketChannel对象的OP_ACCEPT操作进行了注册。通过对服务器 socket 通道的引用,我们调用了它的accept()方法,来获取刚到达的socket 的句柄。返回的对象的类型是SocketChannel,也是一个可选择的通道类型。这时,与创建一个新线程来从新的连接中读取数据不同,我们只是简单地将 socket 同多注册到选择器上。我们通过传入 OP_READ 标记,告诉选择器我们关心新的socket通道什么时候可以准备好读取数据。如果键指示通道还没有准备好执行 accept(),我们就检查它是否准备好执行read()。任何一个这么指示的socket通道一定是之前ServerSocketChannel创建的SocketChannel 对象之一,并且被注册为只对读操作感兴趣。对于每个有数据需要读取的socket通道,我们调用一个公共的方法来读取并处理这个带有数据的socket。需要注意的是这个公共方法需要准备好以非阻塞的方式处理socket上的不完整的数据。它需要迅速地返回,以其他带有后续输入的通道能够及时地得到处理。例4-1中只是简单地对数据进行响应,将数据写回socket,传回给发送者。在循环的底部,我们通过调用Iterator(迭代器)对象的remove()方法,将键从已选择的键的集合中移除。键可以直接从 selectKeys()返回的Set中移除,但同时需要用Iterator来检查集合,您需要使用迭代器的 remove()方法来避免破坏迭代器内部的状态。

Example 4-2. Servicing channels with a thread pool 使用线程池来为通道提供服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
package com.ronsoft.books.nio.channels;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.channels.SelectionKey;
import java.util.List;
import java.util.LinkedList;
import java.io.IOException;
/**
* Specialization of the SelectSockets class which uses a thread pool
* to service channels. The thread pool is an ad-hoc implementation
* quickly lashed together in a few hours for demonstration purposes.
* It's definitely not production quality.
*
* @author Ron Hitchens (ron@ronsoft.com)
*/
public class SelectSocketsThreadPool extends SelectSockets
{
private static final int MAX_THREADS = 5;
private ThreadPool pool = new ThreadPool (MAX_THREADS);
// -------------------------------------------------------------
public static void main (String [] argv)
throws Exception
{
new SelectSocketsThreadPool( ).go (argv);
}
// -------------------------------------------------------------
/**
* Sample data handler method for a channel with data ready to read.
* This method is invoked from the go( ) method in the parent class.
* This handler delegates to a worker thread in a thread pool to
* service the channel, then returns immediately.
* @param key A SelectionKey object representing a channel
* determined by the selector to be ready for reading. If the
* channel returns an EOF condition, it is closed here, which
* automatically invalidates the associated key. The selector
* will then de-register the channel on the next select call.
*/
protected void readDataFromSocket (SelectionKey key)
throws Exception
{
WorkerThread worker = pool.getWorker( );
if (worker == null) {
// No threads available. Do nothing. The selection
// loop will keep calling this method until a
// thread becomes available. This design could
// be improved.
return;
}
// Invoking this wakes up the worker thread, then returns
worker.serviceChannel (key);
}
// ---------------------------------------------------------------
/**
* A very simple thread pool class. The pool size is set at
* construction time and remains fixed. Threads are cycled
* through a FIFO idle queue.
*/
private class ThreadPool
{
List idle = new LinkedList( );
ThreadPool (int poolSize)
{
// Fill up the pool with worker threads
for (int i = 0; i < poolSize; i++) {
WorkerThread thread = new WorkerThread (this);
// Set thread name for debugging. Start it.
thread.setName ("Worker" + (i + 1));
thread.start( );
idle.add (thread);
}
}
/**
* Find an idle worker thread, if any. Could return null.
*/
WorkerThread getWorker( )
{
WorkerThread worker = null;
synchronized (idle) {
if (idle.size( ) > 0) {
worker = (WorkerThread) idle.remove (0);
}
}
return (worker);
}
/**
* Called by the worker thread to return itself to the
* idle pool.
*/
void returnWorker (WorkerThread worker)
{
synchronized (idle) {
idle.add (worker);
}
}
}
/**
* A worker thread class which can drain channels and echo-back
* the input. Each instance is constructed with a reference to
* the owning thread pool object. When started, the thread loops
* forever waiting to be awakened to service the channel associated
* with a SelectionKey object.
* The worker is tasked by calling its serviceChannel( ) method
* with a SelectionKey object. The serviceChannel( ) method stores
* the key reference in the thread object then calls notify( )
* to wake it up. When the channel has been drained, the worker
* thread returns itself to its parent pool.
*/
private class WorkerThread extends Thread
{
private ByteBuffer buffer = ByteBuffer.allocate (1024);
private ThreadPool pool;
private SelectionKey key;
WorkerThread (ThreadPool pool)
{
this.pool = pool;
}
// Loop forever waiting for work to do
public synchronized void run( )
{
System.out.println (this.getName( ) + " is ready");
while (true) {
try {
// Sleep and release object lock
this.wait( );
} catch (InterruptedException e) {
e.printStackTrace( );
// Clear interrupt status
this.interrupted( );
}
if (key == null) {
continue; // just in case
}
System.out.println (this.getName( )
+ " has been awakened");
try {
drainChannel (key);
} catch (Exception e) {
System.out.println ("Caught '"
+ e + "' closing channel");
// Close channel and nudge selector
try {
key.channel().close( );
} catch (IOException ex) {
ex.printStackTrace( );
}
key.selector().wakeup( );
}
key = null;
// Done. Ready for more. Return to pool
this.pool.returnWorker (this);
}
}
/**
* Called to initiate a unit of work by this worker thread
* on the provided SelectionKey object. This method is
* synchronized, as is the run( ) method, so only one key
* can be serviced at a given time.
* Before waking the worker thread, and before returning
* to the main selection loop, this key's interest set is
* updated to remove OP_READ. This will cause the selector
* to ignore read-readiness for this channel while the
* worker thread is servicing it.
*/
synchronized void serviceChannel (SelectionKey key)
{
this.key = key;
key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
this.notify( ); // Awaken the thread
}
/**
* The actual code which drains the channel associated with
* the given key. This method assumes the key has been
* modified prior to invocation to turn off selection
* interest in OP_READ. When this method completes it
* re-enables OP_READ and calls wakeup( ) on the selector
* so the selector will resume watching this channel.
*/
void drainChannel (SelectionKey key)
throws Exception
{
SocketChannel channel = (SocketChannel) key.channel( );
int count;
buffer.clear( ); // Empty buffer
// Loop while data is available; channel is nonblocking
while ((count = channel.read (buffer)) > 0) {
buffer.flip( ); // make buffer readable
// Send the data; may not go all at once
while (buffer.hasRemaining( )) {
channel.write (buffer);
}
// WARNING: the above loop is evil.
// See comments in superclass.
buffer.clear( ); // Empty buffer
}
if (count < 0) {
// Close channel on EOF; invalidates the key
channel.close( );
return;
}
// Resume interest in OP_READ
key.interestOps (key.interestOps( ) | SelectionKey.OP_READ);
// Cycle the selector so this key is active again
key.selector().wakeup( );
}
}
}

书中解读如下:

由于执行选择过程的线程将重新循环并几乎立即再次调用select(),键的interest集合将被修改,并将 interest(感兴趣的操作)从读取就绪(read-rreadiness)状态中移除。这将防止选择器重复地调用 readDataFromSocket( )(因为通道仍然会准备好读取数据,直到工作线程从它那里读取数据)。当工作线程结束为通道提供的服务时,它将再次更新键的ready集合,来将interest重新放到读取就绪集合中。它也会在选择器上显式地调用wakeup()。如果主线程在select()中被阻塞,这将使它继续执行。这个选择循环会再次执行一个轮回(可能什么也没做)并带着被更新的键重新进入select()

个人觉得这个译文没有完全表达原文意思,导致很难理解,摘录原文如下,

Because the thread doing the selection will loop back and call select( ) again almost immediately, the interest set in the key is modified to remove interest in read-readiness. This prevents the selector from repeatedly invoking readDataFromSocket( ) (because the channel will remain ready to read until the worker thread can drain the data from it). When a worker thread has finished servicing the channel, it will again update the key’s interest set to reassert an interest in read-readiness. It also does an explicit wakeup( ) on the selector. If the main thread is blocked in select( ), this causes it to resume. The selection loop will then cycle (possibly doing nothing) and reenter select( ) with the updated key.

个人理解:首先要清楚drainChannel的过程需要一定时间,并且由于选择线程和响应socket的线程分开,选择线程并不会在处理socket这个步骤阻塞而是又进入了下一次选择,这就导致有可能下一次select操作时上一次响应socket的处理还未结束。在不将Read从interest集合移除的情况下:假如drainChannel还未完成,未将对应的Channel从已选择的集合中移除,那么在下一次select操作后这个Channel对应的key还会被selectKeys()返回,接着readDataFromSocket()还会被调用,即又启动了一个新的线程来处理之前处理到中途的socketChannel