package com.autonavi.tinfo.traffic.zookeeper; import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext; import com.github.zkclient.IZkChildListener;
import com.github.zkclient.IZkStateListener;
import com.github.zkclient.ZkClient; public class DistributedZookeeper {
private static final Logger logger = LoggerFactory.getLogger(DistributedZookeeper.class);
private Lock lock = new ReentrantLock();// 锁对象
private int sessionTimeout;
private int connectionTimeout;
private String zkServerList;
// private String zkServerDir = "tmc-city-root-path";
private String subNode = "tmclr";
private String curPath;
private ZkClient zkClient;
private String[] resourcePath;
private String zookeeperPath; public String getZookeeperPath() {
return zookeeperPath;
} public void setZookeeperPath(String zookeeperPath) {
this.zookeeperPath = zookeeperPath;
} private ClassPathXmlApplicationContext context = null; private void start() {
if (context == null) {
context = new ClassPathXmlApplicationContext(resourcePath);
}
} private void destroy() {
if (context != null) {
// context.registerShutdownHook(); logger.info("destroyed current application!!!");
context.stop();
context.close();
context.destroy();
context.registerShutdownHook();
context = null;
}
} public void connect() throws Exception { if (this.zkClient != null) {
this.zkClient.close();
}
this.zkClient = new ZkClient(zkServerList, sessionTimeout, connectionTimeout); if (!zkClient.exists(zookeeperPath)) {
zkClient.createPersistent(zookeeperPath, null);
}
if (curPath == null) {
curPath = zkClient.createEphemeralSequential(zookeeperPath + "/" + subNode, "monitor".getBytes());
} try {
startWatchingTopicStatus();
} catch (Exception e) {
// TODO Auto-generated catch block
logger.error(e.getMessage(), e);
logger.error("error occurs during sync data from zk");
System.exit(0);
}
Thread.sleep(2000);// */
handleMonitorNodeChange();
} public void startWatchingTopicStatus() {
ZkTopicStatusListener topicEventListener = new ZkTopicStatusListener();
ZkConnectedStatusListener connectedStatusListener = new ZkConnectedStatusListener();
try {
zkClient.subscribeChildChanges(zookeeperPath, topicEventListener);
zkClient.subscribeStateChanges(connectedStatusListener);
} catch (Exception e) {
logger.error(e.getMessage(), e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
startWatchingTopicStatus();
} } public void handleMonitorNodeChange() throws Exception {
this.lock.lock();
try {
if (zkClient == null)
return;
if (!zkClient.exists(zookeeperPath)) {
zkClient.createPersistent(zookeeperPath, null);
} // 确认curPath是否真的是列表中的最小节点
List<String> childs = zkClient.getChildren(zookeeperPath);
if (childs == null || childs.size() == 0) {
// 创建子节点
curPath = zkClient.createEphemeralSequential(zookeeperPath + "/" + subNode, "monitor".getBytes());
childs = zkClient.getChildren(zookeeperPath); }
Collections.sort(childs); String thisNode = curPath.substring((zookeeperPath + "/").length());
int index = childs.indexOf(thisNode);
if (index < 0) {
curPath = zkClient.createEphemeralSequential(zookeeperPath + "/" + subNode, "monitor".getBytes());
childs = zkClient.getChildren(zookeeperPath);
Collections.sort(childs);
thisNode = curPath.substring((zookeeperPath + "/").length());
index = childs.indexOf(thisNode);
} if (index == 0) {
// 确实是最小节点
start();
} else {
destroy();
}
} finally {
this.lock.unlock();
}
} class ZkTopicStatusListener implements IZkChildListener { @Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
handleMonitorNodeChange();
}
} class ZkConnectedStatusListener implements IZkStateListener { @Override
public void handleStateChanged(KeeperState state) throws Exception {
// TODO Auto-generated method stub ConnectedReadOnly
if (state.equals(KeeperState.SyncConnected) || state.equals(KeeperState.ConnectedReadOnly)) {
System.out.println("zookeeper start to be connected");
handleMonitorNodeChange();
} else if (state.equals(KeeperState.Disconnected)) {
destroy();
}
} @Override
public void handleNewSession() throws Exception {
// TODO Auto-generated method stub
} } public void stop() {
destroy();
if (zkClient == null) {
logger.warn("cannot shutdown already shutdown topic event watcher.");
return;
}
// stopWatchingTopicEvents();
zkClient.close();
zkClient = null;
} public void setZkServerList(String zkServerList) {
this.zkServerList = zkServerList;
} public int getSessionTimeout() {
return sessionTimeout;
} public void setSessionTimeout(int sessionTimeout) {
this.sessionTimeout = sessionTimeout;
} public int getConnectionTimeout() {
return connectionTimeout;
} public void setConnectionTimeout(int connectionTimeout) {
this.connectionTimeout = connectionTimeout;
} public String[] getResourcePath() {
return resourcePath;
} public void setResourcePath(String[] resourcePath) {
this.resourcePath = resourcePath;
} public static void main(String[] args) throws Exception {
DistributedZookeeper statusMonitor = new DistributedZookeeper();
try {
if (args.length < 5) {
logger.warn("incomplete parameters.");
System.exit(0);
} // statusMonitor.setZkServerList("10.17.133.73:2181,10.17.133.73:2182,10.17.133.73:2183");
// statusMonitor.setConnectionTimeout(5000);
// statusMonitor.setSessionTimeout(5000);
// statusMonitor.setResourcePath(args); statusMonitor.setZkServerList(args[0]);
statusMonitor.setConnectionTimeout(Integer.valueOf(args[1]));
statusMonitor.setSessionTimeout(Integer.valueOf(args[2]));
statusMonitor.setZookeeperPath(args[3]);
statusMonitor.setResourcePath(Arrays.copyOfRange(args, 4, args.length)); statusMonitor.connect(); Executors.newSingleThreadExecutor().awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
} catch (Exception e) {
logger.error(e.toString(), e);
statusMonitor.stop();
System.exit(0);
} catch (OutOfMemoryError e) {
logger.error(e.toString(), e);
statusMonitor.stop();
System.exit(0);
}
} }
#!/bin/sh
#zookeeper address
zkAddress=100.69.209.30:2181,100.69.207.28:2181,100.69.193.87:2181
#connection timeout in Millseconds
connectTimeOut=60000
#session timeout in Millseconds
sessionTimeOut=60000
zookeeperPath="/tmc-city-path-aone"
x=`echo $0 | grep "^/"`
if test "${x}"; then
dir=`dirname $0`
else
pwdv=`pwd`
dir=`dirname ${pwdv}/$0`
fi
dir=`readlink -m $dir`
echo "app location : "$dir
run="nohup /opt/taobao/java/bin/java -Xms2G -Xmx6G -Duser.dir=$dir/.. -cp ${dir}/../etc:${dir}/../lib/* com.autonavi.tinfo.traffic.zookeeper.DistributedZookeeper $zkAddress $connectTimeOut $sessionTimeOut $zookeeperPath classpath:ctx/**/*.xml"
log="nohup.out"
app_dir=`echo $dir|awk -F'/' '{print $(NF-1)}'`
len=`expr ${#app_dir} / 3`
app_dir_blur=`expr substr $app_dir 1 $len`
shutdown="kill `ps -ef|grep /opt/taobao/java/bin/java |grep $dir| awk '{print $2}'`"
sd_rb="kill `ps -ef|grep /opt/taobao/java/bin/java |grep $app_dir_blur| awk '{print $2}'`"
stopApp() {
echo "starting stop ..."
pid=`ps -ef|grep /opt/taobao/java/bin/java |grep $dir| awk '{print $2}'`
echo $pid
if [ ! $pid ]; then
echo "not find process to kill"
else
kill -9 $pid
echo "kill -9 $pid successfully"
fi
}
case $1 in
start)
$run >> $log 2>&1 &
chmod 744 $log
;;
stop)
#$shutdown
stopApp
;;
restart)
$shutdown &&
$run >> $log 2>&1 &
;;
rb)
$sd_rb &&
$run >> $log 2>&1 &
;;
*)
echo "usage: run.sh [start|stop|restart]"
esac