内容目录
3.6 管道
java.nio.channels
包中含有一个名为Pipe
(管道)的类。广义上讲,管道就是一个用来在两个实体之间单向传输数据的导管。管道的概念对于 Unix(和类 Unix)操作系统的用户来说早就很熟悉了。Unix 系统中,管道被用来连接一个进程的输出和另一个进程的输入。Pipe
类实现一个管道范例,不过它所创建的管道是进程内(在Java虚拟机进程内部)而非进程间使用的。参见图 3-10。
Pipe
类创建一对提供环回机制的Channel
对象。这两个通道的远端是连接起来的,以便任何写在SinkChannel
对象上的数据都能出现在SourceChannel
对象上。图 3-11 显示了Pipe
的类层级。
package java.nio.channels; public abstract class Pipe { public static Pipe open() throws IOException public abstract SourceChannel source(); public abstract SinkChannel sink(); public static abstract class SourceChannel extends AbstractSelectableChannel implements ReadableByteChannel, ScatteringByteChannel public static abstract class SinkChannel extends AbstractSelectableChannel implements WritableByteChannel, GatheringByteChannel }
Pipe
实例是通过调用不带参数的Pipe.open()
工厂方法来创建的。Pipe
类定义了两个嵌套的通道类来实现管路。这两个类是 Pipe.SourceChannel
(管道负责读的一端)和 Pipe.SinkChannel
(管道负责写的一端)。这两个通道实例是在Pipe
对象创建的同时被创建的,可以通过在 Pipe
对象上分别调用source()
和sink()
方法来取回。
此时,您可能在想管道到底有什么作用。您不能使用Pipe
在操作系统级的进程间建立一个类Unix 管道(您可以使用SocketChannel
来建立)。Pipe
的source通道和sink通道提供类似java.io.PipedInputStream
和java.io.PipedOutputStream
所提供的功能,不过它们可以执行全部的通道语义。请注意,SinkChannel
和SourceChannel
都由AbstractSelectableChannel
引申而来(所以也是从SelectableChannel
引申而来),这意味着pipe通道可以同选择器一起使用(参见第四章)。
管道可以被用来仅在同一个Java虚拟机内部传输数据。虽然有更加有效率的方式来在线程之间传输数据,但是使用管道的好处在于封装性。生产者线程和用户线程都能被写道通用的Channel API中。根据给定的通道类型,相同的代码可以被用来写数据到一个文件、socket 或管道。选择器可以被用来检查管道上的数据可用性,如同在socket通道上使用那样地简单。这样就可以允许单个用户线程使用一个Selector
来从多个通道有效地收集数据,并可任意结合网络连接或本地工作线程使用。因此,这些对于可伸缩性、冗余度以及可复用性来说无疑都是意义重大的。
Pipes的另一个有用之处是可以用来辅助测试。一个单元测试框架可以将某个待测试的类连接到管道的“写”端并检查管道的“读”端出来的数据。它也可以将被测试的类置于通道的“读”端并将受控的测试数据写进其中。两种场景对于回归测试都是很有帮助的。
管路所能承载的数据量是依赖实现的(implementation-dependent)。唯一可保证的是写到SinkChannel
中的字节都能按照同样的顺序在SourceChannel
上重现。例 3-11 诠释了如何使用管道。
/*
*例 3-11 工作线程对一个管道进行写操作
*/
package com.ronsoft.books.nio.channels;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.channels.Pipe;
import java.nio.channels.Channels;
import java.util.Random;
/**
* Test Pipe objects using a worker thread.
*
* Created April, 2002
* @author Ron Hitchens (ron@ronsoft.com)
*/
public class PipeTest {
public static void main (String [] argv) throws Exception {
// Wrap a channel around stdout
WritableByteChannel out = Channels.newChannel(System.out);
// Start worker and get read end of channel
ReadableByteChannel workerChannel = startWorker(10);
ByteBuffer buffer = ByteBuffer.allocate(100);
while (workerChannel.read(buffer) >= 0) {
buffer.flip();
out.write(buffer);
buffer.clear();
}
}
// This method could return a SocketChannel or
// FileChannel instance just as easily
private static ReadableByteChannel startWorker (int reps) throws Exception {
Pipe pipe = Pipe.open();
Worker worker = new Worker(pipe.sink(), reps);
worker.start();
return (pipe.source());
}
// -----------------------------------------------------------------
/**
* A worker thread object which writes data down a channel.
* Note: this object knows nothing about Pipe, uses only a
* generic WritableByteChannel.
*/
private static class Worker extends Thread {
WritableByteChannel channel;
private int reps;
Worker(WritableByteChannel channel, int reps) {
this.channel = channel;
this.reps = reps;
}
// Thread execution begins here
public void run() {
ByteBuffer buffer = ByteBuffer.allocate(100);
try {
for (int i = 0; i < this.reps; i++) {
doSomeWork (buffer);
// channel may not take it all at once
while (channel.write(buffer) > 0) {
// empty
}
}
this.channel.close();
} catch (Exception e) {
// easy way out; this is demo code
e.printStackTrace();
}
}
private String [] products = {"No good deed goes unpunished", "To be, or what?", "No matter where you go, there you are", "Just say \"Yo\"", "My karma ran over my dogma"};
private Random rand = new Random();
private void doSomeWork (ByteBuffer buffer) {
int product = rand.nextInt(products.length);
buffer.clear();
buffer.put(products[product].getBytes());
buffer.put("\r\n".getBytes());
buffer.flip();
}
}
}
0 条评论
撰写评论