多进程服务器

我在之前的文章中介绍了多进程的相关概念,包括进程的概念,多进程的使用等等,详情可看:

  1. Python多进程
  2. Linux进程
  3. gunicorn介绍

本文主要是研究一下如何实现多进程的模式,主要是pre-fork。

之前文章讲解了如何使用I/O多路复用实现同时处理多个客户端的服务器。现在就试着写如何使用多进程实现。

pre-fork的思想就是先创建一定数量的进程,然后进程同时监听某个socket,等有请求时,就按照某种机制分别处理响应的请求。

下面写的是一种比较简单的例子,基本思想就是先创建多个进程,然后同时accept。上代码:

import socket
import traceback
from basesocket import BaseSocket
import select
import os



class PreForkServer(BaseSocket):

    def __init__(self,host,port,trans_type='TCP'):
        super(PreForkServer,self).__init__(host,port,trans_type)
        self.worker_num = 4


    def run(self,connect_num=1):
        self.socket.bind((self.host,self.port))
        print 'start server'
        self.socket.listen(connect_num)
        print 'Server has started successfully!Listening at {0}'.format(self.port)
        self.create_workers()
        while 1:
            clientsock,addr = self.socket.accept()
            print 'build connection from worker {0} to client {1}'.format(os.getpid(),addr)
            clientsock.close()


    def create_workers(self):
        for index in range(self.worker_num):
            pid = os.fork()
            if pid != 0:
                print 'fork child process:{0}'.format(pid)
            else:
                continue



if __name__ == "__main__":
    pre_fork = PreForkServer('10.13.27.47','5000')
    pre_fork.run()

一般情况下,一个进程是只能绑定在一个端口上的,但linux本身从操作系统层面是支持了多个进程监听同一个socket的。其基本思想是把当前进程插入这个fd的等待队列然后阻塞 ,当新连接进来的时候,操作系统会唤醒这个fd的等待队列的第一个进程,只唤醒一个进程,其他的进程还是在就绪的等待状态。

上面代码的BaseSocket是我写的关于创建socket的基类,代码在IO多路复用的文章有介绍。

实际应用中,多进程服务器要比这复杂得多,现在比较流行的master-worker模式,像nginx,gunicorn等。      在gunicorn中,就主要使用了pre-fork模式,即先建立一定数目的子进程,master只负责管理,其他worker负责监听。master要动态去创建,删除,杀死子进程等。

master主要是通过信号管理它们,那么信号是至关重要的,信号是进程间相互通信的一种机制,可能学过的都知道还有队列,管道等等。Python中有一个signal模块专门用来提供相应功能。

信号本身其实很简单,都是整数,进程会根据收到的对应的数来进行响应行为,比如我们常用的kill -9 pid,就是其中一种。比较常用的信号:

编号 名称 作用
1 SIGHUP 终端挂起或者终止进程。默认动作为终止进程
2 SIGINT 键盘中断 <ctrl+c> 经常会用到。默认动作为终止进程
3 SIGQUIT 键盘退出键被按下。一般用来响应 <ctrl+d> 。 默认动作终止进程
9 SIGKILL 强制退出。 shell中经常使用
14 SIGALRM 定时器超时,默认为终止进程
15 SIGTERM 程序结束信号,程序一般会清理完状态在退出,我们一般说的优雅的退出

这里也说一下Linux中有64种信号,不仅仅是上面这些,信号的作用不仅仅是进程间通信,也会用于其他场景,比如异常处理,当出现非法访问、空指针异常等都会发送相应的信号,空指针、内存溢出等问题,系统会向进程发送 SIGSEGV 或 SIGABRT 。系统资源不足,如内存不足、最大文件描述符不足也会发送信号。Linux的信号应用非常广泛,但我们都知道信号是异步的,信号的丢失、幂等性等问题也会是考验,当然Linux自己本身也有一定机制来保证,包括队列机制、屏蔽机制、可重入机制等等。

和信号相关的信号处理函数,Linux支持当发送信号时,进程可以自定义信号处理函数,除了SIGKILL(kill -9)和 SIGSTOP

更多信号可参考signal模块代码。

下面代码是写的关于signal模块的。

import os
import sys
import signal


class SigHandler(object):

    SIGNALS = [
       "SIGKILL",
       "SIGINT",
       "SIGQUIT",
       "SIGTTIN",
       "SIGTTOU",
       "SIGHUP",
       "SIGUSR1",
       "SIGUSR2",
   ]

    SIG_QUEUE = []

    def init_signals(self):
       for sig in self.SIGNALS:
           sig_value = getattr(signal,sig)
           signal.signal(sig_value,self.put)


    def put(self,sig):
        if len(self.SIG_QUEUE) < 9:
            self.SIG_QUEUE.append(sig)

    def handle_int(self,sig,frame):
        print 'test int signal'
        sys.exit(0)

  
    def handle_usr1(self,sig,frame):
        print 'get signal'

    def run(self):
        import time
        print 'pid:{0}'.format(os.getpid())
        #实现了信号和事件的绑定。
        signal.signal(10,self.handle_usr1)
        signal.signal(2,self.handle_int)
        #signal.signal(9,self.handle_kill())
        while 1:
            time.sleep(1)


if __name__ == "__main__":
    si = SigHandler()
    si.run()

其实还有一个比较重要的信号:SIGCHLD,它是子进程结束后,向父进程发出的信号,该信号的默认处理动作是忽略,父进程可以自定义SIGCHLD信号的处理函数,这样父进程只需专心处理自己的工作,不必关心子进程了,子进程终止时会通知父进程,父进程在信号处理函数中调用wait清理子进程即可。gunicorn也实现了自定义的子进程结束后的方法:

signal.signal(signal.SIGCHLD, self.handle_chld)  

  def handle_chld(self, sig, frame):
        "SIGCHLD handling"
        self.reap_workers()
        self.wakeup()

其实,子进程退出后都不会立即消失,还还会转为僵尸进程,然后等待父进程回收。僵尸进程是不能通过kill命令杀掉的,但可通过wait或者witpid调用结束它。gunicorn中也有相应的处理方法:


    def reap_workers(self):
        """\
        Reap workers to avoid zombie processes
        """
        try:
            while True:
                wpid, status = os.waitpid(-1, os.WNOHANG)
                if not wpid:
                    break
                if self.reexec_pid == wpid:
                    self.reexec_pid = 0
                else:
                    # A worker was terminated. If the termination reason was
                    # that it could not boot, we'll shut it down to avoid
                    # infinite start/stop cycles.
                    exitcode = status >> 8
                    if exitcode == self.WORKER_BOOT_ERROR:
                        reason = "Worker failed to boot."
                        raise HaltServer(reason, self.WORKER_BOOT_ERROR)
                    if exitcode == self.APP_LOAD_ERROR:
                        reason = "App failed to load."
                        raise HaltServer(reason, self.APP_LOAD_ERROR)

                    worker = self.WORKERS.pop(wpid, None)
                    if not worker:
                        continue
                    worker.tmp.close()
                    self.cfg.child_exit(self, worker)
        except OSError as e:
            if e.errno != errno.ECHILD:
                raise

上面代码中wpid, status = os.waitpid(-1, os.WNOHANG)很关键。

通过while循环,确保不丢失信号,-1表示任何的子进程,os.WHOHANG表示立即返回。

os.waitpid方法:

等待进程id为pid的进程结束,返回一个tuple,包括进程的进程ID和退出信息(和os.wait()一样),参数options会影响该函数的行为。在默认情况下,options的值为0。

如果pid是一个正数,waitpid()请求获取一个pid指定的进程的退出信息,如果pid为0,则等待并获取当前进程组中的任何子进程的值。如果pid为-1,则等待当前进程的任何子进程,如果pid小于-1,则获取进程组id为pid的绝对值的任何一个进程。当系统调用返回-1时,抛出一个OSError异常。

下面主要研究一下gunicorn是如何用master管理worker的,是如何调度不同进程的。

http://rur.logdown.com/

参考资料:

深入理解Linux信号

--------EOF---------
微信分享/微信扫码阅读