并发编程之多进程
之前介绍过Python多线程,对于计算密集型任务,Python多线程不再起作用,而应该使用多进程。 Python多进程我的理解是在一个父进程下fork出多个进程。父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。
1、创建子进程:
from multiprocessing import Process,Pool
import os
import time
def run(name):
print 'current process:%s in %s start' % (name,os.getpid())
time.sleep(3)
print 'process %s in %s ends' % (name,os.getpid())
if __name__ == '__main__':
print 'Parent process:%s start' % os.getpid()
p1 = Process(target=run,args=('haibo',))
p2 = Process(target=run,args=('lina',))
p1.start()
p2.start()
p1.join()
p2.join()
print 'Parent process:%s end' % os.getpid()
输出:
Parent process:8284 start
current process:haibo in 8804 start
current process:lina in 7356 start
process haibo in 8804 ends
process lina in 7356 ends
Parent process:8284 end
父进程 8284创建了两个子进程,8804和7456。
2、进程池
如果要创建多个进程应该使用进程池。
import multiprocessing as m
from multiprocessing import Process,Pool
import os
import time
def run(name):
print 'current process:%s in %s start' % (name,os.getpid())
time.sleep(3)
print 'process %s in %s ends' % (name,os.getpid())
def main():
print 'Parent process:%s start' % os.getpid()
pool = Pool()
for i in range(m.cpu_count()+1):
pool.apply_async(run,args=(i,))
pool.close()
pool.join()
print 'Parent process:%s end' % os.getpid()
if __name__ == '__main__':
main()
输出:
Parent process:7804 start
current process:0 in 4936 start
current process:1 in 8496 start
current process:2 in 8112 start
current process:3 in 6684 start
process 0 in 4936 ends
current process:4 in 4936 start
process 1 in 8496 ends
process 2 in 8112 ends
process 3 in 6684 ends
process 4 in 4936 ends
Parent process:7804 end
从输出结果可以看出,第5个进程没有立即执行,这是因为该电脑只有4核,第五个得至少等到一个进程结束之后才能执行。可以看到,多进程可以充分发挥多行CPU的优势。
进程池的计算上面用了apply_async,除此之外,还有apply,map,map_async。如果使用appy,那么会阻塞当前进程,待执行之后,再执行其他进程。
for i in range(m.cpu_count()+1):
pool.apply(run,args=(i,))
输出:
Parent process:6468 start
current process:0 in 7864 start
process 0 in 7864 ends
current process:1 in 5912 start
process 1 in 5912 ends
current process:2 in 8868 start
process 2 in 8868 ends
current process:3 in 8308 start
process 3 in 8308 ends
current process:4 in 7864 start
process 4 in 7864 ends
Parent process:6468 end
从结果看到,1执行完之后才能执行进程2,以此类推。
map的用法:
for i in range(m.cpu_count()+1):
pool.apply(run,args=(i,))
替换成:
pool.map(run,range(m.cpu_count()+1)
3、多进程之间相互通信
首先理解进程间的通信。进程间通信主要有Pipe和传统IPC(消息队列、共享内存、信号量等。) inux下进程间通信的几种主要手段简介:
- 管道(Pipe)及有名管道(named pipe):管道可用于具有亲缘关系进程间的通信,可以使用管道将一个进程的输出和另一个进程的输入连接起来,从而利用文件操作API来管理进程间通信。在shell中,我们经常利用管道将多个进程连接在一起,从而让各个进程协作,实现复杂的功能。
- 信号(Signal):信号是比较复杂的通信方式,用于通知接受进程有某种事件发生,除了用于进程间通信外,进程还可以发送信号给进程本身;linux除了支持Unix早期信号语义函数sigal外,还支持语义符合Posix.1标准的信号函数sigaction(实际上,该函数是基于BSD的,BSD为了实现可靠信号机制,又能够统一对外接口,用sigaction函数重新实现了signal函数);
- 报文(Message)队列(消息队列):消息队列是消息的链接表,包括Posix消息队列system V消息队列。有足够权限的进程可以向队列中添加消息,被赋予读权限的进程则可以读走队列中的消息。消息队列克服了信号承载信息量少,管道只能承载无格式字节流以及缓冲区大小受限等缺点。
- 共享内存:使得多个进程可以访问同一块内存空间,是最快的可用IPC形式。是针对其他通信机制运行效率较低而设计的。往往与其它通信机制,如信号量结合使用,来达到进程间的同步及互斥。
- 信号量(semaphore):主要作为进程间以及同一进程不同线程之间的同步手段。
- 套接口(Socket):更为一般的进程间通信机制,可用于不同机器之间的进程间通信。起初是由Unix系统的BSD分支开发出来的,但现在一般可以移植到其它类Unix系统上:Linux和System V的变种都支持套接字。
Python进程间的通信使用Queue和PIPE实现进程间通信,Queue即FIFO。
1)Queue(即FIFO)
from multiprocessing import Process,Pool,Pipe,Queue,Manager
import os
import time
def input_data(q):
for i in range(5):
print 'process %s Put data %d to Queue' %(os.getpid(),i)
q.put(i)
time.sleep(1)
def get_data(q):
while True:
try:
value = q.get(True,timeout=3)
finally:
pass
print 'process %s Get data %s from Queue' % (os.getpid(),value)
def main():
q = Queue()
p1 = Process(target=input_data,args=(q,))
p2 = Process(target=get_data,args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
print 'main function ends'
if __name__ == '__main__':
main()
输出:
process 10128 Put data 0 to Queue
process 10128 Put data 1 to Queue
process 10124 Get data 0 from Queue
process 10124 Get data 1 from Queue
process 10128 Put data 2 to Queue
process 10128 Put data 3 to Queue
process 10124 Get data 2 from Queue
process 10124 Get data 3 from Queue
process 10128 Put data 4 to Queue
process 10124 Get data 4 from Queue
value = q.get(True),设为True(默认也是True),会一直阻塞,直到取得一个元素,如果设置timeout,那么待等到一段时间之后,会抛出Empty异常。
2)Pipe:
def input_data(pipe):
while True:
for i in xrange(1000):
print 'process %s send data %d' %(os.getpid(),i)
pipe.send(i)
time.sleep(1)
def get_data(pipe):
while True:
print 'process %s receive data %s' % (os.getpid(),pipe.recv())
time.sleep(1)
输出:
process 5248 send data 0
process 9356 receive data 0
process 5248 send data 1
process 9356 receive data 1
process 5248 send data 2
process 9356 receive data 2
process 5248 send data 3
process 9356 receive data 3
Pipe即一个进程的输出当做另外一个进程的输入。
数据共享
当多进程并发执行的时候,最好是尽量避免有数据共享发生,但如果真的需要进行数据共享的时候,multiprocessing模块提供了很多的方法。
1、共享内存
数据可以通过Value或者Array存在共享内存映射中。
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
这些数据是进程安全的。
2、服务器进程Server process
multiprocessing.Manager类会创建以一个manager类,该类控制着一个支持多种Python 对象的服务进程,而且该类允许其他进程使用代理来操作这些Python对象。
from multiprocessing import Process, Manager
def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(10))
p = Process(target=f, args=(d, l))
p.start()
p.join()
print(d)
print(l)
输出:
{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
服务进程要比共享内存更加灵活,它支持多种不同类型的Python对象。此外,单个manager类可以为同一网络中多个电脑之间实现共享。不过服务进程要比共享内存要慢。
多进程的优缺点
多进程本身是占用资源的,且进程间的切换也是相当消耗的,因此对于IO密集型的任务,不应该选择多进程。此外,多进程之间的变量是不能共享的,可以通过几种进程间通信方式实现。
上面说了多进程的缺点,但多进程在计算密集型任务下是相当牛逼的。
微信分享/微信扫码阅读