前言
前几天在学习Redis Cluster
模式的时候,突然想到如果把它的集群模式应用在T-io
上也是挺有意思的一件事情。
Redis 集群简介
Redis Cluster
中有 N 台实例,每个实例负责部分 Slot,总共有 16384 个Slot,然后客户端连接的时候,需要根据操作的Key计算出所在的Slot和服务实例地址,然后直接执行或者返回MOVE命令等。实例之间的元数据更新使用Gossip
协议。简单一张图了解一下:
Tio集群
我设计的很简单,就是一个多实例集群,没有主从关系,集群之间通过 伪Gossip(因为我也不知道咋实现,┭┮﹏┭┮)协议通讯。我的设计思路是这样的。
从图中可以看出,每个实例会和集群中的其他的某几个实例相连,通过信息扩散的方式达到最终集群的完整性。比如A只知道B的存在,但是它最终会根据B知道C的存在。或者C 根据B知道A的存在。
实现原理
每个实例在启动之后会加载集群的其他某几个节点。然后上线之后通过 ClientChannelContext
与另外的集群节点进行通讯。模仿Redis
设计了如下几个命令:
MEET
在一个实例上线之后,会发送 MEET
命令给其他已知实例。其他实例收到命令之后回复PONG
命令。
PING
实例会定期在实例列表中选择若干实例发送PING命令,对方实例回复PONG命令,交换信息
PONG
在接收到 MEET,PING
命令之后回复PONG
命令
FAIL
在创建 ClientChannelContext
对象之后,可以通过ClientAioListener
获取连接回调,如果连接失败,那么实例将向其他在线实例发送FAIL
命令,当本节点收到的FAILE
命令数大于节点总数的一半时,认为该节点确实已经宕掉了,那么在执行UNAVAILABLE
命令
UNAVAILABLE
如果说FAIL命令是病危通知书,那么 UNAVAILABLE
就是 确认死亡。
代码设计实现
TioClusterServer
继承自 TioServer
,新增start方法,增加其他集群业务,例如初始化集群上下文信息
ClusterServerTioConfig
继承自 ServerTioConfig
,为了约束客户端传参类型。
TioClusterNode
集群节点,包含 IP,Port,创建时间,状态 等信息,同时负责创建自己的ClientChannelContext
TioClusterContext
集群上下文,负责集群节点的创建,集群数据更新工作,同时负责调度各个节点的事件处理。
ClusterCommandExecutor
集群命令发送执行器,负责将各个命令发送出去
ClusterAioHandler,ClusterAioListener,ClusterServerAioHandler,ClusterServerAioListener
这几个类大家就很熟悉了,就是用于编解码的处理类。
集群启动流程
使用Gossip协议有个缺点,集群上线之后,可能会有很大的延迟发现。实例越多,延迟越大。更何况我自己实现的伪 Gossip 协议,暂且不提。
代码解析
在集群上下文中,维护了一个节点的集合,节点的增删改查都是通过它来实现。
/**
* 当前集群的所有节点
*/
private ConcurrentHashMap<String, TioClusterNode> clusterNodes = new ConcurrentHashMap<>(10);
初始化集群
单例,集群实例中只有一个大管家就是它,全局可用。
/**
* 初始化集群
*/
public TioClusterContext(ClusterCommandExecutor clusterCommandExecutor) {
if (tioClusterContext == null) {
synchronized (this) {
if (tioClusterContext == null) {
this.startTime = SystemTimer.currentTimeMillis();
this.clusterCommandExecutor = clusterCommandExecutor;
this.clusterProperty = TioClusterProperty.getClusterProperty();
initCurrentNode();
initClusterNodes();
tioClusterContext = this;
}
}
}
}
通知节点选择
根据配置文件选择要发送消息的N个节点,原则如下,距离上次更新时间超过配置文件中的超时时间的节点优先。然后轮询选择下一个节点,防止某个节点可能较长时间未被选中(也可以通过时上次更新时间排序解决),达到配置文件的节点数,停止。
public List<TioClusterNode> selectNotifyNodes(boolean containsCurrent,boolean containsUnavailable) {
List<TioClusterNode> nodes = new LinkedList<>();
TioClusterNode[] nodesInArray = getClusterNodesInArray(containsCurrent);
//根据时间间隔查找下一个节点
for (TioClusterNode node : nodesInArray) {
if ((SystemTimer.currentTimeMillis() - node.getUpdateTime()) >= clusterProperty.getNotifyTimeInterval()
&& nodes.size() < clusterProperty.getNotifyNodesMaxCount()
&& (containsCurrent || !isCurrentNode(node))
&& !nodes.contains(node)
&& (containsUnavailable || !node.isUnAvailable())) {
nodes.add(node);
}
}
int maxCount = nodesInArray.length;
int lastCount = (clusterProperty.getNotifyNodesMaxCount() > maxCount ? maxCount : clusterProperty.getNotifyNodesMaxCount()) - nodes.size();
int tryMaxCount = lastCount;
int tryCount = 0;
//很容易出现死循环,做好边界
while (lastCount > 0 && tryCount < tryMaxCount) {
TioClusterNode nextNode = RoundRibbonNodeSelector.nextPingNode(nodesInArray);
if (nextNode != null && !nodes.contains(nextNode)) {
if (containsUnavailable || !nextNode.isUnAvailable()) {
nodes.add(nextNode);
}
lastCount--;
}
tryCount++;
}
return nodes;
}
命令发送
发送命令很简单,就是调用 Tio 的Send方法。
private void send(TioClusterNode node, Consumer<ChannelContext> consumer){
ClientChannelContext channelContext = node.getClientChannelContext();
if (node.connectionActivated()) {
consumer.accept(channelContext);
}
}
@Override
public void meet(TioClusterNode other) {
send(other, channelContext -> Tio.send(channelContext, ClusterPacketBuilder.buildMeetPacket()));
}
命令编码
由于格式固定,所以命令的内容就不直接用 JSON 格式传输,根据命令的传输内容不同,分别组装不同的命令。
public ByteBuffer build( final ClusterPacket clusterPacket) {
this.clusterPacket = clusterPacket;
byte[] body = body();
ByteBuffer buffer = ByteBuffer.allocate(bufferLength());
//command
buffer.put(clusterPacket.command().getType());
//fromServerLength
buffer.putShort(fromServerLength());
//bodyLength
buffer.putInt(body.length);
//fromServer
buffer.put(fromServerBytes);
buffer.put(body);
return buffer;
}
//此方法由具体的命令类实现,每个命令所传输的内容不同,格式也不同
protected abstract byte[] body();
AioHandler 解耦
由于我们在内部处理集群消息的时候需要用到AioHandler,所以这个是避免不了的,但是用户又有自己的消息要处理,所以,我又增加了一层,将ClusterServerAioHandler
作为抽象类,增加若干抽象方法,然后具体的用户消息的处理放到新增的抽象方法里,其他类同理,不在阐述。
public abstract class ClusterServerAioHandler implements ServerAioHandler {
private static final Logger logger = LoggerFactory.getLogger(ClusterServerAioHandler.class);
/**
* 用户自定义解码
*/
public abstract Packet clusterDecode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException;
/**
* 用户自定义编码
*/
public abstract ByteBuffer clusterEncode(Packet packet, TioConfig tioConfig, ChannelContext channelContext);
/**
* 用户自定义消息处理
*/
public abstract void clusterHandler(Packet packet, String packetJsonString, ChannelContext channelContext) throws Exception;
/**
* 处理集群消息解码
* */
@Override
public final Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException {
Packet clusterPacket = ClusterPacketDecoder.decode(buffer, limit, position, readableLength, channelContext);
if (clusterPacket != null) {
return clusterPacket;
}
//reset buffer because cluster has already read a byte for cluster command
buffer.position(position);
return clusterDecode(buffer, limit, position, readableLength, channelContext);
}
/**
* 处理集群消息编码
* */
@Override
public final ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
ByteBuffer buffer = ClusterPacketEncoder.encode(packet, tioConfig, channelContext);
if (buffer != null) {
return buffer;
}
//当集群解析的Buffer为NULL时候,说明是非集群内部消息,交给用户自定义处理
return clusterEncode(packet, tioConfig, channelContext);
}
/**
*
* 处理集群消息*/
@Override
public final void handler(Packet packet, ChannelContext channelContext) throws Exception {
//这里留空,后边讲解
}
}
集群消息的转发
暂时还没有将 slot 的概念派上用场,先用了普通的取模方式。此方法是根据业务主键获取所在节点的方法。比如 ID为1 的用户连接的服务节点可能为 127.0.0.1:7002,ID为2的用户连接的服务节点为127.0.0.1:7001.至于细节可以屏蔽,比如通过NGINX让用户无感知到底连接了哪台服务器。
/**
* 根据 hashCode 获取该值所绑定的节点,如果集群机器增加或者删除可能会导致节点错误,发消息失败(暂不考虑)
* */
public static TioClusterNode getClusterServerNode(Object object) {
int hashCode = object.hashCode();
//获取集群活跃节点
List<TioClusterNode> nodes = TioClusterContext.currentContext().getActivateNodes();
int length = nodes.size();
//取模
int index = hashCode % length;
return nodes.get(index);
}
发送消息
private static boolean sendToUserInternal(TioConfig tioConfig,String userId, Packet packet) {
TioClusterNode node = getClusterServerNode(userId);
//如果是本机,直接发送到本机即可
if (TioClusterContext.currentContext().currentServer().equals(node) && tioConfig != null) {
return Tio.sendToUser(tioConfig, userId, packet);
} else {
//这里需要包装成为ClusterPacket,因为集群内部的消息是通过ClusterPacket实现的
ClusterPacket userPacket = ClusterPacketBuilder.buildUserPacket(packet);
return Tio.send(node.getClientChannelContext(), userPacket);
}
}
上文中的代码很简单,如果获取到的节点是本机节点,那么很幸运,直接执行发送即可。否则,需要将Packet包装到ClusterPacket
中,因为集群内部的消息都是通过它来流通的。新增 USER 命令。它的包编码很简单,转JSON。
@Override
protected byte[] body() {
Packet userPacket = ((ClusterUserPacket) clusterPacket).getPacketBody().getUserPacket();
String json = Json.toJson(userPacket);
return SafeStringEncoder.stringToBytes(json);
}
在上文中留空的地方代码如下:
@Override
public final void handler(Packet packet, ChannelContext channelContext) throws Exception {
//第一种情况,内部消息转发
if (packet instanceof ClusterPacket) {
ClusterPacket clusterPacket = (ClusterPacket) packet;
logger.info("[{}] RECEIVED [{}] FROM [{}] ", TioClusterContext.currentContext().currentServer().serverName(),clusterPacket.command().getCmd() ,clusterPacket.getPacketBody().getFromNode().serverName());
//当命令为USER时,说明此消息是从其他节点过来的用户消息
if (clusterPacket.command() == ClusterCommand.USER) {
ClusterUserPacket clusterUserPacket = (ClusterUserPacket) packet;
//由于框架并不知道用户使用了具体的Packet类型,所以,这里提供了 JSON 和 对象的两种传递方式,在这里 getuserPakcet 是为NULL的。
clusterHandler(clusterUserPacket.getPacketBody().getUserPacket(), clusterUserPacket.getPacketBody().getUserPacketString(), channelContext);
} else {
ClusterObjectFactory.getPacketHandler(clusterPacket.command()).handle(clusterPacket, channelContext);
}
} else {
//第二种情况,直接走用户消息处理
clusterHandler(packet, null, channelContext);
}
}
测试环节
- 1 编写 ServerAioHandler,ServerAioListener,正如上文所讲,需要继承ClusterServerAioHandler
public class MyClusterServerAioHandler extends ClusterServerAioHandler {}
public class MyClusterServerAioListener extends ClusterServerAioListener{}
- 2 编写starter
public static void main(String[] args) throws IOException {
start();
}
private static void start() throws IOException {
//这里要实例化 ClusterSververTioConfig
tioConfig = new ClusterServerTioConfig(new MyClusterServerAioHandler(), new MyClusterServerAioListener());
TioClusterServer tioServer = new TioClusterServer(tioConfig);
tioServer.start();
}
- 3 编写配置文件
tio.server.ip=127.0.0.1
tio.server.port=7001
tio.cluster.server.nodes=127.0.0.1:7002
tio.cluster.server.notify.max.count=4
tio.cluster.server.notify.interval=20000
tio.cluster.server.notify.retry.count=1
tio.cluster.server.notify.timeout=20000
tio.server.ip=127.0.0.1
tio.server.port=7002
tio.cluster.server.nodes=127.0.0.1:7001
tio.cluster.server.notify.max.count=4
tio.cluster.server.notify.interval=20000
tio.cluster.server.notify.retry.count=1
tio.cluster.server.notify.timeout=20000
4 启动服务,查看日志
日志中可以看出,刚开始一个节点是连不通的,但是该节点上线之后发送了MEET命令。
现在两个节点都上线了,集群目前正常。5 启动两个客户端,一个用户1,一个用户2 ,用户1 连接 7002,用户2 连接 7001
public static void main(String[] args) throws Exception {
String name = System.getenv("TIO-CLUSTER-FILE-NAME");
Prop prop = new Prop(name + ".properties");
ClientTioConfig config = new ClientTioConfig(new MyClientAioHandler(), new MyClientAioListener(), null);
config.setHeartbeatTimeout(0);
config.setName("testClient");
TioClient tioClient = new TioClient(config);
ClientChannelContext channelContext = tioClient.connect(new Node(prop.get("tio.server.ip"), prop.getInt("tio.server.port")), 20000);
String userId = prop.get("tio.userid");
Tio.send(channelContext, MyPacket.handshakePacket(userId));
while (true) {
Thread.sleep(5000);
Tio.send(channelContext, MyPacket.helloPacket(prop.get("tio.touserid"), "hello there ,I'm user " + userId + ",I come from " + prop.get("tio.server.port")));
}
}
总结
本文通过参考Redis集群模式实现了一个简单的集群功能。开发过程中遇到的问题:
- 集群节点的发现功能,Gossip协议的实现,需要优化,优化节点算法
- 细节的处理,写着写着可能发现隐藏着死循环!!!
- 并发的处理,在这方面由于能力原因处理较少,应该还有优化空间
- 消息类的解耦和封装,还有很多优化空间
此集群实现可能出现的问题:
集群节点过多的时候,一个实例可能维护多个 client ,再加上各种命令的发送,会比较浪费系统资源。可以通过合理的设置超时时间,通知节点的个数等来优化,我觉得也可以采用 RocketMQ的NameServer的方式去实现
思路虽然不算错误,但是个人感觉,依赖第三方组件也是一种稳妥的做法,Redis发布订阅他不香吗?,Zookeeper管理集群节点不好吗?
其实这都不是问题,最主要的是享受思考和开发的过程,通常会几个小时最后发现是自己犯了一个小错误导致的。不过还好,结局还是在接受范围内。
最后本文也花费了我接近两个小时的时间,分享出我的想法和开发历程。希望能给各位带来收获,另外也希望各位大牛提出更好的意见和方案。谢谢大家,回家喽~~