博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
第三十二天- 管道 进程池
阅读量:5098 次
发布时间:2019-06-13

本文共 4967 字,大约阅读时间需要 16 分钟。

 

1.管道

  进程间通信(IPC)方式二:管道(不推荐使用,了解即可),端口易导致数据不安全的情况出现。

1 from multiprocessing import Pipe,Process 2  3  4 def func(conn1,conn2): 5     msg = conn1.recv()  # 接收了conn2传递的 6     # msg1 = conn2.recv()  # 接收了conn1传递的 7     print('>>>',msg) 8     # print('>>>',msg1) 9 10 11 if __name__ == '__main__':12     # 拿到管道的两端,双工通信方式,两端都可以收发消息13     conn1,conn2 = Pipe()  # 必须在Process之前产生管道14     p = Process(target=func,args=(conn1,conn2,))  # 管道给子进程15     p.start()16     conn1.send('hello')17     conn1.close()18     conn2.send('小子')19     conn2.close()20 21     print('进程结束')22 23 # 注意管道不用了就关闭防止异常

 

 

2.共享数据

  进程之间数据共享的模块之一Manager模块(少用):

  进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的虽然进程间数据独立,但可以通过Manager实现数据共享:

1 from multiprocessing import Manager,Process,Lock 2  3  4 def func1(dic,loc): 5     # loc.acquire()  # 不加锁易出错 6     dic['num'] -= 1 7     # loc.release() 8  9 10 if __name__ == '__main__':11     m = Manager()12     loc = Lock()13     dic = m.dict({
'num':100})14 p_list = []15 for i in range(100):16 p = Process(target=func1, args=(dic,loc))17 p_list.append(p)18 p.start()19 20 [pp.join() for pp in p_list]21 22 print('>>>>>',dic['num'])23 # 共享时不加锁,很可能导致同一个数据被多个子进程取用,数据是不安全的,且超多进程消耗大量资源易导致卡死.
基于Manager的数据共享

  多进程共同去处理共享数据的时候,就和我们多进程同时去操作一个文件中的数据是一样的,不加锁就会出现错误的结果,进程不安全的,所以也需要加锁

 

总结:进程间应该尽量避免通信,即便需要通信,也应该选择进程安全的工具来避免加锁带来的问题。

 

 

3.进程池 Pool

  创建进程需要消耗时间,销毁进程(空间,变量,文件信息等等的内容)也需要消耗时间。开启成千上万的进程,操作系统无法让他们同时执行,维护一个很大的进程列表的同时,调度的时候,还需要进行切换并且记录每个进程的执行节点,也就是记录上下文(各种变量等等乱七八糟的东西,虽然你看不到,但是操作系统都要做),这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程,这就需要用到进程池:

  定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。

 

创建方法:

Pool([numprocess [,initializer [, initargs]]]):创建进程池

参数介绍:

1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None3 initargs:是要传给initializer的参数组

常用方法:

p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。'''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()'''p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。'''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。'''    p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用主要方法介绍

 

1 import time 2 from multiprocessing import Process,Pool 3  4  5 def func1(i): 6     num = 0 7     for j in range(5): 8         num += i 9 10 11 if __name__ == '__main__':12     pool = Pool(6)  # 创建进程池13 14     p_list = []15     start_time = time.time()16     for i in range(500):17         p = Process(target=func1,args=(i,))18         p_list.append(p)19         p.start()20 21     [pp.join() for pp in p_list]22     end_time = time.time()23     print('耗时:',end_time-start_time)24 25     s_time = time.time()26     pool.map(func1,range(500))  # map27     e_time = time.time()28     print('耗时:',e_time - s_time)  # 耗时远远小于直接开500进程
进程池 简单应用

 

apply同步方法:

1 from multiprocessing import Process,Pool 2 import time 3  4  5 def func1(i): 6     num = 0 7     for j in range(3): 8         num += i 9     time.sleep(1)10     print(num)11     return num12 13 14 if __name__ == '__main__':15     pool = Pool(6)16 17     for i in range(10):18         res = pool.apply(func1,args=(i,))  # apply 进程同步/串行方法 效率低,不常用19         # print(res)
apply 进程同步/串行方法

 

apply_async异步方法:

1 from multiprocessing import Process,Pool 2 import time 3  4  5 def func1(i): 6     num = 0 7     for j in range(5): 8         num += i 9     time.sleep(1)10     # print('>>>>>',num)11     return num12 13 14 if __name__ == '__main__':15     pool = Pool(6)16 17     red_list = []18     for i in range(10):19         res = pool.apply_async(func1,args=(i,))20         red_list.append(res)21 22     pool.close()  # 不是关闭,只是锁定进程池,告诉主进程不会再添加数据进去23     pool.join()  # 等待子程序执行完24 25     for ress in red_list:26         print(ress.get())  # get方法取出返回值num 按添加顺序取出已保存在缓存区的结果 所以是顺序打印出的
View Code

 

回调函数:运用时注意一点,回调函数的形参执行有一个,如果你的执行函数有多个返回值,那么也可以被回调函数的这一个形参接收,接收的是一个元祖,包含着你执行函数的所有返回值。

1 from multiprocessing import Pool,Process 2 import time,os 3  4  5 def func1(n): 6     # print('子进程的pid:',os.getpid()) 7     return n*n 8  9 10 def func2(i):11     res = i**212     # print('callback的pid:',os.getpid())13     print(res)14     return res15 16 17 if __name__ == '__main__':18     pool = Pool(4)19     pool.apply_async(func1,args=(3,),callback=func2)  # callback把前面的返回值作参数传给后面20     # print('主进程的pid:',os.getpid())  # 主进程执行了callback21     pool.close()22     pool.join()
回调函数 callback

 

 

4.总结

  进程之间的通信:队列、管道、数据共享也算

  信号量和事件也相当于锁,也是全局的,所有进程都能拿到这些锁的状态,进程之间这些锁啊信号量啊事件啊等等的通信,其实底层还是socekt,只不过是基于文件的socket通信,而不是跟上面的数据共享啊空间共享啊之类的机制,我们之前学的是基于网络的socket通信,socket的两个家族,一个文件的一个网络的,所以如果说这些锁之类的报错,可能你看到的就是类似于socket的错误。工作中常用的是锁,信号量和事件不常用,但是信号量和事件面试的时候会问到(做了解)

 

转载于:https://www.cnblogs.com/xi1419/p/10041082.html

你可能感兴趣的文章
浅谈unique列上插入重复值的MySQL解决方案
查看>>
hdu 4859(思路题)
查看>>
11.2.0.4 sql*loader/oci direct load导致kpodplck wait before retrying ORA-54
查看>>
sql server 2008空间释放
查看>>
团队-科学计算器-最终总结
查看>>
树的遍历 TJUACM 3988 Password
查看>>
UVA 725 - Division
查看>>
bzoj1798: [Ahoi2009]Seq 维护序列seq(线段树)
查看>>
day5
查看>>
Palindrome
查看>>
窗体中拖动panel,并且不会拖动至窗体外部程序实现方法。
查看>>
vb中从域名得到IP及从IP得到域名
查看>>
一步步跨过学习中一道道的坎
查看>>
RxJava入门优秀博客推荐
查看>>
基于Selenium2+Java的UI自动化(5) - 执行JavaScript脚本
查看>>
bc https://en.wikipedia.org/wiki/Gossip_protocol
查看>>
saltstack---自动化运维平台
查看>>
Java注释@interface的用法【转】
查看>>
妙味——操作元素属性的几种方法
查看>>
Ring 0 Inline Hook
查看>>