RocketMQ入门(3)拉取消息

时间:2023-03-08 22:30:59
RocketMQ入门(3)拉取消息

转自:http://www.changeself.net/archives/rocketmq入门(3)拉取消息.html

RocketMQ不止可以直接推送消息,在消费端注册监听器进行监听,还可以由消费端决定自己去拉取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/**
* PullConsumer,订阅消息
*/
public
class
PullConsumer
{
    //Java缓存
    private
static
final
Map<MessageQueue,
Long>
offseTable
=
new
HashMap<MessageQueue,
Long>();
    public
static
void
main(String[]
args)
throws
MQClientException
{
        DefaultMQPullConsumer
consumer
=
new
DefaultMQPullConsumer("PullConsumerGroup");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.start();
                //拉取订阅主题的队列,默认队列大小是4
        Set<MessageQueue>
mqs
=
consumer.fetchSubscribeMessageQueues("TopicTestMapBody");
        for
(MessageQueue
mq
:
mqs)
{
            System.out.println("Consume
from the queue: "
+
mq);
            SINGLE_MQ:while(true){
                try
{
                    PullResult
pullResult
=
                            consumer.pullBlockIfNotFound(mq,
null,
getMessageQueueOffset(mq),
32);
                    List<MessageExt>
list=pullResult.getMsgFoundList();
                    if(list!=null&&list.size()<100){
                        for(MessageExt
msg:list){
                            System.out.println(SerializableInterface.deserialize(msg.getBody()));
                        }
                    }
                    System.out.println(pullResult.getNextBeginOffset());
                    putMessageQueueOffset(mq,
pullResult.getNextBeginOffset());
                    switch
(pullResult.getPullStatus())
{
                    case
FOUND:
                        //
TODO
                        break;
                    case
NO_MATCHED_MSG:
                        break;
                    case
NO_NEW_MSG:
                        break
SINGLE_MQ;
                    case
OFFSET_ILLEGAL:
                        break;
                    default:
                        break;
                    }
                }
                catch
(Exception
e)
{
                    e.printStackTrace();
                }
            
}
        }
        consumer.shutdown();
    }
    private
static
void
putMessageQueueOffset(MessageQueue
mq,
long
offset)
{
        offseTable.put(mq,
offset);
    }
    private
static
long
getMessageQueueOffset(MessageQueue
mq)
{
        Long
offset
=
offseTable.get(mq);
        if
(offset
!=
null){
            System.out.println(offset);
            return
offset;
        }
        return
0;
    }

刚开始的没有细看PullResult对象,以为拉取到的结果没有MessageExt对象还跑到群里面问别人,犯2了

特别要注意  静态变量offsetTable的作用,拉取的是按照从offset(理解为下标)位置开始拉取,拉取N条,offsetTable记录下次拉取的offset位置