业务监控-指标监控(v1)

时间:2021-07-29 18:54:37

  最近做了指标监控系统的后台,包括需求调研、代码coding、调试调优测试等,穿插其他杂事等前后花了一个月左右。

  指标监控指的是用户通过接口上传某些指标信息,并且通过配置阈值公式和告警规则等信息监测自己上传指标的准确性。程序方面,接口和前台采用go + redis + mysql,后台python + mysql。

  这个系统的难度主要在于数据量较大,需要在1分钟处理5w+个指标,对接口和后台程序处理并发的性能要求较高。前台则是展示问题,包括核心指标、我的订阅、指标列表、异常列表等。
  系统设计:  

1、指标的判断只支持数值的判断

2、指标的阈值公式包括对当前值V的判断,根据上一次值L求环比数据,根据历史数据求同比数据

3、告警规则采用fibonacci告警规则,分为F0,F1,F2,F3,F5,O1,O2,O5等N种规则

  程序主要代码:

1、心跳判断根据经验设置异常公式


## hbtChannel = 0, 10, 30, 60, 360, 720


## hbtDiffer = 5, 20, 45, 60, 180, 240

@classmethod
def checkHbtInfo(cls):
    """检查心跳异常公式配置"""
    if len(cls.hbtChannelList) != len(cls.hbtDifferList) or len(cls.hbtChannelList) <= 1:#至少2种阶梯
        return -1
    min = 0
    for i in range(len(cls.hbtChannelList)):#层层递增
        try:
            sum = int(cls.hbtChannelList[i]) + int(cls.hbtDifferList[i])
            if 0 == i and 0 != int(cls.hbtChannelList[i]): #从0开始
                return -1
            if min > sum:
                return -1
            min = sum
        except ValueError:
            return -1
    return 0

2、获取告警规则(fibonacci)

def getAlertChannel(self, alertRule):
    """获取告警通道"""
    channel = []
    maxNum = CMyConf.MAX_ALERT_COUNT + 1
    if "F0" == alertRule:#前20次告警
        channel = [i for i in range(1, maxNum)]
    elif "F1" == alertRule:
        channel = [self.getFibonacciNum(i) for i in range(2, maxNum)]
    elif "F2" == alertRule:
        channel = [self.getFibonacciNum(i) for i in range(3, maxNum)]
    elif "F3" == alertRule:
        channel = [self.getFibonacciNum(i) for i in range(4, maxNum)]
    elif "F5" == alertRule:
        channel = [self.getFibonacciNum(i) for i in range(5, maxNum)]
    elif "O1" == alertRule:#第1次告警
        channel = [1]
    elif "O2" == alertRule:
        channel = [2]
    elif "O5" == alertRule:
        channel = [5]
    elif "O10" == alertRule:
        channel = [10]
    else:
        channel = []
    return channel                                                            

3、解析模块

def analyzerThreshold(self, itemId, indexId, threshold, period, periodUnit, value, lastValue):
    """得到自定义公式的值, code, results, global, warnCount"""
    if "" == threshold.strip():#空阈值处理
        return 0, False, {}
    retTh, globalVar = self.checkThreshold(itemId, indexId, threshold, period, periodUnit, value, lastValue)
    if "" != retTh:
        try:
            results = eval(retTh, {}, globalVar)
        except Exception, e:
            errMsg = ("WARNING:自定义公式配置错误! "
                    "详细:itemId=%d, indexId=%d, period=%d%s, threshold=%s, Exception=%s" %
                    (itemId, indexId, period, periodUnit, threshold, e))
            g_logFd.writeFormatMsg(CMyLog.LEVEL_WARN, errMsg)
            return -1, False, {}
        return 0, results, globalVar
    errMsg = ("WARNING:自定义公式配置错误! "
            "详细:itemId=%d, indexId=%d, period=%d%s, threshold=%s" %
            (itemId, indexId, period, periodUnit, threshold))
    g_logFd.writeFormatMsg(CMyLog.LEVEL_WARN, errMsg)
    return -1, False, {}
    

其中 checkThreshold 这个函数比较麻烦,用来将自定义公式转换成python公式,当然自定义公式每个人都可以设立不同的规则,只要能求到同比、环比的值自然可以。例如我的规则是支持 V>0 || ((+H < 0.5) && T>0.5, 1d, 5, MAX )这种公式,则需要根据公式求出H,T,abs(T)等值,然后转成python可以解析的公式,通过eval来解析结果。当然上面的程序中的globalVar其实是local这个形参,只是懒得修改名字而已。最后通过这个结果来判断是否需要告警。

4、多进程模块

pool = multiprocessing.Pool(CMyConf.processNum)
server = multiprocessing.Manager()

indexIdList = []
warnChangeIndexList = server.list()
alertIndexList = server.list()
indexCount = server.dict() #测试是否全部进程都跑完整
hbtItemInfo = server.list()
#print len(self.indexInfo)
handleResults = []

i = 0
for indexId in self.indexInfo:
    indexIdList.append(indexId)
    i += 1
    if i == CMyConf.PROCESS_INDEX_NUM:#5000个
        handleResults.append(pool.apply_async(handlePartIndex,
                args=(self, indexIdList, warnChangeIndexList, hbtItemInfo, alertIndexList, indexCount)))
        indexIdList = []
        i = 0
handleResults.append(pool.apply_async(handlePartIndex,
        args=(self, indexIdList,  warnChangeIndexList, hbtItemInfo, alertIndexList, indexCount)))
pool.close()
pool.join()

其中handlePartIndex是一个进程处理函数,作用比较单纯,就是调用前面的函数解析指标,判断是否信息出错和心跳、数值异常等。

进程池在这个程序中发挥的作用其实并不怎么多,相反为了测试这个多进程花了几天的时间,真的是得不偿失,最后速度也就比单进程提高几秒,因为几万个数据解析的瓶颈并不是在解析模块,而是在数据库的写模块。其实当时本意也是想借这个机会用下进程池,既然都写了就还是用着吧,反正进程数才开了那么几个,不占资源。相信以后数据达到十几万个多进程的优势就会显现出来了。不过用完这个多进程模块,觉得python在进程共享方面做得还是比较不错的,以后发挥的空间还比较大。至少比多线程好用多,那个GIL也真是坑。

5、mysql load命令

cmd = ("mysql -u%s -p%s -h%s itemMonitor --local-infile=1 -e \"load data local infile '%s' "
         "replace into table myCurrentWarning character set utf8 fields terminated by ',' enclosed by '\\\"' lines terminated by '\\n'\"" %
       (CMyConf.dbIMUser, CMyConf.dbIMPass, CMyConf.dbIMHost, sqlFileName))
g_logFd.writeFormatMsg(CMyLog.LEVEL_INFO, cmd)

这个命令替换一个for循环直行replace into真是太棒了,尝试了两三个钟才写出这个命令,也要感谢万能的google.hk。本来30s的replace into语句,瞬间变成3s,性能虽然没有网上说的20倍那么多,也是杠杠的。估计是因为replace一次等于两句sql吧,速度缩减到10倍。哈哈。赞一个。

6、求N个月以前的时间

curTime_t = datetime.datetime.now()
targetMonth = curTime_t.month - num #num个月前
if 0 == targetMonth:
    tmpMon = 12
elif targetMonth > 0:
    tmpMon = targetMonth
else:
    tmpMon = (abs(targetMonth)/12*12 + 12 + targetMonth)
targetMonthNext = targetMonth + 1#保存下个月的
if 0 == targetMonthNext:
    tmpMonNext = 12
elif targetMonthNext > 0:
    tmpMonNext = targetMonthNext
else:
    tmpMonNext = (abs(targetMonthNext)/12*12 + 12 + targetMonthNext)

endTime = curTime_t
try:
    endTime = endTime.replace(year = endTime.year if targetMonth > 0 else endTime.year - int(-targetMonth/12) -1,
            month = tmpMon, day = endTime.day)
except:
    endTime = endTime.replace(year = endTime.year if targetMonth > 0 else endTime.year - int(-targetMonth/12) -1,
            month = tmpMonNext, #下个月1号
            day = 1)#12月31,11月份没31
    endTime += datetime.timedelta(days=-1) # 时间减1天

比较麻烦的是timedelta里面没有月的,只能自己写,还需要考虑某一天可能不存在。

总体后台就这么几个模块比较重要,虽然不难,但花在调试的时间还真是不少,主要在自定义公式设计和多进程吧。效果是现在30s前后能跑5w个数据左右,以后10w+的数据估计多进程就该发挥作用了。

  可以后续优化:告警规则优化、告警内容支持发报表等。