美文网首页程序员
flask-wtforms组件源码解析

flask-wtforms组件源码解析

作者: 马小跳_ | 来源:发表于2018-01-10 21:54 被阅读259次

使用wtforms

class LoginForm(Form):
    name=simple.StringField(
        label='用户名',
        validators=[
            validators.DataRequired(message='用户名不能为空'),
            validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
        ],
        widget=widgets.TextInput(),
        render_kw={'class': 'form-control'}
    )

    pwd=simple.StringField(
        label='密码',
        validators=[
            validators.DataRequired(message='密码不能为空'),
        ],
        widget=widgets.TextInput(),
        render_kw={'class': 'form-control'}
    )


@user.route('/login',methods=['GET','POST'])
def login():
    if request.method=='GET':
        form=LoginForm()
        print(form)
        return render_template('login.html',form=form)
    else:
        form=LoginForm(request.form)
        if form.validate():

知识储备:

对象是类创建的,实例化对象时自动执行类的__new____init__方法,对象()执行类的__call__方法

类是type创建的,创建类的时候自动执行type__init__,类() 执行type__call__方法(封装了类的__new____init__方法)

源码流程分析:

1.创建LoginForm时,执行其元类的__init____call__方法

class LoginForm(Form):
    pass
    
class Form(with_metaclass(FormMeta, BaseForm)):
    # Form的元类是FormMeta,基类是BaseForm
    pass

"""
等价于
class Form(BaseForm,metaclass=FormMeta):
    pass
"""

def with_metaclass(meta, base=object):
    """
    FormMeta("NewBase", (BaseForm,), {})
    """
    return meta("NewBase", (base,), {})

class FormMeta(type):
    def __init__(cls, name, bases, attrs):
        """
        此时cls是LoginForm
        __init__方法旨在将_unbound_fields和_wtforms_meta字段封装到LoginForm中
        
        class LoginForm(Form):
            _unbound_fields = None
            _wtforms_meta = None
        """
        type.__init__(cls, name, bases, attrs)
        cls._unbound_fields = None
        cls._wtforms_meta = None
        
    def __call__(cls, *args, **kwargs):
        """
        实例化一个LoginForm类对象
        1.填充_unbound_fields属性
        遍历LoginForm的属性,排除掉_开头的字段
        将其他字段排序后存放到_unbound_fields属性中  
        _unbound_fields = [(name, unbound_field),(name, unbound_field),(name, unbound_field)]
        2.填充_wtforms_meta属性
        将LoginForm和它所继承的所有类中的Meta属性抽离出来,组成一个列表bases
        动态创建一个Meta类,继承自bases
        _unbound_fields = Meta
        """
        if cls._unbound_fields is None:
            fields = []
            #当前类所有的属性
            for name in dir(cls):
                if not name.startswith('_'):  # 排除掉系统自定义的属性
                    unbound_field = getattr(cls, name)  # 得到UnboundField()对象
                    if hasattr(unbound_field, '_formfield'):
                        fields.append((name, unbound_field))
            # 根据UnboundField()对象的.creation_counter进行排序
            fields.sort(key=lambda x: (x[1].creation_counter, x[0]))
            cls._unbound_fields = fields

        # Create a subclass of the 'class Meta' using all the ancestors.
        if cls._wtforms_meta is None:
            bases = []
            # __mro__代表该类的继承关系
            for mro_class in cls.__mro__:
                if 'Meta' in mro_class.__dict__:
                    bases.append(mro_class.Meta)
            cls._wtforms_meta = type('Meta', tuple(bases), {})
        return type.__call__(cls, *args, **kwargs)

2.实例化字段对象

我们还没执行到form=LoginForm()时,LoginForm里面所有的字段都已经执行加载完了,里面的字段的值都是Field实例化而来。

执行Field的__new__方法

def __new__(cls, *args, **kwargs):
    if '_form' in kwargs and '_name' in kwargs:
        return super(Field, cls).__new__(cls)
    else:
        return UnboundField(cls, *args, **kwargs)

可以知道开始的时候所有的Field对象都是UnboundField()对象,我们所写的Filed实例实际开始是这样的(注释)

class LoginForm(Form):
    name = UnboundField(StringField, *args, **kwargs) # creation_counter=1
    pwd = UnboundField(PasswordField, *args, **kwargs) # creation_counter=2

执行Form类的__init__方法:

def __init__(self, label=None, validators=None, filters=tuple(),
             description='', id=None, default=None, widget=None,
             render_kw=None, _form=None, _name=None, _prefix='',
             _translations=None, _meta=None):
             
    if _translations is not None:
        self._translations = _translations

    if _meta is not None:
        self.meta = _meta
    elif _form is not None:
        self.meta = _form.meta
    else:
        raise TypeError("Must provide one of _form or _meta")

    self.default = default
    self.description = description
    self.render_kw = render_kw
    self.filters = filters
    self.flags = Flags()
    self.name = _prefix + _name
    self.short_name = _name
    self.type = type(self).__name__
    self.validators = validators or list(self.validators)

    self.id = id or self.name
    self.label = Label(self.id, label if label is not None else self.gettext(_name.replace('_', ' ').title()))

    if widget is not None:
        self.widget = widget
    # 字段对象的验证规则
    for v in itertools.chain(self.validators, [self.widget]):
        flags = getattr(v, 'field_flags', ())
        for f in flags:
            setattr(self.flags, f, True)
 super(Form, self).__init__(self._unbound_fields, meta=meta_obj, prefix=prefix)
  1. baseForm中的__init__
def __init__(self, fields, prefix='', meta=DefaultMeta()):
        """
        :param fields:
            A dict or sequence of 2-tuples of partially-constructed fields.
        :param prefix:
            If provided, all fields will have their name prefixed with the
            value.
        :param meta:
            A meta instance which is used for configuration and customization
            of WTForms behaviors.
        """
        if prefix and prefix[-1] not in '-_;:/.':
            prefix += '-'

        self.meta = meta
        self._prefix = prefix
        self._errors = None
        self._fields = OrderedDict()

        if hasattr(fields, 'items'):
            fields = fields.items()

        translations = self._get_translations()
        extra_fields = []
        if meta.csrf:
            self._csrf = meta.build_csrf(self)
            extra_fields.extend(self._csrf.setup_form(self))

        for name, unbound_field in itertools.chain(fields, extra_fields):
            options = dict(name=name, prefix=prefix, translations=translations)
            field = meta.bind_field(self, unbound_field, options)
            self._fields[name] = field
#将fields和extra_fields链接起来
        for name, unbound_field in itertools.chain(fields, extra_fields):
            options = dict(name=name, prefix=prefix, translations=translations)
            field = meta.bind_field(self, unbound_field, options)
            self._fields[name] = field

4.执行UnboundField中的bind()方法:

class UnboundField(object):
    _formfield = True
    creation_counter = 0

    def __init__(self, field_class, *args, **kwargs):
        UnboundField.creation_counter += 1
        self.field_class = field_class
        self.args = args
        self.kwargs = kwargs
        self.creation_counter = UnboundField.creation_counter

    def bind(self, form, name, prefix='', translations=None, **kwargs):
        kw = dict(
            self.kwargs,
            _form=form,
            _prefix=prefix,
            _name=name,
            _translations=translations,
            **kwargs
        )
        return self.field_class(*self.args, **kw)

    def __repr__(self):
        return '<UnboundField(%s, %r, %r)>' % (self.field_class.__name__, self.args, self.kwargs)

在bind方法中我们可以看到,由于字段中的__new__方法,实例化时:name = simple.StringField(label='用户名'),创建的是UnboundField(cls, *args, **kwargs),当执行完bind之后,就变成执行 wtforms.fields.core.StringField(),

5.再回到BaseForm中的__init__中,将返回值添加到 self._fields[name] 中,既:

 _fields = {
    name: wtforms.fields.core.StringField(),
 }

6.执行完BaseForm的__init__后,在继续回到Form的构造方法中循环_fields,为对象设置属性

for name, field in iteritems(self._fields):
    # Set all the fields to attributes so that they obscure the class
    # attributes with the same names.
    setattr(self, name, field)

7.执行process,为字段设置默认值:self.process(formdata, obj, data=data, **kwargs),再循环执行每个字段的process方法,为每个字段设置值:

for name, field, in iteritems(self._fields):
    if obj is not None and hasattr(obj, name):
        field.process(formdata, getattr(obj, name))
    elif name in kwargs:
        field.process(formdata, kwargs[name])
    else:
        field.process(formdata)

8.执行每个字段的process方法,为字段的data和字段的raw_data赋值

Field的process

    def process(self, formdata, data=unset_value):
        self.process_errors = []
        if data is unset_value:
            try:
                data = self.default()
            except TypeError:
                data = self.default

        self.object_data = data

        try:
            self.process_data(data)
        except ValueError as e:
            self.process_errors.append(e.args[0])

        if formdata:
            try:
                if self.name in formdata:
                    self.raw_data = formdata.getlist(self.name)
                else:
                    self.raw_data = []
                self.process_formdata(self.raw_data)
            except ValueError as e:
                self.process_errors.append(e.args[0])

        try:
            for filter in self.filters:
                self.data = filter(self.data)
        except ValueError as e:
            self.process_errors.append(e.args[0])

页面上执行print(form.name) 时,打印标签,流程如下(参考自定义form组件很容易理解)

我们在前端和后端上打印的name和pwd其实是一个Filed的实例,相当于一个实例对象,我们知道直接print一个对象的时候,会调用该类的__str__方法,所以我们查看Field的__str__方法:

def __str__(self):
    return self()

我们可以看到他返回self(),对象()---->执行当前类的__call__方法:

def __call__(self, **kwargs):
    return self.meta.render_field(self, kwargs)

最终返回值是meta.render_field(self, kwargs)执行后的结果

def render_field(self, field, render_kw):
    other_kw = getattr(field, 'render_kw', None)
    if other_kw is not None:
        render_kw = dict(other_kw, **render_kw)
    return field.widget(field, **render_kw)

调用插件返回对应的Html页面代码

9.验证流程

a. 执行form的validate方法,获取钩子方法
            def validate(self):
                extra = {}
                for name in self._fields:
                    inline = getattr(self.__class__, 'validate_%s' % name, None)
                    if inline is not None:
                        extra[name] = [inline]
        
                return super(Form, self).validate(extra)
        b. 循环每一个字段,执行字段的 validate 方法进行校验(参数传递了钩子函数)
            def validate(self, extra_validators=None):
                self._errors = None
                success = True
                for name, field in iteritems(self._fields):
                    if extra_validators is not None and name in extra_validators:
                        extra = extra_validators[name]
                    else:
                        extra = tuple()
                    if not field.validate(self, extra):
                        success = False
                return success
        c. 每个字段进行验证时候
            字段的pre_validate 【预留的扩展】
            字段的_run_validation_chain,对正则和字段的钩子函数进行校验
            字段的post_validate【预留的扩展】

相关文章

网友评论

    本文标题:flask-wtforms组件源码解析

    本文链接:https://www.haomeiwen.com/subject/lieanxtx.html