一、阿里云官网---帮助文档
https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.WWTIUh
按照官网步骤,创建Topic、申请发布(生产者)、申请订阅(消费者)
二、代码
1、配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
public class MqConfig {
/**
* 启动测试之前请替换如下 XXX 为您的配置
*/
public static final String PUBLIC_TOPIC = "test" ; //公网测试
public static final String PUBLIC_PRODUCER_ID = "PID_SCHEDULER" ;
public static final String PUBLIC_CONSUMER_ID = "CID_SERVICE" ;
public static final String ACCESS_KEY = "123" ;
public static final String SECRET_KEY = "123" ;
public static final String TAG = "" ;
public static final String THREAD_NUM = "25" ; //消费端线程数
/**
* ONSADDR 请根据不同Region进行配置
* 公网测试: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
* 公有云生产: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
* 杭州金融云: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
* 深圳金融云: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal
*/
public static final String ONSADDR = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal" ;
}
|
ONSADDR 阿里云用 公有云生产,测试用公网
不同的业务可以设置不同的tag,但是如果发送消息量大的话,建议新建TOPIC
2、生产者
方式1:
配置文件:producer.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
<? xml version = "1.0" encoding = "UTF-8" ?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">
< beans >
< bean id = "producer" class = "com.aliyun.openservices.ons.api.bean.ProducerBean"
init-method = "start" destroy-method = "shutdown" >
< property name = "properties" >
< map >
< entry key = "ProducerId" value = "" /> <!-- PID,请替换 -->
< entry key = "AccessKey" value = "" /> <!-- ACCESS_KEY,请替换 -->
< entry key = "SecretKey" value = "" /> <!-- SECRET_KEY,请替换 -->
<!--PropertyKeyConst.ONSAddr 请根据不同Region进行配置
公网测试: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
公有云生产: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
杭州金融云: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
深圳金融云: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal -->
< entry key = "ONSAddr" value = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal" />
</ map >
</ property >
</ bean >
</ beans >
|
启动方式1,在使用类的全局里设置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
//初始化生产者
private ApplicationContext ctx;
private ProducerBean producer;
@Value ( "${producerConfig.enabled}" ) //开关,spring配置项,true为开启,false关闭
private boolean producerConfigEnabled;
@PostConstruct
public void init(){
if ( true == producerConfigEnabled) {
ctx = new ClassPathXmlApplicationContext( "producer.xml" );
producer = (ProducerBean) ctx.getBean( "producer" );
}
}
|
PS:最近发现一个坑,如果producer用上面这种方式启动的话,一旦启动的多了,会造成fullGC,所以可以换成下面这种注解方式启动,在用到的地方手动start、shutdown
方式2:配置类(不需要xml)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
@Configuration
public class ProducerBeanConfig {
@Value ( "${openservices.ons.producerBean.producerId}" )
private String producerId;
@Value ( "${openservices.ons.producerBean.accessKey}" )
private String accessKey;
@Value ( "${openservices.ons.producerBean.secretKey}" )
private String secretKey;
private ProducerBean producerBean;
@Value ( "${openservices.ons.producerBean.ONSAddr}" )
private String ONSAddr;
@Bean
public ProducerBean oneProducer() {
ProducerBean producerBean = new ProducerBean();
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.ProducerId, producerId);
properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
properties.setProperty(PropertyKeyConst.ONSAddr, ONSAddr);
producerBean.setProperties(properties);
return producerBean;
}
}
|
PS:经过这次双11发现,以上2种方式在大数据量,多线程情况下都不太适用, 性能很差,所以推荐用3
方式3:(不需要xml)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
@Component
public class ProducerBeanSingleTon {
@Value ( "${openservices.ons.producerBean.producerId}" )
private String producerId;
@Value ( "${openservices.ons.producerBean.accessKey}" )
private String accessKey;
@Value ( "${openservices.ons.producerBean.secretKey}" )
private String secretKey;
@Value ( "${openservices.ons.producerBean.ONSAddr}" )
private String ONSAddr;
private static Producer producer;
private static class SingletonHolder {
private static final ProducerBeanSingleTon INSTANCE = new ProducerBeanSingleTon();
}
private ProducerBeanSingleTon (){}
public static final ProducerBeanSingleTon getInstance() {
return SingletonHolder.INSTANCE;
}
@PostConstruct
public void init(){
// producer 实例配置初始化
Properties properties = new Properties();
//您在控制台创建的Producer ID
properties.setProperty(PropertyKeyConst.ProducerId, producerId);
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
//设置发送超时时间,单位毫秒
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000" );
// 设置 TCP 接入域名(此处以公共云生产环境为例)
properties.setProperty(PropertyKeyConst.ONSAddr, ONSAddr);
producer = ONSFactory.createProducer(properties);
// 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可
producer.start();
}
public Producer getProducer(){
return producer;
}
}
|
spring配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
consumerConfig.enabled = true
producerConfig.enabled = true #方式1:
scheduling.enabled = false
#方式2、3:rocketMQ \u516C\u7F51\u914D\u7F6E
openservices.ons.producerBean.producerId = pid
openservices.ons.producerBean.accessKey =
openservices.ons.producerBean.secretKey =
openservices.ons.producerBean.ONSAddr = 公网、杭州公有云生产
|
方式1投递消息代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
try {
String jsonC = JsonUtils.toJson(elevenMessage);
Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes());
SendResult sendResult = producer.send(message);
if (sendResult != null ) {
logger.info(".Send mq message success!”;
} else {
logger.warn( ".sendResult is null........." );
}
} catch (Exception e) {
logger.warn( "DoubleElevenAllPreService" );
Thread.sleep( 1000 ); //如果有异常,休眠1秒
}
|
方式2投递消息代码:(可以每发1000个启动/关闭一次)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
producerBean.start();
try {
String jsonC = JsonUtils.toJson(elevenMessage);
Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes());
SendResult sendResult = producer.send(message);
if (sendResult != null ) {
logger.info(".Send mq message success!”;
} else {
logger.warn( ".sendResult is null........." );
}
} catch (Exception e) {
logger.warn( "DoubleElevenAllPreService" );
Thread.sleep( 1000 ); //如果有异常,休眠1秒
}
producerBean.shutdown();
|
方式3:投递消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
try {
String jsonC = JsonUtils.toJson(elevenMessage);
Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes());
Producer producer = ProducerBeanSingleTon.getInstance().getProducer();
SendResult sendResult = producer.send(message);
if (sendResult != null ) {
logger.info( "DoubleElevenMidService.Send mq message success! Topic is:" ”;
} else {
logger.warn( "DoubleElevenMidService.sendResult is null........." );
}
} catch (Exception e) {
logger.error( "DoubleElevenMidService Thread.sleep 1 s___error is " +e.getMessage(), e);
Thread.sleep( 1000 ); //如果有异常,休眠1秒
}
|
发送消息的代码一定要捕获异常,不然会重复发送。
这里的TOPIC用自己创建的,elevenMessage是要发送的内容,我这里是自己建的对象
3、消费者
配置启动类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
@Configuration
@ConditionalOnProperty (value = "consumerConfig.enabled" , havingValue = "true" , matchIfMissing = true )
public class ConsumerConfig {
private Logger logger = LoggerFactory.getLogger(LoggerAppenderType.smsdist.name());
@Bean
public Consumer consumerFactory(){ //不同消费者 这里不能重名
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, MqConfig.CONSUMER_ID);
consumerProperties.setProperty(PropertyKeyConst.AccessKey, MqConfig.ACCESS_KEY);
consumerProperties.setProperty(PropertyKeyConst.SecretKey, MqConfig.SECRET_KEY);
//consumerProperties.setProperty(PropertyKeyConst.ConsumeThreadNums,MqConfig.THREAD_NUM);
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, MqConfig.ONSADDR);
Consumer consumer = ONSFactory.createConsumer(consumerProperties);
consumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new DoubleElevenMessageListener()); //new对应的监听器
consumer.start();
logger.info( "ConsumerConfig start success." );
return consumer;
}
}
|
CID和ONSADDR一点要选对,用自己的,消费者线程数等可以在这里配置
创建消息监听器类,消费消息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
@Component
public class MessageListener implements MessageListener {
private Logger logger = LoggerFactory.getLogger( "remind" );
protected static ElevenReposity elevenReposity;
@Resource
public void setElevenReposity(ElevenReposity elevenReposity){
MessageListener .elevenReposity=elevenReposity;
}
@Override
public Action consume(Message message, ConsumeContext consumeContext) {
if (message.getTopic().equals( "自己的TOPIC" )){ //避免消费到其他消息 json转换报错
try {
byte [] body = message.getBody();
String res = new String(body);
//res 是生产者传过来的消息内容
//业务代码
} else {
logger.warn( "!" );
}
} catch (Exception e) {
logger.error( "MessageListener.consume error:" + e.getMessage(), e);
}
logger.info("MessageListener.Receive message”);
//如果想测试消息重投的功能,可以将Action.CommitMessage 替换成Action.ReconsumeLater
return Action.CommitMessage;
} else {
logger.warn();
return Action.ReconsumeLater;
}
}
|
注意,由于消费者是多线程的,所以对象要用static+set注入,把对象的级别提升到进程,这样多个线程就可以共用,但是无法调用父类的方法和变量
消费者状态可以查看消费者是否连接成功,消费是否延迟,消费速度等
重置消费位点可以清空所有消息
三、注意事项
1、发送的消息体 最大为256KB
2、消息最多存在3天
3、消费端默认线程数是20
4、如果运行过程中出现java挂掉或者cpu占用异常高,可以在发送消息的时候,每发送1000条让线程休息1s
5、本地测试或启动的时候,把ONSADDR换成公网,不然报错无法启动
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://my.oschina.net/u/3670641/blog/1560267