美文网首页
celery 7 优秀开源项目kombu源码分析之registr

celery 7 优秀开源项目kombu源码分析之registr

作者: 星丶雲 | 来源:发表于2020-12-25 17:11 被阅读0次

    有多年后台开发的工程师想必清楚,Celery本身涉及到的技术点其实在业界应用是很广泛的。Celery能这么流行,我们先排除没有进行技术深入下的盲从,和它诞生的非常早以外,我认为这和项目的内部设计的非常好也是有关的。

    接下来分析Celery使用的Kombu库中的一些设计实现让大家对这个优秀项目更了解,并从中学习可扩展开发的实践。

    Kombu是什么?
    当一个项目变得越来越复杂,就要考虑只保留核心,并把其他部分分拆到不同的项目中以便减少未来的维护和开发的成本。Flask、IPython都是这样做的。

    Kombu是一个把消息传递封装成统一接口的库。

    Celery一开始先支持的RabbitMQ,也就是使用AMQ协议。由于要支持越来越多的消息代理,但是这些消息代理是不支持AMQ协议的,需要一个东西把所有的消息代理的处理方式统一起来,甚至可以理解为把它们「伪装成支持AMQ协议」。Kombu的最初的实现叫做carrot, 后来经过重构才成了Kombu。

    registry

    registry也就是「注册」,有按需加入的意思,在Python标准库和一些优秀开源项目中都有应用。我们先看个django的[场景]

    ### source code start
    from itertools import chain
    
    
    class CheckRegistry:
    
        def __init__(self):
            self.registered_checks = []
            self.deployment_checks = []
    
        def register(self, check=None, *tags, **kwargs):
            kwargs.setdefault('deploy', False)
    
            def inner(check):
                check.tags = tags
                if kwargs['deploy']:
                    if check not in self.deployment_checks:
                        self.deployment_checks.append(check)
                elif check not in self.registered_checks:
                    self.registered_checks.append(check)
                return check
    
            if callable(check):
                return inner(check)
            else:
                if check:
                    tags += (check, )
                return inner
    
        def tag_exists(self, tag, include_deployment_checks=False):
            return tag in self.tags_available(include_deployment_checks)
    
        def tags_available(self, deployment_checks=False):
            return set(chain(*[check.tags for check in self.get_checks(deployment_checks) if hasattr(check, 'tags')]))
    
        def get_checks(self, include_deployment_checks=False):
            checks = list(self.registered_checks)
            if include_deployment_checks:
                checks.extend(self.deployment_checks)
            return checks
    
    
    registry = CheckRegistry()
    register = registry.register
    tag_exists = registry.tag_exists
    
    ### source code end
    @register('mytag', 'another_tag')
    def my_check(apps, **kwargs):
        pass
    
    
    print tag_exists('another_tag')
    print tag_exists('not_exists_tag')
    

    可以看到每次用registry.register都能动态的添加新的tag,最后还用 register= registry.register这样的方式列了个别名。执行结果如下:

    ❯ python django_example.py
    True
    False
    

    <meta charset="utf-8">

    kombu库包含对消息的序列化和反序列化工作的实现,可以同时支持多种序列化方案,如pickle、json、yaml和msgpack。假如你从前没有写过这样可扩展的项目,可能想的是每种的方案的loads和dumps都封装一遍,然后用一个大的if/elif/else来控制最后的序列化如何执行。

    那么在kombu里面是怎么用的呢?

    import codecs
    from collections import namedtuple
    
    codec = namedtuple('codec', ('content_type', 'content_encoding', 'encoder'))
    
    class SerializerNotInstalled(Exception):
        pass
    
    
    class SerializerRegistry(object):
        def __init__(self):
            self._encoders = {}
            self._decoders = {}
            self._default_encode = None
            self._default_content_type = None
            self._default_content_encoding = None
    
        def register(self, name, encoder, decoder, content_type,
                     content_encoding='utf-8'):
            if encoder:
                self._encoders[name] = codec(
                    content_type, content_encoding, encoder,
                )
            if decoder:
                self._decoders[content_type] = decoder
    
        def _set_default_serializer(self, name):
            try:
                (self._default_content_type, self._default_content_encoding,
                 self._default_encode) = self._encoders[name]
            except KeyError:
                raise SerializerNotInstalled(
                    'No encoder installed for {0}'.format(name))
    
        def dumps(self, data, serializer=None):
            if serializer and not self._encoders.get(serializer):
                raise SerializerNotInstalled(
                    'No encoder installed for {0}'.format(serializer))
    
            if not serializer and isinstance(data, unicode):
                payload = data.encode('utf-8')
                return 'text/plain', 'utf-8', payload
    
            if serializer:
                content_type, content_encoding, encoder = \
                    self._encoders[serializer]
            else:
                encoder = self._default_encode
                content_type = self._default_content_type
                content_encoding = self._default_content_encoding
    
            payload = encoder(data)
            return content_type, content_encoding, payload
    
        def loads(self, data, content_type, content_encoding):
            content_type = (content_type if content_type
                            else 'application/data')
            content_encoding = (content_encoding or 'utf-8').lower()
    
            if data:
                decode = self._decoders.get(content_type)
                if decode:
                    return decode(data)
            return data
    
    
    registry = SerializerRegistry()
    dumps = registry.dumps
    loads = registry.loads
    register = registry.register
    

    其实kombu还实现了unregister限于篇幅我就不展开了。现在我们想添加yaml的支持,只需要加这样一个函数:

    def register_yaml():
        try:
            import yaml
            registry.register('yaml', yaml.safe_dump, yaml.safe_load,
                              content_type='application/x-yaml',
                              content_encoding='utf-8')
        except ImportError:
    
            def not_available(*args, **kwargs):
                """Raise SerializerNotInstalled.
                Used in case a client receives a yaml message, but yaml
                isn't installed.
                """
                raise SerializerNotInstalled(
                    'No decoder installed for YAML. Install the PyYAML library')
            registry.register('yaml', None, not_available, 'application/x-yaml')
    
    
    register_yaml()
    

    这样就支持yaml了。如果希望默认使用yaml来序列化,可以执行:

    registry._set_default_serializer('yaml')
    

    是不是非常好扩展,如果哪天我希望去掉对pickle(安全问题),就可以直接注释对应的函数就好了。写个小例子试验下

    yaml_data = """\
    float: 3.1415926500000002
    int: 10
    list: [george, jerry, elaine, cosmo]
    string: The quick brown fox jumps over the lazy dog
    unicode: "Th\\xE9 quick brown fox jumps over th\\xE9 lazy dog"
    """
    
    content_type, content_encoding, payload = dumps(yaml_data, serializer='yaml')
    print content_type, content_encoding
    
    assert loads(payload, content_type=content_type, content_encoding=content_encoding) == yaml_data
    

    运行的结果就是:

    相关文章

      网友评论

          本文标题:celery 7 优秀开源项目kombu源码分析之registr

          本文链接:https://www.haomeiwen.com/subject/vwpvsktx.html