import threading,queue
import time
# 最简单的线程并发
# def run(*args):
# print("threading", args)
# time.sleep(2)
#
# run("t1")
# run("t2")
#
# t3 = threading.Thread(target=run, args=("t3",)) #必须使用, 保证其是一个元组
# t4 = threading.Thread(target=run, args=("t4",))
# t3.start()
# t4.start()
# 通过类的形式
# class MyThread(threading.Thread):
# def __init__(self,n):
# super(MyThread, self).__init__()
# self.n = n
#
# def run(self):
# print("thereading", self.n)
# time.sleep(2)
#
# t5 = MyThread("t5")
# t6 = MyThread("t6")
# t5.start()
# t6.start()
#一次性启动多个线程 主线程和子线程是并行执行
# def run(*args):
# print("threading", args)
# time.sleep(2)
# print("threading done".center(50, "-"), args)
# start_time = time.time()
# for i in range(50):
# t = threading.Thread(target=run, args=("t%s"% i,))
# t.start()
# end_time = time.time()
# print("cost time %s" % (end_time - start_time))
# 如何等所有子线程执行完再进行? ->程序的串行 join()方法
# 如何实现 线程并行 主线程串行? ->join方法的合理使用
# def run(*args):
# print("threading", args, threading.current_thread(),threading.active_count())
# time.sleep(2)
# print("threading done", args)
#
# start_time = time.time()
# #start 50 threads
# t_joins = []
# for i in range(50):
# t = threading.Thread(target=run, args=(i,))
# t_joins.append(t)
# t.start()
# for i in t_joins:
# i.join()
# print("thread done".center(40, '-'), threading.current_thread() )
# end_time = time.time()
# print("cost %s" % (end_time-start_time))
# 守护进程(守护线程) 非守护线程执行完毕就退出 不等所有线程执行完毕
# def run(*args):
# print("threading", args, threading.current_thread(),threading.active_count())
# time.sleep(2)
# print("threading done", args)
#
# start_time = time.time()
# #start 50 threads
# t_joins = []
# for i in range(50):
# t = threading.Thread(target=run, args=(i,))
# t.setDaemon(True) #设置守护线程 (非守护线程执行完毕即终止)
# t_joins.append(t) #避免join导致等待影响接下来的行动 先加入列表 然后执行
# t.start()
# # for i in t_joins:
# # i.join()
# print("thread done".center(40, '-'), threading.current_thread() )
# end_time = time.time()
# print("cost %s" % (end_time-start_time))
# 全局解释器锁GIL python调用的线程是C接口的原生线程 pthread... (语言的基石部分)
# 同一时间只有一个线程拿到数据-->cpython的缺陷 pypy git即时编译(提前预编译)
#################################################################################
# 用户程序锁保证同一时间只有一个线程真正的数据
# 全局解释器锁保证同一时间只有一个线程执行
# 怎么加用户程序锁: THREAD LOCK (2.x)
# def run(*args):
# lock.acquire()
# print("thread %s" % args)
# global num
# num += 1
# time.sleep(1)
# lock.release()
#
# lock = threading.Lock() #开启用户锁->会导致串行->不要sleep,计算量小
# num = 0
# t_joins = []
# for i in range(50):
# t = threading.Thread(target=run, args=(i,))
# t_joins.append(t)
# t.start()
# for i in t_joins:
# i.join() #不加join数据出错? 线程未执行完程序就关闭
# print("num:", num)
# 进两道门锁两次 出去的时候开两次门 =>出错:锁被弄混 --->lock.Rlock()
# 线程锁(互斥锁)Mutex
# 信号量(Semaphore) 互斥锁是指同时只有一个线程更改数据 信号量是指同时允许一定数量的线程更改数据
# mysql线程池 socketserver连接限制
# def run(n):
# semaphore.acquire()
# time.sleep(1)
# print("run the threading %s" % n )
# semaphore.release()
#
# if __name__ == "__main__":
# semaphore = threading.BoundedSemaphore(3)
# for i in range(21):
# t = threading.Thread(target=run, args=(i, ))
# t.start()
# while threading.active_count() != 1:
# pass
# else:
# print("all thread done".center(20, '-'))
###########################################################################
# 事件 : 由一个事件导致另一个事件的连锁变化 threading.Event set(标志位被设置代表绿灯) wait clear(标志位被清空代表红灯) is_set
# 实现红绿灯
# event = threading.Event()
# def lighter():
# count = 0
# while True:
# if 5 <= count < 10:
# print("red light is on")
# event.clear() #清除标记 , 红灯->禁止通行
# elif count < 20:
# print("green light is on")
# event.set() #设置标记 , 绿灯->允许通行
# elif count == 20:
# count = 0
# time.sleep(1)
# count += 1
#
# light = threading.Thread(target=lighter)
# light.start()
#
# def car(name):
# while True:
# if event.is_set():
# print("[%s] drive ..." % name)
# time.sleep(1)
# else:
# print("[%s] stop ..." % name)
# event.wait()
#
# car = threading.Thread(target=car, args=("Lambo",))
# car.start()
################################################################################
# 消息队列(python自带队列)一个有顺序的容器
# 为什么需要队列? 1.提高运行效率(不用干等着) 2.实现解耦(紧耦合->松耦合)
# 列表也有顺序 为什么还需要队列? 列表->复制了一块硬盘 队列->数据只有一份,取走了就没了
# class queue.Queue() #FIFO
# class queue.LifoQueue(maxsize=0) #last in fisrt out
# class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列
# Queue.qsize()
# Queue.empty() #return True if empty
# Queue.full() # return True if full
# Queue.put(item, block=True, timeout=None)
# Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).
#
# Queue.put_nowait(item)
# Equivalent to put(item, False).
#
# Queue.get(block=True, timeout=None)
# Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).
#
# Queue.get_nowait()
# Equivalent to get(False).
#
# Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.
#
# Queue.task_done()
# Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
#
# If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
#
# Raises a ValueError if called more times than there were items placed in the queue.
#
# Queue.join() block直到queue被消费完毕
################################################################################
# 生产者消费者模型
# 在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
#
# 为什么要使用生产者和消费者模式
# 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
#
# 什么是生产者消费者模式
# 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
# q = queue.Queue(maxsize=10)
# def Producer():
# while True:
# for i in range(10):
# print("生产了一坨粑粑%s"% i)
# q.put("粑粑%s" % i)
# time.sleep(1)
#
# def Consumer(name):
# while True:
# print("%s 吃了 %s" % (name, q.get()))
# time.sleep(2)
#
# p = threading.Thread(target=Producer)
# c1 = threading.Thread(target=Consumer, args=("阿西坝", ))
# c2 = threading.Thread(target=Consumer, args=("奥巴马", ))
# p.start()
# c1.start()
# c2.start()
# Python Paramiko 模块: 该模块基于SSH连接远程服务器并执行相关操作->批量管理12# 每台linux机器上都有一个ssh客户端## ssh 密钥## ssh -keygen .ssh/authen...## ssh-copy-id "-p55222 root@10.0.0.31"## 公钥密钥登录->?## 进程/线程# 计算机中 所有指令的执行是由cpu来控制 cpu负责运算 线程就是操作系统能够进行调度的最小单位 cpu->运算速度# 简单理解: 线程->是操作系统的最小调度单位,是一串指令的集合 每一个程序的内存是独立的,不能互相直接访问# 进程:QQ作为整体 暴露给os操作系统来调用管理, 里面包含对各种资源的调用 内存的管理, 网络接口的调用等;(对各种资源管理的集合:就可以称为进程)# 进程要操作cpu 必须要先创建一个线程 进程本身不能执行 (进程->屋子 线程->屋子的空间,人,...)# 进程是资源的集合 线程是一串执行的指令## 书页 行数 字数# 一核只能同时干一件事 为什么会出现多核的幻觉? 是因为上下文的切换, 运算太快了# 一个执行的上下文就是一个线程# 所有在同一个进程的线程可以共享同一块内存空间 (访问同一块数据的问题)# 进程->一个程序的实例 一个资源的集合 唯一的进程标识符(pid)## 当启动一个进程时候会先启动一个线程 线程独立## 线程和进程的区别是什么>?# 1) 进程快还是线程快? 无可比性 进程要执行要通过线程# 2) 启动一个线程快还是一个进程快? 启动一个线程块 进程是资源的集合# 3) 线程共享内存空间 进程的内存是独立的# 4) 父进程生成两个子进程 每生成一个子进程相当于克隆一份父进程 两个子进程之间的数据不共享# 主进程的线程生成子线程->子县城数据和主线程一致# 5) 同一个进程的线程之间可以直接交流 资源的共享 信息的独立# 进程之间是相互独立的 两个进程相通信必须通过一个中间代理来实现# 6) 创建新线程容易创建 创建新进程需要对其父进程进行一次克隆# 7) 一个线程可以控制和操作同一个进程里的其他线程 但是进程只能操作子进程# Python GIL(global interpreter lock) 全局解释器锁# 无论你启动多少个线程 多少个cpu python在执行的时候 会淡定的在同一时刻只允许一个线程运行