Ansible 多机文件分发、执行脚本并单机合并实验结果(Check point, 多线程异步执行,主机状态检测等)

时间:2021-08-22 15:21:37

简介

Ansible其实是一个自动化管理工具,可以管理多台机器的自动化部署与安装。在python中,结合Ansible来试验任务多机执行功能,采用Master-Slave架构,首先检测网络状态,剔除不可用主机,然后进行负载均衡,然后实现数据集的分发与异步执行,在执行过程中,采用check point的机制检测是否执行成功,当全部执行完毕后,Master收集所有实验结果。

Host(host.ini)

[master]
netlab_x2

[slave]
netlab_x1

Config(Part)(config.ini)

[ansible]
master_load = 5
slave_load = 1
code_home = /home/guangfa/ansible/guangfa

Python Script

  1 #!/usr/bin/env python
  2 # -*- coding:utf-8 -*-
  3 
  4 import sys, os
  5 import ConfigParser
  6 import ansible.runner
  7 import time
  8 
  9 inventory_file = "../host.ini"
 10 database_path = "../database"
 11 config_file = "../config.ini"
 12 
 13 class DistributedByAnsible:
 14     def __init__(self):
 15         cf = ConfigParser.ConfigParser()
 16     cf.read(config_file)
 17     self.master_load = int(cf.get("ansible", "master_load"))
 18         self.slave_load = int(cf.get("ansible", "slave_load"))
 19 
 20     self.code_home_path = cf.get("ansible", "code_home")
 21     self.ansibleResult = self.code_home_path + "/ansibleResult"
 22     self.database_path = self.code_home_path + "/database"
 23 
 24 
 25     self.avail_master = []
 26     self.avail_slave = []
 27     self.loadBalancing = {}
 28 
 29     def extractHost(self,config_file_path):
 30         cf = ConfigParser.ConfigParser(allow_no_value = True)
 31         cf.read(config_file_path)
 32         s = cf.sections()
 33         #print 'section:', s
 34         ms = cf.options("master")
 35         #print 'options:', ms
 36     for x in ms:
 37         runner = ansible.runner.Runner(
 38         host_list=inventory_file,
 39             module_name='ping',
 40         module_args='',
 41         pattern=x
 42         )
 43         res = runner.run()
 44         #print res
 45         if res['dark'] and res['dark'][x] and res['dark'][x]['failed']:
 46         print "Host Error:", x, "Unreachable"
 47             continue
 48         self.avail_master.append(x)
 49     self.avail_master = list(set(self.avail_master))
 50     print self.avail_master
 51     ss = cf.options("slave")
 52     for x in ss:
 53         runner = ansible.runner.Runner(
 54             host_list=inventory_file,
 55             module_name='ping',
 56         module_args='',
 57         pattern=x
 58         )
 59         res = runner.run()
 60         #print res
 61         if res['dark'] and res['dark'][x] and res['dark'][x]['failed']:
 62             continue
 63         self.avail_slave.append(x)
 64     self.avail_slave = list(set(self.avail_slave))
 65 
 66     # make true the host is unique
 67     for host in self.avail_master:
 68         if host in self.avail_slave:
 69             self.avail_slave.remove(host)
 70     print self.avail_slave
 71     
 72     def calculateLoadBalancing(self):
 73         load_count = int(os.popen("ls %s | grep ^mygraph | wc -l" % database_path).read())
 74     self.load_count = load_count
 75     current_load_count = load_count
 76     print load_count
 77     print self.master_load, self.slave_load
 78     master_all_count = len(self.avail_master)
 79     slave_all_count = len(self.avail_slave)
 80     
 81     for m in self.avail_master:
 82         temp_count = min(current_load_count, int(load_count * (self.master_load / float(self.master_load * master_all_count + self.slave_load * slave_all_count))))
 83         print temp_count
 84         if temp_count > 0:
 85                 self.loadBalancing[m] = temp_count
 86         current_load_count -= temp_count
 87         for s in self.avail_slave:
 88         temp_count = min(current_load_count, int(load_count * (self.slave_load / float(self.master_load * master_all_count + self.slave_load * slave_all_count))))
 89         if temp_count > 0:
 90             self.loadBalancing[s] = temp_count
 91         current_load_count -= temp_count
 92 
 93     if current_load_count > 0 and master_all_count > 0:
 94         self.loadBalancing[self.avail_master[0]] += current_load_count
 95         current_load_count = 0
 96     if current_load_count > 0 and slave_all_count > 0:
 97         self.loadBalancing[self.avail_slave[0]] += current_load_count
 98         current_load_count = 0
 99     print self.loadBalancing
100     
101     def fileTransfer(self):
102         print self.code_home_path
103     print self.ansibleResult
104     if self.ansibleResult == "":
105         print "ERROR: the home of the result about ansible is null!"
106         sys.exit(1)
107     current_database_count = 0
108     #clean the directory of the result
109     for host in self.loadBalancing:
110         print host
111         runner = ansible.runner.Runner(
112             host_list=inventory_file,
113         module_name='file',
114         module_args='path=%s state=absent' % self.ansibleResult,
115         pattern=host
116         )
117         res = runner.run()
118         #print res
119         runner = ansible.runner.Runner(
120             host_list=inventory_file,
121         module_name='file',
122         module_args='path=%s state=directory mode=0750' % self.ansibleResult,
123         pattern=host
124         )
125         res = runner.run()
126         #print res
127             
128         # transfer the database
129             this_host_load_count = self.loadBalancing[host]
130         for i in range(this_host_load_count) :
131             runner = ansible.runner.Runner(
132             host_list=inventory_file,
133             module_name='copy',
134             module_args='src=%s/mygraph%s dest=%s mode=0750' % (self.database_path, current_database_count + i, self.ansibleResult),
135             pattern=host
136         )
137         res = runner.run()
138         print res
139         #print current_database_count
140         #print this_host_load_count
141         current_database_count += this_host_load_count
142 
143         # transfer the code
144         file_code_list = ["community.jar", "config.ini", "runAll.sh","lib"]
145         for f in file_code_list:
146             runner = ansible.runner.Runner(
147             host_list=inventory_file,
148             module_name='copy',
149             module_args='src=%s/%s dest=%s mode=770' % (self.code_home_path, f, self.code_home_path),
150             pattern=host
151         )
152         res = runner.run()
153         print res
154              
155         # run the code distributed
156         runner = ansible.runner.Runner(
157             host_list=inventory_file,
158         module_name="shell",
159         module_args='chdir=%s nohup bash runAll.sh %s > bash.log &' % (self.code_home_path, self.ansibleResult),
160         pattern=host
161         )
162 
163         res = runner.run()
164         print res
165         print "The Host named", host, "is running!"
166 
167     def informationTransfer(self, transfer_module_name, transfer_module_args, host):
168         print transfer_module_args
169     print transfer_module_name
170     runner = ansible.runner.Runner(
171         host_list=inventory_file,
172         module_name=transfer_module_name,
173         module_args=transfer_module_args,
174         pattern=host
175     )
176     res = runner.run()
177     print res
178 
179     def checkPoint(self):
180         os.system("rm -rf %s/output" % self.database_path)
181         os.system("mkdir -p %s/output" % self.database_path)
182         print "Begining CheckPoint"
183     host_keys = self.loadBalancing.keys()
184     #print host_keys
185     while len(host_keys) > 0:
186         for host in host_keys:
187         runner = ansible.runner.Runner(
188             host_list=inventory_file,
189             module_name='command',
190             module_args='removes=%s/success_log pwd' % self.ansibleResult,
191             pattern=host
192         )
193         res = runner.run()
194         print res
195         if res['contacted'] and res['contacted'][host] and "skipped" in res['contacted'][host]['stdout']:
196             continue
197         else:
198             print "Host", host, "finished his tasks!"
199             host_keys.remove(host)
200             self.informationTransfer("synchronize", "src=%s/output dest=%s mode=pull" % (self.ansibleResult, self.database_path), host)
201             print host_keys
202             print "to sleep"
203             time.sleep(5)
204     
205 
206 if __name__ == "__main__":
207     ob = DistributedByAnsible()
208     ob.extractHost(inventory_file)
209     #ob.calculateLoadBalancing();
210     #ob.fileTransfer();
211     #ob.checkPoint();
212     print "Done"
213 
214     

待完善部分

主机检测状态仅在开始时检测,剔除无用主机,但是当脚本运行中途机器死掉,此时需要分发另一台主机来执行任务。