在broker启动的时候BrokerController如果是slave,配置了master地址更新,没有配置所有broker会想namesrv注册,从namesrv获取haServerAddr,然后更新到HAClient
当HAClient的MasterAddress不为空的时候(因为broker master和slave都构建了HAClient)会主动连接master获取SocketChannel
Master监听Slave请求的端口,默认为服务端口+1
接收slave上传的offset long类型
int pos = this.byteBufferRead.position() -(this.byteBufferRead.position() % 8) //没有理解意图
long readOffset =this.byteBufferRead.getLong(pos - 8);
this.processPostion = pos;
主从复制从哪里开始复制:如果请求时0 ,从最后一个文件开始复制
Slave启动的时候brokerController开启定时任务定时拷贝master的配置信息
SlaveSynchronize类代表slave从master同步信息(非消息)
syncTopicConfig 同步topic的配置信息
syncConsumerOffset 同步消费进度
syncDelayOffset 同步定时进度
syncSubcriptionGroupConfig 同步订阅组配7F6E
HaService类实现了HA服务,负责同步双写,异步复制功能, 这个类master和slave的broker都会实例化,
Master通过AcceptSocketService监听slave的连接,每个masterslave连接都会构建一个HAConnection对象搭建他们之间的桥梁,对于一个master多slave部署结构的会有多个HAConnection实例,
Master构建HAConnection时会构建向slave写入数据服务线程对象WriteSocketService对象和读取Slave反馈服务线程对象ReadSocketService
WriteSocketService
向slave同步commitLog数据线程,
slaveRequestOffset是每次slave同步完数据都会向master发送一个ack表示下次同步的数据的offset。
如果slave是第一次启动的话slaveRequestOffset=0, master会从最近那个commitLog文件开始同步。(如果要把master上的所有commitLog文件同步到slave的话, 把masterOffset值赋为minOffset)
向socket写入同步数据: 传输数据协议<Phy Offset> <Body Size> <Body Data>
ReadSocketService:
4.2 ReadSocketService
读取slave通过HAClient向master返回同步commitLog的物理偏移量phyOffset值
通知前端线程,如果是同步复制的话通知是否复制成功
Slave 通过HAClient建立与master的连接,
来定时汇报slave最大物理offset,默认5秒汇报一次也代表了跟master之间的心跳检测
读取master向slave写入commitlog的数据, master向slave写入数据的格式是
Slave初始化DefaultMessageStore时候会构建ReputMessageService服务线程并在启动存储服务的start方法中被启动
ReputMessageService的作用是slave从物理队列(由commitlog文件构成的MapedFileQueue)加载数据,并分发到各个逻辑队列
HA同步复制, 当msg写入master的commitlog文件后,判断maser的角色如果是同步双写SYNC_MASTER, 等待master同步到slave在返回结果
3 HA异步复制