过程:从定义DefaultProducer到DefaultProducer.start()。
一.测试代码
测试代码:
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");//1.实例化
producer.setNamesrvAddr("101.200.143.74:9876;123.56.70.138:9876");//2.设置namesrv
producer.start();//3.producer开始
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
String currentTime=df.format(new Date());
for (int i = 0; i < 10000000; i++)
try {
Thread.sleep(1000);
Message msg = new Message("topic-1",// topic
currentTime.getBytes(RemotingHelper.DEFAULT_CHARSET));// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
本节讨论从定义DefaultProducer到DefaultProducer.start()。
二.时序图
最重要的两个对象DefaultMQProducerImpl和mQClientFactory(类MQClientInstance)。
0.
DefaultMQProducer producer = new DefaultMQProducer(“ProducerGroupName”),实例化并且DefaultMQProducer与DefaultMQProducerImpl做关联,相互引用对方对象。
调用:
public DefaultMQProducer(final String producerGroup) {
this(producerGroup, null);
}
然后:
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
defaultMQProducerImpl是DefaultMQProducer的属性,且final 和 transient(不能序列化)。同时把 DefaultMQProducer实例化的对象给defaultMQProducerImpl。这样类DefaultMQProducer和类DefaultMQProducerImpl实现了相互引用。
defaultMQProducerImpl内部:
属性:private final DefaultMQProducer defaultMQProducer;
上面的构造方法:
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook;
}
1.defaultMQproducer.start():DefaultMQProducer调用DefaultMQProducerImpl的start.
2.DefaultMQProducerImpl.start():生产者开始,做了对生产者参数的验证。()
1)调用this.defaultMQProducerImpl.start();//defaultMQProducerImpl是DefaultMQProducer的属性,且final 和 transient(不能序列化)。
2)调用重载方法start(true):
public void start() throws MQClientException {
this.start(true);
}
3)分析start(true):
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
//类MQClientFactory的对象mQClientFactory注册生产者。
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());//维护topic与订阅信息TopicPublishInfo()的ConcurrentHashMap.
if (startFactory) {
mQClientFactory.start();//mq客户端开始工作
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "//
+ this.serviceState//
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
类初始化private ServiceState serviceState = ServiceState.CREATE_JUST所以进入 switch,this.serviceState=CREATE_JUST。服务状态变为this.serviceState = ServiceState.START_FAILED;
进入方法this.checkConfig():
Validators.checkGroup(this.defaultMQProducer.getProducerGroup());首先验证ProducerGroup名称的 有效性。包括,非空,正则是否合法,最长字符不能超过255.
不能是生产组名“DEFAULT_PRODUCER”和“CLIENT_INNER_PRODUCER”,这个是系统内部内建的方法组名。
this.defaultMQProducer.changeInstanceNameToPID():
把当前客户端producer名改成pid.
ClientConfig是DefaultMQProducer的父类,changeInstanceNameToPID()是类ClientConfig的方法。
获取了jvm进程id作为producer的intancename名。
String clientId = clientConfig.buildMQClientId();
为client分配id,规则:客户端外网ip@jvm进程id。
根据client取factoryTable内的mqclientInstance实例。
MQClientInstance instance = this.factoryTable.get(clientId);
3.1用MQClientManager.getInstance()创建MQClientManager实例:
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
3.2用方法MQClientManager.getAndCreateMQClientInstance创建类MQClientInstance的实例mQClientFactory:
在这里实例化一个MQClientInstance.
方法体: public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
} else {
// TODO log
}
}
return instance;
}
instance =new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
类MQClientInstance的构造方法(初始化所有MQClientInstance内的类,):
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.clientConfig = clientConfig;//克隆的clientConfig赋给MQClientInstance
this.instanceIndex = instanceIndex;//此instance的索引给MQClientInstance
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
//client端与name端通讯服务
if (this.clientConfig.getNamesrvAddr() != null) {
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
log.info(“user specified name server address: {}”, this.clientConfig.getNamesrvAddr());
} this.clientId = clientId;
this.mQAdminImpl = new MQAdminImpl(this);//后台管理服务
this.pullMessageService = new PullMessageService(this);//拉服务
this.rebalanceService = new RebalanceService(this);//负载均衡服务
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
this.defaultMQProducer.resetClientConfig(clientConfig);
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
log.info("created a new client Instance, FactoryIndex: {} ClinetID: {} {} {}, serializeType={}", //
this.instanceIndex, //
this.clientId, //
this.clientConfig, //
MQVersion.getVersionDesc(MQVersion.CurrentVersion), RemotingCommand.getSerializeTypeConfigInThisServer());
}Address更新nameserver:
类MQClient
public void updateNameServerAddressList(final String addrs) {
List lst = new ArrayList();
String[] addrArray = addrs.split(“;”);
if (addrArray != null) {
for (String addr : addrArray) {
lst.add(addr);
} this.remotingClient.updateNameServerAddressList(lst);//更新用来通讯的客户端的 //NameServerAddressLis
}
}
3.3 mQClientFactory的registerProducer():
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
4.1 mQClientFactory的start():
mQClientFactory开始启动所有的任务。
4.2.this.mQClientAPIImpl.start();
启动client与name节点的通讯服务。
4.3 this.startScheduledTask();
启动所有调度服务
4.4 this.pullMessageService.start();
启动拉服务
4.5 this.rebalanceservice()
启动负载均衡服务
4.6 this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
启动推服务