1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
# -*- coding: utf-8 -*-
import paramiko
import threading
def run(host_ip, username, password, command):
ssh = paramiko.SSHClient()
try :
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(host_ip, 22 , username, password)
print ( '===================exec on [%s]=====================' % host_ip)
stdin, stdout, stderr = ssh.exec_command(command, timeout = 300 )
out = stdout.readlines()
for o in out:
print (o.strip( '\n' ))
except Exception as ex:
print ( 'error, host is [%s], msg is [%s]' % (host_ip, ex.message))
finally :
ssh.close()
if __name__ = = '__main__' :
# 将需要批量执行命令的host ip地址填到这里
# eg: host_ip_list = ['IP1', 'IP2']
host_ip_list = [ '147.116.20.19' ]
for _host_ip in host_ip_list:
# 用户名,密码,执行的命令填到这里
run(_host_ip, 'tzgame' , 'tzgame@1234' , 'df -h' )
run(_host_ip, 'tzgame' , 'tzgame@1234' , 'ping -c 5 220.181.38.148' )
|
pycrypto,由于 paramiko 模块内部依赖pycrypto,所以先下载安装pycrypto
1
2
|
pip3 install pycrypto
pip3 install paramiko
|
(1)基于用户名和密码的连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
import paramiko
# 创建SSH对象
ssh = paramiko.SSHClient()
# 允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(hostname = 'c1.salt.com' , port = 22 , username = 'GSuser' , password = '123' )
# 执行命令
stdin, stdout, stderr = ssh.exec_command( 'ls' )
# 获取命令结果
result = stdout.read()
# 关闭连接
ssh.close()
|
(2)基于公钥秘钥连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import paramiko
private_key = paramiko.RSAKey.from_private_key_file( '/home/auto/.ssh/id_rsa' )
# 创建SSH对象
ssh = paramiko.SSHClient()
# 允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(hostname = 'c1.salt.com' , port = 22 , username = 'wupeiqi' , key = private_key)
# 执行命令
stdin, stdout, stderr = ssh.exec_command( 'df' )
# 获取命令结果
result = stdout.read()
# 关闭连接
ssh.close()
|
SFTPClient:
用于连接远程服务器并进行上传下载功能。
(1)基于用户名密码上传下载
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
import paramiko
transport = paramiko.Transport(( 'hostname' , 22 ))
transport.connect(username = 'GSuser' ,password = '123' )
sftp = paramiko.SFTPClient.from_transport(transport)
# 将location.py 上传至服务器 /tmp/test.py
sftp.put( '/tmp/location.py' , '/tmp/test.py' )
# 将remove_path 下载到本地 local_path
sftp.get( 'remove_path' , 'local_path' )
transport.close()
|
(2)基于公钥秘钥上传下载
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import paramiko
private_key = paramiko.RSAKey.from_private_key_file( '/home/auto/.ssh/id_rsa' )
transport = paramiko.Transport(( 'hostname' , 22 ))
transport.connect(username = 'GSuser' , pkey = private_key )
sftp = paramiko.SFTPClient.from_transport(transport)
# 将location.py 上传至服务器 /tmp/test.py
sftp.put( '/tmp/location.py' , '/tmp/test.py' )
# 将remove_path 下载到本地 local_path
sftp.get( 'remove_path' , 'local_path' )
transport.close()
|
下面是多线程执行版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
#!/usr/bin/python
#coding:utf-8
import threading
import subprocess
import os
import sys
sshport = 13131
log_path = 'update_log'
output = {}
def execute(s, ip, cmd, log_path_today):
with s:
cmd = '''ssh -p%s root@%s -n "%s" ''' % (sshport, ip, cmd)
ret = subprocess.Popen(cmd, shell = True , stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
output[ip] = ret.stdout.readlines()
if __name__ = = "__main__" :
if len (sys.argv) ! = 3 :
print "Usage: %s config.ini cmd" % sys.argv[ 0 ]
sys.exit( 1 )
if not os.path.isfile(sys.argv[ 1 ]):
print "Usage: %s is not file!" % sys.argv[ 1 ]
sys.exit( 1 )
cmd = sys.argv[ 2 ]
f = open (sys.argv[ 1 ], 'r' )
list = f.readlines()
f.close()
today = datetime.date.today()
log_path_today = '%s/%s' % (log_path,today)
if not os.path.isdir(log_path_today):
os.makedirs(log_path_today)
threading_num = 100
if threading_num > len ( list ):
threading_num = len ( list )
s = threading.Semaphore(threading_num)
for line in list :
ip = line.strip()
t = threading.Thread(target = execute,args = (s, ip,cmd,log_path_today))
t.setDaemon( True )
t.start()
main_thread = threading.currentThread()
for t in threading. enumerate ():
if t is main_thread:
continue
t.join()
for ip,result in output.items():
print "%s: " % ip
for line in result:
print " %s" % line.strip()
print "Done!"
|
以上脚本读取两个参数,第一个为存放IP的文本,第二个为shell命令
执行效果如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
|
# -*- coding:utf-8 -*-
import requests
from requests.exceptions import RequestException
import os, time
import re
from lxml import etree
import threading
lock = threading.Lock()
def get_html(url):
response = requests.get(url, timeout = 10 )
# print(response.status_code)
try :
if response.status_code = = 200 :
# print(response.text)
return response.text
else :
return None
except RequestException:
print ( "请求失败" )
# return None
def parse_html(html_text):
html = etree.HTML(html_text)
if len (html) > 0 :
img_src = html.xpath( "//img[@class='photothumb lazy']/@data-original" ) # 元素提取方法
# print(img_src)
return img_src
else :
print ( "解析页面元素失败" )
def get_image_pages(url):
html_text = get_html(url) # 获取搜索url响应内容
# print(html_text)
if html_text is not None :
html = etree.HTML(html_text) # 生成XPath解析对象
last_page = html.xpath( "//div[@class='pages']//a[last()]/@href" ) # 提取最后一页所在href链接
print (last_page)
if last_page:
max_page = re. compile (r '(\d+)' , re.S).search(last_page[ 0 ]).group() # 使用正则表达式提取链接中的页码数字
print (max_page)
print ( type (max_page))
return int (max_page) # 将字符串页码转为整数并返回
else :
print ( "暂无数据" )
return None
else :
print ( "查询结果失败" )
def get_all_image_url(page_number):
base_url = 'https://imgbin.com/free-png/naruto/'
image_urls = []
for i in range ( 1 , page_number):
url = base_url + str (i) # 根据页码遍历请求url
try :
html = get_html(url) # 解析每个页面的内容
if html:
data = parse_html(html) # 提取页面中的图片url
# print(data)
# time.sleep(3)
if data:
for j in data:
image_urls.append({
'name' : x,
'value' : j
})
x + = 1 # 每提取一个图片url,标识x增加1
except RequestException as f:
print ( "遇到错误:" , f)
continue
# print(image_urls)
return image_urls
def get_image_content(url):
try :
r = requests.get(url, timeout = 15 )
if r.status_code = = 200 :
return r.content
return None
except RequestException:
return None
def main(url, image_name):
semaphore.acquire() # 加锁,限制线程数
print ( '当前子线程: {}' . format (threading.current_thread().name))
save_path = os.path.dirname(os.path.abspath( '.' )) + '/pics/'
try :
file_path = '{0}/{1}.jpg' . format (save_path, image_name)
if not os.path.exists(file_path): # 判断是否存在文件,不存在则爬取
with open (file_path, 'wb' ) as f:
f.write(get_image_content(url))
f.close()
print ( '第{}个文件保存成功' . format (image_name))
else :
print ( "第{}个文件已存在" . format (image_name))
semaphore.release() # 解锁imgbin-多线程-重写run方法.py
except FileNotFoundError as f:
print ( "第{}个文件下载时遇到错误,url为:{}:" . format (image_name, url))
print ( "报错:" , f)
raise
except TypeError as e:
print ( "第{}个文件下载时遇到错误,url为:{}:" . format (image_name, url))
print ( "报错:" , e)
class MyThread(threading.Thread):
"""继承Thread类重写run方法创建新进程"""
def __init__( self , func, args):
"""
:param func: run方法中要调用的函数名
:param args: func函数所需的参数
"""
threading.Thread.__init__( self )
self .func = func
self .args = args
def run( self ):
print ( '当前子线程: {}' . format (threading.current_thread().name))
self .func( self .args[ 0 ], self .args[ 1 ])
# 调用func函数
# 因为这里的func函数其实是上述的main()函数,它需要2个参数;args传入的是个参数元组,拆解开来传入
if __name__ = = '__main__' :
start = time.time()
print ( '这是主线程:{}' . format (threading.current_thread().name))
urls = get_all_image_url( 5 ) # 获取所有图片url列表
thread_list = [] # 定义一个列表,向里面追加线程
semaphore = threading.BoundedSemaphore( 5 ) # 或使用Semaphore方法
for t in urls:
# print(i)
m = MyThread(main, (t[ "value" ], t[ "name" ])) # 调用MyThread类,得到一个实例
thread_list.append(m)
for m in thread_list:
m.start() # 调用start()方法,开始执行
for m in thread_list:
m.join() # 子线程调用join()方法,使主线程等待子线程运行完毕之后才退出
end = time.time()
print (end - start)
# get_image_pages(<a href="https://imgbin.com/free-png/Naruto" rel="external nofollow">https://imgbin.com/free-png/Naruto</a>)
|
以上就是python ssh 执行shell命令的示例的详细内容,更多关于python ssh 执行shell命令的资料请关注服务器之家其它相关文章!
原文链接:https://www.cnblogs.com/chengxuyonghu/archive/2004/01/13/13638870.html