内容目录
4.5 选择过程的可扩展性
我多次提到选择器可以简化用单线程同时管理多个可选择通道的实现。使用一个线程来为多个通道提供服务,通过消除管理各个线程的额外开销,可能会降低复杂性并可能大幅提升性能。但只使用一个线程来服务所有可选择的通道是否是一个好主意呢?这要看情况。
对单CPU的系统而言这可能是一个好主意,因为在任何情况下都只有一个线程能够运行。通过消除在线程之间进行上下文切换带来的额外开销,总吞吐量可以得到提高。但对于一个多CPU的系统呢?在一个有n个CPU的系统上,当一个单一的线程线性地轮流处理每一个线程时,可能有n-1个cpu处于空闲状态。
那么让不同道请求不同的服务类的办法如何?想象一下,如果一个应用程序为大量的分布式的传感器记录信息。每个传感器在服务线程遍历每个就绪的通道时需要等待数秒钟。这在响应时间不重要时是可以的。但对于高优先级的连接(如操作命令),如果只用一个线程为所有通道提供服务,将不得不在队列中等待。不同的应用程序的要求也是不同的。您采用的策略会受到您尝试解决的问题的影响。
在第一个场景中,如果您想要将更多的线程来为通道提供服务,请抵抗住使用多个选择器的欲望。在大量通道上执行就绪选择并不会有很大的开销,大多数工作是由底层操作系统完成的。管理多个选择器并随机地将通道分派给它们当中的一个并不是这个问题的合理的解决方案。这只会形成这个场景的一个更小的版本。
一个更好的策略是对所有的可选择通道使用一个选择器,并将对就绪通道的服务委托给其他线程。您只用一个线程监控通道的就绪状态并使用一个协调好的工作线程池来处理共接收到的数据。根据部署的条件,线程池的大小是可以调整的(或者它自己进行动态的调整)。对可选择通道的管理仍然是简单的,而简单的就是好的。
第二个场景中,某些通道要求比其他通道更高的响应速度,可以通过使用两个选择器来解决:一个为命令连接服务,另一个为普通连接服务。但这种场景也可以使用与第一个场景十分相似的办法来解决。与将所有准备好的通道放到同一个线程池的做法不同,通道可以根据功能由不同的工作线程来处理。它们可能可以是日志线程池,命令/控制线程池,状态请求线程池,等等。
例4-2的代码是例4-1的一般性的选择循环的扩展。它覆写了readDataFromSocket()
方法,并使用线程池来为准备好数据用于读取的通道提供服务。与在主线程中同步地读取数据不同,这个版本的实现将SelectionKey
对象传递给为其服务的工作线程。
/*
*例4-2.使用线程池来为通道提供服务
*/
package com.ronsoft.books.nio.channels;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.channels.SelectionKey;
import java.util.List;
import java.util.LinkedList;
import java.io.IOException;
/**
* Specialization of the SelectSockets class which uses a thread pool to service
* channels. The thread pool is an ad-hoc implementation quicky lashed togther
* in a few hours for demonstration purposes. It's definitely not production
* quality.
*
* @author Ron Hitchens (ron@ronsoft.com)
*/
public class SelectSocketsThreadPool extends SelectSockets {
private static final int MAX_THREADS = 5;
private ThreadPool pool = new ThreadPool(MAX_THREADS);
public static void main(String[] argv) throws Exception {
new SelectSocketsThreadPool().go(argv);
}
// -------------------------------------------------------------
/**
* Sample data handler method for a channel with data ready to read. This
* method is invoked from the go() method in the parent class. This handler
* delegates to a worker thread in a thread pool to service the channel,
* then returns immediately.
*
* @param key
* A SelectionKey object representing a channel determined by the
* selector to be ready for reading. If the channel returns an
* EOF condition, it is closed here, which automatically
* invalidates the associated key. The selector will then
* de-register the channel on the next select call.
*/
protected void readDataFromSocket(SelectionKey key) throws Exception {
WorkerThread worker = pool.getWorker();
if (worker == null) {
// No threads available. Do nothing. The selection
// loop will keep calling this method until a
// thread becomes available. This design could
// be improved.
return;
}
// Invoking this wakes up the worker thread, then returns
worker.serviceChannel(key);
}
// ---------------------------------------------------------------
/**
* A very simple thread pool class. The pool size is set at construction
* time and remains fixed. Threads are cycled through a FIFO idle queue.
*/
private class ThreadPool {
List idle = new LinkedList();
ThreadPool(int poolSize) {
// Fill up the pool with worker threads
for (int i = 0; i < poolSize; i++) {
WorkerThread thread = new WorkerThread(this);
// Set thread name for debugging. Start it.
thread.setName("Worker" + (i + 1));
thread.start();
idle.add(thread);
}
}
/**
* Find an idle worker thread, if any. Could return null.
*/
WorkerThread getWorker() {
WorkerThread worker = null;
synchronized (idle) {
if (idle.size() > 0) {
worker = (WorkerThread) idle.remove(0);
}
}
return worker;
}
/**
* Called by the worker thread to return itself to the idle pool.
*/
void returnWorker(WorkerThread worker) {
synchronized (idle) {
idle.add(worker);
}
}
}
/**
* A worker thread class which can drain channels and echo-back the input.
* Each instance is constructed with a reference to the owning thread pool
* object. When started, the thread loops forever waiting to be awakened to
* service the channel associated with a SelectionKey object. The worker is
* tasked by calling its serviceChannel() method with a SelectionKey
* object. The serviceChannel() method stores the key reference in the
* thread object then calls notify() to wake it up. When the channel has
147
* been drained, the worker thread returns itself to its parent pool.
*/
private class WorkerThread extends Thread {
private ByteBuffer buffer = ByteBuffer.allocate(1024);
private ThreadPool pool;
private SelectionKey key;
WorkerThread(ThreadPool pool) {
this.pool = pool;
}
// Loop forever waiting for work to do
public synchronized void run() {
System.out.println(this.getName() + " is ready");
while (true) {
try {
// Sleep and release object lock
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
// Clear interrupt status
this.interrupted();
}
if (key == null) {
continue; // just in case
}
System.out.println(this.getName() + " has been awakened");
try {
drainChannel(key);
} catch (Exception e) {
System.out.println("Caught '" + e + "' closing channel");
// Close channel and nudge selector
try {
key.channel().close();
} catch (IOException ex) {
ex.printStackTrace();
}
key.selector().wakeup();
}
key = null;
// Done. Ready for more. Return to pool
this.pool.returnWorker(this);
}
}
/**
* Called to initiate a unit of work by this worker thread on the
* provided SelectionKey object. This method is synchronized, as is the
* run() method, so only one key can be serviced at a given time.
* Before waking the worker thread, and before returning to the main
* selection loop, this key's interest set is updated to remove OP_READ.
* This will cause the selector to ignore read-readiness for this
* channel while the worker thread is servicing it.
*/
synchronized void serviceChannel(SelectionKey key) {
this.key = key;
key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
this.notify(); // Awaken the thread
}
/**
* The actual code which drains the channel associated with the given
* key. This method assumes the key has been modified prior to
* invocation to turn off selection interest in OP_READ. When this
* method completes it re-enables OP_READ and calls wakeup( ) on the
* selector so the selector will resume watching this channel.
*/
void drainChannel(SelectionKey key) throws Exception {
SocketChannel channel = (SocketChannel) key.channel();
int count;
buffer.clear(); // Empty buffer
// Loop while data is available; channel is nonblocking
while ((count = channel.read(buffer)) > 0) {
buffer.flip(); // make buffer readable
// Send the data; may not go all at once
while (buffer.hasRemaining()) {
channel.write(buffer);
}
// WARNING: the above loop is evil.
// See comments in superclass.
buffer.clear(); // Empty buffer
}
if (count < 0) {
// Close channel on EOF; invalidates the key
channel.close();
return;
}
// Resume interest in OP_READ
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
// Cycle the selector so this key is active again
key.selector().wakeup();
}
}
}
由于执行选择过程的线程将重新循环并几乎立即再次调用select()
,键的interest集合将被修改,并将interest(感兴趣的操作)从读取就绪(read-rreadiness)状态中移除。这将防止选择器重复地调用readDataFromSocket()
(因为通道仍然会准备好读取数据,直到工作线程从它那里读取数据)。当工作线程结束为通道提供的服务时,它将再次更新键的ready集合,来将interest重新放到读取就绪集合中。它也会在选择器上显式地嗲用wakeup()
。如果主线程在select()
中被阻塞,这将使它继续执行。这个选择循环会再次执行一个轮回(可能什么也没做)并带着被更新的键重新进入select()
。
0 条评论
撰写评论