内容目录
4.3.3 管理选择键
既然我们已经理解了问题的各个部分是怎样结合在一起的,那么是时候看看它们在正常的使用中是如何交互的了。为了有效地利用选择器和键提供的信息,合理地管理键是非常重要的。
选择是累积的。一旦一个选择器将一个键添加到它的已选择的键的集合中,它就不会移除这个键。并且,一旦一个键处于已选择的键的集合中,这个键的ready集合将只会被设置,而不会被清理。乍一看,这好像会引起麻烦,因为选择操作可能无法表现出已注册的通道的正确状态。它提供了极大的灵活性,但把合理地管理键以确保它们表示的状态信息不会变得陈旧的任务交给了程序员。
合理地使用选择器的秘诀是理解选择器维护的选择键集合所扮演的角色。(参见 4.3.1 小节,特别是选择过程的第二步。)最重要的部分是当键已经不再在已选择的键的集合中时将会发生什么。当通道上的至少一个感兴趣的操作就绪时,键的ready集合就会被清空,并且当前已经就绪的操作将会被添加到ready集合中。该键之后将被添加到已选择的键的集合中。
清理一个SelectKey
的ready集合的方式是将这个键从已选择的键的集合中移除。选择键的就绪状态只有在选择器对象在选择操作过程中才会修改。处理思想是只有在已选择的键的集合中的键才被认为是包含了合法的就绪信息的。这些信息将在键中长久地存在,直到键从已选择的键的集合中移除,以通知选择器您已经看到并对它进行了处理。如果下一次通道的一些感兴趣的操作发生时,键将被重新设置以反映当时通道的状态并再次被添加到已选择的键的集合中。
这种框架提供了很多灵活性。通常的做法是在选择器上调用一次select操作(这将更新已选择的键的集合),然后遍历selectKeys()
方法返回的键的集合。在按顺序进行检查每个键的过程中,相关的通道也根据键的就绪集合进行处理。然后键将从已选择的键的集合中被移除(通过在Iterator
对象上调用remove()
方法),然后检查下一个键。完成后,通过再次调用select()
方法重复这个循环。例 4-1 中的代码是典型的服务器的例子。
/*
*例 4-1. 使用 select()来为多个通道提供服务
*/
package com.ronsoft.books.nio.channels;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.Selector;
import java.nio.channels.SelectionKey;
import java.nio.channels.SelectableChannel;
import java.net.Socket;
import java.net.ServerSocket;
import java.net.InetSocketAddress;
import java.util.Iterator;
/**
* Simple echo-back server which listens for incoming stream connections and
* echoes back whatever it reads. A single Selector object is used to listen to
* the server socket (to accept new connections) and all the active socket
* channels.
*
* @author Ron Hitchens (ron@ronsoft.com)
*/
public class SelectSockets {
public static int PORT_NUMBER = 1234;
public static void main(String[] argv) throws Exception {
new SelectSockets().go(argv);
}
public void go(String[] argv) throws Exception {
int port = PORT_NUMBER;
if (argv.length > 0) { // Override default listen port
port = Integer.parseInt(argv[0]);
}
System.out.println("Listening on port " + port);
// Allocate an unbound server socket channel
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// Get the associated ServerSocket to bind it with
ServerSocket serverSocket = serverChannel.socket();
// Create a new Selector for use below
Selector selector = Selector.open();
// Set the port the server channel will listen to
serverSocket.bind(new InetSocketAddress(port));
// Set nonblocking mode for the listening socket
serverChannel.configureBlocking(false);
// Register the ServerSocketChannel with the Selector
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// This may block for a long time. Upon returning, the
// selected set contains keys of the ready channels.
int n = selector.select();
if (n == 0) {
continue; // nothing to do
}
// Get an iterator over the set of selected keys
Iterator it = selector.selectedKeys().iterator();
// Look at each key in the selected set
while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
// Is a new connection coming in?
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
registerChannel(selector, channel, SelectionKey.OP_READ);
sayHello(channel);
}
// Is there data to read on this channel?
if (key.isReadable()) {
readDataFromSocket(key);
}
// Remove key from selected set; it's been handled
it.remove();
}
}
}
// ----------------------------------------------------------
/**
* Register the given channel with the given selector for the given
* operations of interest
*/
protected void registerChannel(Selector selector,
SelectableChannel channel, int ops) throws Exception {
if (channel == null) {
return; // could happen
}
// Set the new channel nonblocking
channel.configureBlocking(false);
// Register it with the selector
channel.register(selector, ops);
}
// ----------------------------------------------------------
// Use the same byte buffer for all channels. A single thread is
// servicing all the channels, so no danger of concurrent acccess.
private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
/**
* Sample data handler method for a channel with data ready to read.
*
* @param key
* A SelectionKey object associated with a channel determined by
* the selector to be ready for reading. If the channel returns
* an EOF condition, it is closed here, which automatically
* invalidates the associated key. The selector will then
* de-register the channel on the next select call.
*/
protected void readDataFromSocket(SelectionKey key) throws Exception {
SocketChannel socketChannel = (SocketChannel) key.channel();
int count;
buffer.clear(); // Empty buffer
// Loop while data is available; channel is nonblocking
while ((count = socketChannel.read(buffer)) > 0) {
buffer.flip(); // Make buffer readable
// Send the data; don't assume it goes all at once
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
// WARNING: the above loop is evil. Because
// it's writing back to the same nonblocking
// channel it read the data from, this code can
// potentially spin in a busy loop. In real life
// you'd do something more useful than this.
buffer.clear(); // Empty buffer
}
if (count < 0) {
// Close channel on EOF, invalidates the key
socketChannel.close();
}
}
// ----------------------------------------------------------
/**
* Spew a greeting to the incoming client connection.
*
* @param channel
* The newly connected SocketChannel to say hello to.
*/
private void sayHello(SocketChannel channel) throws Exception {
buffer.clear();
buffer.put("Hi there!\r\n".getBytes());
buffer.flip();
channel.write(buffer);
}
}
例 4-1 实现了一个简单的服务器。它创建了ServerSocketChannel
和Selector
对象,并将通道注册到选择器上。我们不在注册的键中保存服务器socket的引用,因为它永远不会被注销。这个无限循环在最上面先调用了select()
,这可能会无限期地阻塞。当选择结束时,就遍历选择键并检查已经就绪的通道。
如果一个键指示与它相关的通道已经准备好执行一个accecpt()
操作,我们就通过键获取关联的通道,并将它转换为SeverSocketChannel
对象。我们都知道这么做是安全的,因为只有ServerSocketChannel
支持OP_ACCEPT
操作。我们也知道我们的代码只把对一个单一的ServerSocketChannel
对象的OP_ACCEPT
操作进行了注册。通过对服务器socket通道的引用,我们调用了它的accept()
方法,来获取刚到达的socket的句柄。返回的对象的类型是SocketChannel
,也是一个可选择的通道类型。这时,与创建一个新线程来从新的连接中读取数据不同,我们只是简单地将socket同多注册到选择器上。我们通过传入OP_READ
标记,告诉选择器我们关心新的socket通道什么时候可以准备好读取数据。
如果键指示通道还没有准备好执行accept()
,我们就检查它是否准备好执行read()
。任何一个这么指示的socket通道一定是之前ServerSocketChannel
创建的SocketChannel
对象之一,并且被注册为只对读操作感兴趣。对于每个有数据需要读取的socket通道,我们调用一个公共的方法来读取并处理这个带有数据的socket。需要注意的是这个公共方法需要准备好以非阻塞的方式处理socket上的不完整的数据。它需要迅速地返回,以其他带有后续输入的通道能够及时地得到处理。例 4-1 中只是简单地对数据进行响应,将数据写回socket,传回给发送者。
在循环的底部,我们通过调用Iterator
(迭代器)对象的remove()
方法,将键从已选择的键的集合中移除。键可以直接从selectKeys()
返回的Set
中移除,但同时需要用Iterator
来检查集合,您需要使用迭代器的remove()
方法来避免破坏迭代器内部的状态。
0 条评论
撰写评论