标签:
一、heapster简介 什么是Heapster?Heapster是容器集群监控和性能分析工具,天然的撑持Kubernetes和CoreOS。
Kubernetes有个着名的监控agent---cAdvisor。在每个kubernetes Node上城市运行cAdvisor,它会收集本机以及容器的监控数据(cpu,memory,filesystem,network,uptime)。
在较新的版本中,K8S已经将cAdvisor成果集成到kubelet组件中。每个Node节点可以直接进行web访谒。
cAdvisor也供给Restful API: https://github.com/google/cadv ... pi.md
Heapster是一个收集者,将每个Node上的cAdvisor的数据进行汇总,然后导到第三方工具(如InfluxDB)。
二、heapster挪用kubelet源码分析 1、整体源码heapster整体的源码分析可以参考文章:,
上面链接中的文章中会对heapster从启动开始进行分析,主要讲述了下面内容:
main()
创建数据源
创建后端处事
创建数据processors
获取源数据并存储
heapster API创建
给上述文章的作者点个赞,思路真的非常清晰。不过笔者最存眷的点是heapster如何进行的kubelet挪用。
2、挪用kubelet源码分析从上述文章提到的 NewKubeletProvider 函数中,我们找到了创建kubeClient和kubeletClient的处所,见下面代码的飘黄部分:
func NewKubeletProvider(uri *url.URL) (MetricsSourceProvider, error) { // 创建kubernetes master及kubelet client相关的配置 kubeConfig, kubeletConfig, err := GetKubeConfigs(uri) if err != nil { return nil, err } // 创建kubeClient及kubeletClient kubeClient := kube_client.NewOrDie(kubeConfig) kubeletClient, err := NewKubeletClient(kubeletConfig) if err != nil { return nil, err } // 获取下所有的Nodes,测试下创建的client是否能正常通讯 if _, err := kubeClient.Nodes().List(kube_api.ListOptions{ LabelSelector: labels.Everything(), FieldSelector: fields.Everything()}); err != nil { glog.Errorf("Failed to load nodes: %v", err) } // 监控k8s的nodes变换 // 这里会创建协程进行watch,便于后面挪用nodeLister.List()列出所有的nodes。 // 该Watch的实现,需要看下apiServer中的实现,后面会进行讲解 lw := cache.NewListWatchFromClient(kubeClient, "nodes", kube_api.NamespaceAll, fields.Everything()) nodeLister := &cache.StoreToNodeLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)} reflector := cache.NewReflector(lw, &kube_api.Node{}, nodeLister.Store, time.Hour) reflector.Run() // 布局在前面介绍过 return &kubeletProvider{ nodeLister: nodeLister, reflector: reflector, kubeletClient: kubeletClient, }, nil }
1)heapster源码目录我们发明这个函数地址的文件为kubelet.go,地址的包为metrics/sources/kubelet
我们可以看到,kubelet挪用相关的代码都在这个里面:
2)heapster挪用kubelet的API我们进入到NewKubeletClient函数看一下:
// 传入的是KubeletClientConfig,通过读取相关配置,来初始化client func NewKubeletClient(kubeletConfig *kubelet_client.KubeletClientConfig) (*KubeletClient, error) { transport, err := kubelet_client.MakeTransport(kubeletConfig) if err != nil { return nil, err } c := &http.Client{ // 此处可以看到,是http挪用 Transport: transport, Timeout: kubeletConfig.HTTPTimeout, } return &KubeletClient{ config: kubeletConfig, client: c, }, nil }
接下来我们看一下kubelet_client.go中KubeletClient有哪些要领:
(i)获取所有的containersInfo第一个,获取所有的container统计信息,如果对cAdvisor对照了解的话,,可以进入到该要领的返回值的ContainerInfo里面看下,长短常全面的容器监控信息(后续会有cAdvisor的源码分析,敬请等候)
func (self *KubeletClient) getAllContainers(url string, start, end time.Time) ([]cadvisor.ContainerInfo, error) {
// Request data from all subcontainers.此处是结构requestBody
request := statsRequest{
ContainerName: "http://www.mamicode.com/",
NumStats:
1,
Start:
start, // 2017-11-10T06:46:17Z 这种utc格局的时间戳
End:
end,
Subcontainers: true,
}
body, err := json.Marshal(request)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", url, bytes.NewBuffer(body))
if err != nil {
return nil, err
}
// 设置请求头
req.Header.Set("Content-Type", "application/json")
var containers map[string]cadvisor.ContainerInfo
client := self.client
if client == nil {
client = http.DefaultClient
}
err = self.postRequestAndGetValue(client, req, &containers)
if err != nil {
return nil, fmt.Errorf("failed to get all container stats from Kubelet URL %q: %v", url, err)
}
result := make([]cadvisor.ContainerInfo, 0, len(containers))
for _, containerInfo := range containers {
cont := self.parseStat(&containerInfo)
if cont != nil {
result = append(result, *cont)
}
}
return result, nil
}
我们可以看到上面的源码其实是结构了一个http的post请求。那么我们在本地通过restClient模拟一下: