Flask权限认证(Flask-Principal)
Flask-Principal主要提供两种功能:
1、权限认证 2、用户信息管理
该模块主要由Identity, Needs, Permission, and the IdentityContext.组成。
- Identity代表用户,一般存储在session中;
- Needs代表了一种角色,如需要管理员权限,需要验证等等;
- Permission是接入控制的集合,它控制了用户是否有权限访问某个资源
- IdentityContext是一个含有permission的identity上下文,一般当做装饰器用。
1、权限认证
以我的Flask项目为例,首先定义permission。
admin = Permission(RoleNeed('admin'))
moderator = Permission(RoleNeed('moderator'))
auth = Permission(RoleNeed('authenticated'))
使用方法如下:
@cost.route('/addcost', methods=("GET","POST"))
@admin.require(403)
def add_cost():
cur_date = date.today().strftime('%Y-%m-%d')
pass
当不是登录用户时,就会产生403异常,调用abort()函数。 从代码看出,admin.require是一个装饰器。
class Permission(object): def require(self, http_exception=None): """Create a principal for this permission.
The principal may be used as a context manager, or a decroator. If ``http_exception`` is passed then ``abort()`` will be called with the HTTP exception code. Otherwise a ``PermissionDenied`` exception will be raised if the identity does not meet the requirements. :param http_exception: the HTTP exception code (403, 401 etc) """ return IdentityContext(self, http_exception)
IdentityContext是一个类装饰器,昨天刚学完装饰器(哈哈,发现Flask中把装饰器运用的炉火纯青),那它肯定含有一个__call__方法,该方法实现了封装函数的功能。代码如下:
class IdentityContext(object): """The context of an identity for a permission. The principal behaves as either a context manager or a decorator. The permission is checked for provision in the identity, and if available the flow is continued (context manager) or the function is executed (decorator) """
def __init__(self, permission, http_exception=None): self.permission = permission self.http_exception = http_exception @property def identity(self): """The identity of this principal """ return g.identity def can(self): """Whether the identity has access to the permission """ return self.identity.can(self.permission) def __call__(self, f): @wraps(f) def _decorated(*args, **kw): with self: rv = f(*args, **kw) return rv return _decorated def __enter__(self): # check the permission here if not self.can(): if self.http_exception: abort(self.http_exception, self.permission) raise PermissionDenied(self.permission) def __exit__(self, *args): return False
上面除了实现封装之外,最重要的是__enter 方法。先说下这个方法,它和__exit 方法是实现with语句必不可少的。如在__call 调用中,with self:语句开始就自动执行__enter 方法,with语句结束,就执行__exit 方法。 这里的__enter 方法实现了鉴权功能self.can最终调用的是permission类的allows方法,代码如下:
这个方法决定了是否有权限访问,它采用的方法是看看permission定义的needs与当前用户的needs 的set集合是否有交集。def allows(self, identity): """Whether the identity can access this permission.
:param identity: The identity """ if self.needs and not self.needs.intersection(identity.provides): return False if self.excludes and self.excludes.intersection(identity.provides): return False return True
我们在数据模型中定义了provides更新的方法:
class UserQuery(BaseQuery):
def gen_identity(self, identity):
"""
Loads user from flask.ext.principal.Identity instance and
assigns permissions from user.
A "user" instance is monkeypatched to the identity instance.
If no user found then None is returned.
"""
try:
user = self.get(int(identity.id))
except (ValueError, TypeError):
user = None
if user:
identity.provides.update(user.provides)
identity.user = user
return user
@cached_property
def provides(self):
needs = [RoleNeed('authenticated'),UserNeed(self.id)]
if self.is_admin:
needs.append(RoleNeed('admin'))
if self.is_moderator:
needs.append(RoleNeed('moderator'))
return needs
上面会更新identity.provides属性。 那么,目前为止,问题来了,这个id是如何保存到identity类中的呢。 实现方法是在登录时会发送一个带有identity上下文的信号,如下:
@account.route('/login/',methods=['GET','POST'])
def login():
form = LoginForm(login=request.args.get('login',None))
if form.validate_on_submit():
if form.is_old_user:
session.permanent = form.remember_me
identity_changed.send(current_app._get_current_object(),
identity=Identity(form.user.id))
flash(u'已经成功过登录')
return redirect(url_for('frontend.index'))
上面在验证成功后会发送一个id改变的信号。此时订阅者会调用Pincipal()._on_identity_changed方法,具体在下面的用户信息管理中会讲到。
2、用户信息管理
Principal初始化:Principal(app),
从初始化代码可以看出,主要做了以下几个过程:def init_app(self, app): if hasattr(app, 'static_url_path'): self._static_path = app.static_url_path else: self._static_path = app.static_path
app.before_request(self._on_before_request) identity_changed.connect(self._on_identity_changed, app) if self.use_sessions: self.identity_loader(session_identity_loader) self.identity_saver(session_identity_saver)
1、app.before_request(self._on_before_request)
实际上就是self._on_before_request被app.before_request装饰器装饰,在每次请求之前都会调用self._on_before_request。
从以上代码看出,首先将全局变量g的identity设为匿名ID,这一般是对未登录用户设置的。然后从队列中取出函数session_identity_loader,如果返回了一个ID,就将session中的当前用户ID赋给全局变量,并发送一个带有当前ID上下文的信号。def _on_before_request(self): if self._is_static_route(): return
g.identity = AnonymousIdentity() for loader in self.identity_loaders: identity = loader() if identity is not None: self.set_identity(identity) return
def session_identity_loader(): if 'identity.id' in session and 'identity.auth_type' in session: identity = Identity(session['identity.id'], session['identity.auth_type']) return identity def set_identity(self, identity): """Set the current identity.
:param identity: The identity to set """ self._set_thread_identity(identity) for saver in self.identity_savers: saver(identity)
def _set_thread_identity(self, identity): g.identity = identity identity_loaded.send(current_app._get_current_object(), identity=identity)
在以上代码中,设置ID,主要是设置全局变量g的属性,然后发送一个信号,上下文是当前identity。然后将identity信息保存到session中saver(identity),如下:
def session_identity_saver(identity):
session['identity.id'] = identity.id
session['identity.auth_type'] = identity.auth_type
session.modified = True
2、在上一个装饰器之后,会订阅一个信号identity_changed.connect(self._on_identity_changed, app),并调用self._on_identity_changed:
即如果收到id发生变化的信号,就会重新设置全局ID以及session中的identity信息。def _on_identity_changed(self, app, identity): if self._is_static_route(): return
self.set_identity(identity)
3、将session_identity_loader和session_identity_saver分别放到两个队列中。
self.identity_loader(session_identity_loader) self.identity_saver(session_identity_saver)
在我的Flask项目中,定义了如下形式:
def init_identity(app):
Principal(app)
@identity_loaded.connect_via(app)
def on_identity_loaded(sender, identity):
g.user = User.query.gen_identity(identity)
除了初始化Principal(app),还订阅了一个信号,当identity_loaded发送一个信号(在上面中介绍过,即设置ID时,会发送一个信号)时,调用该函数,即设置全局变量中g的user属性,此外还要更新当前id的provides,具体函数再上面的权限认证已提到。
微信分享/微信扫码阅读