How it works(4) Tilestrata源码阅读(B

作者: 默而识之者 | 来源:发表于2019-01-31 09:57 被阅读10次

    引入

    阅读Tilestrata的源码是绕不开Tilestrata-Balancer(以下简称TB)这个默认的负载均衡服务的.其特性是:

    • 支持动态增加,减少节点
    • 定时检查节点健康状况
    • 简单的请求过滤

    大多数服务会采取专业的负载均衡或者反向代理,比如常见的nginx.理论上讲,nodejs的http-proxy性能是拍马也不及nginx的,但对于地图瓦片服务,绝大多数瓶颈在瓦片的生成,而非反向代理和负载均衡的效率,直接使用node实现很大的增加了灵活性,性能实际也没什么大下降.

    架构

    注册与注销节点

    TB本质上是个反向代理服务器,处理对内和对外两方面的请求:

    • 对内的请求:接收局域网内各个Tilestrata服务的的注册与注销请求.
    • 对外的请求:接收客户端发过来的地图瓦片请求.
    Balancer.prototype.listen = function(callback) {
     callback = callback || function() {};
     var self = this;
     async.parallel([
      //同时开启对外对内的端口监听
      this._listenPrivate.bind(this),
      this._listenPublic.bind(this)
     ], function(err) {
      if (err) {
       log.error(null, err);
       return self.close(function() {
        callback(err);
       });
      }
      return callback();
     });
    };
    

    处理对内的请求:

    Balancer.prototype._listenPrivate = function(callback) {
     var self = this;
     this.http_private = http.createServer(function(req, res) {
      //对内通过向'nodes'接口post数据,供Tilestrata服务注册节点或注销节点
      if (req.method === 'POST' && req.url === '/nodes') {
       self.handleNodeRegister(req, res);
      } else if (req.method === 'DELETE' && REGEX_NODES.test(req.url)) {
       self.handleNodeUnregister(req, res);
      } else {
       res.writeHead(404, {'Content-Type': 'text/plain'});
       res.write('Unrecognized URL');
       res.end();
      }
     });
     //监听对内专用端口
     var port = this.options.privatePort;
     var hostname = this.options.hostname;
     this.http_private.listen(port, hostname, function(err) {
      callback(err);
     });
    };
    

    以注册节点为例(注销节点代码类似,逻辑相反),TB是被动注册的,它需要Tilestrata主动Post一个JSON格式的实体,将自身的信息传递给负载均衡服务,使TB找到这个节点,进而完成注册.要了解Tilestrata与TB交互的细节,需要看看Tilestrata中我在上一篇中没提到的有关负载均衡的代码.
    Tilestrata/TileServer.js中的代码:

    //构建请求实体
    TileServer.prototype._buildBalancerData = function () {
     var self = this;
     //绑定所有的图层
     return {
      id: this.uuid,
      version: this.version,
      listen_port: this.http_port,
      node_weight: this.options.balancer.node_weight || 1,
      layers: Object.keys(this.layers).map(function (layerName) {
       var layer = self.layer(layerName);
       return {
        name: layerName,
        options: layer.options,
        routes: Object.keys(layer.routes)
       };
      })
     };
    };
    
    //绑定节点到TB上
    TileServer.prototype.bindBalancer = function () {
     var self = this;
     var balancer = this.options.balancer;
     if (!balancer) return;
     if (this.balancer) return;
     //断线重连
     var recovery = this.balancer = new Recovery({
      'retries': Infinity,
      'min': balancer.register_mindelay || '1000 ms',
      'max': balancer.register_maxdelay || '30 seconds',
      'reconnect timeout': balancer.register_timeout || '30 seconds'
     });
     //每次断线重连都重新注册
     recovery.on('reconnect', function (opts) {
      log.info('balancer', 'Attempting to register with ' + balancer.host + '...');
      
      request({
       json: true,
       method: 'POST',
       url: 'http://' + balancer.host + '/nodes',
       timeout: balancer.register_timeout || 30000,
       body: self._buildBalancerData()
      }, function (err, res, body) {
       //处理TB返回的参数
       if (err) {
        var level = err.code === 'ETIMEDOUT' || err.code === 'ECONNREFUSED' ? 'warn' : 'error';
        log[level]('balancer', 'Registration failed (' + err.message + ')');
        return recovery.reconnected(err);
       } else if (res.statusCode !== 201 && res.statusCode !== 200) {
        log.warn('balancer', 'Registration failed (HTTP ' + res.statusCode + ' ' + HTTPStatus[res.statusCode] + ')');
        return recovery.reconnected(new Error('Non 201 status (' + res.statusCode + ')'));
       } else if (!body || typeof body !== 'object') {
        log.error('balancer', 'Registration failed (invalid body)');
        return recovery.reconnected(new Error('Invalid body from balancer'));
       } else if (!body.token) {
        log.error('balancer', 'Registration failed ("token" not provided)');
        return recovery.reconnected(new Error('Invalid token'));
       } else if (isNaN(body.check_interval)) {
        log.error('balancer', 'Registration failed ("check_interval" not a number or missing)');
        return recovery.reconnected(new Error('Invalid check_interval'));
       }
    
       log.info('balancer', res.statusCode === 201 ? 'Successfully registered' : 'Already in pool');
       //获取TB返回的token
       self.balancer_token = body.token;
       //获取TB的存活检查时间间隔
       self.balancer_check_interval = body.check_interval;
       //TB连接保证
       self.handleBalancerPing();
       //主动重连
       recovery.reconnected();
      });
     });
    
     recovery.reconnect();
    };
    
    //预防TB超时未检查,自动重连
    TileServer.prototype.handleBalancerPing = function () {
     //每次执行健康检查时,都会执行该函数,从而重置超时计时器
     //如果因为各种原因TB没有进行节点健康监测,则视为断线,重新注册给TB
     var self = this;
     clearTimeout(this.balancer_timeout);
     this.balancer_timeout = setTimeout(function () {
      log.warn('balancer', 'Balancer not checking in, re-notifying of existence...');
      self.balancer.reconnect();
     }, this.balancer_check_interval * 3 + Math.random() * 500);
    };
    

    Tilestrata发送的内容清楚了,TB接受到Tilestrata发送的请求,如何处理以及注册到负载均衡的呢?先看处理请求:

    Balancer.prototype.handleNodeRegister = function(req, res) {
     function fail(err) {
      log.error(ipaddr, 'Failed to register (' + err.message + ')');
      res.writeHead(500, {'Content-Type': 'text/plain'});
      res.write(err.message);
      return res.end();
     }
    
     var self = this;
     var ipaddr = req.connection.remoteAddress;
     //格式化注册实体
     this._getJSONBody(req, function(err, body) {
      if (err) return fail(err);
      //注册节点,反馈信息给Tilestra
      self.nodes.register(ipaddr, body, function(err, added) {
       if (err) return fail(err);
       res.writeHead(added ? 201 : 200, {'Content-Type': 'application/json'});
       res.write(self.register_body);
       res.end();
      });
     });
    };
    

    请求的处理其实没什么好说的.关键点在于节点的注册.TB是负载均衡服务,因此采用了常见的一致性哈希(hashring)来作为分配的请求的算法:

    NodeList.prototype.register = function(ipaddr, body, callback) {
     var self = this;
     var id = body.id;
     var target = ipaddr + ':' + body.listen_port;
     var existing_id = this.ids_by_host[target];
     var update_ring = true;
     //避免重复注册
     //重复注册存在双重验证:id重复,或ip+端口重复
     if (this.hosts_by_id[id]) {
      log.info('pool', target + ' already in pool');
      return callback(null, false);
     } else if (existing_id && existing_id !== id) {
      //对与id不重复,但ip+端口出现重复,可能是节点重启了
      //释放上一次的id,但不用刷新一致性哈希表,因为除了id,其他的一切都没变
      this.unreference(existing_id);
      update_ring = false;
      log.warn('pool', target + ' already in pool w/different id (node did not exit cleanly): ' + existing_id + ' != ' + id);
     }
    
     var entry = {};
     //权重
     entry[target] = {weight: body.node_weight || 1};
    
     this.ids_by_host[target] = id;
     this.hosts_by_id[id] = target;
     this.layers_by_id[id] = [];
     //将信息插入一致性哈希中
     body.layers.forEach(function(layer) {
      self.layers_by_id[id].push(layer.name);
      self.layer_options[layer.name] = layer.options;
      if (update_ring) self.ring(layer.name).add(entry);
     });
     //监视该节点
     this.watch(id);
     if (update_ring) log.info('pool', 'Added ' + target);
     callback(null, update_ring);
    };
    

    动态的负载均衡,需要根据节点的状态进行不同的负载指向:

    //定时检测节点健康程度
    NodeList.prototype.watch = function(id) {
     if (this.check_timers_by_id[id]) return;
    
     var self = this;
     var unhealthy = 0;
     var interval = this.options.checkInterval;
     var maxUnhealthy = this.options.unhealthyCount;
     var target = this.hosts_by_id[id];
     //Tilestrata的获取健康度接口
     var check_url = 'http://' + target + '/health';
    
     function performCheck() {
      self.check_timers_by_id[id] = setTimeout(function() {
       request.get(check_url, {
        timeout: interval,
        headers: {
         'X-TileStrataBalancer-Token': self.token
        }
       }, function(err, res) {
        //节点不健康程度超过阈值,就注销节点
        if (err || res.statusCode !== 200) ++unhealthy;
        else unhealthy = Math.max(0, unhealthy-1);
        if (unhealthy >= maxUnhealthy) {
         log.warn(null, target + ' is unhealthy. Removing...');
         self.unregister(id);
        } else {
         performCheck();
        }
       });
      }, interval);
     }
    
     performCheck();
    };
    

    其中的获取健康度方法,调用了上一次我没有提到的Tilestrata中的'health'接口.
    Tilestrata/routes/health.js中的代码:

    module.exports = function(req, res, server) {
     var status = 200;
     var host = process.env.TILESTRATA_HIDEHOSTNAME ? '(hidden)' : hostname;
     var data = _.extend({
      ok: true,
      version: server.version,
      host: host,
      uptime: humanizeDuration(server.uptime().duration),
      uptime_s: server.uptime().duration / 1000
     }, result);
    
     if (server.options.balancer) {
      var balancer_status = 'initializing';
      if (server.balancer) {
       if (server.balancer.reconnecting()) {
        balancer_status = 'connecting';
       } else {
        balancer_status = 'connected';
       }
      }
      data.balancer = {status: balancer_status};
     }
    
     //告知Tilestrata,TB已经发现其存在,可以不断地进行心跳ping
     var incoming_token = req.headers && req.headers['x-tilestratabalancer-token'];
     if (incoming_token && incoming_token === server.balancer_token) {
      server.handleBalancerPing();
     }
    
     var resbuffer = new Buffer(JSON.stringify(data), 'utf8');
     res.writeHead(status, {'Content-Length': resbuffer.length, 'Content-Type': 'application/json'});
     res.write(resbuffer);
     res.end();
    };
    

    可以看出,Tilestrata只会主动相应200状态,因此,只有当节点上Tilestrata因负载太大发生超时或错误时,才会让TB注销该节点.
    其实如果更加智能一些,可以根据Tilestrata的profile数据中的相应指标,动态的调节该节点在TB中的权重.

    处理对外的请求:

    Balancer.prototype._listenPublic = function(callback) {
     var self = this;
     this.http_public = http.createServer(function(req, res) {
      //处理特殊请求
      if (req.url === '/robots.txt') {
       res.writeHead(200, {'Content-Length': BUFFER_ROBOTSTXT.length, 'Content-Type': 'text/plain'});
       res.write(BUFFER_ROBOTSTXT);
       res.end();
      } else if (req.url === '/health') {
       //这里其实可以返回所有节点的健康状况
       res.writeHead(200, {'Content-Type': 'application/json'});
       res.write('{"ok":true}');
       res.end();
      } else {
       //反向代理地图瓦片请求
       self.handleProxyRequest(req, res, function() {
        res.writeHead(404, {'Content-Type': 'text/plain'});
        res.write('Unrecognized URL');
        res.end();
       });
      }
     });
     //监听对外公开接口
     var port = this.options.port;
     var hostname = this.options.hostname;
     this.http_public.listen(port, hostname, function(err) {
      callback(err);
     });
    };
    

    TB的主要任务就是接受客户端的地图瓦片请求,负载均衡然后进行反代:

    Balancer.prototype.handleProxyRequest = function(req, res, next) {
     //初步筛选一些格式非法的请求
     var parts = req.url.substring(1).split('/');
     if (parts.length < 5) return next();
     var layer = parts[0];
     var z = Number(parts[1]);
     var x = Number(parts[2]);
     var y = Number(parts[3]);
     if (isNaN(x) || isNaN(y) || isNaN(z)) {
      return next();
     }
    
     var file = parts[4];
     var qspos = file.indexOf('?');
     if (qspos > -1) file = file.substring(0, qspos);
     //通过一致性哈希获取最适宜节点
     var target = this.nodes.pick(layer, file, z, x, y);
     if (!target) {
      res.writeHead(404, {'Content-Type': 'text/plain'});
      res.write('No servers found to handle the request');
      return res.end();
     }
     //进行反向代理
     this.proxy.web(req, res, {target: 'http://'+target});
    };
    

    核心还是最适宜节点的选取:

    NodeList.prototype.pick = function(layer, file, z, x, y) {
     var layer_opts = this.layer_options[layer];
     if (!layer_opts) return;
    
     //进一步根据缩放级别筛选不合理请求
     if (typeof layer_opts.minZoom === 'number' && z < layer_opts.minZoom) return;
     if (typeof layer_opts.maxZoom === 'number' && z > layer_opts.maxZoom) return;
    
     //进一步根据请求范围筛选不合理请求
     if (!testbbox(x, y, z, layer_opts.bbox)) return;
    
     //可以修复偏移
     var metatile = layer_opts.metatile || 1;
     x -= x % metatile;
     y -= y % metatile;
    
     //这里TB不建议把文件类型加入key,
     //因为对于同一个图层的不同种类文件,如.png后缀和.jpg后缀,都应放在同一个服务上,
     //以提高缓存的命中率与利用率,因此不需要作为负载均衡的哈希key.
     var key = layer + '/' + z + '/' + x + '/' + y;
     return this.ring(layer).get(key);
    };
    

    总结

    TB的实现给了Tilestrata一种很灵活的分压模式:既可以将所有图层挂载到一个Tilestrata上,启动若干个相同的实例,也可以将图层打散,启动若干实例,每个实例挂载不重复的图层.
    前一种模式是很普遍的负载均衡,后一种模式则可能更适用于某些特定情况:几个不常用图层,放到一台低性能主机上,几个常用图层放到另一台高性能主机上,甚至对于常用图层,可以开若干专用节点,达到最大性能利用.

    相关文章

      网友评论

        本文标题:How it works(4) Tilestrata源码阅读(B

        本文链接:https://www.haomeiwen.com/subject/zhhksqtx.html