Spring boot 整合disruptor

时间:2025-04-11 07:48:31

第一步:引入pom

<dependency>
		<groupId></groupId>
		<artifactId>disruptor</artifactId>
		<version>3.4.2</version>
</dependency>

第二步:定义MessageModel

@Data
public class MessageModel {
	private String message;
}

 

第三步:message配置管理


@Configuration
public class MQManager {

	@Autowired
	private RedisUtils redisUtils;
	
	@Bean("messageModel")
	public RingBuffer<MessageModel> messageModelRingBuffer() {
		// 定义用于事件处理的线程池,
		// Disruptor通过提供的线程来触发consumer的事件处理
		ExecutorService executor = (2);

		// 指定事件工厂
		HelloEventFactory factory = new HelloEventFactory();

		// 指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率
		int bufferSize = 1024 * 256;

		// 单线程模式,获取额外的性能
		Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor, ,
				new BlockingWaitStrategy());

		// 设置事件业务处理器---消费者
		(new HelloEventHandler(redisUtils));

		// 启动disruptor线程
		();

		// 获取ringbuffer环,用于接取生产者生产的事件
		RingBuffer<MessageModel> ringBuffer = ();

		return ringBuffer;
	}
}

第四步:定义工厂类

public class HelloEventFactory implements EventFactory<MessageModel> {
	@Override
	public MessageModel newInstance() {
		return new MessageModel();
	}
}

第五步:定义消费端事件处理器


@Slf4j
@Component
public class HelloEventHandler implements EventHandler<MessageModel> {
	
	private RedisUtils redisUtils;
	
	public HelloEventHandler (RedisUtils redisUtils) {
		 = redisUtils;
	}
	
	@Override
	public void onEvent(MessageModel event, long sequence, boolean endOfBatch) {
		try {
			// 这里停止1000ms是为了确定消费消息是异步的
			(10);
			("sayHelloMqReceive");
			("消费者处理消息开始");
			if (event != null) {
				("消费者消费的信息是:{}", event);
			}
		} catch (Exception e) {
			("消费者处理消息失败");
		}
		("消费者处理消息结束");
	}
}

第六步:具体的实现

public interface DisruptorMqService {

	void sayHelloMq(String message);
}

@Slf4j
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {

	@Autowired
	private RingBuffer<MessageModel> messageModelRingBuffer;
	@Autowired
	private RedisUtils redisUtils;
	
	
	@Override
	public void sayHelloMq(String message) {
		("sayHelloMqSend");
		("record the message: {}", message);
		// 获取下一个Event槽的下标
		long sequence = ();
		try {
			// 给Event填充数据
			MessageModel event = (sequence);
			(message);
			("往消息队列中添加消息:{}", event);
		} catch (Exception e) {
			("failed to add event to messageModelRingBuffer for : e = {},{}", e, ());
		} finally {
			// 发布Event,激活观察者去消费,将sequence传递给改消费者
			// 注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer
			(sequence);
		}
	}

}