Flask权限认证(Flask-Principal)

Flask-Principal主要提供两种功能:

1、权限认证 2、用户信息管理

该模块主要由Identity, Needs, Permission, and the IdentityContext.组成。

  • Identity代表用户,一般存储在session中;
  • Needs代表了一种角色,如需要管理员权限,需要验证等等;
  • Permission是接入控制的集合,它控制了用户是否有权限访问某个资源
  • IdentityContext是一个含有permission的identity上下文,一般当做装饰器用。

1、权限认证

以我的Flask项目为例,首先定义permission。

admin = Permission(RoleNeed('admin'))
moderator = Permission(RoleNeed('moderator'))
auth = Permission(RoleNeed('authenticated'))

使用方法如下:


@cost.route('/addcost', methods=("GET","POST"))
@admin.require(403)
def add_cost():
    cur_date = date.today().strftime('%Y-%m-%d')
    pass

当不是登录用户时,就会产生403异常,调用abort()函数。 从代码看出,admin.require是一个装饰器。


class  Permission(object):
    def require(self, http_exception=None):
    """Create a principal for this permission.

The principal may be used as a context manager, or a decroator.

If ``http_exception`` is passed then ``abort()`` will be called
with the HTTP exception code. Otherwise a ``PermissionDenied``
exception will be raised if the identity does not meet the
requirements.

:param http_exception: the HTTP exception code (403, 401 etc)
"""
return IdentityContext(self, http_exception)

IdentityContext是一个类装饰器,昨天刚学完装饰器(哈哈,发现Flask中把装饰器运用的炉火纯青),那它肯定含有一个__call__方法,该方法实现了封装函数的功能。代码如下:


class IdentityContext(object):
    """The context of an identity for a permission.
    The principal behaves as either a context manager or a decorator. The
    permission is checked for provision in the identity, and if available the
    flow is continued (context manager) or the function is executed (decorator)
    """

def __init__(self, permission, http_exception=None):
    self.permission = permission
    self.http_exception = http_exception

@property
def identity(self):
    """The identity of this principal
    """
    return g.identity

def can(self):
    """Whether the identity has access to the permission
    """
    return self.identity.can(self.permission)

def __call__(self, f):
    @wraps(f)
    def _decorated(*args, **kw):
        with self:
            rv = f(*args, **kw)
        return rv
    return _decorated

def __enter__(self):
    # check the permission here
    if not self.can():
        if self.http_exception:
            abort(self.http_exception, self.permission)
        raise PermissionDenied(self.permission)

def __exit__(self, *args):
    return False

上面除了实现封装之外,最重要的是__enter 方法。先说下这个方法,它和__exit 方法是实现with语句必不可少的。如在__call 调用中,with self:语句开始就自动执行__enter 方法,with语句结束,就执行__exit 方法。 这里的__enter 方法实现了鉴权功能self.can最终调用的是permission类的allows方法,代码如下:


def allows(self, identity):
    """Whether the identity can access this permission.

:param identity: The identity
"""
if self.needs and not self.needs.intersection(identity.provides):
    return False

if self.excludes and self.excludes.intersection(identity.provides):
    return False
return True

这个方法决定了是否有权限访问,它采用的方法是看看permission定义的needs与当前用户的needs 的set集合是否有交集。

我们在数据模型中定义了provides更新的方法:


class UserQuery(BaseQuery):

    def gen_identity(self, identity):
        """
        Loads user from flask.ext.principal.Identity instance and
        assigns permissions from user.

        A "user" instance is monkeypatched to the identity instance.

        If no user found then None is returned.
        """

        try:
            user = self.get(int(identity.id))
        except (ValueError, TypeError):
            user = None

        if user:
            identity.provides.update(user.provides)

        identity.user = user

        return user

@cached_property
def provides(self):
    needs = [RoleNeed('authenticated'),UserNeed(self.id)]

    if self.is_admin:
        needs.append(RoleNeed('admin'))

    if self.is_moderator:
        needs.append(RoleNeed('moderator'))

    return needs

上面会更新identity.provides属性。 那么,目前为止,问题来了,这个id是如何保存到identity类中的呢。 实现方法是在登录时会发送一个带有identity上下文的信号,如下:


@account.route('/login/',methods=['GET','POST'])
def login():
    form = LoginForm(login=request.args.get('login',None))
    if form.validate_on_submit():
        if form.is_old_user:
            session.permanent = form.remember_me
            identity_changed.send(current_app._get_current_object(),
                                  identity=Identity(form.user.id))
            flash(u'已经成功过登录')
            return redirect(url_for('frontend.index'))

上面在验证成功后会发送一个id改变的信号。此时订阅者会调用Pincipal()._on_identity_changed方法,具体在下面的用户信息管理中会讲到。

2、用户信息管理

Principal初始化:Principal(app),


def init_app(self, app):
    if hasattr(app, 'static_url_path'):
        self._static_path = app.static_url_path
    else:
        self._static_path = app.static_path

app.before_request(self._on_before_request)
identity_changed.connect(self._on_identity_changed, app)

if self.use_sessions:
    self.identity_loader(session_identity_loader)
    self.identity_saver(session_identity_saver)

从初始化代码可以看出,主要做了以下几个过程:

1、app.before_request(self._on_before_request)

实际上就是self._on_before_request被app.before_request装饰器装饰,在每次请求之前都会调用self._on_before_request。


def _on_before_request(self):
    if self._is_static_route():
        return

g.identity = AnonymousIdentity()
for loader in self.identity_loaders:
    identity = loader()
    if identity is not None:
        self.set_identity(identity)
        return

从以上代码看出,首先将全局变量g的identity设为匿名ID,这一般是对未登录用户设置的。然后从队列中取出函数session_identity_loader,如果返回了一个ID,就将session中的当前用户ID赋给全局变量,并发送一个带有当前ID上下文的信号。

def session_identity_loader():
    if 'identity.id' in session and 'identity.auth_type' in session:
        identity = Identity(session['identity.id'],
                            session['identity.auth_type'])
        return identity
def set_identity(self, identity):
    """Set the current identity.

:param identity: The identity to set
"""

self._set_thread_identity(identity)
for saver in self.identity_savers:
    saver(identity)

def _set_thread_identity(self, identity): g.identity = identity identity_loaded.send(current_app._get_current_object(), identity=identity)

在以上代码中,设置ID,主要是设置全局变量g的属性,然后发送一个信号,上下文是当前identity。然后将identity信息保存到session中saver(identity),如下:


def session_identity_saver(identity):
    session['identity.id'] = identity.id
    session['identity.auth_type'] = identity.auth_type
    session.modified = True

2、在上一个装饰器之后,会订阅一个信号identity_changed.connect(self._on_identity_changed, app),并调用self._on_identity_changed:


def _on_identity_changed(self, app, identity):
    if self._is_static_route():
        return

self.set_identity(identity)

即如果收到id发生变化的信号,就会重新设置全局ID以及session中的identity信息。

3、将session_identity_loader和session_identity_saver分别放到两个队列中。

self.identity_loader(session_identity_loader) self.identity_saver(session_identity_saver)

在我的Flask项目中,定义了如下形式:


def init_identity(app):
    Principal(app)
    @identity_loaded.connect_via(app)
    def on_identity_loaded(sender, identity):
        g.user = User.query.gen_identity(identity)

除了初始化Principal(app),还订阅了一个信号,当identity_loaded发送一个信号(在上面中介绍过,即设置ID时,会发送一个信号)时,调用该函数,即设置全局变量中g的user属性,此外还要更新当前id的provides,具体函数再上面的权限认证已提到。

--------EOF---------
微信分享/微信扫码阅读