app.models 源代码

# -*- coding:utf-8 -*-
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from werkzeug.security import generate_password_hash, check_password_hash
from flask_login import UserMixin, AnonymousUserMixin,current_user
from flask import current_app, request, url_for
from . import login_manager
from . import db
from datetime import datetime
from jieba.analyse import ChineseAnalyzer

DEFAULT_AVATAR_URL = "https://ws1.sinaimg.cn/large/647dc635jw1fb6f78kot1j20b40b4mx1.jpg"

[文档]class Permission: """ 列出了要支持的用户角色以及定义角色使用的权限位。 ============== ========== 值(int) 说明 ============== ========== BORROW 借阅 RETURN 归还 MODERATE_MOVIE 修改影片 ADMINISTER 超级管理员 ============== ========== """ BORROW = 0x01 RETURN = 0x02 MODERATE_MOVIE = 0x04 ADMINISTER = 0x80
[文档]class Role(db.Model): """ 类变量 ``__tablename__`` 定义在数据库中使用的表名。 ================= =========== 列名 说明 ================= =========== id 序号 name 角色名 default 默认值 permissions 权限位 ================= =========== """ __tablename__ = 'roles' id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(64), unique=True) default = db.Column(db.Boolean, default=False, index=True) permissions = db.Column(db.Integer) users = db.relationship('User', backref='role', lazy='dynamic') def __repr__(self): return '<Role %r>' % self.name @staticmethod
[文档] def insert_roles(): """ .. note:: 将角色插入到数据库 ``insert_roles()`` 函数并不直接创建新角色对象,而是通过角色名查找现有的角色, 然后再进行更新。只有当数据库中没有某个角色名时才会创建新角色对象。 如此一来,如果以后更新了角色列表,就可以执行更新操作了。 要想添加新角色,或者修改角色的权限,修改 ``roles`` 数组,再运行函数即可。 """ roles = { 'User': (Permission.BORROW | Permission.RETURN, True), 'Moderator': (Permission.BORROW | Permission.RETURN | Permission.MODERATE_MOVIE, False), 'Administrator': (0xff, False) } for r in roles: role = Role.query.filter_by(name=r).first() if role is None: role = Role(name=r) role.permissions = roles[r][0] role.default = roles[r][1] db.session.add(role) db.session.commit()
[文档]class Record(db.Model): """ ================= =============== 列名 说明 ================= =============== customer_id 客户序号 movie_id 电影序号 timestamp 借阅时间 ================= =============== """ __tablename__ = 'records' customer_id = db.Column(db.Integer, db.ForeignKey('users.id'), primary_key=True) movie_id = db.Column(db.Integer, db.ForeignKey('movies.id'), primary_key=True) timestamp = db.Column(db.DateTime, index=True, default=datetime.now)
[文档]class Movie(db.Model): """ 类变量 ``__tablename__`` 定义在数据库中使用的表名。 类变量 ``__searchable__`` 定义可以搜索的列。 类变量 ``__analyzer__`` 定义搜索使用的分词器。 ==================== ================= 列名 说明 ==================== ================= id 序号 title 电影名 original_title 借阅时间 directors 导演 casts 主演 genres 类型 year 上映年份 rating 评分 images 封面图片 alt 豆瓣链接 amount 库存 counts 借阅次数 ==================== ================= """ __tablename__ = 'movies' __searchable__ = ['title', 'original_title'] __analyzer__ = ChineseAnalyzer() id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(64), unique=True, index=True) original_title = db.Column(db.String(64), unique=True, index=True) directors = db.Column(db.String(64)) casts = db.Column(db.String(64)) genres = db.Column(db.String(64)) year = db.Column(db.Integer) rating = db.Column(db.Float, default='0.0') images = db.Column(db.String(64)) alt = db.Column(db.String(64)) amount = db.Column(db.Integer,default=200) counts = db.Column(db.Integer,default=0) movie = db.relationship('Record', foreign_keys=[Record.movie_id], backref=db.backref('movie',lazy='joined'), lazy='dynamic',cascade='all, delete-orphan')
[文档] def to_json(self): """ 获取 ``movies`` 数据的 ``dict`` 格式,用于转成 ``json`` 格式生成 ``API`` :rtype: dict """ json_movie = { 'title': self.title, 'original_title': self.original_title, 'directors': self.directors.split(' / '), 'casts': self.casts.split(' / '), 'genres': self.genres.split(' / '), 'year': self.year, 'rating': self.rating, 'images': self.images, 'api': url_for('api.get_movie', id=self.id, _external=True), 'douban_alt': self.alt, 'alt': url_for('main.movie', id=self.id, _external=True), } return json_movie
def __repr__(self): return '<Movie %r>' % self.title
[文档] def can(self): return self.amount > 0
[文档]class User(UserMixin, db.Model): """ 类变量 ``__tablename__`` 定义在数据库中使用的表名。 ==================== =================== 列名 说明 ==================== =================== id 序号 eamil 邮箱 username 用户名 role_id 角色序号 password_hash 密码哈希值 confirmed 是否验证 amount 最大借阅数量 avatar_url 头像地址 ==================== =================== .. note:: 使用 ``UserMixin`` 使用 ``flask_login`` 中的 ``UserMixin`` 代替自己实现的用户方法 ``is_authenticated()`` 如果用户已经登录, 必须返回 ``True`` , 否则返回 ``False``。 ``is_active()`` 如果允许用户登录, 必须返回 ``True`` , 否则返回 ``False``, 如果要禁用账户, 可以返回 ``False``。 ``is_anonymous()`` 对普通用户必须返回 ``False``。 ``get_id()`` 必须返回用户的唯一标识符, 使用 ``Unicode`` 编码字符串。 """ __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) email = db.Column(db.String(64), unique=True, index=True) username = db.Column(db.String(64), unique=True, index=True) role_id = db.Column(db.Integer, db.ForeignKey('roles.id')) password_hash = db.Column(db.String(128)) confirmed = db.Column(db.Boolean, default=False) amount = db.Column(db.Integer, default=7) avatar_url = db.Column(db.String(64), default=DEFAULT_AVATAR_URL) customer = db.relationship('Record', foreign_keys=[Record.customer_id], backref=db.backref('customer',lazy='joined'), lazy='dynamic',cascade='all, delete-orphan')
[文档] def borrow(self, movie, user): """ 租借 """ if not self.is_borrowing(movie): if movie.can() and self.can_borrow(): movie.amount -= 1 movie.counts += 1 self.amount -= 1 r = Record(customer_id=user.id, movie_id=movie.id) db.session.add(r)
[文档] def return_movie(self, movie): """ 归还 """ r = self.customer.filter_by(movie_id=movie.id).first() if r: movie.amount += 1 self.amount += 1 db.session.delete(r)
[文档] def is_borrowing(self, movie): """ 判断当前影片是否正在被用户借阅 """ return self.customer.filter_by(movie_id=movie.id).first() is not None
@property def borrowed_movies(self): """ 当前用户借阅中的影片列表 :rtype: list """ r = self.customer.filter_by(customer_id=current_user.id).all() if r: movies = [Movie.query.filter_by(id=i.movie_id).first() for i in r] return movies else: return None
[文档] def can_borrow(self): """ 判断用户是否可以借阅 :rtype: bool """ return self.amount > 0
def __repr__(self): return '<User %r>' % self.username
[文档] def generate_auth_token(self, expriation): """ 使用编码后的用户 ``id`` 字段值生成一个签名令牌, 还指定了以秒为单位的过期时间。 :rtype: json """ s = Serializer(current_app.config['SECRET_KEY'], expires_in=expriation) return s.dumps({'id': self.id})
@staticmethod
[文档] def verify_auth_token(token): """ 如果令牌可用就返回对应的用户 """ s = Serializer(current_app.config['SECRET_KEY']) try: data = s.loads(token) except: return None return User.query.get(data['id'])
@property def password(self): """ 拒绝用户读取 password 属性的值 """ raise AttributeError('password is not a readable attribute') @password.setter def password(self, password): """ .. note:: 设置密码 计算密码散列值的函数通过名为 ``password`` 的只写属性实现。 调用 ``Werkzeug`` 提供的 ``generate_password_hash()`` 函数, 并把得到的结果赋值给 ``password_hash`` 字段。 """ self.password_hash = generate_password_hash(password)
[文档] def verify_password(self, password): """ .. note:: 验证密码 ``verify_password`` 方法接受一个参数(即密码), 将其传给 ``Werkzeug`` 提供的 ``check_password_hash()`` 函数。 和存储在 ``User`` 模型中的密码散列值进行比对。 如果这个方法返回 ``True``, 就表明密码是正确的。 """ return check_password_hash(self.password_hash, password)
[文档] def generate_confirmation_token(self, expiration=3600): """ 生成一个用于确认账户令牌, 有效期默认为一小时 """ s = Serializer(current_app.config['SECRET_KEY'], expiration) return s.dumps({'confirm': self.id})
[文档] def confirm(self, token): """ .. note:: 检验令牌 如果检验通过, 则把 ``confirmed`` 属性设为 ``True``。 除了检验令牌, ``confirm()`` 方法还检查令牌中的 ``id`` 是否和存储在 ``current_user`` 中的已登录用户匹配。 如此一来, 即使恶意用户知道如何生成签名令牌, 也无法确认别人的账户。 """ s = Serializer(current_app.config['SECRET_KEY']) try: data = s.loads(token) except: return False if data.get('confirm') != self.id: return False self.confirmed = True db.session.add(self) return True
[文档] def generate_reset_token(self, expiration=3600): """ 生成一个用于重置密码的令牌, 有效期默认为一小时 """ s = Serializer(current_app.config['SECRET_KEY'], expiration) return s.dumps({'reset': self.id})
[文档] def reset_password(self, token, new_password): """ 重置密码 """ s = Serializer(current_app.config['SECRET_KEY']) try: data = s.loads(token) except: return False if data.get('reset') != self.id: return False self.password = new_password db.session.add(self) return True
[文档] def generate_email_change_token(self, new_email, expiration=3600): """ 生成一个用于修改邮箱的令牌, 有效期默认为一小时 """ s = Serializer(current_app.config['SECRET_KEY'], expiration) return s.dumps({'change_email': self.id, 'new_email': new_email})
[文档] def change_email(self, token): """ 修改邮箱 """ s = Serializer(current_app.config['SECRET_KEY']) try: data = s.loads(token) except: return False if data.get('change_email') != self.id: return False new_email = data.get('new_email') if new_email is None: return False if self.query.filter_by(email=new_email).first() is not None: return False self.email = new_email db.session.add(self) return True
def __repr__(self): return '<User %r>' % self.username def __init__(self, **kwagrs): """ .. note:: 赋予用户对应的角色 用户在程序中注册账户时,会被赋予适当的角色。 大多数用户在注册时赋予的角色都是``用户``,因为这是默认角色。 唯一的例外是管理员,管理员在最开始就应该赋予``管理员``角色。 管理员由保存在设置变量 ``FLASKY_ADMIN`` 中的电子邮件地址识别, 只要这个电子邮件地址出现在注册请求中, 就会被赋予正确的角色。 """ super(User, self).__init__(**kwagrs) if self.role is None: if self.email == current_app.config['FLASKY_ADMIN']: self.role = Role.query.filter_by(permissions=0xff).first() if self.role is None: self.role = Role.query.filter_by(default=True).first()
[文档] def can(self, permissions): """ .. note:: ``can()`` 方法在请求和赋予角色这两种权限之间进行位与操作。 如果角色中包含请求的所有权限位,则返回 ``True``,表示允许用户执行此项操作。 """ return self.role is not None and \ (self.role.permissions & permissions) == permissions
[文档] def is_administrator(self): """ 检查管理员权限 """ return self.can(Permission.ADMINISTER)
@login_manager.user_loader
[文档]def load_user(user_id): """ .. note:: 加载用户的回调函数接收以 Unicode 字符串形式表示的用户标识符。 如果能找到用户,这个函数必须返回用户对象,否则应该返回 None 。 """ return User.query.get(int(user_id))
[文档]class AnonymousUser(AnonymousUserMixin):
[文档] def can(self, permissions): return False
[文档] def is_administrator(self): return False
login_manager.anonymous_user = AnonymousUser