多进程
多线程
协程
paramiko模块
1、基于UDP的套接字
UDP是面向数据报的,不是面向连接的
from socket import *UDP服务端
udp_server=socket(AF_INET,SOCK_DGRAM)
udp_server.bind(('127.0.0.1',8080))
while True:
data,client_addr=udp_server.recvfrom(1024)
print(data,client_addr)
udp_server.sendto(data.upper(),client_addr)
from socket import *UDP客户端
udp_client=socket(AF_INET,SOCK_DGRAM)
while True:
msg=input('>>: ').strip()
udp_client.sendto(msg.encode('utf-8'),('127.0.0.1',8080))
data,server_addr=udp_client.recvfrom(1024)
print(data.decode('utf-8'))
基于UDP的套接字不会发生粘包现象
from socket import *UDP服务端
udp_server=socket(AF_INET,SOCK_DGRAM)
udp_server.bind(('127.0.0.1',8080))
data1,client_addr=udp_server.recvfrom(3)
print('data1',data1)
data2,client_addr=udp_server.recvfrom(1024)
print('data2',data2)
from socket import *UDP客户端
udp_client=socket(AF_INET,SOCK_DGRAM)
udp_client.sendto('hello'.encode('utf-8'),('127.0.0.1',8080))
udp_client.sendto('world'.encode('utf-8'),('127.0.0.1',8080))
并发的UDP套接字
#UDP服务端
import socketserverclass MyUDPhandler(socketserver.BaseRequestHandler):
def handle(self):
print(self.request)
self.request[1].sendto(self.request[0].upper(),self.client_address)
if __name__ == '__main__':
s=socketserver.ThreadingUDPServer(('127.0.0.1',8080),MyUDPhandler)
s.serve_forever()
#UDP客户端
from socket import *
udp_client=socket(AF_INET,SOCK_DGRAM)
while True:
msg=input('>>: ').strip()
udp_client.sendto(msg.encode('utf-8'),('127.0.0.1',8080))
data,server_addr=udp_client.recvfrom(1024)
print(data.decode('utf-8'))
2、进程理论知识
进程是对正在运行程序的一个抽象。
#一 操作系统的作用:
1:隐藏丑陋复杂的硬件接口,提供良好的抽象接口
2:管理、调度进程,并且将多个进程对硬件的竞争变得有序
进程与程序的区别:
程序仅仅只是一堆代码而已,而进程指的是程序的运行过程。
注:同一个程序执行两次,那也是两个进程,比如打开暴风影音,虽然都是同一个软件,但是一个可以播放苍井空,一个可以播放饭岛爱。
同步执行:一个进程在执行某个任务时,另外一个进程必须等待其执行完毕,才能继续执行
异步执行:一个进程在执行某个任务时,另外一个进程无需等待其执行完毕,就可以继续执行,当有消息返回时,系统会通知后者进行处理,这样可以提高执行效率
#开启进程的方式一
from multiprocessing import Process
import time
def work(name):
print('task <%s> is runing' %name)
time.sleep(0.5)
print('task <%s> is done' % name)
if __name__ == '__main__': #windows系统开启子进程一定要写在main函数下。
# Process(target=work,kwargs={'name':'egon'})
p1=Process(target=work,args=('egon',)) #一定要加,表示此为元组
p2=Process(target=work,args=('alex',))
p1.start()
p2.start()
print('主')
#join方法 待子进程运行完后主进程开始运行
from multiprocessing import Process
import time
def work(name):
print('task <%s> is runing' %name)
time.sleep(0.5)
print('task <%s> is done' % name)
if __name__ == '__main__':
p1=Process(target=work,args=('egon',))
p2=Process(target=work,args=('alex',))
p3=Process(target=work,args=('yuanhao',))
p_l = [p1, p2, p3]
for p in p_l:
p.start()
for p in p_l:
p.join()
# p1.join() #主进程等,等待p1运行结束
# p2.join() #主进程等,等待p2运行结束
# p3.join() #主进程等,等待p3运行结束
print('主')
#开启子进程的方式二
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self,name):
super().__init__()
self.name=name
def run(self):
print('task <%s> is runing' % self.name)
time.sleep(0.5)
print('task <%s> is done' % self.name)
if __name__ == '__main__':
p=MyProcess('egon')
p.start()
print('主')
#并发的套接字通讯
#服务端
from multiprocessing import Process
from socket import *
s=socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(('127.0.0.1',8080))
s.listen(5)
def talK(conn,addr):
while True:
try:
data=conn.recv(1024)
if not data:break
conn.send(data.upper())
except Exception:
break
conn.close()
if __name__ == '__main__':
while True:
conn,addr=s.accept()
p=Process(target=talK,args=(conn,addr)) #链接循环使用开启通讯循环子进程方式
p.start()
s.close()
#客户端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8080))
while True:
msg=input('>>: ').strip()
if not msg:continue
c.send(msg.encode('utf-8'))
data=c.recv(1024)
print(data.decode('utf-8'))
c.close()
#Process对象的其他方法和属性
from multiprocessing import Process
import time,os
def work():
print('parent:%s task <%s> is runing' %(os.getppid(),os.getpid()))
time.sleep(1)
print('parent:%s task <%s> is done' %(os.getppid(),os.getpid()))
if __name__ == '__main__':
p1=Process(target=work,args=('egon',),name='123123') #指定进程名p1.start()
p1.terminate() #强制中止进程 如果p1有子进程,会出现僵尸进程
p1.is_alive() #True os过了一段时间才能回收p1进程
p1.name #进程名
p1.pid #进程号
os.getpid() #当前进程的pid
os.getppid #父进程的pid
windows cmd tasklist|findstr python(pycharm)
守护进程daemon
#守护进程daemon
#其一:守护进程会在主进程代码执行结束后就终止
#其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
from multiprocessing import Processimport time
def work(name):
print('task <%s> is runing' %name)
time.sleep(0.5)
print('task <%s> is done' % name)
if __name__ == '__main__':
p1=Process(target=work,args=('egon',))
p1.daemon = True #子进程start之前必须要要指定daemon=True #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
p1.start()
print('主')
#主进程代码运行完毕,守护进程就会结束
from multiprocessing import Process
import time
def foo():
print(123)
time.sleep(1)
print("end123")
def bar():
print(456)
time.sleep(3)
print("end456")
if __name__ == '__main__':
p1=Process(target=foo)
p2=Process(target=bar)
p1.daemon=True
p1.start()
p2.start()
print("main-------") #打印该行则主进程代码结束,则守护进程p1应该被终止,可能会有p1任务执行的打印信息123,因为主进程打印main----时,p1也执行了,但是随即被终止
同步锁mutex
#竞争带来的结果就是错乱,如何控制,就是加锁处理
#同步锁mutexfrom multiprocessing import Process,Lock
import time
def work(name,mutex):
mutex.acquire() #加锁
print('task <%s> is runing' %name)
time.sleep(2)
print('task <%s> is done' % name)
mutex.release() #解锁
if __name__ == '__main__':
mutex=Lock()
p1=Process(target=work,args=('egon',mutex)) #参数要指定mutex
p2=Process(target=work,args=('alex',mutex))
p1.start()
p2.start()
print('主')
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
虽然可以用文件共享数据实现进程间通信,但问题是:
1.效率低
2.需要自己加锁处理
模拟抢票
#db.txt {"count": 1} #序列化需要用双引号
import json
import os
import time
from multiprocessing import Process,Lock
def search():
dic=json.load(open('db.txt'))
print('\033[32m[%s] 看到剩余票数<%s>\033[0m' %(os.getpid(),dic['count']))
def get_ticket():
dic = json.load(open('db.txt'))
time.sleep(0.5) #模拟读数据库的网络延迟
if dic['count'] > 0:
dic['count']-=1
time.sleep(0.5) # 模拟写数据库的网络延迟
json.dump(dic,open('db.txt','w'))
print('\033[31m%s 购票成功\033[0m' %os.getpid())
def task(mutex):
search()
mutex.acquire()
get_ticket()
mutex.release()
if __name__ == '__main__':
mutex=Lock()
for i in range(10):
p=Process(target=task,args=(mutex,))
p.start()
共享数据
from multiprocessing import Process,Manager,Lock
def task(dic,mutex):
with mutex:
dic['count']-=1
if __name__ == '__main__':
mutex=Lock()
m=Manager()
dic=m.dict({'count':100})
p_l=[]
for i in range(10):
p=Process(target=task,args=(dic,mutex))
p_l.append(p)
p.start()
for p in p_l:
p.join()
print(dic)
队列
#为此mutiprocessing模块为我们提供了基于消息的IPC通信机制:队列和管道。
1 队列和管道都是将数据存放于内存中
2 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
from multiprocessing import Queueq=Queue(3)
q.put('first')
q.put('second')
q.put('third')
# q.put('fourth') #满了会一直卡在这
print(q.get())
print(q.get())
print(q.get())
# print(q.get()) #空的话会一直卡在这
#了解
q=Queue(3)
q.put('first',block=False)
q.put('second',block=False)
q.put('third',block=False)
# q.put_nowait('fourth') == #q.put('fourth',block=False) #满了不会卡在这,会抛出一个异常
q.put('fourth',timeout=3) #指定超时时间
生产者消费者模型
from multiprocessing import Process,Queue
import time,os
def producer(q,name):
for i in range(3):
time.sleep(1)
res='%s%s' %(name,i)
q.put(res)
print('\033[45m<%s> 生产了 [%s]\033[0m' %(os.getpid(),res))
def consumer(q):
while True:
res=q.get()
if res is None:break
time.sleep(1.5)
print('\033[34m<%s> 吃了 [%s]\033[0m' % (os.getpid(), res))
if __name__ == '__main__':
q=Queue()
#生产者们:即厨师们
p1=Process(target=producer,args=(q,'包子'))
p2=Process(target=producer,args=(q,'饺子'))
p3=Process(target=producer,args=(q,'馄饨'))
#消费者们:即吃货们
c1=Process(target=consumer,args=(q,))
c2=Process(target=consumer,args=(q,))
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join() #待生产者们运行完毕
q.put(None) #队列中放入结束指定符,几个消费者就放入几个None
q.put(None)
print('主')
Joinable生产者消费者模型
#Joinablequeue
#消费者发消息给生产者
from multiprocessing import Process, JoinableQueue
import time, os
def producer(q, name):
for i in range(3):
time.sleep(1)
res = '%s%s' % (name, i)
q.put(res)
print('\033[45m<%s> 生产了 [%s]\033[0m' % (os.getpid(), res))
q.join() #待进程运行完毕
def consumer(q):
while True:
res = q.get()
time.sleep(1.5)
print('\033[34m<%s> 吃了 [%s]\033[0m' % (os.getpid(), res))
q.task_done() #消费者发消息给生产者
if __name__ == '__main__':
q = JoinableQueue()
# 生产者们:即厨师们
p1 = Process(target=producer, args=(q, '包子'))
p2 = Process(target=producer, args=(q, '饺子'))
p3 = Process(target=producer, args=(q, '馄饨'))
# 消费者们:即吃货们
c1 = Process(target=consumer, args=(q,))
c2 = Process(target=consumer, args=(q,))
c1.daemon=True #消费者为守护进程
c2.daemon=True
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
print('主')
进程池
创建进程池的类:如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,不会开启其他进程
#进程池Poolfrom multiprocessing import Pool
import os,time
def work(n):
print('task <%s> is runing' %os.getpid())
time.sleep(2)
return n**2
if __name__ == '__main__':
# print(os.cpu_count()) #CPU个数获得方式
p=Pool(4) #进程个数设置为CPU个数 #要创建的进程数,如果省略,将默认使用cpu_count()的值
res_l=[]
for i in range(10):
res=p.apply_async(work,args=(i,)) #异步方式提交任务
res_l.append(res)
#异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
p.close() #不允许再给进程池加任务 join前必须要执行close,否则程序有问题 p.join() #主进程等待进程池中任务执行结束 for res in res_l: print(res.get()) #从进程池中获得任务执行的结果 ##使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get
p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()
p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。
p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
进程池之回调函数
需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。
#回调函数callback=
#进程池异步方式提交任务,进程池结果使用回调函数处理任务执行结果
import requests #pip3 install requests
import os,time
from multiprocessing import Pool
def get_page(url):
print('<%s> get :%s' %(os.getpid(),url))
respone = requests.get(url)
if respone.status_code == 200:
return {'url':url,'text':respone.text}
def parse_page(dic):
print('<%s> parse :%s' %(os.getpid(),dic['url']))
time.sleep(0.5)
res='url:%s size:%s\n' %(dic['url'],len(dic['text'])) #模拟解析网页内容
with open('db.txt','a') as f:
f.write(res)
if __name__ == '__main__':
p=Pool(4)
urls = [
'http://www.baidu.com',
'http://www.baidu.com',
'http://www.baidu.com',
'http://www.baidu.com',
'http://www.baidu.com',
'http://www.baidu.com',
'http://www.baidu.com',
]
for url in urls:
p.apply_async(get_page,args=(url,),callback=parse_page)
p.close()
p.join()
print('主进程pid:',os.getpid())
进程池控制并发的套接字通信
#服务端
from multiprocessing import Pool
import os
from socket import *
s=socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(('127.0.0.1',8080))
s.listen(5)
def talK(conn,addr):
print(os.getpid())
while True:
try:
data=conn.recv(1024)
if not data:break
conn.send(data.upper())
except Exception:
break
conn.close()
if __name__ == '__main__':
p=Pool(4)
while True:
conn,addr=s.accept()
p.apply_async(talK,args=(conn,addr))
s.close()
#客户端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8080))
while True:
msg=input('>>: ').strip()
if not msg:continue
c.send(msg.encode('utf-8'))
data=c.recv(1024)
print(data.decode('utf-8'))
c.close()
paramiko模块
paramiko是一个用于做远程控制的模块,使用该模块可以对远程服务器进行命令或文件操作
pip3 install paramiko #在python3中
#用户名密码方式远程连接服务器执行命令获取结果
import paramiko
# 创建SSH对象
ssh = paramiko.SSHClient()
# 允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(hostname='120.92.84.249', port=22, username='root', password='123QWEasd')
# 执行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 获取命令结果
result = stdout.read()
print(result.decode('utf-8'))
# 关闭连接
ssh.close()
#公私钥方式远程连接服务器执行命令获取结果
# import paramiko
private_key = paramiko.RSAKey.from_private_key_file('id_rsa')
# 创建SSH对象
ssh = paramiko.SSHClient()
# 允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(hostname='120.92.84.249', port=22, username='root', pkey=private_key)
# 执行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 获取命令结果
result = stdout.read()
print(result.decode('utf-8'))
# 关闭连接
ssh.close()
#paramiko利用sftp上传下载文件
import paramiko
transport = paramiko.Transport(('120.92.84.249', 22))
transport.connect(username='root', password='123QWEasd')
sftp = paramiko.SFTPClient.from_transport(transport)
# 将location.py 上传至服务器 /tmp/test.py
sftp.put('id_rsa', '/tmp/test.rsa')
# 将remove_path 下载到本地 local_path
# sftp.get('remove_path', 'local_path')
transport.close()