分布式监控CAT源码解析——cat-client

时间:2022-04-10 10:17:56

cat-client模块的包结构

└─com
├─dianping
│ └─cat
│ ├─build
│ ├─configuration
│ ├─log4j
│ ├─message
│ │ ├─internal
│ │ ├─io
│ │ └─spi
│ │ ├─codec
│ │ └─internal
│ ├─servlet
│ └─status
└─site
├─helper
└─lookup
└─util

Client模块架构图

分布式监控CAT源码解析——cat-client

类图

分布式监控CAT源码解析——cat-client

阅读思路:

我们通过一个测试用例,debug来理解源码。

        //静态方法获取Transaction对象
Transaction t=Cat.newTransaction("logTransaction", "logTransaction");

TimeUnit.SECONDS.sleep(30);
t.setStatus("0");
t.complete();

接着我们着重看看关键代码:

Cat.java

    private static Cat s_instance = new Cat();
private static volatile boolean s_init = false;

private static void checkAndInitialize() {
if (!s_init) {
synchronized (s_instance) {
if (!s_init) {
initialize(new File(getCatHome(), "client.xml"));
log("WARN", "Cat is lazy initialized!");
s_init = true;
}
}
}
}
private Cat() {
}

public static MessageProducer getProducer() {
checkAndInitialize();

return s_instance.m_producer;
}

Cat lazy Init

可以看到类加载时已经完成了Cat对象的初始化,内存中有且仅有一个Cat Object(static Cat s_instance = new Cat();),但是包含配置信息的完整的Cat对象并没有完全初始化完成。调用Cat时会先尝试获取producer对象,并在获取之前检查客户端配置是否加载完毕(checkAndInitialize)。

checkAndInitialize()通过使用doublecheck来对Cat相关配置填充的单次初始化加载。

1.cat-client首先会使用plexus(一个比较老的IOC容器)加载配置文件/META-INF/plexus/plexus.xml,完成IOC容器的初始化。

2.接着使用../../client.xml文件完成cat对象的配置信息填充初始化。并且启动这四个daemon线程,后文详细说明:

cat-StatusUpdateTask 用来每秒钟上报客户端基本信息(JVM等信息)

cat-merge-atomic-task(消息合并检查)

cat-TcpSocketSender-ChannelManager(NIO 连接服务端检查)

cat-TcpSocketSender(消息发送服务端)

CatClientModule

public class CatClientModule extends AbstractModule {
public static final String ID = "cat-client";

@Override
protected void execute(final ModuleContext ctx) throws Exception {
ctx.info("Current working directory is " + System.getProperty("user.dir"));

// initialize milli-second resolution level timer
MilliSecondTimer.initialize();

// tracking thread start/stop,此处增加经典的hook,用于线程池关闭的清理工作。
Threads.addListener(new CatThreadListener(ctx));

// warm up Cat
Cat.getInstance().setContainer(((DefaultModuleContext) ctx).getContainer());

// bring up TransportManager
ctx.lookup(TransportManager.class);

ClientConfigManager clientConfigManager = ctx.lookup(ClientConfigManager.class);

if (clientConfigManager.isCatEnabled()) {
// start status update task
StatusUpdateTask statusUpdateTask = ctx.lookup(StatusUpdateTask.class);

Threads.forGroup("cat").start(statusUpdateTask);
LockSupport.parkNanos(10 * 1000 * 1000L); // wait 10 ms

// MmapConsumerTask mmapReaderTask = ctx.lookup(MmapConsumerTask.class);
// Threads.forGroup("cat").start(mmapReaderTask);
}
}

这里plexusIOC的具体的初始化加载逻辑在org\unidal\framework\foundation-service\2.5.0\foundation-service-2.5.0.jar中,有兴趣可以仔细查看。
当准备工作做完之后,会执行具体的消息构造:

消息构造

    @Override
public Transaction newTransaction(String type, String name) {
// this enable CAT client logging cat message without explicit setup
if (!m_manager.hasContext()) {
//step完成Context的构造:ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp());
m_manager.setup();

}

if (m_manager.isMessageEnabled()) {
DefaultTransaction transaction = new DefaultTransaction(type, name, m_manager);

//向Context中填充构造的消息体:Context.m_tree;Context.m_stack;稍后看看Context这个对象
m_manager.start(transaction, false);
return transaction;
} else {
return NullMessage.TRANSACTION;
}
}

........

@Override
public void start(Transaction transaction, boolean forked) {
Context ctx = getContext();

if (ctx != null) {
ctx.start(transaction, forked);

if (transaction instanceof TaggedTransaction) {
TaggedTransaction tt = (TaggedTransaction) transaction;

m_taggedTransactions.put(tt.getTag(), tt);
}
} else if (m_firstMessage) {
m_firstMessage = false;
m_logger.warn("CAT client is not enabled because it's not initialized yet");
}
}
......
public void start(Transaction transaction, boolean forked) {
if (!m_stack.isEmpty()) {
{
Transaction parent = m_stack.peek();
addTransactionChild(transaction, parent);
}
} else {
m_tree.setMessage(transaction);//在这里把返回的transaction放在tree上,如果有嵌套结构,后边继续在tree上添枝加叶
}

if (!forked) {
m_stack.push(transaction);
}
}

至此,消息体构造完毕。
这里需要看一下Context类,是DefaultMessageManager包私有的内部类。

Context.java

    class Context {
private MessageTree m_tree;//初始化的时候构建一个MessageTree

private Stack<Transaction> m_stack;

private int m_length;

private boolean m_traceMode;

private long m_totalDurationInMicros; // for truncate message

private Set<Throwable> m_knownExceptions;

public Context(String domain, String hostName, String ipAddress) {
m_tree = new DefaultMessageTree();
m_stack = new Stack<Transaction>();

Thread thread = Thread.currentThread();
String groupName = thread.getThreadGroup().getName();

m_tree.setThreadGroupName(groupName);
m_tree.setThreadId(String.valueOf(thread.getId()));
m_tree.setThreadName(thread.getName());

m_tree.setDomain(domain);
m_tree.setHostName(hostName);
m_tree.setIpAddress(ipAddress);
m_length = 1;
m_knownExceptions = new HashSet<Throwable>();
}

每个线程通过使用ThreadLocal构造一个Context对象并存储。Context主要包含当前的消息体m_tree,和多个嵌套消息体填充的栈:m_stack 。

分布式监控CAT源码解析——cat-client

再回到我们原来的UnitTest代码,
Transaction t=Cat.newTransaction("logTransaction", "logTransaction");
这行代码完成了客户端plexusIOC容器的初始化,cat-client的加载初始化、启动了四个daemon线程,并返回了Transaction对象。

        t.setStatus("0");//很简单,就是这是一个属性值
t.complete();


transaction.complete();

complete的具体代码如下:

........
public void complete() {
try {
if (isCompleted()) {
// complete() was called more than once
DefaultEvent event = new DefaultEvent("cat", "BadInstrument");

event.setStatus("TransactionAlreadyCompleted");
event.complete();
addChild(event);
} else {
m_durationInMicro = (System.nanoTime() - m_durationStart) / 1000L;

setCompleted(true);

if (m_manager != null) {
m_manager.end(this);
}
}
} catch (Exception e) {
// ignore
}
}
........
@Override
public void end(Transaction transaction) {
Context ctx = getContext();

if (ctx != null && transaction.isStandalone()) {
if (ctx.end(this, transaction)) {
m_context.remove();
}
}
}
........

public boolean end(DefaultMessageManager manager, Transaction transaction) {
if (!m_stack.isEmpty()) {
Transaction current = m_stack.pop();//Context的成员变量m_stack弹出栈顶元素,LIFO当然是最新的current元素。

if (transaction == current) {
m_validator.validate(m_stack.isEmpty() ? null : m_stack.peek(), current);
} else {
while (transaction != current && !m_stack.empty()) {
m_validator.validate(m_stack.peek(), current);

current = m_stack.pop();
}
}

if (m_stack.isEmpty()) {//如果当前线程存储的Context中m_stack无元素
MessageTree tree = m_tree.copy();

m_tree.setMessageId(null);//清理m_tree
m_tree.setMessage(null);

if (m_totalDurationInMicros > 0) {
adjustForTruncatedTransaction((Transaction) tree.getMessage());
}

manager.flush(tree);//将消息放入消费队列中
return true;
}
}

return false;
}
........
public void flush(MessageTree tree) {
if (tree.getMessageId() == null) {
tree.setMessageId(nextMessageId());//为消息体生产全局唯一ID,详见snowflate算法
}

MessageSender sender = m_transportManager.getSender();

if (sender != null && isMessageEnabled()) {
sender.send(tree);

reset();//ThreadLocal中存储的Context清理
} else {
m_throttleTimes++;

if (m_throttleTimes % 10000 == 0 || m_throttleTimes == 1) {
m_logger.info("Cat Message is throttled! Times:" + m_throttleTimes);
}
}
}
........
private Context getContext() {
if (Cat.isInitialized()) {
Context ctx = m_context.get();//ThreadLocal存储一个Context对象

if (ctx != null) {
return ctx;
} else {
if (m_domain != null) {
ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp());
} else {
ctx = new Context("Unknown", m_hostName, "");
}

m_context.set(ctx);
return ctx;
}
}

return null;
}

//TcpSocketSender.send(MessageTree tree)

private MessageQueue m_queue = new DefaultMessageQueue(SIZE);

private MessageQueue m_atomicTrees = new DefaultMessageQueue(SIZE);

@Override
public void send(MessageTree tree) {
if (isAtomicMessage(tree)) {
boolean result = m_atomicTrees.offer(tree, m_manager.getSample());

if (!result) {
logQueueFullInfo(tree);
}
} else {
boolean result = m_queue.offer(tree, m_manager.getSample());

if (!result) {
logQueueFullInfo(tree);
}
}
}

至此,构造的消息体放入了阻塞队列中等待上传。

接着我们来看看消息上传服务端的代码,这里会有一个线程cat-TcpSocketSender监听消费队列,并消费(上传服务端):

TcpSocketSender.java

    @Override
public void run() {
m_active = true;

while (m_active) {
ChannelFuture channel = m_manager.channel();

if (channel != null && checkWritable(channel)) {
try {
MessageTree tree = m_queue.poll();

if (tree != null) {
sendInternal(tree);//netty NIO编码后TCP发送到服务端。
tree.setMessage(null);
}

} catch (Throwable t) {
m_logger.error("Error when sending message over TCP socket!", t);
}
} else {
long current = System.currentTimeMillis();
long oldTimestamp = current - HOUR;

while (true) {
try {
MessageTree tree = m_queue.peek();

if (tree != null && tree.getMessage().getTimestamp() < oldTimestamp) {
MessageTree discradTree = m_queue.poll();

if (discradTree != null) {
m_statistics.onOverflowed(discradTree);
}
} else {
break;
}
} catch (Exception e) {
m_logger.error(e.getMessage(), e);
break;
}
}

try {
Thread.sleep(5);
} catch (Exception e) {
// ignore it
m_active = false;
}
}
}
}

private void sendInternal(MessageTree tree) {
ChannelFuture future = m_manager.channel();
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10 * 1024); // 10K

System.out.println(tree);

m_codec.encode(tree, buf);//编码后发送

int size = buf.readableBytes();
Channel channel = future.channel();

channel.writeAndFlush(buf);
if (m_statistics != null) {
m_statistics.onBytes(size);
}
}

接下来我们着重看看,随着cat-client加载启动的几个daemon Thread后台线程:

cat-merge-atomic-task

接上文,符合如下逻辑判断的atomicMessage会放入m_atomicTrees消息队列,然后由这个后台线程监听并消费。
具体代码如下:

TcpSocketSender.java

private MessageQueue m_atomicTrees = new DefaultMessageQueue(SIZE);

......

private boolean isAtomicMessage(MessageTree tree) {
Message message = tree.getMessage();//从tree上拿去message

if (message instanceof Transaction) {//如果这个message实现了Transaction接口,也就是Transaction类型的消息
String type = message.getType();

if (type.startsWith("Cache.") || "SQL".equals(type)) {//如果以Cache.,SQL开头的则返回True
return true;
} else {
return false;
}
} else {
return true;
}
//看到这里,也就是说,"Cache","SQL"开头的Transaction消息,或者非Transaction消息,认为是atomicMessage.
}

......

public void send(MessageTree tree) {
if (isAtomicMessage(tree)) {//如果符合atomicMessage
boolean result = m_atomicTrees.offer(tree, m_manager.getSample());

if (!result) {
logQueueFullInfo(tree);//队列溢出处理
}
} else {
boolean result = m_queue.offer(tree, m_manager.getSample());

if (!result) {
logQueueFullInfo(tree);
}
}
}
......
public class DefaultMessageQueue implements MessageQueue {
private BlockingQueue<MessageTree> m_queue;

private AtomicInteger m_count = new AtomicInteger();

public DefaultMessageQueue(int size) {
m_queue = new LinkedBlockingQueue<MessageTree>(size);
}

@Override
public boolean offer(MessageTree tree) {
return m_queue.offer(tree);
}

@Override
public boolean offer(MessageTree tree, double sampleRatio) {
if (tree.isSample() && sampleRatio < 1.0) {//如果这个消息是sample,并且sampleRation大于1
if (sampleRatio > 0) {//这段逻辑就是按采样率去剔除一些消息,只选取其中一部分进行后续的消费上传。
int count = m_count.incrementAndGet();

if (count % (1 / sampleRatio) == 0) {
return offer(tree);
}
}
return false;
} else {//不做采样过滤,放入队列
return offer(tree);
}
}

@Override
public MessageTree peek() {
return m_queue.peek();
}

@Override
public MessageTree poll() {
try {
return m_queue.poll(5, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
return null;
}
}

@Override
public int size() {
return m_queue.size();
}
}

接下来,看看这个后台进程的消费动作:

......

private boolean shouldMerge(MessageQueue trees) {
MessageTree tree = trees.peek();//获取对头元素,非移除

if (tree != null) {
long firstTime = tree.getMessage().getTimestamp();
int maxDuration = 1000 * 30;
//消息在30s内生成,或者队列挤压消息超过200,则需要merge
if (System.currentTimeMillis() - firstTime > maxDuration || trees.size() >= MAX_CHILD_NUMBER) {
return true;
}
}
return false;
}

......

@Override
public void run() {
while (true) {
if (shouldMerge(m_atomicTrees)) {
MessageTree tree = mergeTree(m_atomicTrees);//把m_atomicTrees队列中的消息merge为一条消息树
boolean result = m_queue.offer(tree);//放入m_queue队列,等待cat-TcpSocketSender线程正常消费

if (!result) {
logQueueFullInfo(tree);
}
} else {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
break;
}
}
}
}

.....

private MessageTree mergeTree(MessageQueue trees) {
int max = MAX_CHILD_NUMBER;
DefaultTransaction tran = new DefaultTransaction("_CatMergeTree", "_CatMergeTree", null);//增加merge处理埋点
MessageTree first = trees.poll();//从队列头部移除

tran.setStatus(Transaction.SUCCESS);
tran.setCompleted(true);
tran.addChild(first.getMessage());
tran.setTimestamp(first.getMessage().getTimestamp());
long lastTimestamp = 0;
long lastDuration = 0;

//这段逻辑就是不停从这个m_atomicTrees队列头部拿去messsage,并使用同一个messageId,把队列中所有的消息合并为一条Transaction消息。
while (max >= 0) {
MessageTree tree = trees.poll();//接着 从队列头部移除

if (tree == null) {
tran.setDurationInMillis(lastTimestamp - tran.getTimestamp() + lastDuration);
break;
}
lastTimestamp = tree.getMessage().getTimestamp();
if(tree.getMessage() instanceof DefaultTransaction){
lastDuration = ((DefaultTransaction) tree.getMessage()).getDurationInMillis();
} else {
lastDuration = 0;
}
tran.addChild(tree.getMessage());
m_factory.reuse(tree.getMessageId());
max--;
}
((DefaultMessageTree) first).setMessage(tran);
return first;
}

TcpSocketSender-ChannelManager 后台线程

这个线程是通过服务端配置的路由ip,去检查路由ip是否变动,并保证连接正常。

    @Override
public void run() {
while (m_active) {
// make save message id index asyc
m_idfactory.saveMark();
checkServerChanged();// 按要求检查连接信息,并进行连接,使用TCP协议建立长连接

ChannelFuture activeFuture = m_activeChannelHolder.getActiveFuture();//根据服务端配置的路由,获取其中一个服务端ip并建立连接.
try {
} catch (Exception e) {
e.printStackTrace();
}
List<InetSocketAddress> serverAddresses = m_activeChannelHolder.getServerAddresses();

doubleCheckActiveServer(activeFuture);//检查当前连接是否正常
reconnectDefaultServer(activeFuture, serverAddresses);//如果不正常,则继续尝试建立其他连接。当所有default-server ip都无法连接时,默认会走backServer的Ip进行连接。

try {
Thread.sleep(10 * 1000L); // check every 10 seconds
} catch (InterruptedException e) {
}
}
}
....

private void checkServerChanged() {
if (shouldCheckServerConfig(++m_count)) {//每遍历监听30次或者没有成功的连接,则检查连接信息
Pair<Boolean, String> pair = routerConfigChanged();

if (pair.getKey()) {
String servers = pair.getValue();
List<InetSocketAddress> serverAddresses = parseSocketAddress(servers);
ChannelHolder newHolder = initChannel(serverAddresses, servers);//建立连接

if (newHolder != null) {
if (newHolder.isConnectChanged()) {
ChannelHolder last = m_activeChannelHolder;

m_activeChannelHolder = newHolder;
closeChannelHolder(last);
m_logger.info("switch active channel to " + m_activeChannelHolder);
} else {
m_activeChannelHolder = newHolder;
}
}
}
}
}

StatusUpdateTask 后台线程

这个线程很简单,类似传统的agent,每分钟上报关于应用的各种信息(OS、MXBean信息等等)。而且,在每次线程启动时上报一个Reboot消息表示重启动。