美文网首页
python微服务sanic 使用异步zipkin(2) - 一

python微服务sanic 使用异步zipkin(2) - 一

作者: 非梦nj | 来源:发表于2019-01-31 21:57 被阅读70次

    参考:python微服务sanic 使用异步zipkin(1)

    关键字:python sanic 微服务 异步 zipkin sanic-plugin 插件 Sanic-Plugins-Framewor Pypi发布
    所需环境:python3.7, Docker, Linux or WSL

    image.png

    Sanic插件(Plugin/Extension) - sanic-zipkin已经ready,你可以轻松用pip安装啦:
    喜欢的话,github点个赞吧:https://github.com/kevinqqnj/sanic-zipkin

    pip install sanic-zipkin
    # app.py
    from sanic_zipkin import SanicZipkin, logger, sz_rpc
    sz = SanicZipkin(app)
    

    上一篇已经学会了如何在Sanic app里引入aiozipkin,来做分布式系统追踪。本篇,来讨论下,如何创建一个完整的Sanic插件,以方便自己或者分享给他人。

    先来看看插件是怎么用的:

    功能

    • adding "Request span" by default
    • if Request is from another micro-service endpoint, span will be attached (Inject/Extract) to that endpoint
    • use "logger" decorator to create span for "methods" calls
    • use "sz_rpc" method to create sub-span for RPC calls, attaching to parent span

    使用例子

    1. run examples/servic_a.py and examples/service_b.py
    2. use Docker to run zipkin or jaeger:
      docker run -d -p9411:9411 openzipkin/zipkin:latest
      or
      docker run -d -e COLLECTOR_ZIPKIN_HTTP_PORT=9411 -p5775:5775/udp -p6831:6831/udp -p6832:6832/udp -p5778:5778 -p16686:16686 -p14268:14268 -p9411:9411 jaegertracing/all-in-one
    3. access the endpoint:
    • 最简应用:
      curl localhost:8000/ to see plugin's basic usage
    from sanic_zipkin import SanicZipkin, logger, sz_rpc
    
    app = Sanic(__name__)
    
    # initilize plugin, default parameters:
    #        zipkin_address = 'http://127.0.0.1:9411/api/v2/spans'
    #        service = __name__
    #        host = '127.0.0.1'
    #        port = 8000
    sz = SanicZipkin(app, service='service-a')
    
    @app.route("/")
    async def index(request):
        return response.json({"hello": "from index"})
    
    

    This "/" endpoint will add trace span to zipkin automatically

    • 使用装饰器装饰方法,以及链式trace
      curl localhost:8000/2 to see how to decorate methods and chain-calls
    @logger()
    async def db_access(context, data):
        await asyncio.sleep(0.1)
        print(f'db_access done. data: {data}')
        return
    
    @sz.route("/2")
    async def method_call(request, context):
        await db_access(context, 'this is method_call data')
        return response.json({"hello": 'method_call'})
    
    

    Use "@logger" decorator to generate span for methods.
    Note: in this case, you need to use "@sz.route" decorator, and pass contextparameter to method calls.

    • 微服务之间通过PRC访问:
      curl localhost:8000/3 to see how RPC calls working, both GET/POST is supported
    @logger()
    async def decorate_demo(context, data):
        await db_access(context, data)
        data = {'payload': 'rpc call data of decorate_demo'}
        rsp = await sz_rpc(context, backend_service2, data, method='GET')
        print(rsp.status, await rsp.text())
        return
    
    @sz.route("/3")
    async def rpc_call(request, context):
        await decorate_demo(context, 'this is index4 data')
        data = {'payload': 'rpc call data of rpc_call'}
        rsp = await sz_rpc(context, backend_service1, data) # default method='POST'
        print(rsp.status, await rsp.text())
        return response.json({"hello": 'rpc_call'})
    
    

    method sz_rpc just wrapper span injection to RPC POST/GET calls. In peer server, span-context will be automatically extracted and generate a chain-view in zipkin.

    1. 在Zipkin/Jaeger UI里查看Trace:


      image.png

    Sanic插件开发过程

    使用Sanic-Plugins-Framework开发,省时省力,而且充分利用Sanic异步框架的威力。

    1. 插件初始化

    • 用户可自定义的初始化变量
      继承Sanic-Plugins-Framework(SPF) 的Contextualize类型,在on_before_registered方法引用时,加载用户自定义的初始变量。
      目前支持:
      • zipkin server地址
      • 微服务名称service
      • 微服务IP, port
    from spf.plugins.contextualize import Contextualize
    
    class SanicZipkin(Contextualize):
        def __init__(self, *args, **kwargs):
            super(SanicZipkin, self).__init__(*args, **kwargs)
            self.zipkin_address = None
            self.service = None
            self.host = None
            self.port = None
    
        def on_before_registered(self, context, *args, **kwargs):
            self.zipkin_address = kwargs.get('zipkin_address', 'http://127.0.0.1:9411/api/v2/spans')
            self.service = kwargs.get('service', __name__)
            self.host = kwargs.get('host', '127.0.0.1')
            self.port = kwargs.get('port', 8000)
            _logger.info(f'SanicZipkin: before registered: service={self.service}')
    
    • 创建aiozipkin服务
      实例化sanic_zipkin,然后调用Sanic 'before_server_start'方法,初始化context.tracercontext.aio_session
      context是SPF全局可以访问的上下文变量,可以存储任何你想要共享的数据。
    sanic_zipkin = instance = SanicZipkin()
    
    @sanic_zipkin.listener('before_server_start')
    async def setup_zipkin(app, loop, context):
        endpoint = az.create_endpoint(sanic_zipkin.service, ipv4=sanic_zipkin.host,
                                    port=sanic_zipkin.port)
        context.tracer = await az.create(sanic_zipkin.zipkin_address, endpoint, 
                                    sample_rate=1.0)
        trace_config = az.make_trace_config(context.tracer)
        context.aio_session = aiohttp.ClientSession(trace_configs=[trace_config])
        context.span = []
        context.zipkin_headers = []
    

    这里context.span设计成数组,模拟堆栈FILO(先进后出),是因为考虑到链式调用时,tracer需要以parent span为基础,创建child span。同理Inject/Extract用到的context.zipkin_headers也设成数组。

    2. 创建middleware,给Request GET/POST自动添加span

    • Requst: 先用middleware装饰器来监听request消息,然后调用自定义方法request_span(request, context)来创建span。
    @sanic_zipkin.middleware(priority=2, with_context=True)
    def mw1(request, context):
        context.log(DEBUG, f'mw-request: add span and headers before request')
        span = request_span(request, context)
        context.span.append(span)
        context.zipkin_headers.append(span.context.make_headers())
    

    这里,要考虑到,当前span是new span,还是child span。
    span.append()压入堆栈。

    • 自定义方法request_span(),通过读取request里是否有zipkin_headers信息,来判断当前是其它微服务的RPC call,还是用户发起的http访问。
    def request_span(request, context):
        context.log(DEBUG, f'REQUEST json: {request.json}, args: {request.args}')
        headers = request.parsed_json.get('zipkin_headers', None) if request.json else \
                    request.args.get('zipkin_headers', None)
    
    • Response: 在一次http访问结束,返回response时,此次访问的context.spancontext.zipkin_headers弹出堆栈,以免污染到其它http访问的trace:span.pop()
    @sanic_zipkin.middleware(priority=8, attach_to='response', relative='post',
                          with_context=True)
    def mw2(request, response, context):
        context.span.pop()
        context.zipkin_headers.pop()
        context.log(DEBUG, 'mw-response: clear span/zipkin_headers after Response')
    

    3. 创建zipkin_headers,用于RPC Inject/Extract

    如果当前堆栈里有zipkin_headers,则方法request_span()创建上下文:

    def request_span(request, context):
        context.log(DEBUG, f'REQUEST json: {request.json}, args: {request.args}')
        headers = request.parsed_json.get('zipkin_headers', None) if request.json else \
                    request.args.get('zipkin_headers', None)
        if headers:
            span_context = az.make_context(headers)
            with context.tracer.new_child(span_context) as span:
                ...
    

    如果无,则创建新的span:

            with context.tracer.new_trace() as span:
                span.name(f'{request.method} {request.path}')
                ...
    

    4. methods函数,添加@logger装饰器

    对于非http request的方法函数,因为没有middleware可以监听,则需要创建新的装饰器了。
    使用@logger时,默认context作为第一个参数:

    def logger(type=None, category=None, detail=None, description=None,
                   tracing=True, level=logging.INFO, *args, **kwargs):
            def decorator(fn=None):
                @functools.wraps(fn)
                async def _decorator(*args, **kwargs):
                    # print('_decorator args: ', args, kwargs)
                    context = args[0] if len(args) > 0 and isinstance(args[0], ContextDict) else None
                    ...
    

    然后创建新的new span 或 child span。
    同时,添加(Inject)新的上下文zipkin_headers

                    span = gen_span(fn.__name__, context)
                    context.zipkin_headers.append(span.context.make_headers())
    

    执行装饰器所装饰的函数fn()
    之后,必须清除(弹出最上面)本次装饰器新增的临时span和zipkin_headers。因为当前函数外部的其它引用函数(如果有)所需要的数据,还在堆栈下面。

                    try:
                        exce = False
                        res = await fn(*args, **kwargs)
                        return res
                    except Exception as e:
                        exce = True
                        raise e
                    finally:
                        ...
                        # clean up tmp vars for this wrapper
                        context.span.pop()
                        context.zipkin_headers.pop()
    

    5. 创建帮助函数sz_rpc(),简化RPC 访问其它微服务时的Inject操作

    这个很简单的helper,负责把zipkin_headers,inject到POST/GET访问的data里,这样,对方收到http request时,就可以顺利的Extract,得到span上下文了。

    async def sz_rpc(context, url, data, method='POST'):
        data.update({'zipkin_headers': json.dumps(context.zipkin_headers[-1])})
        if method.upper() == 'POST':
            return await context.aio_session.post(url, json=data)
        else: 
            return await context.aio_session.get(url, params=data)
    

    6. 发布插件到Pypi

    插件写好了,下面就是发布了。

    • 先注册一个Pypi账号:https://pypi.org
    • 修改当前目录的结构,以及添加一些必要的标注文件:
    kevinqq@CN-00009841:/c/Users/xxx/git/sanic-zipkin$ tree -L 2
    ├── CHANGES.txt  # 版本信息,必须
    ├── LICENSE         # 必须
    ├── MANIFEST.in
    ├── README.md
    ├── dist                  # 发布时自动打的包
    │   └── sanic-zipkin-0.1.2.tar.gz
    ├── examples
    │   ├── requirements.txt
    │   ├── service_a.py
    │   └── service_b.py
    ├── requirements-dev.txt
    ├── sanic_zipkin     # 包的目录
    │   ├── __init__.py  # 必须。含版本信息和可引用的对象
    │   └── sanic_zipkin.py    # 主文件
    ├── sanic_zipkin.egg-info  # 发布时自动生成
    └── setup.py  # 发布用的程序
    
    • 发布前检查:
      python3 setup.py check

    • 发布到Pypi:
      python3 setup.py sdist upload
      此时,会让你输入Pypi的密码。
      如果收到200,则上传成功。

    • 检查是否已经可用:
      pip install sanic-zipkin

    Quetions?
    Github: https://github.com/kevinqqnj/sanic-zipkin

    相关文章

      网友评论

          本文标题:python微服务sanic 使用异步zipkin(2) - 一

          本文链接:https://www.haomeiwen.com/subject/ugilsqtx.html