RocketMQ 自己的整理和理解

时间:2025-02-21 08:34:09
每个人的想法不同,  RocketMQ 介绍的时候就说 是阿里从他们使用的上 解耦出来 近一步简化 便捷的  目的当然是 让其能快速入手和开发 
 
 如果不是在项目设计层面上   只是使用的话   从Git上下载该项目的源码 其中有一个包是专门的测试 实例的   只需要照猫画虎 使用就可以了
不能有中文路径!

不能有中文路径!
  
不能有中文路径!    


关系

 两个接口 
 
    interface MQProducer  //生产者接口 
           
    {
           实现该接口的只有一个 默认的 DefaultMQProducer 
           
           DefaultMQProducer 实现 MQProducer 接口的时候 还继承了 ClientConfig类 (客户端配置类)
               可以配置如  sendMsgTimeout 超时时间  producerGroup  消息最大多少 超过多少压缩等等
           
           
           
           关键方法 :
                send(Message) 发送一个消息到MQ
                
                   这个方法其实是调用 DefaultMQProducer构造方法 创建的  defaultMQProducerImpl 类对象的 send(..)方法 
                  
                  defaultMQProducerImpl 类 才是真正发送消息的核心类 
                  
                   方法 --》 sendDefaultImpl方法

                      sendDefaultImpl --》  tryToFindTopicPublishInfo 来检测映射 队列是否存在 是否正常
                      {
                               final Segment<K,V>[] segments;  这个 键值
                            
                               不存在 不正常 :
                                     创建一个 TopicPublishInfo 到 segments 映射文件 同时 将 Topic (队列) 信息 更新到NameServer中
                      }     
                         
                      sendDefaultImpl --》   通过设置是失败重复次数  和  超时时间 来从新发送消息  
                       
                                详细  for (; times < 失败重复次数 && (结束时间 - 开始时间) < 配置的超时时间; times++) 
                         
                      sendDefaultImpl --》sendKernelImpl  装载 配置 信息 
                     
                                      --》sendKernelImpl --》().sendMessage()
                                       
                                           MQClientInstance mQClientFactory  对象  是在  DefaultMQProducer start启动方式时候初始化的
                                           
                                           ().getAndCreateMQClientInstance(,rpcHook);
                                       
                                                       
                                      --》                 --》sendMessage
                                                            {    
                                                                MQClientInstance  --》  MQClientAPIImpl mQClientAPIImpl 
                                                               
                                                                     ()  --> sendMessageSync
                                                                              switch (communicationMode)  同步 异步  单向 处理  默认是 同步
                                                            }
                                            

                                        后续返回  SendResult sendResult  改类型描述当时 发送MQ 的最终状态
                                             
                  
                  Message 消息的 Topic 不能为空  
                    
           
           (); 关闭 shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
           
    }

     
     
     发送消息负载的问题  
     {
     
        看源码  是通过循环从 namesrv 获取的到 Topic 路由消息 (也就是有几个broker 每个 broker 有几个队列)
        然后  记录当前发送过的 +1
     
     
        备注 : 队列数量 小于 消费者数量  多余的消费者将不起做用
     }
     
     
     
     关于顺序消息发送 的问题
     {
         环境: 1 下单 2 付款 3 收货 3个状态 ,  普通模式下 发送到队列中的 是轮询队列 将3个消息分别发送到多个队列中。
         
         很可能会照成出现 先消费 2 在消费 1 流程错乱的情况  当然可以业务层处理 但是业务层处理比较麻烦
         
         
         顺序消费的发送的原理 :
          
                我们自己指定 消息将要添加的队列 
         
                SendResult sendResult = (msg,
                        new MessageQueueSelector()
                        {
                            @Override
                            public MessageQueue select(List<MessageQueue> mqs,
                                    Message msg, Object arg)
                            {
                                Integer id = (Integer) arg;
                                int index = id % (); // 通过取于来 讲 同一个订单号 访入同一个队列中  
                                // 前提是 队列数量没有变动
                                return (index);

                            }
                        }, “10001”); // orderID “10001”是传递给回调方法的 自定义数据 
     
     
                    List<MessageQueue> mqs 就是从namesrv 获取的所有队列
     }
      
     
     
    备注   
            // 订单 Id 
            String orderId = "20034568923546"; 
            (orderId); 
            // Keys
            每个消息在业务局面的唯一标识码   通过 topic,key 来查询返条消息内容,以及消息被谁消费
            查询的时候 非常重要
     
     
    
    消费者
    interface MQConsumer      
    {
    
        // 回溯消费
        {
            mqadmin resetOffsetByTime 命令  
            改方式 是通过消费的日志来恢复的  但是只能通过 消费的组来恢复  恢复消息后 也只能用改组来从新消费 
            
            -s : 时间戳的问题  可以是 毫秒 或者是从什么时候开始
        
        }
    
    
    
        //拉取模式
        interface MQPullConsumer:
        {
            
            
            
        }
        
        
        
        
        
        
        // 接收模式
        
        长轮询模式  一次获取一批 消息 
        
        
        记录  
             批量和单条 内部实现   还是获取了所有的 可以获取到的队列消息 放入集合中 判断集合大小是否 大于设置的单次消费数量
             
             小于
                直接将其 消息集合 放入执行回调方法中
             大于
                使用的是For 循环 来单条处理
                
                
                
        
        
        
        
        interface MQPushConsumer:
        {
            class DefaultMQPushConsumer  extends ClientConfig implements MQPushConsumer
            
            DefaultMQPushConsumer 包含很多可以配置的信息   详情见文档 
            其中最主要的 有几个
                 messageModel  消息模型   支持以下两种  1、集群消费 2、广播消费
                messageListener  消息监听器
                consumeThreadMin 消费线程池数量 默认10
                consumeThreadMax 消费线程池数量 默认20
                
                重要的是  消费线程池 !  这就说明  我发布一个 消费应用 消费逻辑就可以 N 个 处理! 不用自己搞了有没有!!
                安默认的来算  20个消费逻辑  可以配置  而且还 可以横向 增加 消费应用  只要保持是一个组就可以了
                
                难怪会在文档中 特意话一个 性能图啊!!
                
                
            
            
            应用通常吐 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立刻回调 Listener 接口方法
            
            MessageListenerOrderly 这个是有序的
            MessageListenerConcurrently 这个是无序的
            
            关键方法 DefaultMQPushConsumer registerMessageListener(new implements MessageListenerConcurrently)
            {
                public void registerMessageListener(MessageListenerConcurrently messageListener) {
                     = messageListener; // 给自己复制一个 消费逻辑类对象  方法后续查询 替换修改等
                    
                    关键方法
                      (messageListener);
                    // 将消费逻辑类告诉 调用者类 
                } 
            }
            
            关键方法 start  
            
            ()  -->  ()
            {
                  来记录设置当前程序运行状态  来做多态 
                 
                 checkConfig() 检查配置 初始化赋值 
                
                 copySubscription() 拷贝订阅者信息 赋值 消费逻辑类 
                 
                 
                 // 有就获取 没有就创建一个
                  =  ().getAndCreateMQClientInstance(,);
                        
                
                
                 接着初始化一系列信息
                 
                 // 加载消费进度
                 ();
                 // 该方法有两个实现   
                 
                 一个是本地 () 获取数据 
                 {
                     //获取文件字符串
                     String content = MixAll.file2String();
                     OffsetSerializeWrapper offsetSerializeWrapper =(content, );
                        
                        可以看出 淘宝使用的是JSON
                         
                 }
                
                 
                 
                 
                 if (() instanceof MessageListenerOrderly)
                 else if (() instanceof MessageListenerConcurrently)
                 判断 消费逻辑类 实现那个接口 创建对应的 ConsumeMessageOrderlyService 对象
                    
                ConsumeMessageConcurrentlyService  该实现为空
                
                
                本地 
                ()
                {
                     
                     
                     
                     创建并执行一个周期性的动作成为了第一个在给定的初始延迟之后,随后用给定的时期,这是执行后将开始initialDelay然后initialDelay +,然后initialDelay + 2 *时期,等等。如果任何执行任务遇到异常,后续执行的镇压。否则,只会终止的任务通过取消或终止执行器。如果执行这个任务花费的时间比其期,然后后续执行可能会迟到,但不会同时执行。
                     //就是一个定时器
                     
                     (new Runnable() {
                                        @Override
                                        public void run() {
                                            ();
                                        }
                    }, 1000 * 1, , );
                    
                    scheduleAtFixedRate 应该是一个线程池管理  
                    
                    不用去关心 scheduleAtFixedRate 方法  看  ()
                    {
                        
                        ().lockAll()
                        是
                          () //  将读取到的消息上锁 
                    }
                    
                     
                }
                
                
                // 最关键的服务启动了
                // 正在的启动了
                (); 
                {
                     synchronized (this){
                     
                            //Start request-response channel  启动请求-响应通道
                            ();
                            //Start various schedule tasks    开始各种安排任务  启动定时任务   其中就包含 获取到MQ消息消费的 回调方法
                            ();
                            //Start pull service              开始拉取服务
                            ();
                            //Start rebalance service         启动负载均衡  //  该服务非常重要
                            ();
                            //Start push service              开始推动服务
                            ().start(false); 
                     } 
                } 
            }
             
            
        }
        
        指定 group 
        订阅 topic 
        注册消息监听处理器,当消息到来时消费消息 
        消费端 Start 
         
            复制订阅关系 
             
            初始化 rebalance 变量 
             
            构建 offsetStore 消费进度存储对象 
             
            启动消费消息服务 
             
            向 mqClientFactory 注册本消费者 
             
            启动 client 端远程通信 
            
            * 加载消费进度 Loand() 
             
            * 启动定时任务  
             
                定时获取 nameserver 地址  
                 
                定时从 nameserver 获取 topic 路由信息  
                 
                定时清理下线的 borker  
                 
                定时向所有 broker 发送心跳信息, (包括订阅关系)  
                 
                * 定时持久化 Consumer 消费进度(广播存储到本地,集群存储到 Broker)  
                PS:  这里也是是个关键   持久化消费进度 是用来记录当前 组的消费情况 可以做到 回溯消费  和宕机等情况下 启动后接着上次执行消费 
                 
                统计信息打点  
                 
                动态调整消费线程池 
             
            启动拉消息服务 PullMessageService 
             
            启动消费端负载均衡服务 RebalanceService 
             
            从 namesrv 更新 topic 路由信息 
             
            向所有 broker 发送心跳信息, (包括订阅关系) 
             
            唤醒 Rebalance 服务线程                                     
        
        
    }
    
    
    // 有些懒得看了 直接看别人 得了
    
     消费端负载均衡 
    {
       这个也是个重点 
       
       消费端会通过 RebalanceService 线程,10 秒钟做一次基于 topic 下的所有队列负载 
       
       消费端 遍历自己所有的 Topic 获取 Topic 下所有的 队列  (一个Topic 包含对个队列  默认是 4 个 有别于其他MQ )
       
       从 broker 获取当前 组(group)的所有消费端( 有心跳的) 
       获取队列集合Set<MessageQueue> mqSet
       现在队列分配策略实例  AllocateMessageQueueStrategy 执行分配算法
       {
          1:平均分配算法 :
                其实是类似于分页的算法 
                将所有 queue 排好序类似于记录 
                将所有消费端 consumer 排好序,相当于页数 
                然后获取当前 consumer 所在页面应该分配到的 queue 
          2:按照配置来分配队列 :
               消费服务启动的时候 就指定好了要消费的 是哪个队列
          3:按照机房来配置队列 :
                Consumer 启动的时候会指定在哪些机房的消息  (应该是指定 broker)
                获取指定机房的 queue 
                然后在执行如 1)平均算法 
       }
 
        根据分配队列的结果更新 ProccessQueueTable<MessageQueue, ProcessQueue> 
        {
            比对 mqSet 将多余的队列删除, 当 broker 当机或者添加,会导致分配到 mqSet 变化, 
 
            添加新增队列, 比对 mqSet,给新增的 messagequeue 构建长轮询对象 PullRequest 对象,会从 broker 获取消费的进度 构建这个队列的 ProcessQueue 
            将 PullRequest 对象派发到长轮询拉消息服务(单线程异步拉取) 
            
            
            注:ProcessQueue 正在被消费的队列,  
            
                        (1) 长轮询拉取到消息都会先存储到 ProcessQueue 的 TreeMap<Long, MessageExt> 集合中,消费调后会删除掉,用来控制 consumer 消息堆积,  TreeMap<Long, MessageExt> key 是消息在此 ConsumeQueue 队列中索引 
                        
                        (2) 对于顺序消息消费 处理  
                        
                        locked 属性:当 consumer 端向 broker 申请锁队列成功后设置 true,  只有被锁定 的 processqueue 才能被执行消费  rollback: 将消费在 msgTreeMapTemp 中的消息,放回 msgTreeMap 重新消费 
                        
                        commit: 将临时表 msgTreeMapTemp 数据清空,代表消费完成,放回最大偏移
                        值 
                        
                        (3) 这里是个 TreeMap,对 key 即消息的 offset 进行排序,这个样可以使得消息进 行顺序消费
                        
                        
                 
        }
    
    
    
    }
    
    
    
    
    长轮询 
    {
                    Rocketmq 的消息是由 consumer 端主动到 broker拉取的, consumer 向 broker 发送拉消息
                    请求, PullMessageService 服务通过一个线程将阻塞队列 LinkedBlockingQueue<PullRequest>
                    中的 PullRequest 到 broker 拉取消息 
                     
                    DefaultMQPushConsumerImpl 的 pullMessage(pullRequest)方法执行向 broker 拉消息动作 
                    1. 获取 ProcessQueue 判读是否 drop 的, drop 为 true 返回 
                    2. 给 ProcessQueue 设置拉消息时间戳 
                    3. 流量控制,正在消费队列中消息(未被消费的)超过阀值,稍后在执行拉消息 
                    4. 流量控制,正在消费队列中消息的跨度超过阀值(默认 2000) ,稍后在消费 
                    5. 根据 topic 获取订阅关系 
                    6. 构建拉消息回调对象 PullBack, 从 broker 拉取消息(异步拉取)返回结果是回调 
                    7. 从内存中获取 commitOffsetValue  //TODO 这个值跟  区别 
                    8. 构建 sysFlag   pull 接口用到的 flag 
                    9. 调底层通信层向 broker 发送拉消息请求 
                    如果 master 压力过大,会建议去 slave 拉取消息 
                    如果是到 broker 拉取消息清楚实时提交标记位,因为 slave 不允许实时提交消费进
                    度,可以定时提交   
                    //TODO 关于 master 拉消息实时提交指的是什么? 
                    10. 拉到消息后回调 PullCallback 
                    处理 broker 返回结果 pullResult 
                      
                    更新从哪个 broker(master 还是 slave)拉取消息 
                      
                    反序列化消息 
                      
                    消息过滤 
                      
                    消息中放入队列最大最小 offset, 方便应用来感知消息堆积度 
                    将消息加入正在处理队列 ProcessQueue 
                    将消息提交到消费消息服务 ConsumeMessageService 
                    流控处理, 如果 pullInterval 参数大于 0 (拉消息间隔,如果为了降低拉取速度,
                    可以设置大于 0 的值) , 延迟再执行拉消息,  如果 pullInterval 为 0 立刻在执行拉
                    消息动作     
    
    
            看图 人家花的不错 很明了
    }
           
    push 消息
    {
    
      PS:
           长轮询向broker拉取消息是批量拉取的,  默认设置批量的值为pullBatchSize = 32, 可配置 
          
          消费端 consumer 构建一个消费消息任务 ConsumeRequest 消费一批消息的个数是 可配置的 consumeMessageBatchMaxSize = 1, 默认批量个数为一个 
              也就是说 每次传递给回调方法的 参数 消息集合 的解释
              
          ConsumeRequest 任务 run 方法执行 
 
 
            判断 proccessQueue 是否被 droped 的, 废弃直接返回,不在消费消息  
             
            构建并行消费上下文  
             
            给消息设置消费失败时候的 retry topic,当消息发送失败的时候发送到 topic
            为%RETRY%groupname 的队列中 
             
             
            调 MessageListenerConcurrently 监听器的 consumeMessage 方法消费消息,返回消
            费结果 
              
            如果 ProcessQueue 的 droped 为 true,不处理结果,不更新 offset, 但其实这里消
            费端是消费了消息的,这种情况感觉有被重复消费的风险 
              
            处理消费结果  :
            
                    消费成功,  对于批次消费消息, 返回消费成功并不代表所有消息都消费成功,
                    但是消费消息的时候一旦遇到消费消息失败直接放回,根据 ackIndex 来标记
                    成功消费到哪里了  
             
             
                    消费失败, ackIndex 设置为-1 
                    广播模式发送失败的消息丢弃, 广播模式对于失败重试代价过高,对整个集
                    群性能会有较大影响,失败重试功能交由应用处理 
                    集群模式, 将消费失败的消息一条条的发送到 broker 的重试队列中去,如果
                    此时还有发送到重试队列发送失败的消息, 那就在 cosumer 的本地线程定时 5
                    秒钟以后重试重新消费消息, 在走一次上面的消费流程。 
             
             
            删除正在消费的队列 processQueue 中本次消费的消息,放回消费进度 
             
             
            更新消费进度, 这里的更新只是一个内存 offsetTable 的更新,后面有定时任务定
            时更新到 broker 上去  
      
      

      
      PS:
            关于消费成功 和 失败的  问题  
            
                在集群模式下   回调方法设置为消费失败  会将当前消费的失败消息 发送到 broker 的容错度列中  等待N次+ 从新消费 。
      
      
      push 消费-顺序消费消息 
 
                        顺序消费服务 ConsumeMessageConcurrentlyService 构建的时候  
                        
                        构建一个线程池来接收消费请求 ConsumeRequest  
                         
                        构建一个单线程的本地线程, 用来稍后定时重新消费 ConsumeRequest, 用来执行
                        定时周期性(一秒)钟锁队列任务  
                         
                        周期性锁队列 lockMQPeriodically  
                         
                        获取正在消费队列列表 ProcessQueueTable 所有 MesssageQueue,  构建根据 broker
                        归类成 MessageQueue 集合 Map<brokername, Set<MessageQueue>>  
                         
                        遍历 Map<brokername, Set<MessageQueue>>的 brokername, 获取 broker 的 master
                        机器地址, 将brokerName的Set<MessageQueue>发送到broker请求锁定这些队列。  在broker
                        端锁定队列,其实就是在 broker 的 queue 中标记一下消费端,表示这个 queue 被某个 client
                        锁定。 Broker 会返回成功锁定队列的集合, 根据成功锁定的 MessageQueue,设置对应的正
                        在处理队列 ProccessQueue 的 locked 属性为 true 没有锁定设置为 false  
                         
                        通过长轮询拉取到消息后会提交到消息服务 ConsumeMessageOrderlyService, 
                        ConsumeMessageOrderlyService 的 submitConsumeRequest 方法构建 ConsumeRequest 任务提
                        交到线程池。ConsumeRequest 是由 ProcessQueue 和 Messagequeue 组成。 
                         
                        ConsumeRequest 任务的 run 方法 
                         
                        判断 proccessQueue 是否被 droped 的, 废弃直接返回,不在消费消息 
                         
                        每个 messagequeue 都会生成一个队列锁来保证在当前 consumer 内,同一个队列串行
                        消费,  
                         
                        判断 processQueue 的 lock 属性是否为 true, lock 属性是否过期, 如果为 false 或者过期,
                        放到本地线程稍后锁定在消费。 如果 lock 为 true 且没有过期,开始消费消息 
                         
                        计算任务执行的时间如果大于一分钟且线程数小于队列数情况下,将 processqueue, 
                        messagequeue 重新构建 ConsumeRequest 加到线程池 10ms 后在消费,这样防止个别队列被
                        饿死 
                         
                        获取客户端的消费批次个数,默认一批次为一条 
                         
                        从 proccessqueue 获取批次消息,  (batchSize) , 从 msgTreeMap
                        中移除消息放到临时 map 中 msgTreeMapTemp, 这个临时 map 用来回滚消息和 commit 消
                        息来实现事物消费 
                         
                        调回调接口消费消息,返回状态对象 ConsumeOrderlyStatus 
                         
                        根据消费状态,处理结果 
                        1) 非事物方式,自动提交 
                        消息消息状态为 success: 调用  方法  
                         
                        获取 msgTreeMapTemp 的最后一个 key,表示提交的 offset  
                         
                        清空 msgTreeMapTemp 的消息,已经成功消费 
                        2) 事物提交,由用户来控制提交回滚(精卫专用) 
                             更新消费进度,  这里的更新只是一个内存 offsetTable 的更新, 后面有定时任务定时更
                        新到 broker 上去 
      
      
    }        
           
           
        关闭
        {

         shutdown 
         
                DefaultMQPushConsumerImpl  关闭消费端 
                 
                关闭消费线程 
                 
                将分配到的 Set<MessageQueue>的消费进度保存到 broker 
                利 用
                DefaultMQPushConsumerImpl
                获 取
                ProcessQueueTable<MessageQueue, 
                ProcessQueue>的 keyset 的 messagequeue 去获取 
                <MessageQueue, AutomicLong>Map 中的消费进
                度, 
                offsetTable 中 的 messagequeue 的 值, 在 update 的时候如果 没有对应 的
                Messagequeue 会构建, 但是也会 rebalance 的时候将没有分配到的 messagequeue
                删除 
                rebalance 会将 offsettable 中没有分配到 messagequeue 删除,  但是在从 offsettable
                删除之前会将 offset 保存到 broker 
                 
                Unregiser 客户端 
                 
                pullMessageService 关闭 
                 
                scheduledExecutorService 关闭,关闭一些客户端的起的定时任务 
                 
                mqClientApi 关闭 
                 
                rebalanceService 关闭 
        
        
        }

 补充 一
消息的延迟
    {
        通过测试程序可以看出  通过设置 message 的DelayTimeLevel 可以设置消息延迟处理

    }

    消息重试机制  容错机制
    {

        通过源码可以看出 消费方法的返回对象 只有两个值 
        
        CONSUME_SUCCESS // 消费成功
        
        RECONSUME_LATER // 消费失败,稍后重试  
        
        
        CONSUME_SUCCESS 无异议  
        
        关键是 RECONSUME_LATER 
            
            我们可以通过 RECONSUME_LATER 来容错。 阿里提供的这个  重试机制 是通过添加到一个错误队列中 设置期  DelayTimeLevel 来实现的
            
            第一次消费的时候  打印 MessageExt 没有 properties属性的详细信息  返回 RECONSUME_LATER 稍后重试
            
            [queueId=0, storeSize=106, queueOffset=0, sysFlag=0, bornTimestamp=1458803327047, bornHost=/10.10.12.27:41697, storeTimestamp=1458803327059, storeHost=/10 .10.12.27:10911, msgId=0A0A0C1B00002A9F0000000000031F10, commitLogOffset=204560, bodyCRC=910247988, reconsumeTimes=0, preparedTransactionOffset=0, toStrin g()=Message [topic=Topic2, flag=0, properties={MAX_OFFSET=1, MIN_OFFSET=0}, body=9]]

            第二次消费的时候 
            
            [queueId=0, storeSize=260, queueOffset=0, sysFlag=0, bornTimestamp=1458803327047, bornHost=/10.10.12.27:41697, storeTimestamp=1458803516104, storeHost=/10 .10.12.27:10911, msgId=0A0A0C1B00002A9F0000000000032079, commitLogOffset=204921, bodyCRC=910247988, reconsumeTimes=1, preparedTransactionOffset=0, toStrin g()=Message [topic=Topic2, flag=0, properties={ORIGIN_MESSAGE_ID=0A0A0C1B00002A9F0000000000031F10, DELAY=3, REAL_TOPIC=%RETRY%ConsumerGroupName, WAIT=fals e, RETRY_TOPIC=Topic2, MAX_OFFSET=1, MIN_OFFSET=0, REAL_QID=0}, body=9]]

            可以看出 消息  虽然  queueId 是相同的值 0 但是  msgId 却变了 ! 同时用rocketmq-console 来监控 该 消费者 你会发现 多了个 Topic  %RETRY%ConsumerGroupName  
            
            所有 可以得出一个结论  
            
            我们返回 消费失败,稍后重试  RECONSUME_LATER  消息会回到 broker 同时创建一条相同的消息 访如   %RETRY%ConsumerGroupName  
            同时 设置 该 消息的 延迟消费 每次延迟时间 +1  
            
            我觉得可以通过 reconsumeTimes 来做一个简单的容错  获取 当前消费的 次数  是否大于 设定值   大于就说明其是死信  记录到异常数据库 
            
            
    }


 
 

备注问题:
背景:
生产端 使用 linux 服务器 (UTF-8 编码)
 Message me = new Message();
							("中国人".getBytes());
							(me);
消费端 使用   Windows 服务器 (GBK 编码)
							MessageExt msg = (0);
							String strBody = new String(());
			问题 :
					生产端无问题,消费端 存在 字符集 编码问题 。
			
			原因 :
					生产端发送给MQ 的数据是 字节 !  getBytes() 不指定字节格式 会默认使用 本地系统编码格式  linux下通常是 UTF-8 格式
					消费端由于是Windows 本地系统的编码格式是 GBK 格式 。
					new String(()); 方法 不指定编码格式 使用的也是 本地系统编码格式 也就是 GBK格式
					
					可能会说 直接 GBK转换UTF-8就好了,但是! GBK 对应的是2个字节  UTF-8 对应的是3个字节  当出现 3个字的中文或者 特殊符号的时候 
					转换过程中会 主动 2补1 所有 “中国人”  这里 人 字就会乱码 
					
					String iso = new String(("UTF-8"), "ISO-8859-1");
					  strBody = new String(("ISO-8859-1"), "UTF-8");
				    
					上面这种解决方法在 测试方法中有效  在消费端 具体消费类中的消费方法 并未生效  
					
					这里希望有大神可以指出为什么!?
					
					
			解决方法:
			
					MessageExt msg = (0);
					strBody = new String((), "UTF-8");   
					
					在第一次 字节转换成字符串的时候 就指定 该字节按照 UTF-8 格式转换!  
					
			PS: 
					虽然解决方法很简单,但是 稍微不注意 就会跳过这里啊  劲量做到统一开发环境啊!
 
 
      消费端 多实例问题
经过试验,一个消费 组 只能处理一个 Topic 下的一个 Tags  !
 





努力或许不会有收获,但是不努力却一定不会有收获!