您的浏览器过于古老 & 陈旧。为了更好的访问体验, 请 升级你的浏览器
Ready 发布于2013年08月17日 10:45

原创 Java nio入门教程详解(十七)

2597 次浏览 读完需要≈ 17 分钟

内容目录

3.2 Scatter/Gather

通道提供了一种被称为Scatter/Gather的重要新功能(有时也被称为矢量 I/O)。Scatter/Gather是一个简单却强大的概念(参见 1.4.1.1 节),它是指在多个缓冲区上实现一个简单的 I/O 操作。对于一个write操作而言,数据是从几个缓冲区按顺序抽取(称为gather)并沿着通道发送的。缓冲区本身并不需要具备这种 gather 的能力(通常它们也没有此能力)。该gather过程的效果就好比全部缓冲区的内容被连结起来,并在发送数据前存放到一个大的缓冲区中。对于 read 操作而言,从通道读取的数据会按顺序被散布(称为 scatter)到多个缓冲区,将每个缓冲区填满直至通道中的数据或者缓冲区的最大空间被消耗完。

大多数现代操作系统都支持本地矢量I/O(native vectored I/O)。当您在一个通道上请求一个Scatter/Gather操作时,该请求会被翻译为适当的本地调用来直接填充或抽取缓冲区。这是一个很大的进步,因为减少或避免了缓冲区拷贝和系统调用。Scatter/Gather 应该使用直接的 ByteBuffers 以从本地 I/O 获取最大性能优势。

将scatter/gather接口添加到图 3-3 的UML类图中可以得到图 3-4。下面的代码描述了scatter是如何扩展读操作的,以及gather是如何基于写操作构建的:

public interface ScatteringByteChannel extends ReadableByteChannel{
    public long read (ByteBuffer[] dsts) throws IOException;
    public long read (ByteBuffer[] dsts, int offset, int length) throws IOException;
}
public interface GatheringByteChannel extends WritableByteChannel{
    public long write(ByteBuffer[] srcs) throws IOException;
    public long write(ByteBuffer[] srcs, int offset, int length) throws IOException;
}

图 3-4 Scatter/Gather 接口

从上图您可以看到,这两个接口都添加了两种以缓冲区阵列作为参数的新方法。另外,每种方法都提供了一种带offset和length参数的形式。让我们先来理解一下怎样使用方法的简单形式。在下面的代码中,我们假定channel连接到一个有 48 字节数据等待读取的socket上:

ByteBuffer header = ByteBuffer.allocateDirect (10);
ByteBuffer body = ByteBuffer.allocateDirect (80);
ByteBuffer [] buffers = { header, body };
int bytesRead = channel.read (buffers);

一旦read()方法返回,bytesRead就被赋予值48,header缓冲区将包含前10个从通道读取的字节而body缓冲区则包含接下来的38个字节。通道会自动地将数据scatter到这两个缓冲区中。缓冲区已经被填充了(尽管此例中body缓冲区还有空间填充更多数据),那么将需要被flip以便其中数据可以被抽取。在类似这样的例子中,我们可能并不会费劲去flip这个header缓冲区而是以绝对get的方式随机访问它以检查各种header字段;不过body缓冲区会被flip并传递到另一个通道的write()方法上,然后在通道上发送出去。例如:

switch (header.getShort(0)) {
    case TYPE_PING:
    break;
    case TYPE_FILE:
        body.flip( );
        fileChannel.write (body);
    break;
    default:
        logUnknownPacket (header.getShort(0), header.getLong(2), body);
    break;
}

同样,很简单地,我们可以用一个gather操作将多个缓冲区的数据组合并发送出去。使用相同的缓冲区,我们可以像下面这样汇总数据并在一个 socket 通道上发送包:

body.clear();
body.put("FOO".getBytes()).flip(); // "FOO" as bytes
header.clear();
header.putShort (TYPE_FILE).putLong (body.limit()).flip();
long bytesWritten = channel.write(buffers);

以上代码从传递给 write()方法的 buffers 阵列所引用的缓冲区中 gather 数据,然后沿着通道发送了总共 13 个字节。 图 3-5 描述了一个gather写操作。数据从缓冲区阵列引用的每个缓冲区中 gather 并被组合成沿着通道发送的字节流。

图 3-5 一个使用了四个缓冲区的gather写操作

图 3-6 描述了一个scatter读操作。从通道传输来的数据被scatter到所列缓冲区,依次填充每个缓冲区(从缓冲区的 position 处开始到 limit 处结束)。这里显示的 position 和 limit 值是读操作开始之前的。

图 3-6 一个使用了四个缓冲区的 scatter 读操作

带offset和length参数版本的read()和write()方法使得我们可以使用缓冲区阵列的子集缓冲区。这里的 offset 值指哪个缓冲区将开始被使用,而不是指数据的offset。这里的length参数指示要使用的缓冲区数量。举个例子,假设我们有一个五元素的fiveBuffers阵列,它已经被初始化并引用了五个缓冲区,下面的代码将会写第二个、第三个和第四个缓冲区的内容:

int bytesRead = channel.write (fiveBuffers, 1, 3);

使用得当的话,Scatter/Gather会是一个极其强大的工具。它允许您委托操作系统来完成辛苦活:将读取到的数据分开存放到多个存储桶(bucket)或者将不同的数据区块合并成一个整体。这是一个巨大的成就,因为操作系统已经被高度优化来完成此类工作了。它节省了您来回移动数据的工作,也就避免了缓冲区拷贝和减少了您需要编写、调试的代码数量。既然您基本上通过提供数据容器引用来组合数据,那么按照不同的组合构建多个缓冲区阵列引用,各种数据区块就可以以不同的方式来组合了。例 3-2 很好地诠释了这一点:

/*
 *例 3-2 以 gather 写操作来集合多个缓冲区的数据
 */
package com.ronsoft.books.nio.channels;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.io.FileOutputStream;
import java.util.Random;
import java.util.List;
import java.util.LinkedList;
/**
* Demonstrate gathering write using many buffers.
* @author Ron Hitchens (ron@ronsoft.com)
*/
public class Marketing{
    private static final String DEMOGRAPHIC = "blahblah.txt";
    // "Leverage frictionless methodologies"
    public static void main (String [] argv) throws Exception {
        int reps = 10;
        if (argv.length > 0) {
            reps = Integer.parseInt (argv [0]);
        }
        FileOutputStream fos = new FileOutputStream (DEMOGRAPHIC);
        GatheringByteChannel gatherChannel = fos.getChannel( );
        // Generate some brilliant marcom, er, repurposed content
        ByteBuffer [] bs = utterBS (reps);
        // Deliver the message to the waiting market
        while (gatherChannel.write (bs) > 0) {
            // Empty body
            // Loop until write( ) returns zero
        }
        System.out.println ("Mindshare paradigms synergized to " + DEMOGRAPHIC);
        fos.close();
    }
    // ------------------------------------------------
    // These are just representative; add your own
    private static String [] col1 = {"Aggregate", "Enable", "Leverage", "Facilitate", "Synergize", "Repurpose", "Strategize", "Reinvent", "Harness"};
    private static String [] col2 = {"cross-platform", "best-of-breed", "frictionless", "ubiquitous", "extensible", "compelling",   "mission-critical", "collaborative", "integrated"};
    private static String [] col3 = {"methodologies", "infomediaries", "platforms", "schemas", "mindshare", "paradigms", "functionalities", "web services", "infrastructures"};
    private static String newline = System.getProperty ("line.separator");
    
    // The Marcom-atic 9000
    private static ByteBuffer [] utterBS (int howMany) throws Exception {
        List list = new LinkedList( );
        for (int i = 0; i < howMany; i++) {
            list.add (pickRandom (col1, " "));
            list.add (pickRandom (col2, " "));
            list.add (pickRandom (col3, newline));
        }
        ByteBuffer [] bufs = new ByteBuffer [list.size()];
        list.toArray (bufs);
        return (bufs);
    }
    // The communications director
    private static Random rand = new Random();
    // Pick one, make a buffer to hold it and the suffix, load it with
    // the byte equivalent of the strings (will not work properly for
    // non-Latin characters), then flip the loaded buffer so it's ready
    // to be drained
    
    private static ByteBuffer pickRandom (String [] strings, String suffix) throws Exception {
        String string = strings [rand.nextInt (strings.length)];
        int total = string.length() + suffix.length();
        ByteBuffer buf = ByteBuffer.allocate (total);
        buf.put (string.getBytes ("US-ASCII"));
        buf.put (suffix.getBytes ("US-ASCII"));
        buf.flip();
        return (buf);
    }
}

下面是实现 Marketing 类的输出。虽然这种输出没什么意义,但是gather写操作却能让我们非常高效地把它生成出来。

Aggregate compelling methodologies
Harness collaborative platforms
Aggregate integrated schemas
Aggregate frictionless platforms
Enable integrated platforms
Leverage cross-platform functionalities
Harness extensible paradigms
Synergize compelling infomediaries
Repurpose cross-platform mindshare
Facilitate cross-platform infomediaries

Java nio入门教程详解(十八)

  • CodePlayer技术交流群1
  • CodePlayer技术交流群2

0 条评论

撰写评论