Shared File System Master Slave 全配置以及测试

时间:2024-11-04 18:07:38
在本机完成2个broker的共享文件测试
2个broker的完整配置文件如下

<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at
   
    http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<!--  
    Create a dynamic network of brokers
    For more information, see:
    
    http://activemq.apache.org/networks-of-brokers.html
    
    To run this example network of ActiveMQ brokers run
    
    $ bin/activemq console xbean:conf/activemq-dynamic-network-broker1.xml
    
    and
    
    $ bin/activemq console xbean:conf/activemq-dynamic-network-broker2.xml
    
    in separate consoles
 -->
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd   
  http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">     <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>     <broker xmlns="http://activemq.apache.org/schema/core" brokerName="dynamic-broker2" dataDirectory="${activemq.base}/data" >         <!-- Destination specific policies using destination names or wildcards -->
        <destinationPolicy>
            <policyMap>
                <policyEntries>
                    <policyEntry queue=">" producerFlowControl="true" memoryLimit="20mb">
                        <deadLetterStrategy>
                          <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" />
                        </deadLetterStrategy>
                    </policyEntry>
                    <policyEntry topic=">" producerFlowControl="true" memoryLimit="20mb">
                    </policyEntry>
                </policyEntries>
            </policyMap>
        </destinationPolicy>         <!-- Use the following to configure how ActiveMQ is exposed in JMX -->
        <managementContext>
            <managementContext createConnector="true" connectorPort="1100"/>
        </managementContext>         <!--
            Configure network connector to use multicast protocol
            For more information, see
            
            http://activemq.apache.org/multicast-transport-reference.html
        -->
        <networkConnectors>
          <networkConnector uri="multicast://default"
            dynamicOnly="true" 
            networkTTL="3" 
            prefetchSize="1" 
            decreaseNetworkConsumerPriority="true" />
        </networkConnectors>
            <persistenceAdapter>
       <!--<kahaDB directory="${activemq.base}/data/dynamic-broker2/kahadb" />-->
       <kahaDB directory="D:/"/> 
   </persistenceAdapter>
   
   <!--  The maximum amount of space the broker will use before slowing down producers -->
   <systemUsage>
       <systemUsage>
           <memoryUsage>
               <memoryUsage limit="20 mb"/>
           </memoryUsage>
           <storeUsage>
               <storeUsage limit="1 gb" name="foo"/>
           </storeUsage>
           <tempUsage>
               <tempUsage limit="100 mb"/>
           </tempUsage>
       </systemUsage>
   </systemUsage>         <!-- 
            The transport connectors ActiveMQ will listen to
            Configure discovery URI to use multicast protocol
        -->
        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61618" discoveryUri="multicast://default" />
        </transportConnectors>     </broker> </beans>

另外一个Borker

<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at
   
    http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<!--  
    Create a dynamic network of brokers
    For more information, see:
    
    http://activemq.apache.org/networks-of-brokers.html
    
    To run this example network of ActiveMQ brokers run
    
    $ bin/activemq console xbean:conf/activemq-dynamic-network-broker1.xml
    
    and
    
    $ bin/activemq console xbean:conf/activemq-dynamic-network-broker2.xml
    
    in separate consoles
 -->
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">     <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>     <broker xmlns="http://activemq.apache.org/schema/core" brokerName="dynamic-broker1" dataDirectory="${activemq.base}/data" >         <!-- Destination specific policies using destination names or wildcards -->
        <destinationPolicy>
            <policyMap>
                <policyEntries>
                    <policyEntry queue=">" producerFlowControl="true" memoryLimit="20mb">
                        <deadLetterStrategy>
                          <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" />
                        </deadLetterStrategy>
                    </policyEntry>
                    <policyEntry topic=">" producerFlowControl="true" memoryLimit="20mb">
                    </policyEntry>
                </policyEntries>
            </policyMap>
        </destinationPolicy>         <!-- Use the following to configure how ActiveMQ is exposed in JMX -->
        <managementContext>
            <managementContext createConnector="true"/>
        </managementContext>         <!--
            Configure network connector to use multicast protocol
            For more information, see
            
            http://activemq.apache.org/multicast-transport-reference.html
        -->
   <networkConnectors>
     <networkConnector uri="multicast://default"
       dynamicOnly="true" 
       networkTTL="3" 
       prefetchSize="1" 
       decreaseNetworkConsumerPriority="true" />
   </networkConnectors>
   
        <persistenceAdapter>
            <!--<kahaDB directory="${activemq.base}/data/dynamic-broker1/kahadb"/>-->
            <kahaDB directory="D:/"/> 
        </persistenceAdapter>
        
        <!--  The maximum amount of space the broker will use before slowing down producers -->
        <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="20 mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="1 gb" name="foo"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="100 mb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>    <!-- 
            The transport connectors ActiveMQ will listen to
            Configure discovery URI to use multicast protocol
        -->
        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616" discoveryUri="multicast://default" />
        </transportConnectors>
             </broker> </beans>

上述2个配置文件中,重点在于2个配置文件在同一个目录,
<kahaDB directory="D:/"/> 

分别启动2个broker。

客户端连接按照:private static final String URL="failover:(tcp://localhost:61616,tcp://localhost:61618)"; 进行连接,即初始61616端口为Master,61618为Slave .

1、验证是否可以正常发送接收。

启动producer发送消息,日志输出

[Thread-1] Sending message: 'Message: 0 sent at: Wed Oct 30 09:46:42 CST 2013  ...'
[Thread-1] Sending message: 'Message: 1 sent at: Wed Oct 30 09:46:52 CST 2013 ...'
[Thread-1] Sending message: 'Message: 2 sent at: Wed Oct 30 09:47:02 CST 2013 ...'
[Thread-1] Sending message: 'Message: 3 sent at: Wed Oct 30 09:47:12 CST 2013 ...'
[Thread-1] Sending message: 'Message: 4 sent at: Wed Oct 30 09:47:22 CST 2013 ...'
[Thread-1] Sending message: 'Message: 5 sent at: Wed Oct 30 09:47:32 CST 2013 ...'

启动consumer接收消息,日志输出

[Thread-1] Received: 'Message: 0 sent at: Wed Oct 30 09:46:42 CST 2013  ...' (length 255)
[Thread-1] Received: 'Message: 1 sent at: Wed Oct 30 09:46:52 CST 2013 ...' (length 255)
[Thread-1] Received: 'Message: 2 sent at: Wed Oct 30 09:47:02 CST 2013 ...' (length 255)
[Thread-1] Received: 'Message: 3 sent at: Wed Oct 30 09:47:12 CST 2013 ...' (length 255)
[Thread-1] Received: 'Message: 4 sent at: Wed Oct 30 09:47:22 CST 2013 ...' (length 255)
[Thread-1] Received: 'Message: 5 sent at: Wed Oct 30 09:47:32 CST 2013 ...' (length 255)

说明收发正常。

2、验证Master宕机时,Slave进行消息处理,是否有消息丢失。

先停止consumer,记住此时已经接收了消息的序号,(此时消息id是18 待重启的时候看是否从下一个序号开始接收),procuder一直保持消息的发送,

停止61616端口上的master ,然后再启动consumer,发现consumer接收的消息从第19个消息开始接收。

Connecting to URL: failover:(tcp://localhost:61616,tcp://localhost:61618)
Consuming queue: TOOL.DEFAULT.SJ
Using a non-durable subscription
Running 1 parallel threads
log4j:WARN No appenders could be found for logger (org.apache.activemq.transport.failover.FailoverTransport).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[Thread-1] Received: 'Message: 19 sent at: Wed Oct 30 09:49:52 CST 2013 ...' (length 255)

说明master宕机后,不用进行任何处理,slave就可以继续处理消息了。