通过对ClusterManager 源码的学习,对其Cluster的 整体思路有了一些认识。
在ClusterManager 中有一段静态代码段,如下:
static {其主要功能是 建立一个 属性 监听器,并插入到 PropertyEventDispatcher 中。PropertyEventDispatcher 顾名思义,就是一个属性事件分发器,当openfire系统中的某个属性发生变换后,PropertyEventDispatcher 会通知其保存的各个PropertyEventListener(当然包含此处建立的这个)。
// Listen for clustering property changes (e.g. enabled/disabled)
PropertyEventDispatcher.addListener(new PropertyEventListener() {
public void propertySet(String property, Map<String, Object> params) { /* ignore */ }
public void propertyDeleted(String property, Map<String, Object> params) { /* ignore */ }
public void xmlPropertyDeleted(String property, Map<String, Object> params) { /* ignore */ }
public void xmlPropertySet(String property, Map<String, Object> params) {
if (ClusterManager.CLUSTER_PROPERTY_NAME.equals(property)) {
if (Boolean.parseBoolean((String) params.get("value"))) {
// Reload/sync all Jive properties
JiveProperties.getInstance().init();
ClusterManager.startup();
} else {
ClusterManager.shutdown();
}
}
}
});
在这里,属性监听器 的主要功能 体现在 xmlPropertySet 函数中,即 监听 clustering.enabled 属性值的变化,如果 为变为true,则重载所有Jive属性值,并调
用 ClusterManager.startup() 来启动cluster。 否则,则关闭cluster,即执行 ClusterManager.shutdown()。
下面来看看ClusterManager.startup() 的代码:
public static synchronized void startup() {
if (isClusteringEnabled() && !isClusteringStarted()) {
initEventDispatcher();
CacheFactory.startClustering();
}
}
可以看出,在Cluster启用并且未被启动的前提下,startup 函数执行 两个函数: initEventDispatcher() 和 CacheFactory.startClustering()
private static void initEventDispatcher() {
if (dispatcher == null || !dispatcher.isAlive()) {
dispatcher = new Thread("ClusterManager events dispatcher") {
@Override
public void run() {
// exit thread if/when clustering is disabled
while (ClusterManager.isClusteringEnabled()) {
try {
Event event = events.take();
EventType eventType = event.getType();
// Make sure that CacheFactory is getting this events first (to update cache structure)
if (event.getNodeID() == null) {
// Replace standalone caches with clustered caches and migrate data
if (eventType == EventType.joined_cluster) {
CacheFactory.joinedCluster();
} else if (eventType == EventType.left_cluster) {
CacheFactory.leftCluster();
}
}
// Now notify rest of the listeners
for (ClusterEventListener listener : listeners) {
try {
switch (eventType) {
case joined_cluster: {
if (event.getNodeID() == null) {
listener.joinedCluster();
}
else {
listener.joinedCluster(event.getNodeID());
}
break;
}
case left_cluster: {
if (event.getNodeID() == null) {
listener.leftCluster();
}
else {
listener.leftCluster(event.getNodeID());
}
break;
}
case marked_senior_cluster_member: {
listener.markedAsSeniorClusterMember();
break;
}
default:
break;
}
}
catch (Exception e) {
Log.error(e.getMessage(), e);
}
}
// Mark event as processed
event.setProcessed(true);
} catch (Exception e) {
Log.warn(e.getMessage(), e);
}
}
}
};
dispatcher.setDaemon(true);
dispatcher.start();
}
}
在 initEventDispatcher 内部,实质上是创建了一个后台线程,用来处理ClusterManager 所持有的 事件队列中的事件。处理方式无非是根据event的类型,来 相应的通知 监听者进行处理。事件的类型 在ClusterManager中定义如下:
private enum EventType {
/**
* This JVM joined a cluster.
*/
joined_cluster,
/**
* This JVM is no longer part of the cluster.
*/
left_cluster,
/**
* This JVM is now the senior cluster member.
*/
marked_senior_cluster_member
}
处理完成之后调用event.setProcessed(true);
在此之后,调用CacheFactory.startClustering()来真正启动Cluster。其代码如下:
public static void startClustering() {
if (isClusteringAvailable()) {
clusteringStarting = clusteredCacheFactoryStrategy.startCluster();
}
if (clusteringStarting) {
if (statsThread == null) {
// Start a timing thread with 1 second of accuracy.
statsThread = new Thread("Cache Stats") {
private volatile boolean destroyed = false;
@Override
public void run() {
XMPPServer.getInstance().addServerListener(new XMPPServerListener() {
public void serverStarted() {}
public void serverStopping() {
destroyed = true;
}
});
ClusterManager.addListener(new ClusterEventListener() {
public void joinedCluster() {}
public void joinedCluster(byte[] nodeID) {}
public void leftCluster() {
destroyed = true;
ClusterManager.removeListener(this);
}
public void leftCluster(byte[] nodeID) {}
public void markedAsSeniorClusterMember() {}
});
// Run the timer indefinitely.
while (!destroyed && ClusterManager.isClusteringEnabled()) {
// Publish cache stats for this cluster node (assuming clustering is
// enabled and there are stats to publish).
try {
cacheFactoryStrategy.updateCacheStats(caches);
}
catch (Exception e) {
log.error(e.getMessage(), e);
}
try {
// Sleep 10 seconds.
sleep(10000);
}
catch (InterruptedException ie) {
// Ignore.
}
}
statsThread = null;
log.debug("Cache stats thread terminated.");
}
};
statsThread.setDaemon(true);
statsThread.start();
}
}
}
在代码中进行了相关判断,并首先调用了clusteredCacheFactoryStrategy.startCluster() 方法。关于clusteredCacheFactoryStrategy的初始化方式在 缓存学习一节中有讲到。
而后设置了一个后台线程,在该线程中,首先添加服务器监听事件,用来监听服务器的关闭事件,之后添加集群事件监听器,用来监听节点的“离开集群”事件。最后,以10秒为周期 调用cacheFactoryStrategy.updateCacheStats(caches)来更新缓存。
至此,集群功能启动完毕