上文说到了CAT-Server的启动初始化。
接着我们要分析一下CAT-Server如何接受各个客户端上报(TCP长连接)的消息,以及如何消费、解析、存储等等
com.dianping.cat.analysis.TcpSocketReceiver
在上一篇文章中说过了服务端的启动,在CAT-Server启动时会启动Netty的Nio Reactor模块来接收客户端的请求。消息的接受是在这个类TcpSocketReceiver.java完成的:
// 在CatHomeModule启动时被调用
public void init() {
try {
startServer(m_port);
} catch (Throwable e) {
m_logger.error(e.getMessage(), e);
}
}
/**
* 启动一个netty服务端
* @param port
* @throws InterruptedException
*/
public synchronized void startServer(int port) throws InterruptedException {
boolean linux = getOSMatches("Linux") || getOSMatches("LINUX");
int threads = 24;
ServerBootstrap bootstrap = new ServerBootstrap();
//linux走epoll的事件驱动模型
m_bossGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);//用来做为接受请求的线程池 master线程
m_workerGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);//用来做为处理请求的线程池 slave线程
bootstrap.group(m_bossGroup, m_workerGroup);
bootstrap.channel(linux ? EpollServerSocketChannel.class : NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decode", new MessageDecoder());//增加消息解码器
}
});
// 设置channel的参数
bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
try {
m_future = bootstrap.bind(port).sync();//绑定监听端口,并同步等待启动完成
m_logger.info("start netty server!");
} catch (Exception e) {
m_logger.error("Started Netty Server Failed:" + port, e);
}
}
启动netty,对每个客户端上报的消息都会做解码处理,从字节流转换为消息树,接着交给DefaultMessageHandler处理。
public class DefaultMessageHandler extends ContainerHolder implements MessageHandler, LogEnabled {
/*
* MessageConsumer按每个period(整小时一个period)组合了多个解析器,用来解析生产多个报表(如:Transaction、
* Event、Problem等等)。一个解析器对象-一个有界队列-一个整小时时间组合了一个PeriodTask,轮询的处理这个有界队列中的消息
*/
@Inject
private MessageConsumer m_consumer;
private Logger m_logger;
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public void handle(MessageTree tree) {
if (m_consumer == null) {
m_consumer = lookup(MessageConsumer.class);//从容器中加载MessageConsumer实例
}
try {
m_consumer.consume(tree);//消息消费
} catch (Throwable e) {
m_logger.error("Error when consuming message in " + m_consumer + "! tree: " + tree, e);
}
}
}
消息消费是由MessageConsumer的实现类RealtimeConsumer处理:
com…RealtimeConsumer.consume(MessageTree tree)
@Override
public void consume(MessageTree tree) {
String domain = tree.getDomain();
String ip = tree.getIpAddress();
if (!m_blackListManager.isBlack(domain, ip)) {// 全局黑名单 按domain-ip
long timestamp = tree.getMessage().getTimestamp();
Period period = m_periodManager.findPeriod(timestamp);//根据消息产生的时间,查找这个小时所属的对应Period
if (period != null) {
period.distribute(tree);//将解码后的tree消息依次分发给所有类型解析器
} else {
m_serverStateManager.addNetworkTimeError(1);
}
} else {
m_black++;
if (m_black % CatConstants.SUCCESS_COUNT == 0) {
Cat.logEvent("Discard", domain);
}
}
}
void com.dianping.cat.analysis.Period.distribute(MessageTree tree)
/**
* 将解码后的tree消息依次分发给所有类型解析器
* @param tree
*/
public void distribute(MessageTree tree) {
m_serverStateManager.addMessageTotal(tree.getDomain(), 1);// 根据domain,统计消息量
boolean success = true;
String domain = tree.getDomain();
for (Entry<String, List<PeriodTask>> entry : m_tasks.entrySet()) {
List<PeriodTask> tasks = entry.getValue();//某种类型报表的解析器
int length = tasks.size();
int index = 0;
boolean manyTasks = length > 1;
if (manyTasks) {
index = Math.abs(domain.hashCode()) % length;//hashCode的绝对值 % 长度 =0~length-1之间的任一个数
}
PeriodTask task = tasks.get(index);
boolean enqueue = task.enqueue(tree);//注意:这里会把同一个消息依依放入各个报表解析中的队列中
if (enqueue == false) {
if (manyTasks) {
task = tasks.get((index + 1) % length);
enqueue = task.enqueue(tree);
if (enqueue == false) {
success = false;
}
} else {
success = false;
}
}
}
if (!success) {
m_serverStateManager.addMessageTotalLoss(tree.getDomain(), 1);
}
}