Vert.x的eventBus实例

时间:2021-11-14 18:01:48



定义消息实体:

package com.vertxzhuss.example.eventbus;

/**
 * 自定义的消息类型
 * @author zhu shunshan
 * start
 */
public class Message {
	
	
	private String type;
	
	private Object body;

	public String getType() {
		return type;
	}

	public void setType(String type) {
		this.type = type;
	}

	public Object getBody() {
		return body;
	}

	public void setBody(Object body) {
		this.body = body;
	}
	
	

}

定义消息编解码器:

package com.vertxzhuss.example.eventbus;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;

public class MsgCodec implements MessageCodec<Message, Message> {

	/**
	 * 将消息实体封装到Buffer用于传输
	 * 
	 * 实现方式:
	 * 			使用对象流从对象中获取Byte数组然后追加到Buffer
	 */
	@Override
	public void encodeToWire(Buffer buffer, Message s) {
		final ByteArrayOutputStream b = new ByteArrayOutputStream();
		ObjectOutputStream o;
		try {
			o = new ObjectOutputStream(b);
			o.writeObject(s);
			o.close();
			buffer.appendBytes(b.toByteArray());
		} catch (IOException e) {
			e.printStackTrace();
		}

	}

	/**
	 * 从buffer中获取传输的消息实体
	 */
	@Override
	public Message decodeFromWire(int pos, Buffer buffer) {
		 final ByteArrayInputStream b = new ByteArrayInputStream(buffer.getBytes());
	     ObjectInputStream o = null;
	     Message msg = null;
		try {
			o = new ObjectInputStream(b);
			msg = (Message) o.readObject();
		} catch (IOException | ClassNotFoundException e) {
			e.printStackTrace();
		}
		return msg;
	}

	/**
	 * 如果是本地消息则直接返回
	 */
	@Override
	public Message transform(Message s) {
		return s;
	}

	/**
	 * 编解码器的名称:
	 * 				必须唯一,用于发送消息时识别编解码器,以及取消编解码器
	 */
	@Override
	public String name() {
		return "UMsgCodec";
	}

	/**
	 * 用于识别是否是用户编码器
	 * 自定义编解码器通常使用-1
	 */
	@Override
	public byte systemCodecID() {
		return -1;
	}

}

定义发送Verticle:

package com.vertxzhuss.example.eventbus;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Vertx;

/**
 * 发送消息的EventBus
 * @author zhu shunshan
 *
 * start
 */
public class MainVerticle extends AbstractVerticle {
	
	
	public static void main(String[] args) {
		
		DeploymentOptions deployment = new DeploymentOptions();
		deployment.setWorker(true);
		
		try {
			Vertx.vertx().deployVerticle(MainVerticle.class.getName(), deployment);
		} catch (Throwable t) {
			t.printStackTrace();
		}
	}
	

	@Override
	public void start() throws Exception {
		vertx.deployVerticle(WorkVerticle.class.getName(), new DeploymentOptions().setWorker(true));
		//注册一个编解码器到eventbus:主要用于实体传输时的编解码。
//		vertx.eventBus().registerCodec(codec)
		//注册一个默认的编解码器到EventBus
//		vertx.eventBus().registerDefaultCodec(clazz, codec)
		
		//订阅消息
//		vertx.eventBus().consumer(address)
		//在指定的地址上创建一个消息订阅者,
//		vertx.eventBus().consumer(address, handler ->{
//			
//		});
		
		Message message = new Message();
		message.setType("get");
		message.setBody("我来请求数据!!");
		
		//发送给msg.test地址一个请求数据的消息,然后通过replyHandler来接受反馈
		vertx.eventBus().registerDefaultCodec(Message.class, new MsgCodec());
		vertx.eventBus().send("msg.test", message, replyHandler->{
			Message msg = (Message) replyHandler.result().body();
			System.out.println(msg.getBody());
			
		});
		
	}
	
	
	

}

定义接收消息的Verticle:

package com.vertxzhuss.example.eventbus;

import io.vertx.core.AbstractVerticle;

public class WorkVerticle extends AbstractVerticle {

	@Override
	public void start() throws Exception {
		
		/**
		 * 接受来自msg.test地址的消息并处理,handler处理
		 */
		vertx.eventBus().consumer("msg.test", handler->{
			Message msg =  (Message) handler.body();
			System.out.println(msg.getBody());
			
			msg.setBody("消息已收到!!!这是反馈消息");
			
			//消息反馈
			handler.reply(msg);
			
			
		});
		
//		MessageConsumer<Message>  mc = 	vertx.eventBus().consumer("");
//		mc.
	}

}