流(Stream)
在Vert.x中有一些对象允许读取或写入数据。在以前的版本中,stream.adoc包是唯一操作Buffer的包。从现在开始,流将不与任何Buffer藕合并且可与其他类别的对象协同工作。
在Vert.x中,写调用会立既返回,内部对写进行队列排序。因此很难明白是向一个对象写快还是直接向底层资源写快,写队列可以无限扩充,直到内存耗尽。为了解决这个问题,在Vert.x API中的一些对象提供了一个简单的流控制能力。任何流控制的对象会被写入到实现WriteStream的流中,同样任何流控制对象都可以从实现了ReadStream的流中读取。下面是一个从ReadStream中读取数据然后将数据写到WriteStream的例子。一个简单的例子是从NetSocket中读取,然后写回同一个NetSocket,因为NetSocket同时实现了ReadStream和WriteStream。注意在ReadStream和WriteStream之间必须是兼容的对象,包括HTTP 请求,HTTP响应,异步文件I/O,WebSocket等。一个幼稚的做法是直接从NetSocket读取数据并立既写回NetSocket:
NetServer server = vertx.createNetServer(
newNetServerOptions().setPort(1234).setHost("localhost")
);
server.connectHandler(sock -> {
sock.handler(buffer -> {
//Write the data straight back
sock.write(buffer);
});
}).listen();
上在的例子有一个问题:如果从Socket中读取数据的速度快与写回socket的速度,数据将会在NetSocket的队列中累积,最终超出RAM的容量。这可能会发生,例如,如果另一端Socket的客户端不能读的足够快,将会给连接产生明显的背压。
因为NetSocket实现了WriteStream,我们在写之前,可以检查WriteStream是否满了。
NetServer server = vertx.createNetServer(
newNetServerOptions().setPort(1234).setHost("localhost")
);
server.connectHandler(sock -> {
sock.handler(buffer -> {
if(!sock.writeQueueFull()) {
sock.write(buffer);
}
});
}).listen();
这个例出不会内存溢出,在写队列满时,我们最终会丢失数据。真正想做的是在写队列满时,可以暂停NetSocket。
NetServer server = vertx.createNetServer(
newNetServerOptions().setPort(1234).setHost("localhost")
);
server.connectHandler(sock -> {
sock.handler(buffer -> {
sock.write(buffer);
if(sock.writeQueueFull()) {
sock.pause();
}
});
}).listen();
NetSocket现在在文件满时可以暂停,但是还需要在写队列处理无任务后从暂停中恢复过来。
NetServer server = vertx.createNetServer(
newNetServerOptions().setPort(1234).setHost("localhost")
);
server.connectHandler(sock -> {
sock.handler(buffer -> {
sock.write(buffer);
if(sock.writeQueueFull()) {
sock.pause();
sock.drainHandler(done -> {
sock.resume();
});
}
});
}).listen();
drainHandler事件处理器在写队列准备接收更多数据时将被调用,这将恢复NetSocket并允许更多数据 被读取。
在写Vert.x应用时这种做法很普遍,所以我们提供帮助类Pump,它为我们做了这些工作。你仅需要其提供ReadStream和WriteStream然后启动它。
NetServer server = vertx.createNetServer(
newNetServerOptions().setPort(1234).setHost("localhost")
);
server.connectHandler(sock -> {
Pump.pump(sock,sock).start();
}).listen();
这与上面复杂例子实际上做了同样的事。现在让我们详细研究一下ReadSteam和WriteStream上的方法:
ReadStream
HttpClientResponse, DatagramSocket,HttpClientRequest, HttpServerFileUpload, HttpServerRequest,HttpServerRequestStream, MessageConsumer, NetSocket, NetSocketStream,WebSocket, WebSocketStream, TimeoutStream, AsyncFile 这些类实现了ReadStream。
功能:
· handler:从ReadStream接收数据的处理器
· pause:暂停处理器。在暂停时,在处理器中不会有数据项被接收。
· resume:恢复处理器。如果一些数据项到达,处理器将被调用。
· exceptionHandler:如果在ReadStream上产生异常将会调用。
· endHandler:在流的到达尾部会被调用。当文件流到达尾(EOF)时会被调用,或者在请求到达尾部时(如果流是HTTP请求),或者连接被关闭时(如果是TCP socket)都会被调用。
WriteStream
HttpClientRequest, HttpServerResponseWebSocket, NetSocket, AsyncFile, PacketWritestream 和MessageProducer实现了WriteStream。
功能:
· write:向WriteStream写一个对象。这个方法从不会阻塞。写是内部队列化的并异步写到底层的资源。
· setWriteQueueMaxSize:设置写入队列认为满时候的对象数,达到此数时,方法writeQueueFull会返回true。注意这个,在写队列认为满的情况下,如果调用write方法,数据还将被接收并入队。实际的这个数字依赖于流的实现,对Buffer,这个大小代表了实际写入的字节数而不是缓冲器的个数。
· writeQueueFull:如果写队列认为满了,返回true。
· exceptionHandler:如果在WriteStream上出现异常,此方法将会被调用。
· drainHandler:如果WriteStream的写队列不再被认为满的时候,将请用此处下器。
Pump(泵接)
Pump接口有下列方法:
· start:开始泵接
· stop:停止泵接。在停止模式下,泵会打开。
· setWriteQueueMaxSize:这与WriteStream上的setWriteQueueMaxSize方法一个意思。
一个泵可以被打开和停止多次。在泵第一次创建时,它是停止的,需要调用start()方法打开它。
记录解析器
记录解析器器为解析固定大小的记录和顺序字节分隔的协义提供方便。记录解析器可以按照设定将输入的缓冲序列转换成结构化缓冲序列。(指固定大小或者被分隔的记录)。例如,如果有一个简单以‘\n’分隔的ASCII文本协议并输入象下面的一样:
buffer1:HELLO\nHOW ARE Y
buffer2:OU?\nI AM
buffer3: DOING OK
buffer4:\n
记录解析器会解析出:
buffer1:HELLO
buffer2:HOW ARE YOU?
buffer3:I AM DOING OK
让我研究一下相关的代码:
final RecordParser parser =RecordParser.newDelimited("\n", h -> {
System.out.println(h.toString());
});
parser.handle(Buffer.buffer("HELLO\nHOWARE Y"));
parser.handle(Buffer.buffer("OU?\nIAM"));
parser.handle(Buffer.buffer("DOINGOK"));
parser.handle(Buffer.buffer("\n"));
也可以象下面代码一产生固定大小的数据块
RecordParser.newFixed(4, h -> {
System.out.println(h.toString());
});
获取多细节,查看RecordParser类。
线程安全
从不同的线程访问Vert.x对象,大多数Vert.x对象是安全的。当然这些对象如果在创建它们的上文中访问,性能是最优的。例如,部署了一个Verticle,它创建了一个NetServer,此NetServer在其处理器中提供了一个NetSocket实例,最好总是从这个Verticle的事件循不中访问那个socket实例。如果坚持标准的Vert.x的verticle部署模式并避免在verticles之间共享对象,那就不必关心这个事情了。
度量SPI
默认Vert.x不记录任何度量信息。相返Vert.x为其他提代了一个SPI供实现,哪些可以被添加到类路径。此度量SPI是一个高级特性,实现它可从Vert.x中抓取事件并收集度量信息。关于这个的更多信息,可以查询API 文档。
如果是嵌入的Vert.x,使用setFactory方法,可以编程指定一个度量工厂。
OSGI
Vert.x内核被打包进一个OSGi bundle。所以可以在OSGiR4.2上的任何环境中使用,如Apache Felix,EclipseEquinox。这个Bundle导出为io.vertx.core*
然而,这个Bundle有一些对Jakson和Netty的依赖。为了获取vert.x内核Bundle需部署以下包:
· JacksonAnnotation [2.6.0,3)
· JacksonCore [2.6.2,3)
· JacksonDatabind [2.6.2,3)
· NettyBuffer [4.0.31,5)
· NettyCodec [4.0.31,5)
· Netty Codec/Socks[4.0.31,5)
· NettyCodec/Common [4.0.31,5)
· NettyCodec/Handler [4.0.31,5)
· NettyCodec/Transport [4.0.31,5)
这是一个在Apache Felix5.2.0上的工作部署:
14|Active | 1|Jackson-annotations (2.6.0)
15|Active | 1|Jackson-core (2.6.2)
16|Active | 1|jackson-databind (2.6.2)
18|Active | 1|Netty/Buffer (4.0.31.Final)
19|Active | 1|Netty/Codec (4.0.31.Final)
20|Active | 1|Netty/Codec/HTTP(4.0.31.Final)
21|Active | 1|Netty/Codec/Socks(4.0.31.Final)
22|Active | 1|Netty/Common (4.0.31.Final)
23|Active | 1|Netty/Handler(4.0.31.Final)
24|Active | 1|Netty/Transport(4.0.31.Final)
25|Active | 1|Netty/Transport/SCTP(4.0.31.Final)
26|Active | 1|Vert.x Core (3.1.0)
在Equinox上,可能需要用下面的框架属性(eclipse.bundle.setTCCL=false)禁用ContextFinder.