信号量
from multiprocessing import Processfrom multiprocessing import Semaphoreimport timeimport randomdef ktv(i,sem): sem.acquire() print('%s走进ktv'%i) time.sleep(random.randint(1,5)) print('%s走出ktv'%i) sem.release()if __name__ == '__main__': sem = Semaphore(4) for i in range(20): p = Process(target=ktv,args=(i,sem)) p.start()
事件
from multiprocessing import Event# 一个信号可以使得所有的进程都进入阻塞状态# 也可以控制所有的进程解除阻塞# 一个事件被创建之后,默认是阻塞状态e = Event() #创建一个事件print(e.is_set()) #查看一个事件的状态,默认被设置为阻塞print(12345)e.set() #将这个事件的状态改为trueprint(e.is_set()) #查看一个事件的状态,默认被设置为阻塞e.wait() #是依据e.is_set()的值来决定是否阻塞print(12345677)e.clear() #将这个事件的状态改为Falsee.wait() #是依据e.is_set()的值来决定是否阻塞print('###############3')
set 和 clear:分别用来修改一个事件的状态。True或False # is_set 用来查看一个事件的状态 # wait 是依据事件的状态来决定自己是否阻塞 False是阻塞。True是不阻塞
# 红绿灯事件from multiprocessing import Eventfrom multiprocessing import Processimport randomimport timedef cars(e,i): if not e.is_set(): print('car%i在等等'%i) e.wait() #阻塞直到得到一个事件状态,事件状态改变成true的信号 print('car%i通过'%i) '车通行'def light(e): while True: if e.is_set(): e.clear() print('\033[31m红灯亮了\33[0m') else: e.set() print('\033[32m绿灯亮了\33[0m') time.sleep(2)if __name__ == '__main__': e = Event() traffic = Process(target=light,args=(e,)) traffic.start() for i in range(20): car = Process(target=cars,args=(e,i)) car.start() time.sleep(random.random())
队列 先进先出 # IPC(Inter-Process Communication)---队列 # 创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
from multiprocessing import Queueimport timeq = Queue(5)q.put(1)q.put(2)q.put(3)q.put(4)q.put(5)print(q.full()) #判断队列是否满了print(q.get())print(q.get())print(q.empty())print(q.get())print(q.get())print(q.get())print(q.empty()) #判断队列是否空while True: try : q.get_nowait() except: print('队列已经空了') time.sleep(1)
from multiprocessing import Queue,Processdef produce(q): q.put('hello')def consume(q): q.get()if __name__ == '__main__': q = Queue() p = Process(target=produce,args=(q,)) p.start() c = Process(target=produce,args=(q,)) c.start() print(q.get())
# 生产者消费者模型
from multiprocessing import Process,Queueimport timeimport randomdef consumer(q,name): while True: food = q.get() if food is None: print('%s获取到了一个空'%name) break print('\033[31m%s消费了%s\033[0m' %(name,food)) time.sleep(random.randint(1,3))def producer(name,food,q): for i in range(4): time.sleep(random.randint(1,3)) f = '%s生成了%s%s'%(name,food,i) print(f) q.put(f)if __name__ == '__main__': q = Queue(20) p1 = Process(target=producer,args=('Egon','包子',q)) p2 = Process(target=producer,args=('wusir','泔水',q)) c1 = Process(target=consumer,args=(q,'alex')) c2 = Process(target=consumer,args=(q,'jin')) p1.start() p2.start() c1.start() c2.start() p1.join() p2.join() q.put(None) q.put(None)
# JoinableQueue # 在消费者这一端: # 每次获取一个数据 # 处理一个数据 # 发送一个记号 :标志一个数据被处理成功 # 在生产者这一端: # 每一次生产一个数据 # 且每一次生产的数据都放在对列中 # 在队列中刻上了一个记号 # 当生产者全部生产完毕之后 # join信号:已经停止生产数据了,且要等待之前被刻上的记号被消费完 # 当数据都被处理完的时候,join阻塞结束 # consumer 中把所有的任务都消耗完 # producer端的join感知到,停止阻塞 # 所有的producer进程结束 # 主进程中的p.join结束 # 主进程代码结束 # 守护进程(消费者进程)结束
from multiprocessing import Process,JoinableQueueimport timeimport randomdef consumer(q,name): while True: food = q.get() if food is None: print('%s获取到了一个空'%name) break print('\033[31m%s消费了%s\033[0m' %(name,food)) time.sleep(random.randint(1,3)) q.task_done() #count-1def producer(name,food,q): for i in range(4): time.sleep(random.randint(1,3)) f = '%s生成了%s%s'%(name,food,i) print(f) q.put(f) q,join() #阻塞 直到一个队列中的所有数据全部被处理完毕,感知一个队列中的数据全部被执行完毕if __name__ == '__main__': q = JoinableQueue(20) p1 = Process(target=producer,args=('Egon','包子',q)) p2 = Process(target=producer,args=('wusir','泔水',q)) c1 = Process(target=consumer,args=(q,'alex')) c2 = Process(target=consumer,args=(q,'jin')) p1.start() p2.start() c1.daemon = True #设置为守护进程,主进程中的代码执行完毕之后子进程自动结束 c2.daemon = True c1.start() c2.start() p1.join() #感知一个进程的结束 p2.join()