定义消息实体:
package com.vertxzhuss.example.eventbus; /** * 自定义的消息类型 * @author zhu shunshan * start */ public class Message { private String type; private Object body; public String getType() { return type; } public void setType(String type) { this.type = type; } public Object getBody() { return body; } public void setBody(Object body) { this.body = body; } }
定义消息编解码器:
package com.vertxzhuss.example.eventbus; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.MessageCodec; public class MsgCodec implements MessageCodec<Message, Message> { /** * 将消息实体封装到Buffer用于传输 * * 实现方式: * 使用对象流从对象中获取Byte数组然后追加到Buffer */ @Override public void encodeToWire(Buffer buffer, Message s) { final ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o; try { o = new ObjectOutputStream(b); o.writeObject(s); o.close(); buffer.appendBytes(b.toByteArray()); } catch (IOException e) { e.printStackTrace(); } } /** * 从buffer中获取传输的消息实体 */ @Override public Message decodeFromWire(int pos, Buffer buffer) { final ByteArrayInputStream b = new ByteArrayInputStream(buffer.getBytes()); ObjectInputStream o = null; Message msg = null; try { o = new ObjectInputStream(b); msg = (Message) o.readObject(); } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); } return msg; } /** * 如果是本地消息则直接返回 */ @Override public Message transform(Message s) { return s; } /** * 编解码器的名称: * 必须唯一,用于发送消息时识别编解码器,以及取消编解码器 */ @Override public String name() { return "UMsgCodec"; } /** * 用于识别是否是用户编码器 * 自定义编解码器通常使用-1 */ @Override public byte systemCodecID() { return -1; } }
定义发送Verticle:
package com.vertxzhuss.example.eventbus; import io.vertx.core.AbstractVerticle; import io.vertx.core.DeploymentOptions; import io.vertx.core.Vertx; /** * 发送消息的EventBus * @author zhu shunshan * * start */ public class MainVerticle extends AbstractVerticle { public static void main(String[] args) { DeploymentOptions deployment = new DeploymentOptions(); deployment.setWorker(true); try { Vertx.vertx().deployVerticle(MainVerticle.class.getName(), deployment); } catch (Throwable t) { t.printStackTrace(); } } @Override public void start() throws Exception { vertx.deployVerticle(WorkVerticle.class.getName(), new DeploymentOptions().setWorker(true)); //注册一个编解码器到eventbus:主要用于实体传输时的编解码。 // vertx.eventBus().registerCodec(codec) //注册一个默认的编解码器到EventBus // vertx.eventBus().registerDefaultCodec(clazz, codec) //订阅消息 // vertx.eventBus().consumer(address) //在指定的地址上创建一个消息订阅者, // vertx.eventBus().consumer(address, handler ->{ // // }); Message message = new Message(); message.setType("get"); message.setBody("我来请求数据!!"); //发送给msg.test地址一个请求数据的消息,然后通过replyHandler来接受反馈 vertx.eventBus().registerDefaultCodec(Message.class, new MsgCodec()); vertx.eventBus().send("msg.test", message, replyHandler->{ Message msg = (Message) replyHandler.result().body(); System.out.println(msg.getBody()); }); } }
定义接收消息的Verticle:
package com.vertxzhuss.example.eventbus; import io.vertx.core.AbstractVerticle; public class WorkVerticle extends AbstractVerticle { @Override public void start() throws Exception { /** * 接受来自msg.test地址的消息并处理,handler处理 */ vertx.eventBus().consumer("msg.test", handler->{ Message msg = (Message) handler.body(); System.out.println(msg.getBody()); msg.setBody("消息已收到!!!这是反馈消息"); //消息反馈 handler.reply(msg); }); // MessageConsumer<Message> mc = vertx.eventBus().consumer(""); // mc. } }