内容目录
3.5.4 DatagramChannel
最后一个socket通道是DatagramChannel
。正如SocketChannel
对应Socket
,ServerSocketChannel
对应ServerSocket
,每一个DatagramChannel
对象也有一个关联的DatagramSocket
对象。不过原命名模式在此并未适用:“DatagramSocketChannel”显得有点笨拙,因此采用了简洁的“DatagramChannel”名称。
正如SocketChannel
模拟连接导向的流协议(如 TCP/IP),DatagramChannel
则模拟包导向的无连接协议(如 UDP/IP):
public abstract class DatagramChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel {
// 这里仅列出部分API
public static DatagramChannel open() throws IOException
public abstract DatagramSocket socket();
public abstract DatagramChannel connect(SocketAddress remote) throws IOException;
public abstract boolean isConnected();
public abstract DatagramChannel disconnect() throws IOException;
public abstract SocketAddress receive(ByteBuffer dst) throws IOException;
public abstract int send(ByteBuffer src, SocketAddress target)
public abstract int read(ByteBuffer dst) throws IOException;
public abstract long read(ByteBuffer[] dsts) throws IOException;
public abstract long read(ByteBuffer[] dsts, int offset, int length) throws IOException;
public abstract int write(ByteBuffer src) throws IOException;
public abstract long write(ByteBuffer[] srcs) throws IOException;
public abstract long write(ByteBuffer[] srcs, int offset, int length) throws IOException;
}
创建DatagramChannel
的模式和创建其他socket通道是一样的:调用静态的open()
方法来创建一个新实例。新DatagramChannel
会有一个可以通过调用socket()
方法获取的对等DatagramSocket
对象。DatagramChannel
对象既可以充当服务器(监听者)也可以充当客户端(发送者)。如果您希望新创建的通道负责监听,那么通道必须首先被绑定到一个端口或地址/端口组合上。绑定DatagramChannel
同绑定一个常规的DatagramSocket
没什么区别,都是委托对等socket对象上的API实现的:
DatagramChannel channel = DatagramChannel.open(); DatagramSocket socket = channel.socket(); socket.bind(new InetSocketAddress(portNumber));
DatagramChannel
是无连接的。每个数据报(datagram)都是一个自包含的实体,拥有它自己的目的地址及不依赖其他数据报的数据净荷。与面向流的的socket不同,DatagramChannel
可以发送单独的数据报给不同的目的地址。同样,DatagramChannel
对象也可以接收来自任意地址的数据包。每个到达的数据报都含有关于它来自何处的信息(源地址)。
一个未绑定的DatagramChannel
仍能接收数据包。当一个底层socket被创建时,一个动态生成的端口号就会分配给它。绑定行为要求通道关联的端口被设置为一个特定的值(此过程可能涉及安全检查或其他验证)。不论通道是否绑定,所有发送的包都含有DatagramChannel
的源地址(带端口号)。未绑定的DatagramChannel
可以接收发送给它的端口的包,通常是来回应该通道之前发出的一个包。已绑定的通道接收发送给它们所绑定的熟知端口(wellknown port)的包。数据的实际发送或接收是通过send()
和receive()
方法来实现的:
public abstract class DatagramChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel {
// 这里仅列出部分API
public abstract SocketAddress receive(ByteBuffer dst) throws IOException;
public abstract int send(ByteBuffer src, SocketAddress target)
}
receive()
方法将下次将传入的数据报的数据净荷复制到预备好的ByteBuffer
中并返回一个SocketAddress
对象以指出数据来源。如果通道处于阻塞模式,receive()
可能无限期地休眠直到有包到达。如果是非阻塞模式,当没有可接收的包时则会返回null
。如果包内的数据超出缓冲区能承受的范围,多出的数据都会被悄悄地丢弃。
假如您提供的ByteBuffer
没有足够的剩余空间来存放您正在接收的数据包,没有被填充的字节都会被悄悄地丢弃。
调用send()
会发送给定ByteBuffer
对象的内容到给定SocketAddress
对象所描述的目的地址和端口,内容范围为从当前position
开始到末尾处结束。如果DatagramChannel
对象处于阻塞模式,调用线程可能会休眠直到数据报被加入传输队列。如果通道是非阻塞的,返回值要么是字节缓冲区的字节数,要么是“0”。发送数据报是一个全有或全无(all-or-nothing)的行为。如果传输队列没有足够空间来承载整个数据报,那么什么内容都不会被发送。
如果安装了安全管理器,那么每次调用send()
或receive()
时安全管理器的checkConnect()
方法都会被调用以验证目的地址,除非通道处于已连接的状态(本节后面会讨论到)。
请注意,数据报协议的不可靠性是固有的,它们不对数据传输做保证。send()
方法返回的非零值并不表示数据报到达了目的地,仅代表数据报被成功加到本地网络层的传输队列。此外,传输过程中的协议可能将数据报分解成碎片。例如,以太网不能传输超过1,500个字节左右的包。如果您的数据报比较大,那么就会存在被分解成碎片的风险,成倍地增加了传输过程中包丢失的几率。被分解的数据报在目的地会被重新组合起来,接收者将看不到碎片。但是,如果有一个碎片不能按时到达,那么整个数据报将被丢弃。
DatagramChannel
有一个connect()
方法:
public abstract class DatagramChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel { // 这里仅列出部分API public abstract DatagramChannel connect(SocketAddress remote) throws IOException; public abstract boolean isConnected(); public abstract DatagramChannel disconnect() throws IOException; }
DatagramChannel
对数据报socket的连接语义不同于对流socket的连接语义。有时候,将数据报对话限制为两方是很可取的。将DatagramChannel
置于已连接的状态可以使除了它所“连接”到的地址之外的任何其他源地址的数据报被忽略。这是很有帮助的,因为不想要的包都已经被网络层丢弃了,从而避免了使用代码来接收、检查然后丢弃包的麻烦。
当DatagramChannel
已连接时,使用同样的令牌,您不可以发送包到除了指定给connect()
方法的目的地址以外的任何其他地址。试图一定要这样做的话会导致一个SecurityException
异常。
我们可以通过调用带SocketAddress对象的connect()
方法来连接一个DatagramChannel
,该SocketAddress
对象描述了DatagramChannel
远程对等体的地址。如果已经安装了一个安全管理器,那么它会进行权限检查。之后,每次send/receive时就不会再有安全检查了,因为来自或去到任何其他地址的包都是不允许的。
已连接通道会发挥作用的使用场景之一是一个客户端/服务器模式、使用UDP通讯协议的实时游戏。每个客户端都只和同一台服务器进行会话而希望忽视任何其他来源地数据包。将客户端的DatagramChannel
实例置于已连接状态可以减少按包计算的总开销(因为不需要对每个包进行安全检查)和剔除来自欺骗玩家的假包。服务器可能也想要这样做,不过需要每个客户端都有一个DatagramChannel
对象。
不同于流socket,数据报socket的无状态性质不需要同远程系统进行对话来建立连接状态。没有实际的连接,只有用来指定允许的远程地址的本地状态信息。由于此原因,DatagramChannel
上也就没有单独的finishConnect()
方法。我们可以使用isConnected()
方法来测试一个数据报通道的连接状态。
不同于SocketChannel
(必须连接了才有用并且只能连接一次),DatagramChannel
对象可以任意次数地进行连接或断开连接。每次连接都可以到一个不同的远程地址。调用disconnect()
方法可以配置通道,以便它能再次接收来自安全管理器(如果已安装)所允许的任意远程地址的数据或发送数据到这些地址上。
当一个DatagramChannel
处于已连接状态时,发送数据将不用提供目的地址而且接收时的源地址也是已知的。这意味着DatagramChannel
已连接时可以使用常规的read()
和write()
方法,包括scatter/gather形式的读写来组合或分拆包的数据:
public abstract class DatagramChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel {
// 这里仅列出部分API
public abstract int read(ByteBuffer dst) throws IOException;
public abstract long read(ByteBuffer[] dsts) throws IOException;
public abstract long read(ByteBuffer[] dsts, int offset, int length) throws IOException;
public abstract int write(ByteBuffer src) throws IOException;
public abstract long write(ByteBuffer[] srcs) throws IOException;
public abstract long write(ByteBuffer[] srcs, int offset, int length) throws IOException;
}
read()
方法返回读取字节的数量,如果通道处于非阻塞模式的话这个返回值可能是“0”。write()
方法的返回值同send()
方法一致:要么返回您的缓冲区中的字节数量,要么返回“0”(如果由于通道处于非阻塞模式而导致数据报不能被发送)。当通道不是已连接状态时调用read()
或write()
方法,都将产生NotYetConnectedException
异常。
数据报通道不同于流socket。由于它们的有序而可靠的数据传输特性,流socket非常得有用。大多数网络连接都是流 socket(TCP/IP 就是一个显著的例子)。但是,像 TCP/IP 这样面向流的的协议为了在包导向的互联网基础设施上维护流语义必然会产生巨大的开销,并且流隐喻不能适用所有的情形。数据报的吞吐量要比流协议高很多,并且数据报可以做很多流无法完成的事情。
下面列出了一些选择数据报socket而非流socket的理由:
- 您的程序可以承受数据丢失或无序的数据。
- 您希望“发射后不管”(fire and forget)而不需要知道您发送的包是否已接收。
- 数据吞吐量比可靠性更重要。
- 您需要同时发送数据给多个接受者(多播或者广播)。
- 包隐喻比流隐喻更适合手边的任务。
如果以上特征中的一个或多个适用于您的程序,那么数据报设计对您来说就是合适的。
例 3-9 显示了如何使用DatagramChannel
发送请求到多个地址上的时间服务器。DatagramChannel
接着会等待回复(reply)的到达。对于每个返回的回复,远程时间会同本地时间进行比较。由于数据报传输不保证一定成功,有些回复可能永远不会到达。大多数Linux和Unix系统都默认提供时间服务。互联网上也有一个公共时间服务器,如time.nist.gov。防火墙或者您的ISP可能会干扰数据报传输,这是因人而异的。
/*
*例 3-9 使用DatagramChannel的时间服务客户端
*/
package com.ronsoft.books.nio.channels;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.DatagramChannel;
import java.net.InetSocketAddress;
import java.util.Date;
import java.util.List;
import java.util.LinkedList;
import java.util.Iterator;
/**
* Request time service, per RFC 868. RFC 868
* (http://www.ietf.org/rfc/rfc0868.txt) is a very simple time protocol
* whereby one system can request the current time from another system.
* Most Linux, BSD and Solaris systems provide RFC 868 time service
* on port 37. This simple program will inter-operate with those.
* The National Institute of Standards and Technology (NIST) operates
* a public time server at time.nist.gov.
*
* The RFC 868 protocol specifies a 32 bit unsigned value be sent,
* representing the number of seconds since Jan 1, 1900. The Java
* epoch begins on Jan 1, 1970 (same as unix) so an adjustment is
* made by adding or subtracting 2,208,988,800 as appropriate. To
* avoid shifting and masking, a four-byte slice of an
* eight-byte buffer is used to send/recieve. But getLong()
* is done on the full eight bytes to get a long value.
*
* When run, this program will issue time requests to each hostname
* given on the command line, then enter a loop to receive packets.
* Note that some requests or replies may be lost, which means
* this code could block forever.
*
* @author Ron Hitchens (ron@ronsoft.com)
*/
public class TimeClient {
private static final int DEFAULT_TIME_PORT = 37;
private static final long DIFF_1900 = 2208988800L;
protected int port = DEFAULT_TIME_PORT;
protected List remoteHosts;
protected DatagramChannel channel;
public TimeClient (String [] argv) throws Exception
if (argv.length == 0) {
throw new Exception ("Usage: [ -p port ] host ...");
}
parseArgs(argv);
this.channel = DatagramChannel.open();
}
protected InetSocketAddress receivePacket (DatagramChannel channel, ByteBuffer buffer) throws Exception {
buffer.clear();
// Receive an unsigned 32-bit, big-endian value
return ((InetSocketAddress) channel.receive (buffer));
}
// Send time requests to all the supplied hosts
protected void sendRequests() throws Exception {
ByteBuffer buffer = ByteBuffer.allocate (1);
Iterator it = remoteHosts.iterator();
while (it.hasNext()) {
InetSocketAddress sa = (InetSocketAddress) it.next();
System.out.println ("Requesting time from " + sa.getHostName() + ":" + sa.getPort());
// Make it empty (see RFC868)
buffer.clear().flip();
// Fire and forget
channel.send (buffer, sa);
}
}
// Receive any replies that arrive
public void getReplies() throws Exception {
// Allocate a buffer to hold a long value
ByteBuffer longBuffer = ByteBuffer.allocate (8);
// Assure big-endian (network) byte order
longBuffer.order (ByteOrder.BIG_ENDIAN);
// Zero the whole buffer to be sure
longBuffer.putLong (0, 0);
// Position to first byte of the low-order 32 bits
longBuffer.position (4);
// Slice the buffer; gives view of the low-order 32 bits
ByteBuffer buffer = longBuffer.slice();
int expect = remoteHosts.size();
int replies = 0;
System.out.println ("");
System.out.println ("Waiting for replies...");
while (true) {
InetSocketAddress sa = receivePacket(channel, buffer);
buffer.flip();
replies++;
printTime(longBuffer.getLong(0), sa);
if (replies == expect) {
System.out.println ("All packets answered");
break;
}
// Some replies haven't shown up yet
System.out.println ("Received " + replies + " of " + expect + " replies");
}
}
// Print info about a received time reply
protected void printTime (long remote1900, InetSocketAddress sa) {
// local time as seconds since Jan 1, 1970
long local = System.currentTimeMillis() / 1000;
// remote time as seconds since Jan 1, 1970
long remote = remote1900 - DIFF_1900;
Date remoteDate = new Date (remote * 1000);
Date localDate = new Date (local * 1000);
long skew = remote - local;
System.out.println ("Reply from " + sa.getHostName() + ":" + sa.getPort());
System.out.println (" there: " + remoteDate);
System.out.println (" here: " + localDate);
System.out.print (" skew: ");
if (skew == 0) {
System.out.println ("none");
} else if (skew > 0) {
System.out.println (skew + " seconds ahead");
} else {
System.out.println ((-skew) + " seconds behind");
}
}
protected void parseArgs (String [] argv) {
remoteHosts = new LinkedList();
for (int i = 0; i < argv.length; i++) {
String arg = argv [i];
// Send client requests to the given port
if (arg.equals("-p")) {
i++;
this.port = Integer.parseInt (argv [i]);
continue;
}
// Create an address object for the hostname
InetSocketAddress sa = new InetSocketAddress(arg, port);
// Validate that it has an address
if (sa.getAddress() == null) {
System.out.println ("Cannot resolve address: " + arg);
continue;
}
remoteHosts.add(sa);
}
}
// --------------------------------------------------------------
public static void main (String [] argv) throws Exception {
TimeClient client = new TimeClient(argv);
client.sendRequests();
client.getReplies();
}
}
例 3-10 中的程序是一个RFC 868时间服务器。这段代码回答来自例 3-9 中的客户端的请求并显示出DatagramChannel
是怎样绑定到一个熟知端口然后开始监听来自客户端的请求的。该时间服务器仅监听数据报(UDP)请求。大多数Unix和Linux系统提供的rdate命令使用TCP协议连接到一个RFC 868时间服务。
/*
*例 3-10 DatagramChannel 时间服务器
*/
package com.ronsoft.books.nio.channels;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.DatagramChannel;
import java.net.SocketAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
/**
* Provide RFC 868 time service (http://www.ietf.org/rfc/rfc0868.txt).
* This code implements an RFC 868 listener to provide time
* service. The defined port for time service is 37. On most
* unix systems, root privilege is required to bind to ports
* below 1024. You can either run this code as root or
* provide another port number on the command line. Use
* "-p port#" with TimeClient if you choose an alternate port.
*
* Note: The familiar rdate command on unix will probably not work
* with this server. Most versions of rdate use TCP rather than UDP
* to request the time.
*
* @author Ron Hitchens (ron@ronsoft.com)
*/
public class TimeServer {
private static final int DEFAULT_TIME_PORT = 37;
private static final long DIFF_1900 = 2208988800L;
protected DatagramChannel channel;
public TimeServer (int port) throws Exception {
this.channel = DatagramChannel.open();
this.channel.socket().bind (new InetSocketAddress (port));
System.out.println ("Listening on port " + port + " for time requests");
}
public void listen() throws Exception {
// Allocate a buffer to hold a long value
ByteBuffer longBuffer = ByteBuffer.allocate (8);
// Assure big-endian (network) byte order
longBuffer.order (ByteOrder.BIG_ENDIAN);
// Zero the whole buffer to be sure
longBuffer.putLong (0, 0);
// Position to first byte of the low-order 32 bits
longBuffer.position (4);
// Slice the buffer; gives view of the low-order 32 bits
ByteBuffer buffer = longBuffer.slice();
while (true) {
buffer.clear();
SocketAddress sa = this.channel.receive (buffer);
if (sa == null) {
continue; // defensive programming
}
// Ignore content of received datagram per RFC 868
System.out.println ("Time request from " + sa);
buffer.clear(); // sets pos/limit correctly
// Set 64-bit value; slice buffer sees low 32 bits
longBuffer.putLong (0, (System.currentTimeMillis() / 1000) + DIFF_1900);
this.channel.send (buffer, sa);
}
}
// --------------------------------------------------------------
public static void main (String [] argv) throws Exception {
int port = DEFAULT_TIME_PORT;
if (argv.length > 0) {
port = Integer.parseInt (argv [0]);
}
try {
TimeServer server = new TimeServer (port);
server.listen();
} catch (SocketException e) {
System.out.println ("Can't bind to port " + port + ", try a different one");
}
}
}
0 条评论
撰写评论