用JGroups实现一个分布式任务分发系统

时间:2022-06-03 06:06:10

该文章来源于Bela Ban 与2008年发布在JGroups官网的博客翻译,原文请查看
原文

前言

JGroups 是一个由Java实现的集群通信类库,目前已经迭代了很多版本。其在分布式系统中应用广泛,包括JBOSS,ElasticMQ,等业界知名的软件。关于JGroups的详细说明可以查看JGroups官网

以下内容来自Bela Ban原博客的翻译

JGroups

JGroups 是一个集群通讯类库,应用可以用JGroups
加入一个集群,发送消息给集群中的其他节点。同时也可以在集群节点离开或者加入集群的时候获得通知(包括节点崩溃).

JGroups 的任务是提供可靠的消息发送机制,它和JMS不同,它不关心,消息主题,队列等。

JGroups 最主要的特点是支持灵活多变的协议栈配置。应用可以按照自己的喜好将配置编辑到一个XML文件中。比如,应用可以很容易的实现压缩协议的配置。

或者它可以删除碎片,因为它的消息总是小于65K(超过UDP),或者因为它使用TCP作为传输

另一个应用程序可能会添加加密和身份验证,因此消息是加密的,只有节点表示有效的X.509证书可以加入集群。

应用程序甚至可以*地编写自己的协议(或扩展现有的协议),并将它们添加到配置中。例如,为了审计或统计数据,添加一个跟踪所有发送和接收到的消息的协议。

JGroups 架构图

用JGroups实现一个分布式任务分发系统

客户端的主要API是Channal,它用于发送和接收消息。

当消息发送以后,它将陪投递到协议栈,堆栈是一个协议列表,每个协议都有机会对消息进行处理。

例如,分段协议可能检查消息的大小。如果消息大于配置的大小,则可能将其分割为多个较小的消息,并将其发送到堆栈中。

在接收端,分段协议将对片段进行排队,直到接收到所有的片段,然后将它们组合到原始消息中并将其传递下去。

JGroups附带的协议可以划分为以下类别:

  • 传输:发送和接收消息。UDP使用IP多播和/或UDP数据报。TCP使用TCP连接。

  • 发现:节点的初始发现

  • 合并:在网络分区恢复后,将子集群合并回一个分区

  • 故障检测:监视集群节点和潜在崩溃或挂起的通知

  • 可靠性:确保消息不会丢失,只接收一次,并按照发送方发送的顺序接收。这是通过为每个消息分配序列号,并通过在消息丢失的情况下重新传输来完成的。

  • 稳定性:节点必须缓冲所有消息(用于重新传输)。稳定协议确保定期(或基于累积的大小),清除所有集群节点接收到的消息,这样它们就可以实现垃圾回收。

  • 组成员管理:跟踪集群中的节点,并通知应用节点的加入或离开(包括崩溃)

  • 流量控制:确保发送者不能发送消息的速度超过接收方在的处理速度。这是防止内存溢出的必要条件。

  • 分段:将大型消息片段分割为较小的消息,并在接收方重新组装

  • 状态传输:确保集群的共享状态(例如,所有HTTP会话)被正确地转移到一个新节点

  • 压缩:压缩消息并在接收方对其进行减压

  • 身份验证:防止未经授权的节点加入集群

  • 加密:消息加密

JGroups 主要API org.jgroups.JChannel

public class JChannel extends Channel {
public JChannel(String properties) throws ChannelException;
public void setReceiver(Receiver r);
public void connect(String cluster_name) throws ChannelException;
public void send(Message msg) throws ChannelException;
public View getView();
public Address getLocalAddress();
public void disconnect();
public void close();
}

创建一个channal 并调用 connect() 方法来加入一个集群

Channel ch=new JChannel(“/home/bela/udp.xml”);
ch.setReceiver(new ReceiverAdapter() {
public void receive(Message msg) {}
});
ch.connect(“demo-cluster”);

这将创建一个在/home/bela/udp.xml 中定义的 channal,如果应用需要不同的设置可以在udp.xml中修改,并作为构造函数的参数传入。

然后我们设置一个 Receiver ,它将会在消息接收以后被回调,最后我们加入一个集群,所有的channal都是类似的配置,并加入相同的集群。

当一个节点加入一个集群以后它就可以接收来自集群成员发送的消息。

Receiver 接口有两个方法:

void receive(Message msg);
void viewAccepted(View new_view);

receive() 方法将在消息到达的时候被回调。它有一个 org.jgroups.Message 类型的参数。

public class Message implements Streamable {
protected Address dest_addr=null;
protected Address src_addr=null;
private byte[] buf=null;

public byte[] getBuffer();
public void setBuffer(byte[] b);
}

message 中定义了一个目的地地址和一个发送方地址,一个Address 是在集群中标明一个节点的不透明类型。如果目的地地址是一个null则意味着这条消息将被发送到集群中所有的节点中去,反之则将发送给集群中唯一的一个节点。

当接收到消息时,应用程序可以调用getBuffer() 将消息检索到字节缓冲区,然后将其反序列化为对应用程序有意义的数据。

viewAccepted() 方法将在某个节点加入集群或者离开集群的时候被回调。它有一个唯一的参数 View ,它本质上是一个地址列表,所有集群节点都以完全相同的顺序接收视图,因此当我们有一个集群a、B、C时,节点有以下视图:

  • A: {A,B,C}
  • B: {A,B,C}
  • C: {A,B,C}

如果此时集群中有一个新的节点D加入,那么此时集群列表应该是 {A,B,C,D}。如果 B 节点奔溃那么此时集群中所有的节点将收到 {A,C,D} 视图,正如我们所看到的,视图中的节点是根据连接时间排序的。最老的节点总是第一个。

现在我们来讨论一下JChannel的其余方法。

send()方法接收消息并将其发送给所有集群节点,如果消息的目的地为null,或者如果目的地是非null,则将其发送给单个节点。应用程序需要将它们的数据编组到一个字节缓冲区,并通过message.setbuffer()将其放入消息中。

要得到当前视图,我们可以使用channel.getview(),并获取一个节点的本地地址,我们调用channel.getlocaladdress()。

调用 disconnect() 方法离开一个集群。

close() 用于销毁一个channal,通道一旦被关闭则无法被再次打开。调用disconnect() 方法将会在所有节点中创建一个视图,viewAccepted() 方法将会在所有节点中执行。

分布式任务系统

我们的想法非常简单,我们有一个由若干节点组成的集群,并且集群中的每个节点可以提交任务集群中的任何一个节点上执行,所以每个节点是平等的。简单的理解就是一个节点既可以提交任务也可以执行任务。在一个真实的系统中,客户端可以通过TCP 或者RMI 连接到集群中任何一个节点上,并提交一个任务,包括它自己。

当提交一个任务以后我们会产生一个随机的整数用来映射到集群中的某个节点上去,我们称其为秩。这个秩其实就是节点在 view中的位置,因为view中的位置在所有节点中都是相同的。秩是节点的唯一标识。

任务是通过多播在集群中执行的,每个节点都将任务添加到由任务及其提交者(JGroups)地址组成的散列map中。

每个节点比较任务的秩与自己的秩是否相同,如果秩与自己的秩匹配则执行任务 并将结果返回给提交者,反之则什么也不做。

当任务提交者接受到一个执行结果,它将通过多播的方式告诉集群中的所有节点,移除这个任务在map中。

如果此时接受任务的节点崩溃,则我们可以通过map找到哪些任务被提交给了这个节点,这个时候我们需要重新生成一个任务秩,并重新提交这个任务给其他节点。

如果master节点在提交了任务以后奔溃,同时没有收到任务执行结果,则此时所有的slave移除所有由此master提交的任务,因为任务执行结果对这个master来讲是毫无意义的。

下图是所有任务在集群中的分发示意图

用JGroups实现一个分布式任务分发系统

代码实现