Vert.x 3学习笔记---03

时间:2021-11-14 18:01:54

The Event Bus

概述

本文介绍event bus。
The event bus is the nervous system of Vert.x.
每一个Vertx实例都只有一个event bus实例,可以使用eventBus方法获取。event bus容许你的应用程序的各个部分(相同或者不同语言编写的,相同或者不同的vertx或者vertx的其他部分)进行通讯。
The event bus allows different parts of your application to communicate with each other irrespective of what language they are written in, and whether they’re in the same Vert.x instance, or in a different Vert.x instance.
它甚至允许客户端JavaScript(浏览器中)去通讯,只要在同一个event bus中。
It can even be bridged to allow client side JavaScript running in a browser to communicate on the same event bus.

The event bus forms a distributed peer-to-peer messaging system spanning multiple server nodes and multiple browsers.

The event bus supports publish/subscribe, point to point, and request-response messaging.

The event bus API is very simple. It basically involves registering handlers, unregistering handlers and sending and publishing messages.

The Theory 理论知识

Addressing

在event bus上,消息被发送到一个地址。
在vertx中,一个地址就是一个字符串。任何字符串都是合法的。例如:europe.news.feed1, acme.games.pacman, sausages, and X。

Handlers

在handler中消息被接收。你在一个地址上面注册一个handler。
不同的handler可以注册在同一个地址上。
一个handler可以注册在不同的地址上。

Publish / subscribe messaging

event bus支持发布/订阅消息。

Point to point and Request-Response messaging

event bus支持点对点和请求/响应传递消息。

Best-effort delivery 尽力投递

vertx内置的message传递的类型为原始/简单的类型,字符串,buffer。最佳实践类型格式为json。
当然我们并不强制你必须使用json。event bus也是可以扩展的,可以支持任意的对象,只要你为这些对象实现了codec

The Event Bus API

Getting the event bus

EventBus eb = vertx.eventBus();

There is a single instance of the event bus per Vert.x instance.
每一个vertx实例都有一个单实例的event bus对象。

Registering Handlers

EventBus eb = vertx.eventBus();

eb.consumer("news.uk.sport", message -> {
System.out.println("I have received a message: " + message.body());
});

如上面的代码,这是注册handler的一个简单的方法,通过consumer方法。调用consumer()后返回的是一个MessageConsumer对象。这个对象经常用在unregister the handler(卸载), or use the handler as a stream(流).

EventBus eb = vertx.eventBus();

MessageConsumer<String> consumer = eb.consumer("news.uk.sport");
consumer.handler(message -> {
System.out.println("I have received a message: " + message.body());
});

当然我们也可以按如上代码使用这个对象。先返回一个没有handler集合的MessageConsumer对象,然后再设置相应的handler。

当我们在一个集群化的event bus上面注册一个handler时,所有的节点都完成,可能需要花费一定的时间。如果我们希望能够在handler注册完成时得到通知,我们可以注册一个completion handler:

consumer.completionHandler(res -> {
if (res.succeeded()) {
System.out.println("The handler registration has reached all nodes");
} else {
System.out.println("Registration failed!");
}
});

Un-registering(卸载) Handlers

卸载调用 unregister方法。如果是集群化的event bus上需要监听是否完成的话:

consumer.unregister(res -> {
if (res.succeeded()) {
System.out.println("The handler un-registration has reached all nodes");
} else {
System.out.println("Un-registration failed!");
}
});

Publishing messages

发布一条消息是很简单的。可以通过调用publish方法来实现:

eventBus.publish("news.uk.sport", "Yay! Someone kicked a ball");

Sending messages

send message是点对点的发送模式。它只发送到一个handler上。这个handler的选择是非严格的循环方式。

eventBus.send("news.uk.sport", "Yay! Someone kicked a ball");

Setting headers on messages

设置消息的头部
在sending or publishing消息时,这是通过DeliveryOptions类来完成的。

DeliveryOptions options = new DeliveryOptions();
options.addHeader("some-header", "some-value");
eventBus.send("news.uk.sport", "Yay! Someone kicked a ball", options);

Message ordering

vertx将传递给任何特定的处理程序的消息,以发件人发出顺序。

The Message object

消息对象就是message实例,body就是我们发送的内容,headers就是我们设置的消息的头部。

Acknowledging messages / sending replies

确认消息/发送回复

有些情况下,确认收到一个消息的通知是很有用的。这样的话,message需要调用reply方法。

The receiver:

MessageConsumer<String> consumer = eventBus.consumer("news.uk.sport");
consumer.handler(message -> {
System.out.println("I have received a message: " + message.body());
message.reply("how interesting!");
});

The sender:

eventBus.send("news.uk.sport", "Yay! Someone kicked a ball across a patch of grass", ar -> {
if (ar.succeeded()) {
System.out.println("Received reply: " + ar.result().body());
}
});

reply可以包含一个有用的内容。
Some examples:

A simple message consumer which implements a service which returns the time of the day would acknowledge with a message containing the time of day in the reply body

A message consumer which implements a persistent queue, might acknowledge with true if the message was successfully persisted in storage, or false if not.

A message consumer which processes an order might acknowledge with true when the order has been successfully processed so it can be deleted from the database

Sending with timeouts

当发送一个message时,如果超过一定的时间(DeliveryOptions中定义时间)还没有收到reply,我们可以通过reply handler来接收,一个错误的结果。

Send Failures

消息发送会失败,如果失败的话,我们的reply handler会接收到一个错误的返回结果。

Message Codecs

上面提到event bus可以发送任意的对象,只要我们实现这个对象的codec。

eventBus.registerCodec(myCodec);

DeliveryOptions options = new DeliveryOptions().setCodecName(myCodec.name());

eventBus.send("orders", new MyPOJO(), options);//通过options设置,只对这一次发送管用。

如果你发送的消息始终是这个类型的,可以如下:

eventBus.registerDefaultCodec(MyPOJO.class, myCodec);

eventBus.send("orders", new MyPOJO());

取消这个消息编码,用unregisterCodec方法。

Clustered Event Bus

The event bus doesn’t just exist in a single Vert.x instance. By clustering different Vert.x instances together on your network they can form a single, distributed, event bus.

Clustering programmatically

If you’re creating your Vert.x instance programmatically you get a clustered event bus by configuring the Vert.x instance as clustered;

VertxOptions options = new VertxOptions();
Vertx.clusteredVertx(options, res -> {
if (res.succeeded()) {
Vertx vertx = res.result();
EventBus eventBus = vertx.eventBus();
System.out.println("We now have a clustered event bus: " + eventBus);
} else {
System.out.println("Failed: " + res.cause());
}
});

You should also make sure you have a ClusterManager implementation on your classpath, for example the default HazelcastClusterManager.

Clustering on the command line

You can run Vert.x clustered on the command line with

vertx run my-verticle.js -cluster

Automatic clean-up in verticles

If you’re registering event bus handlers from inside verticles, those handlers will be automatically unregistered when the verticle is undeployed.
卸载verticle时,注册在其里面的handler都会被自动的卸载。