本文介绍了java远程连接调用Rabbitmq,分享给大家,希望此文章对各位有所帮助。
打开IDEA创建一个maven工程(Java就可以了)。
pom.xml文件如下
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
< project xmlns = " http://maven.apache.org/POM/4.0.0 " xmlns:xsi = " http://www.w3.org/2001/XMLSchema-instance "
xsi:schemaLocation = " http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd " >
< modelVersion >4.0.0</ modelVersion >
< groupId >com.zhenqi</ groupId >
< artifactId >rabbitmq-study</ artifactId >
< version >1.0-SNAPSHOT</ version >
< packaging >jar</ packaging >
< name >rabbitmq-study</ name >
< url > http://maven.apache.org </ url >
< properties >
< project.build.sourceEncoding >UTF-8</ project.build.sourceEncoding >
</ properties >
< dependencies >
< dependency >
< groupId >junit</ groupId >
< artifactId >junit</ artifactId >
< version >4.12</ version >
< scope >test</ scope >
</ dependency >
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
< dependency >
< groupId >com.rabbitmq</ groupId >
< artifactId >amqp-client</ artifactId >
< version >4.1.0</ version >
< exclusions >
< exclusion >
< groupId >org.slf4j</ groupId >
< artifactId >slf4j-api</ artifactId >
</ exclusion >
</ exclusions >
</ dependency >
< dependency >
< groupId >org.slf4j</ groupId >
< artifactId >slf4j-log4j12</ artifactId >
< version >1.7.21</ version >
</ dependency >
< dependency >
< groupId >commons-lang</ groupId >
< artifactId >commons-lang</ artifactId >
< version >2.6</ version >
</ dependency >
</ dependencies >
</ project >
|
为了能远程访问rabbitmq,则需要编辑 /etc/rabbitmq/rabbitmq.conf,添加以下内容。
1
|
2
3
|
[
{rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["asdf"]}]}
]
|
添加administrator角色
1
|
|
rabbitmqctl set_user_tags openstack administrator
|
创建抽象队列 EndPoint.java
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
package com.zhenqi;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* Created by wuming on 2017/7/16.
*/
public abstract class EndPoint {
protected Channel channel;
protected Connection connection;
protected String endPointName;
public EndPoint(String endpointName) throws Exception {
this .endPointName = endpointName;
//创建一个连接工厂 connection factory
ConnectionFactory factory = new ConnectionFactory();
//设置rabbitmq-server服务IP地址
factory.setHost( "192.168.146.128" );
factory.setUsername( "openstack" );
factory.setPassword( "rabbitmq" );
factory.setPort( 5672 );
factory.setVirtualHost( "/" );
//得到 连接
connection = factory.newConnection();
//创建 channel实例
channel = connection.createChannel();
channel.queueDeclare(endpointName, false , false , false , null );
}
/**
* 关闭channel和connection。并非必须,因为隐含是自动调用的。
* @throws IOException
*/
public void close() throws Exception{
this .channel.close();
this .connection.close();
}
}
|
生产者Producer.java
生产者类的任务是向队列里写一条消息
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
package com.zhenqi;
import org.apache.commons.lang.SerializationUtils;
import java.io.Serializable;
/**
* Created by wuming on 2017/7/16.
*/
public class Producer extends EndPoint {
public Producer(String endpointName) throws Exception {
super (endpointName);
}
public void sendMessage(Serializable object) throws Exception {
channel.basicPublish( "" ,endPointName, null , SerializationUtils.serialize(object));
}
}
|
消费者QueueConsumer.java
消费者可以以线程方式运行,对于不同的事件有不同的回调函数,其中最主要的是处理新消息到来的事件。
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
package com.zhenqi;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import org.apache.commons.lang.SerializationUtils;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* Created by wuming on 2017/7/16.
*/
public class QueueConsumer extends EndPoint implements Runnable, Consumer {
private Logger LOG=Logger.getLogger(QueueConsumer. class );
public QueueConsumer(String endpointName) throws Exception {
super (endpointName);
}
public void handleConsumeOk(String s) {
}
public void handleCancelOk(String s) {
}
public void handleCancel(String s) throws IOException {
}
public void handleShutdownSignal(String s, ShutdownSignalException e) {
}
public void handleRecoverOk(String s) {
LOG.info( "Consumer " +s + " registered" );
}
public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte [] bytes) throws IOException {
Map map = (HashMap) SerializationUtils.deserialize(bytes);
LOG.info( "Message Number " + map.get( "message number" ) + " received." );
}
public void run() {
try {
channel.basicConsume(endPointName, true , this );
} catch (IOException e){
e.printStackTrace();
}
}
}
|
测试
运行一个消费者线程,然后开始产生大量的消息,这些消息会被消费者取走
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
package com.zhenqi;
import java.util.HashMap;
/**
* Created by wuming on 2017/7/16.
*/
public class TestRabbitmq {
public static void main(String[] args){
try {
QueueConsumer consumer = new QueueConsumer( "queue" );
Thread consumerThread = new Thread(consumer);
consumerThread.start();
Producer producer = new Producer( "queue" );
for ( int i = 0 ; i < 100000 ; i++){
HashMap message = new HashMap();
message.put( "message number" , i);
producer.sendMessage(message);
System.out.println( "Message Number " + i + " sent." );
}
} catch (Exception e){
e.printStackTrace();
}
}
}
|
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:http://blog.csdn.net/coco2d_x2014/article/details/75213318?utm_source=tuicool&utm_medium=referral