第一步:引入pom
<dependency>
<groupId></groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>
第二步:定义MessageModel
@Data
public class MessageModel {
private String message;
}
第三步:message配置管理
@Configuration
public class MQManager {
@Autowired
private RedisUtils redisUtils;
@Bean("messageModel")
public RingBuffer<MessageModel> messageModelRingBuffer() {
// 定义用于事件处理的线程池,
// Disruptor通过提供的线程来触发consumer的事件处理
ExecutorService executor = (2);
// 指定事件工厂
HelloEventFactory factory = new HelloEventFactory();
// 指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率
int bufferSize = 1024 * 256;
// 单线程模式,获取额外的性能
Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor, ,
new BlockingWaitStrategy());
// 设置事件业务处理器---消费者
(new HelloEventHandler(redisUtils));
// 启动disruptor线程
();
// 获取ringbuffer环,用于接取生产者生产的事件
RingBuffer<MessageModel> ringBuffer = ();
return ringBuffer;
}
}
第四步:定义工厂类
public class HelloEventFactory implements EventFactory<MessageModel> {
@Override
public MessageModel newInstance() {
return new MessageModel();
}
}
第五步:定义消费端事件处理器
@Slf4j
@Component
public class HelloEventHandler implements EventHandler<MessageModel> {
private RedisUtils redisUtils;
public HelloEventHandler (RedisUtils redisUtils) {
= redisUtils;
}
@Override
public void onEvent(MessageModel event, long sequence, boolean endOfBatch) {
try {
// 这里停止1000ms是为了确定消费消息是异步的
(10);
("sayHelloMqReceive");
("消费者处理消息开始");
if (event != null) {
("消费者消费的信息是:{}", event);
}
} catch (Exception e) {
("消费者处理消息失败");
}
("消费者处理消息结束");
}
}
第六步:具体的实现
public interface DisruptorMqService {
void sayHelloMq(String message);
}
@Slf4j
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {
@Autowired
private RingBuffer<MessageModel> messageModelRingBuffer;
@Autowired
private RedisUtils redisUtils;
@Override
public void sayHelloMq(String message) {
("sayHelloMqSend");
("record the message: {}", message);
// 获取下一个Event槽的下标
long sequence = ();
try {
// 给Event填充数据
MessageModel event = (sequence);
(message);
("往消息队列中添加消息:{}", event);
} catch (Exception e) {
("failed to add event to messageModelRingBuffer for : e = {},{}", e, ());
} finally {
// 发布Event,激活观察者去消费,将sequence传递给改消费者
// 注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer
(sequence);
}
}
}