并发编程之实例
之前学了多进程,多线程和协程,今天用实际例子对其性能进行了对比。
我是自己有一个需求:对不同类别的日志进行统计,包括每日的IP,每日的UV数,每日的PV数。采用了多进程+多线程的方式。
多进程:分别处理UV,PV,IP的log。
多线程:分别处理每一个log文件。
一、多进程:
采用进程池的方法
def main():
start = time.time()
pool = multiprocessing.Pool(4)
processes = []
for logtype in ['ip','pv','uv']:
p = multiprocessing.Process(target=log_fork,args=(logtype,))
processes.append(p)
for p in processes:
p.start()
for p in processes:
p.join()
"""
pool.apply_async(log_fork,args=(logtype,))
pool.close()
pool.join()
"""
print 'the parent process:{0} ends,it costs {1}'.format(os.getpid(),time.time()-start)
定义4个进程数。
二、多线程
每一个类别的log文件也有很多,算是IO密集型的任务,因此使用多线程处理。
def log_fork(logtype):
print 'the process:{0} deals with {1}'.format(os.getpid(),logtype)
types = ['ip','pv','uv']
if logtype not in types:
return
logs = search(logtype)
pool = ThreadPool(10)
pool.map(getattr(LogRead(),'deal_%s'%logtype),logs)
pool.close()
pool.join()
使用了线程池,Pool是multiprocessing.dumy中的Pool。map方法比较简单,之前在多线程的篇章也介绍过。
三、协程
今天上午测试半天,发现我用协程做实验没有成功,一直百思不得其解。
晚上回家,查了下资料。找到了问题:是gevent的机制问题。
当我调用time.sleep()时,并没有阻塞gevent的协程。只有gevent自己的sleep才能通知libev当前协程已被阻塞。gevent.sleep
还有另外一种方法,就是gevent的monkey patch,它可以把各个库里面相关的部分全部改成用libev的才能让gevent起作用。
spawns = [ gevent.spawn(getattr(LogRead(),'deal_%s'%logtype),log) for log in logs ]
gevent.joinall(spawns)
其实也可以使用协程池,是gevent的Pool,和multiprocessing的Pool用法差不多。
下面是整个的源代码:
#encoding:utf-8
__author__ = 'haibo'
import re
import os
import gevent
import multiprocessing
from multiprocessing.dummy import Pool as ThreadPool
import threading
from gevent.pool import Pool as GeventPool
import time
import requests
from datetime import date
def search(logtype):
directory = '/mnt/logs/'
p = re.compile(r'^(.*top5).*_%s\.log$'%logtype)
data = []
for rootdir,dirs,files in os.walk(directory):
for f in files:
if p.search(f):
filename = directory + f
data.append(filename)
return data[0:5]
class LogRead(object):
def __init__(self):
self.domain = 'http://hbnnforever.cn'
self.auth = ()
self.day = date.today().strftime('%Y-%m-%d')
def deal_ip(self,filename):
print filename
with file(filename,'r') as f:
time.sleep(0.5)
def deal_pv(self,filename):
print filename
time.sleep(2)
with file(filename, 'r') as f:
for line in f.readlines():
line = line.strip()
#print line
"""
visit_day = filename.split(os.sep)[3].split('_')[0]
print visit_day
print line
requests.post(self.domain + '/api/visitpv/',
data={
'pv': line,
'visit_day':visit_day,
},
auth=self.auth
)
"""
def deal_uv(self,filename):
with file(filename, 'r') as f:
for line in f.readlines():
uv = line.strip()
visit_day = filename.split(os.sep)[3].split('_')[0]
print 'uv:',uv
requests.post(self.domain + '/api/visituv/',
data={
'uv': uv,
'visit_day':visit_day
},
auth = self.auth
)
def log_fork(logtype):
print 'the process:{0} deals with {1}'.format(os.getpid(),logtype)
types = ['ip','pv','uv']
if logtype not in types:
return
logs = search(logtype)
time.sleep(2)
if not logs:
return
ths = []
pool = ThreadPool(10)
pool.map(getattr(LogRead(),'deal_%s'%logtype),logs)
pool.close()
pool.join()
"""
pool = GeventPool(10)
pool.map(getattr(LogRead(),'deal_%s'%logtype),logs)
spawns = [ gevent.spawn(getattr(LogRead(),'deal_%s'%logtype),log) for log in logs ]
gevent.joinall(spawns)
for log in logs:
getattr(LogRead(),'deal_%s'%logtype)(log)
"""
print 'the process:{0} ends'.format(os.getpid())
def main():
start = time.time()
pool = multiprocessing.Pool(4)
processes = []
for logtype in ['ip','pv','uv']:
pool.apply_async(log_fork,args=(logtype,))
pool.close()
pool.join()
print 'the parent process:{0} ends,it costs {1}'.format(os.getpid(),time.time()-start)
if __name__ == "__main__":
main()
--------EOF---------
微信分享/微信扫码阅读
微信分享/微信扫码阅读