多进程服务器
我在之前的文章中介绍了多进程的相关概念,包括进程的概念,多进程的使用等等,详情可看:
本文主要是研究一下如何实现多进程的模式,主要是pre-fork。
之前文章讲解了如何使用I/O多路复用实现同时处理多个客户端的服务器。现在就试着写如何使用多进程实现。
pre-fork的思想就是先创建一定数量的进程,然后进程同时监听某个socket,等有请求时,就按照某种机制分别处理响应的请求。
下面写的是一种比较简单的例子,基本思想就是先创建多个进程,然后同时accept。上代码:
import socket
import traceback
from basesocket import BaseSocket
import select
import os
class PreForkServer(BaseSocket):
def __init__(self,host,port,trans_type='TCP'):
super(PreForkServer,self).__init__(host,port,trans_type)
self.worker_num = 4
def run(self,connect_num=1):
self.socket.bind((self.host,self.port))
print 'start server'
self.socket.listen(connect_num)
print 'Server has started successfully!Listening at {0}'.format(self.port)
self.create_workers()
while 1:
clientsock,addr = self.socket.accept()
print 'build connection from worker {0} to client {1}'.format(os.getpid(),addr)
clientsock.close()
def create_workers(self):
for index in range(self.worker_num):
pid = os.fork()
if pid != 0:
print 'fork child process:{0}'.format(pid)
else:
continue
if __name__ == "__main__":
pre_fork = PreForkServer('10.13.27.47','5000')
pre_fork.run()
一般情况下,一个进程是只能绑定在一个端口上的,但linux本身从操作系统层面是支持了多个进程监听同一个socket的。其基本思想是把当前进程插入这个fd的等待队列然后阻塞 ,当新连接进来的时候,操作系统会唤醒这个fd的等待队列的第一个进程,只唤醒一个进程,其他的进程还是在就绪的等待状态。
上面代码的BaseSocket是我写的关于创建socket的基类,代码在IO多路复用的文章有介绍。
实际应用中,多进程服务器要比这复杂得多,现在比较流行的master-worker模式,像nginx,gunicorn等。 在gunicorn中,就主要使用了pre-fork模式,即先建立一定数目的子进程,master只负责管理,其他worker负责监听。master要动态去创建,删除,杀死子进程等。
master主要是通过信号管理它们,那么信号是至关重要的,信号是进程间相互通信的一种机制,可能学过的都知道还有队列,管道等等。Python中有一个signal模块专门用来提供相应功能。
信号本身其实很简单,都是整数,进程会根据收到的对应的数来进行响应行为,比如我们常用的kill -9 pid,就是其中一种。比较常用的信号:
编号 | 名称 | 作用 |
---|---|---|
1 | SIGHUP | 终端挂起或者终止进程。默认动作为终止进程 |
2 | SIGINT |
键盘中断
<ctrl+c>
经常会用到。默认动作为终止进程
|
3 | SIGQUIT |
键盘退出键被按下。一般用来响应
<ctrl+d>
。 默认动作终止进程
|
9 | SIGKILL | 强制退出。 shell中经常使用 |
14 | SIGALRM | 定时器超时,默认为终止进程 |
15 | SIGTERM | 程序结束信号,程序一般会清理完状态在退出,我们一般说的优雅的退出 |
这里也说一下Linux中有64种信号,不仅仅是上面这些,信号的作用不仅仅是进程间通信,也会用于其他场景,比如异常处理,当出现非法访问、空指针异常等都会发送相应的信号,空指针、内存溢出等问题,系统会向进程发送 SIGSEGV 或 SIGABRT 。系统资源不足,如内存不足、最大文件描述符不足也会发送信号。Linux的信号应用非常广泛,但我们都知道信号是异步的,信号的丢失、幂等性等问题也会是考验,当然Linux自己本身也有一定机制来保证,包括队列机制、屏蔽机制、可重入机制等等。
和信号相关的信号处理函数,Linux支持当发送信号时,进程可以自定义信号处理函数,除了SIGKILL(kill -9)和
SIGSTOP
。
更多信号可参考signal模块代码。
下面代码是写的关于signal模块的。
import os
import sys
import signal
class SigHandler(object):
SIGNALS = [
"SIGKILL",
"SIGINT",
"SIGQUIT",
"SIGTTIN",
"SIGTTOU",
"SIGHUP",
"SIGUSR1",
"SIGUSR2",
]
SIG_QUEUE = []
def init_signals(self):
for sig in self.SIGNALS:
sig_value = getattr(signal,sig)
signal.signal(sig_value,self.put)
def put(self,sig):
if len(self.SIG_QUEUE) < 9:
self.SIG_QUEUE.append(sig)
def handle_int(self,sig,frame):
print 'test int signal'
sys.exit(0)
def handle_usr1(self,sig,frame):
print 'get signal'
def run(self):
import time
print 'pid:{0}'.format(os.getpid())
#实现了信号和事件的绑定。
signal.signal(10,self.handle_usr1)
signal.signal(2,self.handle_int)
#signal.signal(9,self.handle_kill())
while 1:
time.sleep(1)
if __name__ == "__main__":
si = SigHandler()
si.run()
其实还有一个比较重要的信号:SIGCHLD,它是子进程结束后,向父进程发出的信号,该信号的默认处理动作是忽略,父进程可以自定义SIGCHLD信号的处理函数,这样父进程只需专心处理自己的工作,不必关心子进程了,子进程终止时会通知父进程,父进程在信号处理函数中调用wait清理子进程即可。gunicorn也实现了自定义的子进程结束后的方法:
signal.signal(signal.SIGCHLD, self.handle_chld)
def handle_chld(self, sig, frame):
"SIGCHLD handling"
self.reap_workers()
self.wakeup()
其实,子进程退出后都不会立即消失,还还会转为僵尸进程,然后等待父进程回收。僵尸进程是不能通过kill命令杀掉的,但可通过wait或者witpid调用结束它。gunicorn中也有相应的处理方法:
def reap_workers(self):
"""\
Reap workers to avoid zombie processes
"""
try:
while True:
wpid, status = os.waitpid(-1, os.WNOHANG)
if not wpid:
break
if self.reexec_pid == wpid:
self.reexec_pid = 0
else:
# A worker was terminated. If the termination reason was
# that it could not boot, we'll shut it down to avoid
# infinite start/stop cycles.
exitcode = status >> 8
if exitcode == self.WORKER_BOOT_ERROR:
reason = "Worker failed to boot."
raise HaltServer(reason, self.WORKER_BOOT_ERROR)
if exitcode == self.APP_LOAD_ERROR:
reason = "App failed to load."
raise HaltServer(reason, self.APP_LOAD_ERROR)
worker = self.WORKERS.pop(wpid, None)
if not worker:
continue
worker.tmp.close()
self.cfg.child_exit(self, worker)
except OSError as e:
if e.errno != errno.ECHILD:
raise
上面代码中wpid, status = os.waitpid(-1, os.WNOHANG)很关键。
通过while循环,确保不丢失信号,-1表示任何的子进程,os.WHOHANG表示立即返回。
os.waitpid方法:
等待进程id为pid的进程结束,返回一个tuple,包括进程的进程ID和退出信息(和os.wait()一样),参数options会影响该函数的行为。在默认情况下,options的值为0。
如果pid是一个正数,waitpid()请求获取一个pid指定的进程的退出信息,如果pid为0,则等待并获取当前进程组中的任何子进程的值。如果pid为-1,则等待当前进程的任何子进程,如果pid小于-1,则获取进程组id为pid的绝对值的任何一个进程。当系统调用返回-1时,抛出一个OSError异常。
下面主要研究一下gunicorn是如何用master管理worker的,是如何调度不同进程的。
http://rur.logdown.com/
参考资料:
微信分享/微信扫码阅读