paramiko堡垒机、线程及锁

时间:2022-01-22 07:48:55

1.使用paramiko实现ssh连接和scp拷贝

开发堡垒机之前,先来学习Python的paramiko模块,该模块机遇SSH用于连接远程服务器并执行相关操作

1.1 SSHClient

用于连接远程服务器并执行基本命令

基于用户名密码连接:

#SSH1:
import paramiko # 创建SSH对象
ssh = paramiko.SSHClient()
# 允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(hostname='c1.salt.com', port=22, username='root', password='') # 执行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 获取命令结果,标准输出和标准错误哪个有值就输出哪个。 res = stdout.read()
err = stderr.read()
result = res if res else err
# 关闭连接
ssh.close() #SSH2:
import paramiko
transport = paramiko.Transport(('hostname', 22)) transport.connect(username='root', password='')
ssh = paramiko.SSHClient()
ssh._transport = transport
stdin, stdout, stderr = ssh.exec_command('df')
print(stdout.read() )
transport.close()

基于公钥密钥连接:

原理跟ssh免密钥登录一样,手动指定私钥文件位置,把公钥ssh-copy-id到对端机器。

SSH1:
import paramiko private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa') # 创建SSH对象
ssh = paramiko.SSHClient()
# 允许连接不在know_hosts文件中的主机,如果没有这一行,会出现下图的错误;这其实是一个安全认证的环节,ssh第一次连接的时候会让输入一个“yes/no”,输入yes将主机存入know_hosts。
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(hostname='c1.salt.com', port=22, username='root', key=private_key) # 执行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 获取命令结果
res = stdout.read()
err = stderr.read()
result = res if res else err
# 关闭连接
ssh.close() SSH2:
import paramiko private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa') transport = paramiko.Transport(('hostname', 22))
transport.connect(username='root', pkey=private_key) ssh = paramiko.SSHClient()
ssh._transport = transport stdin, stdout, stderr = ssh.exec_command('df') transport.close() SSHClient 封装 Transport

paramiko堡垒机、线程及锁

paramiko堡垒机、线程及锁

1.2 SFTPClient

用于连接远程服务器并执行上传下载

基于用户名密码上传下载

import paramiko

transport = paramiko.Transport(('hostname',22))
transport.connect(username='root',password='') sftp = paramiko.SFTPClient.from_transport(transport)
# 将location.py 上传至服务器 /tmp/test.py
sftp.put('/tmp/location.py', '/tmp/test.py')
# 将remove_path 下载到本地 local_path
sftp.get('remove_path', 'local_path') transport.close()

基于公钥密钥上传下载

import paramiko

private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')

transport = paramiko.Transport(('hostname', 22))
transport.connect(username='root', pkey=private_key ) sftp = paramiko.SFTPClient.from_transport(transport)
# 将location.py 上传至服务器 /tmp/test.py
sftp.put('/tmp/location.py', '/tmp/test.py')
# 将remove_path 下载到本地 local_path
sftp.get('remove_path', 'local_path') transport.close()

demo:

#!/usr/bin/env python

import paramiko
import uuid class Haproxy(object): def __init__(self):
self.host = '172.16.103.191'
self.port = 22
self.username = 'root'
self.pwd = ''
self.__k = None def create_file(self):
file_name = str(uuid.uuid4())
with open(file_name,'w') as f:
f.write('gogogo')
return file_name def run(self):
self.connect()
self.upload()
self.rename()
self.close() def connect(self):
transport = paramiko.Transport((self.host,self.port))
transport.connect(username=self.username,password=self.pwd)
self.__transport = transport def close(self): self.__transport.close() def upload(self):
# 连接,上传
file_name = self.create_file() sftp = paramiko.SFTPClient.from_transport(self.__transport)
# 将location.py 上传至服务器 /tmp/test.py
sftp.put(file_name, '/home/root/tttttttttttt.py') def rename(self): ssh = paramiko.SSHClient()
ssh._transport = self.__transport
# 执行命令
stdin, stdout, stderr = ssh.exec_command('mv /home/root/tttttttttttt.py /home/root/ooooooooo.py')
# 获取命令结果
result = stdout.read() ha = Haproxy()
ha.run()

2. 进程与线程

2.1 什么是线程(thread)?

线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。

不同进程之间的数据不共享,A进程不能访问B进程的内存数据。

进程是程序所需资源的集合,进程创建线程来调度CPU,进程不能直接调度CPU。

一个进程中的所有线程共享同一块内存空间,线程之间是平等的,线程可以创建线程。

2.2 什么是进程(process)?

An executing instance of a program is called a process.

Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.

2.3  进程与线程的区别?

线程共享内存,进程内存隔离;

同一进程中的线程可以直接交流,进程与进程要想通信必须通过一个中间进程处理;

创建线程很简单,创建新进程需要对父进程进行一次克隆;

线程可以操作和控制同一进程里的其他线程,进程只能操作子进程;

修改主线程可能会影响其他线程;修改进程数据不会对子进程产生影响(进程之间的数据独立)。

3. Python threading模块

3.1 线程有2种调用方式,如下

直接调用

import threading
import time
def run(t):
print('t:',t)
time.sleep(5)
## 这样会同时输出t:t1和t:t2
t1 = threading.Thread(target=run,args=("t1",))
t2 = threading.Thread(target=run,args=("t2",))
t1.start()
t2.start() ##这样会先输出t:t1,过5秒后输出t:t2
run('t1')
run('t2')

继承式调用(没有什么特别的,知道就行)

import threading
import time class MyThread(threading.Thread):
def __init__(self,num):
threading.Thread.__init__(self)
self.num = num def run(self):#定义每个线程要运行的函数
print("running on number:%s" %self.num)
time.sleep(3)
t1 = MyThread(1)
t2 = MyThread(2)
t1.start()
t2.start()

3.2 同时启动N个线程

import threading
import time
def run(t):
print('t:',t)
time.sleep(5) for i in range(50):
t = threading.Thread(target=run,args=("t-%s" % i,))
t.start() #瞬间输出结果,因为这50个线程是并行运行的。

3.3 计算所有线程执行完毕的耗时

3.3.1 直接用time做减法是不行的

import threading
import time
def run(t):
print('t:',t)
time.sleep(5)
print('')
time1 = time.time()
for i in range(3):
t = threading.Thread(target=run,args=("t-%s" % i,))
t.start()
print(time.time() - time1) #思路:开始创建线程时记录一个时间,线程启动完毕后记录一个时间,然后相减,事实上这样做是不行的。
#结果:
t: t-0
t: t-1
t: t-2
0.0012934207916259766 #for循环创建了线程就会继续往下走,不会等待线程执行完毕才往下走;所以虽然run()里sleep了5秒,但是这里的总耗时才这么小。
1111111111111111111 #打印了时间,过5秒后才打印的111111...
1111111111111111111
1111111111111111111 #出现上面的结果是因为,主线程只管创建线程执行任务,它创建完了就会继续往下执行,不会等待所有线程都执行完毕后才继续往下执行代码。

3.3.2 join()的作用是线程执行完毕后才往下走

import threading
import time
def run(t):
print('t:',t)
time.sleep(5)
print('')
t1 = threading.Thread(target=run,args=("t1",))
t2 = threading.Thread(target=run,args=("t2",))
t1.start()
t1.join() #只有t1这个线程执行完毕后,才执行后面的t2.start和print
t2.start() #执行了这行代码,不等线程执行结束,直接执行后面的print
print('thread done.') #结果:
t: t1
1111111111111111111
t: t2
thread done.
1111111111111111111

3.3.3 统计线程的耗时

import threading
import time
def run(t):
print('t:',t)
time.sleep(5)
print('')
thread_list = [] #定义一个列表存储所有的线程对象
time1 = time.time()
for i in range(3):
t = threading.Thread(target=run,args=("t-%s" % i,))
t.start()
thread_list.append(t)
for i in thread_list: #循环列表里的对象,每个都执行一次join()方法
i.join() print(time.time() - time1) #结果:
t: t-0
t: t-1
t: t-2
1111111111111111111
1111111111111111111
1111111111111111111
5.005059480667114 #耗时5秒多,因为每个线程会sleep5秒。

3.3.4 threading.current_thread(),threading.active_count()

current_thread()打印当前线程是哪个;threading.active_count()打印活动线程。

import threading
import time
def run(t):
print('t:',t)
time.sleep(2)
print('')
thread_list = []
time1 = time.time()
for i in range(3):
t = threading.Thread(target=run,args=("t-%s" % i,))
print(threading.current_thread)
t.start()
thread_list.append(t)
# for i in thread_list:
# i.join() print(time.time() - time1,threading.current_thread(),threading.active_count()) 结果:
<function current_thread at 0x7fe7e3043510> #方法线程
t: t-0
<function current_thread at 0x7fe7e3043510>
<function current_thread at 0x7fe7e3043510>
t: t-1
0.009331941604614258 <_MainThread(MainThread, started 140634049431360)> 4 #主线程;活动线程4个(三个执行run()方法的线程和一个主线程)
t: t-2
1111111111111111111
1111111111111111111
1111111111111111111

3.4 setDaemon(True)设置守护线程

线程与创建该线程的线程虽然是父子关系,但是身份等价;但是守护线程与创建它的线程身份是不等价的,守护线程像是大臣,而守护线程的父线程像是皇帝,皇帝死了,大臣也跟着陪葬了。

意思是当创建守护线程的父线程执行完毕后,会立刻回收所以守护线程,哪怕守护线程没有执行完毕。

import threading
import time
def run(t):
print('t:',t)
time.sleep(2)
print('')
for i in range(3):
t = threading.Thread(target=run,args=("t-%s" % i,))
# t.setDaemon(True) #注释了这样,线程就单纯是线程,不是守护线程
t.start() #结果:
t: t-0
t: t-1
t: t-2
1111111111111111111
1111111111111111111
1111111111111111111 import threading
import time
def run(t):
print('t:',t)
time.sleep(2)
print('')
for i in range(3):
t = threading.Thread(target=run,args=("t-%s" % i,))
t.setDaemon(True) #将线程设置为守护线程
t.start()
#结果:
t: t-0
t: t-1
t: t-2 #没有上面的11111了,因为当主线程执行完毕后,就立刻结束程序了,所有的守护线程都销毁了。

4. Python GIL(Global Interpreter Lock) 

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

上面的核心意思就是,在CPython中(注意,仅仅是CPython),无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行,所以GIL算是CPython的一个缺陷。

由于python诞生于1989年,当时的CPU还无需考虑多核的问题,所以python最初设计时也没考虑极致的使用多核,就会造成现在的CPython这个缺陷,在2000年的时候,曾有人试图取消GIL,但发现取消后程序变得更慢了,因为既然没有了GIL,就必须自己设计如何让线程利用各个CPU核心。到如今,要取消GIL已经是不可能的事了,因为它是Python的一个底层东西,很多东西都依赖它,一旦取消,基本上要重写很多很多东西。

那如何绕开GIL呢?

需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL,只要不选择CPython作为执行环境即可。

5. 线程锁(互斥锁Mutex)


在ubuntu、mac系统上,使用Python2.x执行下面程序,每次执行的结果可能会不同:

#!/usr/bin/env python

import threading
import time def run():
global num
time.sleep(2)
num += 1 #每个线程执行一次num加1
num = 0 for i in range(1000):
t = threading.Thread(target=run)
t.start()
time.sleep(3)
print(num) #结果:
有时候是1000,有时候是小于1000的数。

但是上面的代码在centos或者python3.X上执行输出的结果就一直是正确的:1000。

出现结果不同的的情况,原因见下:

count=0,A线程的任务是执行count++,B线程的任务也是执行count++;首先A线程会申请GIL锁,然后获取到CPU的使用权限,经过一定时间(在python2.X的时候是每执行100条机器指令就释放GIL锁)释放了GIL锁,但是这时候count++这个操作还没执行完呢;A线程释放完毕后,B线程申请GIL锁,然后读取到count=0,然后操作count++,经过一定时间后释放了GIL锁;然后A线程继续申请GIL锁,继续运算count++,操作完毕后释放GIL锁,并将结果反馈给count,此时的count=1;然后B申请GIL锁,继续执行count++(注意此时B线程已经读取到了count=0,所以即使A线程执行完毕后修改了count=1,B线程也不会重新读),操作完毕后释放GIL锁,并将结果反馈给count,此时的还是count=1(0+1=1)。所以A线程和B线程虽然都执行了,但是结果不如人意,正确的应该是count=2,结果却是count=1。

paramiko堡垒机、线程及锁

这种问题怎么解决呢?在线程操作某个变量时,锁住这个变量,操作完毕后,解锁这个变量。代码见下:

#!/usr/bin/env python

import threading
import time def run():
global num
lock.acquire() #加锁;加锁后注意不要加sleep,不然就变成单线程了,执行完sleep后再执行下一个线程这样。
num += 1
lock.release() #解锁
num = 0
lock = threading.Lock() #定义锁
for i in range(1000):
t = threading.Thread(target=run)
t.start()
time.sleep(3)
print(num)

注意:只有在ubuntu或者mac上使用python2.X时才会出现上面问题,python3.X不会,可能是自动加了锁。

6. RLock(递归锁)

import threading,time

def run1():
print("grab the first part data")
lock.acquire()
global num
num +=1
lock.release()
return num
def run3():
lock.acquire()
res = run1()
print('--------between run1-----')
lock.release() if __name__ == '__main__': num = 0
lock = threading.RLock() #RLock 递归锁;
for i in range(10):
t = threading.Thread(target=run3)
t.start() while threading.active_count() != 1:
print(threading.active_count())
else:
print('----all threads done---')
print(num) #如果锁里面嵌套锁,就必须用threading.RLock() ,如果用threading.Lock()就会导致程序找不到出来的锁,使程序陷入死循环。
#可以试试将上面的RLock()改成Lock(),程序就死循环了。

递归锁的话类似于建了一个锁与钥匙的字典:

locks={
'door1':'lock1',
'door2':'lock2'
}

7. Semaphore(信号量)

互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

import threading
import time def run(n):
semaphore.acquire()
print('num:',n)
time.sleep(3)
semaphore.release() semaphore = threading.BoundedSemaphore(5)
for i in range(22):
t = threading.Thread(target=run,args=(i,))
t.start() #结果:
num: 0
num: 1
num: 2
num: 3
num: 4 num: 5
num: 9
num: 7
num: 6
num: 8 num: 14
num: 12
num: 10
num: 13
num: 11 num: 16
num: 17
num: 15
num: 18
num: 19 num: 20
num: 21
#五个五个的输出;表面上看着好像是5个一块的,其实是每执行完一个线程,就会有新的线程被执行,并不是前5个线程都执行完了,再执行之后的5个线程。

8. Events

通过Event来实现两个或多个线程间的交互,下面是一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。

event.set() 设置信号
event.clear() 清空信号

event.wait() 堵塞程序直到检测到已经设置了信号

event.is_set() 检测是否设置了信号

8.1 编写一个简单的红绿灯

import threading
import time
def lighter():
count = 0
event.set()
while True:
if count >= 3 and count <= 5:
event.clear()
time.sleep(1)
print('now is red,please waiting.')
elif count > 5:
event.set()
time.sleep(1)
print('now is green,you can go.')
count = 0
else:
time.sleep(1)
print('now is green,you can go.')
count +=1 event = threading.Event()
l1 = threading.Thread(target=lighter)
l1.start() #结果:
now is green,you can go.
now is green,you can go.
now is green,you can go.
now is red,please waiting.
now is red,please waiting.
now is red,please waiting.
now is green,you can go.
now is green,you can go.
now is green,you can go.
now is red,please waiting.
now is red,please waiting.
now is red,please waiting.

8.2 红灯停,绿灯行

import threading
import time
def lighter():
count = 0
event.set()
while True:
if count >= 3 and count <= 5:
event.clear()
time.sleep(1)
print('now is red,please waiting.')
elif count > 5:
event.set()
time.sleep(1)
print('now is green,you can go.')
count = 0
else:
time.sleep(1)
print('now is green,you can go.')
count +=1
time.sleep(1)
def car(carname):
while True:
if event.is_set():
print('gogogo.')
time.sleep(1)
else:
print('nonono.')
event.wait() event = threading.Event()
l1 = threading.Thread(target=lighter)
l1.start()
c1 = threading.Thread(target=car,args=('tesla',))
c1.start()
#结果:
gogogo.
now is green,you can go.
gogogo.
gogogo.
now is green,you can go.
gogogo.
gogogo.
now is green,you can go.
gogogo.
nonono.
now is red,please waiting.
now is red,please waiting.
now is red,please waiting.

9. queue队列

队列两个作用:解耦和提高运行效率。

比如所有学生要到老师那里拷数据,如果没有队列,就是所有人都排队,然后一个人一个人的来拷,这样学生和老师都浪费时间;有队列的话,学生把硬盘放到桌子上,老师及其N个助手并行的拿硬盘拷数据,并把拷好的硬盘放到另一个桌子上;学生随时可以来取硬盘,发现拷贝完成的桌子上有自己的硬盘,就代表拷贝好了,可以取走。

解耦:学生只负责放硬盘和去硬盘;老师只负责拷贝数据;双方互不影响。

提高效率:以前是所有学生拿着等着老师拷贝数据,添加队列后,学生把硬盘放在一起,所以老师可以让其助手也帮忙拷贝数据,助手和老师并行拿硬盘→ 拷贝数据→ 放硬盘 。

队列是一个容器,列表也是容器,它俩最主要的区别是:列表的数据被取走后,数据还存在列表里;队列的数据被取走后,该数据就在队列里消失了。

9.1 队列的三种模式

class queue.Queue(maxsize=0) #先入先出
class queue.LifoQueue(maxsize=0) #last in fisrt out
class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

9.2 Queue

Queue.qsize() #获取队列长度
Queue.empty() #return True if empty
Queue.full() # return True if full
Queue.put(item, block=True, timeout=None) #block=True表示如果队列满了,还继续往里面put数据,就会导致队列卡死,除非其他线程取走一个数据才行。
Queue.put_nowait(item) #put数据时如果队列满了,程序不会卡死,而是会触发一个“queue.Full”异常。
Queue.get(block=True, timeout=None) #取数据,block作用同上,如果没有数据了还继续get,就会导致程序卡死,除非有其他线程往队列里添加了数据。
Queue.get_nowait() #get数据时如果队列满了,程序不会卡死,而是会触发一个“queue.Empty”异常。 ###示例代码如下:
import queue q = queue.Queue(maxsize=3)
q.put(1)
q.put(2)
q.put_nowait(3) print(q.get())
print(q.get())
print(q.get_nowait())
#结果:
1
2
3

9.3 LifoQueue

###示例代码如下:
import queue q = queue.Queue(maxsize=3)
q.put(1)
q.put(2)
q.put_nowait(3) print(q.get())
print(q.get())
print(q.get_nowait())
#结果:
3
2
1

9.4 PriorityQueue

import queue

q = queue.PriorityQueue()
q.put((-1,'zrx'))
q.put((13,'aqq'))
q.put((9,'zsc'))
print(q.get())
print(q.get())
print(q.get())
#结果:
(-1, 'zrx')
(9, 'zsc')
(13, 'aqq') #put()只接收一个参数,所以要传入一个元组;元组的第一个元素是比较优先级的对象,可以是数字或者字母,数字的话,数字越小越靠前;字母的话,按a-z排序;不能数字和字母混合比较。

10. 生产者消费者模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

下面来学习一个最基本的生产者消费者模型的例子

import time
import threading
import queue q = queue.Queue(maxsize=10) def producer():
count = 0
while True:
q.put("bone")
print('produce a bone : %s' % count)
count += 1
time.sleep(1)
def consumer(name):
while True:
q.get()
print('%s eat a bone.' % name)
time.sleep(1)
p1 = threading.Thread(target=producer)
p1.start()
c1 = threading.Thread(target=consumer,args=('zsc',))
c2 = threading.Thread(target=consumer,args=('ZSC',))
c1.start()
c2.start()