转自:http://www.changeself.net/archives/rocketmq入门(3)拉取消息.html
RocketMQ不止可以直接推送消息,在消费端注册监听器进行监听,还可以由消费端决定自己去拉取数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
/**
* PullConsumer,订阅消息
*/
public
class PullConsumer {
//Java缓存
private
static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();
public
static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer
consumer = new DefaultMQPullConsumer("PullConsumerGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
//拉取订阅主题的队列,默认队列大小是4
Set<MessageQueue>
mqs = consumer.fetchSubscribeMessageQueues("TopicTestMapBody");
for
(MessageQueue mq : mqs) {
System.out.println("Consume
from the queue: " + mq);
SINGLE_MQ:while(true){
try
{
PullResult
pullResult =
consumer.pullBlockIfNotFound(mq,
null, getMessageQueueOffset(mq), 32);
List<MessageExt>
list=pullResult.getMsgFoundList();
if(list!=null&&list.size()<100){
for(MessageExt
msg:list){
System.out.println(SerializableInterface.deserialize(msg.getBody()));
}
}
System.out.println(pullResult.getNextBeginOffset());
putMessageQueueOffset(mq,
pullResult.getNextBeginOffset());
switch
(pullResult.getPullStatus()) {
case
FOUND:
//
TODO
break;
case
NO_MATCHED_MSG:
break;
case
NO_NEW_MSG:
break
SINGLE_MQ;
case
OFFSET_ILLEGAL:
break;
default:
break;
}
}
catch
(Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private
static void putMessageQueueOffset(MessageQueue mq, long offset) {
offseTable.put(mq,
offset);
}
private
static long getMessageQueueOffset(MessageQueue mq) {
Long
offset = offseTable.get(mq);
if
(offset != null){
System.out.println(offset);
return
offset;
}
return
0;
}
|
刚开始的没有细看PullResult对象,以为拉取到的结果没有MessageExt对象还跑到群里面问别人,犯2了
特别要注意 静态变量offsetTable的作用,拉取的是按照从offset(理解为下标)位置开始拉取,拉取N条,offsetTable记录下次拉取的offset位置