一个集群包含若干成员,要对这些成员进行管理就必须要有一张包含所有成员的列表,当要对某个节点做操作时通过这个列表可以准确找到该节点的地址进而对该节点发送操作消息。如何维护这张包含所有成员的列表是本节要讨论的主题。
成员维护是集群的基础功能,一般划分一个独立模块或层完成此功能,它提供成员列表查询、成员维护、成员列表改变事件通知等能力。由于tribes定位于基于同等节点之间的通信,所以并不存在主节点选举的问题,它所要具备的功能是自动发现节点,即新节点加入要通知集群其他成员更新成员列表,让每个节点都能及时更新成员列表,每个节点都维护一份集群成员表。如图,节点1、节点2、节点3使用组播通过交换机各自已经维护一份成员列表,且他们隔一段时间向交换机组播自己节点消息,即心跳操作。当第四个节点加入集群组,节点四向交换机组播自己的节点消息,原理三个节点接收到后各自把节点四加入到各自的成员列表中,而原来三个节点也不断向交换机发送节点消息,节点四接收到后依次更新成员列表信息,最终达到四个节点都拥有四个节点成员信息。
看下tribes的集群是如何设计实现以上功能的,其成员列表的创建维护是基于经典的组播方式实现,每个节点都创建一个节点信息发射器和节点信息接收器,让他们运行于独立的线程中。发射器用于向组内发送自己节点的消息,而接收器则用于接收其他节点发送过来的节点消息并进行处理。要使节点之间通信能被识别就需要定义一个语义,即约定报文协议的结构,tribes的成员报文是这样定义的,两个固定值用于表示报文的开始和结束,开始标识TRIBES_MBR_BEGIN 的值为字节数组84, 82, 73, 66, 69, 83, 45, 66, 1, 0,结束标识TRIBES_MBR_END的值为字节数组84, 82, 73, 66, 69, 83, 45, 69, 1, 0,整个协议包结构为:开始标识(10bytes)+包长度(4bytes)+存活时间(8bytes)+tcp端口(4bytes)+安全端口(4bytes)+udp端口(4bytes)+host长度(1byte)+host(nbytes)+命令长度(4bytes)+命令(nbytes)+域名长度(4bytes)+域名(nbytes)+唯一会话id(16bytes)+有效负载长度(4bytes)+有效负载(nbytes)+结束标识(10bytes)。成员发射器按照协议组织成包结构并组播,接收器接收包并按照协议进行解包,根据包信息维护成员表。
下面用一段代码简单展示实现过程,由于篇幅问题包的处理省略:
public class McastService {
private MulticastSocket socket;
private String address = "228.0.0.4";
private int port = 8000;
private InetAddress addr;
private byte[] buffer = new byte[2048];
private DatagramPacket receivePacket;
private final Object sendLock = new Object();
public void start() {
try {
addr = InetAddress.getByName(address);
receivePacket = new DatagramPacket(buffer, buffer.length, addr,port);
socket.joinGroup(addr);
new ReceiverThread().start();
new SenderThread().start();
} catch (IOException e) {
}
}
public class ReceiverThread extends Thread {
public void run() {
while (true) {
try {
receive();
} catch (ArrayIndexOutOfBoundsException ax) {
}
}
}
}
public class SenderThread extends Thread {
public void run() {
while (true) {
try {
send();
} catch (Exception x) {
}
try {
Thread.sleep(1000);
} catch (Exception ignore) {
}
}
}
}
public void send() {
byte[] data = 按照成员协议组织包结构;
DatagramPacket packet = new DatagramPacket(data, data.length, addr, port);
try {
socket.send(packet);
} catch (IOException e) {
}
}
public void receive() {
try {
socket.receive(receivePacket);
解析处理成员报文。
} catch (IOException e) {
}
}
}
第一步要先执行加入组播成员操作,接着分别启动接收器线程、发射器线程,一般接收器要优先启动。发射器每隔1秒组织协议包发送心跳,组播组内成员的接收器对接收到的协议报文进行解析,按照一定的逻辑更新各自节点本地成员列表,如果成员表已包含协议包的成员则只更新存活时间等消息。
Tribes利用上述原理维护集群成员,并且由独立模块MembershipService提供成员的相关服务,例如获取集群所有成员相关信息等。
喜欢java的同学可以交个朋友: