并发编程之多进程

之前介绍过Python多线程,对于计算密集型任务,Python多线程不再起作用,而应该使用多进程。 Python多进程我的理解是在一个父进程下fork出多个进程。父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。

1、创建子进程:

from multiprocessing import  Process,Pool
import os
import time

def run(name):
    print 'current process:%s in %s start' % (name,os.getpid())
    time.sleep(3)
    print 'process %s in %s ends' % (name,os.getpid())



if __name__ == '__main__':
    print 'Parent process:%s start' % os.getpid()
    p1 = Process(target=run,args=('haibo',))
    p2 = Process(target=run,args=('lina',))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print 'Parent process:%s end' % os.getpid()
输出:
Parent process:8284 start
current process:haibo in 8804 start
current process:lina in 7356 start
process haibo in 8804 ends
process lina in 7356 ends
Parent process:8284 end

父进程 8284创建了两个子进程,8804和7456。

2、进程池

如果要创建多个进程应该使用进程池。

import multiprocessing as m
from multiprocessing import  Process,Pool
import os
import time

def run(name):
    print 'current process:%s in %s start' % (name,os.getpid())
    time.sleep(3)
    print 'process %s in %s ends' % (name,os.getpid())


def main():
    print 'Parent process:%s start' % os.getpid()
    pool = Pool()
    for i in range(m.cpu_count()+1):
        pool.apply_async(run,args=(i,))
    pool.close()
    pool.join()
    print 'Parent process:%s end' % os.getpid()


if __name__ == '__main__':
    main()
输出:
Parent process:7804 start
current process:0 in 4936 start
current process:1 in 8496 start
current process:2 in 8112 start
current process:3 in 6684 start
process 0 in 4936 ends
current process:4 in 4936 start
process 1 in 8496 ends
process 2 in 8112 ends
process 3 in 6684 ends
process 4 in 4936 ends
Parent process:7804 end

从输出结果可以看出,第5个进程没有立即执行,这是因为该电脑只有4核,第五个得至少等到一个进程结束之后才能执行。可以看到,多进程可以充分发挥多行CPU的优势。

进程池的计算上面用了apply_async,除此之外,还有apply,map,map_async。如果使用appy,那么会阻塞当前进程,待执行之后,再执行其他进程。

 for i in range(m.cpu_count()+1):
        pool.apply(run,args=(i,))
 输出:
Parent process:6468 start
current process:0 in 7864 start
process 0 in 7864 ends
current process:1 in 5912 start
process 1 in 5912 ends
current process:2 in 8868 start
process 2 in 8868 ends
current process:3 in 8308 start
process 3 in 8308 ends
current process:4 in 7864 start
process 4 in 7864 ends
Parent process:6468 end

从结果看到,1执行完之后才能执行进程2,以此类推。

map的用法:

for i in range(m.cpu_count()+1):
        pool.apply(run,args=(i,))
替换成:
  pool.map(run,range(m.cpu_count()+1)

3、多进程之间相互通信

首先理解进程间的通信。进程间通信主要有Pipe和传统IPC(消息队列、共享内存、信号量等。) inux下进程间通信的几种主要手段简介:

  1. 管道(Pipe)及有名管道(named pipe):管道可用于具有亲缘关系进程间的通信,可以使用管道将一个进程的输出和另一个进程的输入连接起来,从而利用文件操作API来管理进程间通信。在shell中,我们经常利用管道将多个进程连接在一起,从而让各个进程协作,实现复杂的功能。
  2. 信号(Signal):信号是比较复杂的通信方式,用于通知接受进程有某种事件发生,除了用于进程间通信外,进程还可以发送信号给进程本身;linux除了支持Unix早期信号语义函数sigal外,还支持语义符合Posix.1标准的信号函数sigaction(实际上,该函数是基于BSD的,BSD为了实现可靠信号机制,又能够统一对外接口,用sigaction函数重新实现了signal函数);
  3. 报文(Message)队列(消息队列):消息队列是消息的链接表,包括Posix消息队列system V消息队列。有足够权限的进程可以向队列中添加消息,被赋予读权限的进程则可以读走队列中的消息。消息队列克服了信号承载信息量少,管道只能承载无格式字节流以及缓冲区大小受限等缺点。
  4. 共享内存:使得多个进程可以访问同一块内存空间,是最快的可用IPC形式。是针对其他通信机制运行效率较低而设计的。往往与其它通信机制,如信号量结合使用,来达到进程间的同步及互斥。
  5. 信号量(semaphore):主要作为进程间以及同一进程不同线程之间的同步手段。
  6. 套接口(Socket):更为一般的进程间通信机制,可用于不同机器之间的进程间通信。起初是由Unix系统的BSD分支开发出来的,但现在一般可以移植到其它类Unix系统上:Linux和System V的变种都支持套接字。

Python进程间的通信使用Queue和PIPE实现进程间通信,Queue即FIFO。

1)Queue(即FIFO)

from multiprocessing import  Process,Pool,Pipe,Queue,Manager
import os
import time


def input_data(q):
    for i in range(5):
        print 'process %s Put data %d to Queue' %(os.getpid(),i)
        q.put(i)
        time.sleep(1)


def get_data(q):
    while True:
        try:
            value = q.get(True,timeout=3)
        finally:
            pass
        print 'process %s Get data %s from Queue' % (os.getpid(),value)


def main():
    q = Queue()
    p1 = Process(target=input_data,args=(q,))
    p2 = Process(target=get_data,args=(q,))

    p1.start()
    p2.start()
    p1.join()
    p2.join()

    print 'main function ends'

if __name__ == '__main__':
    main()
输出:
process 10128 Put data 0 to Queue
process 10128 Put data 1 to Queue
process 10124 Get data 0 from Queue
process 10124 Get data 1 from Queue
process 10128 Put data 2 to Queue
process 10128 Put data 3 to Queue
process 10124 Get data 2 from Queue
process 10124 Get data 3 from Queue
process 10128 Put data 4 to Queue
process 10124 Get data 4 from Queue

value = q.get(True),设为True(默认也是True),会一直阻塞,直到取得一个元素,如果设置timeout,那么待等到一段时间之后,会抛出Empty异常。

2)Pipe:

def input_data(pipe):
    while True:
        for i in xrange(1000):
            print 'process %s send data %d' %(os.getpid(),i)
            pipe.send(i)
            time.sleep(1)



def get_data(pipe):
    while True:
        print 'process %s receive data %s' % (os.getpid(),pipe.recv())
        time.sleep(1)
输出:
process 5248 send data 0
process 9356 receive data 0
process 5248 send data 1
process 9356 receive data 1
process 5248 send data 2
process 9356 receive data 2
process 5248 send data 3
process 9356 receive data 3

Pipe即一个进程的输出当做另外一个进程的输入。

数据共享

当多进程并发执行的时候,最好是尽量避免有数据共享发生,但如果真的需要进行数据共享的时候,multiprocessing模块提供了很多的方法。

1、共享内存

数据可以通过Value或者Array存在共享内存映射中。

    from multiprocessing import Process, Value, Array

      def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])
       3.1415927
       [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
    这些数据是进程安全的。

2、服务器进程Server process

multiprocessing.Manager类会创建以一个manager类,该类控制着一个支持多种Python 对象的服务进程,而且该类允许其他进程使用代理来操作这些Python对象。

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

输出:

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

服务进程要比共享内存更加灵活,它支持多种不同类型的Python对象。此外,单个manager类可以为同一网络中多个电脑之间实现共享。不过服务进程要比共享内存要慢。

 

多进程的优缺点

多进程本身是占用资源的,且进程间的切换也是相当消耗的,因此对于IO密集型的任务,不应该选择多进程。此外,多进程之间的变量是不能共享的,可以通过几种进程间通信方式实现。

上面说了多进程的缺点,但多进程在计算密集型任务下是相当牛逼的。

--------EOF---------
本文微信分享/扫码阅读