分布式监控开发 04 客户端开发

时间:2022-07-13 18:09:10

客户端需求

 客户端说白了就是存储一些插件脚本。然后通过服务端传来的配置文件(监控项、监控项对应的监控间隔)、

在本地定时的去执行每个监控项对应的监控插件。

并且汇报数据。

客户端目录

分布式监控开发 04 客户端开发

MonitorClient.py

启动main

setting.py

配置文件。记录主机的ID号,服务端的地址、端口、API的url、发送数据超时时间、配置文件更新时间。

configs ={
    'HostID': 1,
    "Server": "localhost",
    "ServerPort": 9000,
    "urls":{

        'get_configs' :['api/client/config','get'],  #acquire all the services will be monitored
        'service_report': ['api/client/service/report/','post'],

    },
    'RequestTimeout':30,
    'ConfigUpdateInterval': 300, #5 mins as default

}

main.py

启动脚本。

import client
class command_handler(object):

    def __init__(self, sys_args):
        self.sys_args = sys_args
        if len(self.sys_args)<2:exit(self.help_msg())
        self.command_allowcator()


    def command_allowcator(self):
        '''分捡用户输入的不同指令'''
        print(self.sys_args[1])

        if hasattr(self,self.sys_args[1]):
            func= getattr(self,self.sys_args[1])
            return func()
        else:
            print("command does not exist!")
            self.help_msg()

    def help_msg(self):
        valid_commands = '''
        start       start monitor client
        stop        stop monitor client

        '''
        exit(valid_commands)


    def start(self):
        print("going to start the monitor client")
        #exit_flag = False

        Client = client.ClientHandle()
        Client.forever_run()

    def stop(self):
        print("stopping the monitor client")

client.py

客户端逻辑处理

class ClientHandle(object):
    def __init__(self):
        self.monitored_services = {}

    def load_latest_configs(self):
        '''
        load the latest monitor configs from monitor server
        :return:
        '''
        request_type = settings.configs['urls']['get_configs'][1]
        url = "%s/%s" %(settings.configs['urls']['get_configs'][0], settings.configs['HostID'])
        latest_configs = self.url_request(request_type,url)
        latest_configs = json.loads(latest_configs)
        self.monitored_services.update(latest_configs)

    def forever_run(self):
        '''
        start the client program forever
        :return:
        '''
        exit_flag = False
        config_last_update_time = 0
        while not exit_flag:
              if time.time() - config_last_update_time > settings.configs['ConfigUpdateInterval']:
                  self.load_latest_configs()
                  print("Loaded latest config:", self.monitored_services)
                  config_last_update_time = time.time()
              #start to monitor services

              for service_name,val in self.monitored_services['services'].items():
                  if len(val) == 2:# means it's the first time to monitor
                      self.monitored_services['services'][service_name].append(0)
                  monitor_interval = val[1]
                  last_invoke_time = val[2]
                  if time.time() - last_invoke_time > monitor_interval: #needs to run the plugin
                      print(last_invoke_time,time.time())
                      self.monitored_services['services'][service_name][2]= time.time()
                      #start a new thread to call each monitor plugin
                      t = threading.Thread(target=self.invoke_plugin,args=(service_name,val))
                      t.start()
                      print("Going to monitor [%s]" % service_name)

                  else:
                      print("Going to monitor [%s] in [%s] secs" % (service_name,
                                                                                     monitor_interval - (time.time()-last_invoke_time)))

              time.sleep(1)
    def invoke_plugin(self,service_name,val):
        '''
        invoke the monitor plugin here, and send the data to monitor server after plugin returned status data each time
        :param val: [pulgin_name,monitor_interval,last_run_time]
        :return:
        '''
        plugin_name = val[0]
        if hasattr(plugin_api,plugin_name):
            func = getattr(plugin_api,plugin_name)
            plugin_callback = func()
            #print("--monitor result:",plugin_callback)

            report_data = {
                'client_id':settings.configs['HostID'],
                'service_name':service_name,
                'data':json.dumps(plugin_callback)
            }

            request_action = settings.configs['urls']['service_report'][1]
            request_url = settings.configs['urls']['service_report'][0]

            #report_data = json.dumps(report_data)
            print('---report data:',report_data)
            self.url_request(request_action,request_url,params=report_data)
        else:
            print("\033[31;1mCannot find service [%s]'s plugin name [%s] in plugin_api\033[0m"% (service_name,plugin_name ))
        print('--plugin:',val)


    def url_request(self,action,url,**extra_data):
        '''
        cope with monitor server by url
        :param action: "get" or "post"
        :param url: witch url you want to request from the monitor server
        :param extra_data: extra parameters needed to be submited
        :return:
        '''
        abs_url = "http://%s:%s/%s" % (settings.configs['Server'],
                                       settings.configs["ServerPort"],
                                       url)
        if action in  ('get','GET'):
            print(abs_url,extra_data)
            try:
                req = urllib2.Request(abs_url)
                req_data = urllib2.urlopen(req,timeout=settings.configs['RequestTimeout'])
                callback = req_data.read()
                #print "-->server response:",callback
                return callback
            except urllib2.URLError as e:
                exit("\033[31;1m%s\033[0m"%e)

        elif action in ('post','POST'):
            #print(abs_url,extra_data['params'])
            try:
                data_encode = urllib.urlencode(extra_data['params'])
                req = urllib2.Request(url=abs_url,data=data_encode)
                res_data = urllib2.urlopen(req,timeout=settings.configs['RequestTimeout'])
                callback = res_data.read()
                callback = json.loads(callback)
                print "\033[31;1m[%s]:[%s]\033[0m response:\n%s" %(action,abs_url,callback)
                return callback
            except Exception as e:
                print('---exec',e)
                exit("\033[31;1m%s\033[0m"%e)
load_latest_configs这个函数是获取服务端的配置文件
forever_run是以多线程的方式定期执行插件脚本
url_request定义了具体的提交方式
invoke_plugin是从插件脚本中获取数据,添加到字典中调用url_request发送给服务端

 

plugins

这个目录是插件目录

windows和linux子目录存有对应系统的插件脚本

plugin_api.py存着监控项(接受自服务端)和插件脚本的对应关系。