How it works(4) Tilestrata源码阅读(B

作者: 默而识之者 | 来源:发表于2019-01-31 09:57 被阅读10次

引入

阅读Tilestrata的源码是绕不开Tilestrata-Balancer(以下简称TB)这个默认的负载均衡服务的.其特性是:

  • 支持动态增加,减少节点
  • 定时检查节点健康状况
  • 简单的请求过滤

大多数服务会采取专业的负载均衡或者反向代理,比如常见的nginx.理论上讲,nodejs的http-proxy性能是拍马也不及nginx的,但对于地图瓦片服务,绝大多数瓶颈在瓦片的生成,而非反向代理和负载均衡的效率,直接使用node实现很大的增加了灵活性,性能实际也没什么大下降.

架构

注册与注销节点

TB本质上是个反向代理服务器,处理对内和对外两方面的请求:

  • 对内的请求:接收局域网内各个Tilestrata服务的的注册与注销请求.
  • 对外的请求:接收客户端发过来的地图瓦片请求.
Balancer.prototype.listen = function(callback) {
 callback = callback || function() {};
 var self = this;
 async.parallel([
  //同时开启对外对内的端口监听
  this._listenPrivate.bind(this),
  this._listenPublic.bind(this)
 ], function(err) {
  if (err) {
   log.error(null, err);
   return self.close(function() {
    callback(err);
   });
  }
  return callback();
 });
};

处理对内的请求:

Balancer.prototype._listenPrivate = function(callback) {
 var self = this;
 this.http_private = http.createServer(function(req, res) {
  //对内通过向'nodes'接口post数据,供Tilestrata服务注册节点或注销节点
  if (req.method === 'POST' && req.url === '/nodes') {
   self.handleNodeRegister(req, res);
  } else if (req.method === 'DELETE' && REGEX_NODES.test(req.url)) {
   self.handleNodeUnregister(req, res);
  } else {
   res.writeHead(404, {'Content-Type': 'text/plain'});
   res.write('Unrecognized URL');
   res.end();
  }
 });
 //监听对内专用端口
 var port = this.options.privatePort;
 var hostname = this.options.hostname;
 this.http_private.listen(port, hostname, function(err) {
  callback(err);
 });
};

以注册节点为例(注销节点代码类似,逻辑相反),TB是被动注册的,它需要Tilestrata主动Post一个JSON格式的实体,将自身的信息传递给负载均衡服务,使TB找到这个节点,进而完成注册.要了解Tilestrata与TB交互的细节,需要看看Tilestrata中我在上一篇中没提到的有关负载均衡的代码.
Tilestrata/TileServer.js中的代码:

//构建请求实体
TileServer.prototype._buildBalancerData = function () {
 var self = this;
 //绑定所有的图层
 return {
  id: this.uuid,
  version: this.version,
  listen_port: this.http_port,
  node_weight: this.options.balancer.node_weight || 1,
  layers: Object.keys(this.layers).map(function (layerName) {
   var layer = self.layer(layerName);
   return {
    name: layerName,
    options: layer.options,
    routes: Object.keys(layer.routes)
   };
  })
 };
};
//绑定节点到TB上
TileServer.prototype.bindBalancer = function () {
 var self = this;
 var balancer = this.options.balancer;
 if (!balancer) return;
 if (this.balancer) return;
 //断线重连
 var recovery = this.balancer = new Recovery({
  'retries': Infinity,
  'min': balancer.register_mindelay || '1000 ms',
  'max': balancer.register_maxdelay || '30 seconds',
  'reconnect timeout': balancer.register_timeout || '30 seconds'
 });
 //每次断线重连都重新注册
 recovery.on('reconnect', function (opts) {
  log.info('balancer', 'Attempting to register with ' + balancer.host + '...');
  
  request({
   json: true,
   method: 'POST',
   url: 'http://' + balancer.host + '/nodes',
   timeout: balancer.register_timeout || 30000,
   body: self._buildBalancerData()
  }, function (err, res, body) {
   //处理TB返回的参数
   if (err) {
    var level = err.code === 'ETIMEDOUT' || err.code === 'ECONNREFUSED' ? 'warn' : 'error';
    log[level]('balancer', 'Registration failed (' + err.message + ')');
    return recovery.reconnected(err);
   } else if (res.statusCode !== 201 && res.statusCode !== 200) {
    log.warn('balancer', 'Registration failed (HTTP ' + res.statusCode + ' ' + HTTPStatus[res.statusCode] + ')');
    return recovery.reconnected(new Error('Non 201 status (' + res.statusCode + ')'));
   } else if (!body || typeof body !== 'object') {
    log.error('balancer', 'Registration failed (invalid body)');
    return recovery.reconnected(new Error('Invalid body from balancer'));
   } else if (!body.token) {
    log.error('balancer', 'Registration failed ("token" not provided)');
    return recovery.reconnected(new Error('Invalid token'));
   } else if (isNaN(body.check_interval)) {
    log.error('balancer', 'Registration failed ("check_interval" not a number or missing)');
    return recovery.reconnected(new Error('Invalid check_interval'));
   }

   log.info('balancer', res.statusCode === 201 ? 'Successfully registered' : 'Already in pool');
   //获取TB返回的token
   self.balancer_token = body.token;
   //获取TB的存活检查时间间隔
   self.balancer_check_interval = body.check_interval;
   //TB连接保证
   self.handleBalancerPing();
   //主动重连
   recovery.reconnected();
  });
 });

 recovery.reconnect();
};
//预防TB超时未检查,自动重连
TileServer.prototype.handleBalancerPing = function () {
 //每次执行健康检查时,都会执行该函数,从而重置超时计时器
 //如果因为各种原因TB没有进行节点健康监测,则视为断线,重新注册给TB
 var self = this;
 clearTimeout(this.balancer_timeout);
 this.balancer_timeout = setTimeout(function () {
  log.warn('balancer', 'Balancer not checking in, re-notifying of existence...');
  self.balancer.reconnect();
 }, this.balancer_check_interval * 3 + Math.random() * 500);
};

Tilestrata发送的内容清楚了,TB接受到Tilestrata发送的请求,如何处理以及注册到负载均衡的呢?先看处理请求:

Balancer.prototype.handleNodeRegister = function(req, res) {
 function fail(err) {
  log.error(ipaddr, 'Failed to register (' + err.message + ')');
  res.writeHead(500, {'Content-Type': 'text/plain'});
  res.write(err.message);
  return res.end();
 }

 var self = this;
 var ipaddr = req.connection.remoteAddress;
 //格式化注册实体
 this._getJSONBody(req, function(err, body) {
  if (err) return fail(err);
  //注册节点,反馈信息给Tilestra
  self.nodes.register(ipaddr, body, function(err, added) {
   if (err) return fail(err);
   res.writeHead(added ? 201 : 200, {'Content-Type': 'application/json'});
   res.write(self.register_body);
   res.end();
  });
 });
};

请求的处理其实没什么好说的.关键点在于节点的注册.TB是负载均衡服务,因此采用了常见的一致性哈希(hashring)来作为分配的请求的算法:

NodeList.prototype.register = function(ipaddr, body, callback) {
 var self = this;
 var id = body.id;
 var target = ipaddr + ':' + body.listen_port;
 var existing_id = this.ids_by_host[target];
 var update_ring = true;
 //避免重复注册
 //重复注册存在双重验证:id重复,或ip+端口重复
 if (this.hosts_by_id[id]) {
  log.info('pool', target + ' already in pool');
  return callback(null, false);
 } else if (existing_id && existing_id !== id) {
  //对与id不重复,但ip+端口出现重复,可能是节点重启了
  //释放上一次的id,但不用刷新一致性哈希表,因为除了id,其他的一切都没变
  this.unreference(existing_id);
  update_ring = false;
  log.warn('pool', target + ' already in pool w/different id (node did not exit cleanly): ' + existing_id + ' != ' + id);
 }

 var entry = {};
 //权重
 entry[target] = {weight: body.node_weight || 1};

 this.ids_by_host[target] = id;
 this.hosts_by_id[id] = target;
 this.layers_by_id[id] = [];
 //将信息插入一致性哈希中
 body.layers.forEach(function(layer) {
  self.layers_by_id[id].push(layer.name);
  self.layer_options[layer.name] = layer.options;
  if (update_ring) self.ring(layer.name).add(entry);
 });
 //监视该节点
 this.watch(id);
 if (update_ring) log.info('pool', 'Added ' + target);
 callback(null, update_ring);
};

动态的负载均衡,需要根据节点的状态进行不同的负载指向:

//定时检测节点健康程度
NodeList.prototype.watch = function(id) {
 if (this.check_timers_by_id[id]) return;

 var self = this;
 var unhealthy = 0;
 var interval = this.options.checkInterval;
 var maxUnhealthy = this.options.unhealthyCount;
 var target = this.hosts_by_id[id];
 //Tilestrata的获取健康度接口
 var check_url = 'http://' + target + '/health';

 function performCheck() {
  self.check_timers_by_id[id] = setTimeout(function() {
   request.get(check_url, {
    timeout: interval,
    headers: {
     'X-TileStrataBalancer-Token': self.token
    }
   }, function(err, res) {
    //节点不健康程度超过阈值,就注销节点
    if (err || res.statusCode !== 200) ++unhealthy;
    else unhealthy = Math.max(0, unhealthy-1);
    if (unhealthy >= maxUnhealthy) {
     log.warn(null, target + ' is unhealthy. Removing...');
     self.unregister(id);
    } else {
     performCheck();
    }
   });
  }, interval);
 }

 performCheck();
};

其中的获取健康度方法,调用了上一次我没有提到的Tilestrata中的'health'接口.
Tilestrata/routes/health.js中的代码:

module.exports = function(req, res, server) {
 var status = 200;
 var host = process.env.TILESTRATA_HIDEHOSTNAME ? '(hidden)' : hostname;
 var data = _.extend({
  ok: true,
  version: server.version,
  host: host,
  uptime: humanizeDuration(server.uptime().duration),
  uptime_s: server.uptime().duration / 1000
 }, result);

 if (server.options.balancer) {
  var balancer_status = 'initializing';
  if (server.balancer) {
   if (server.balancer.reconnecting()) {
    balancer_status = 'connecting';
   } else {
    balancer_status = 'connected';
   }
  }
  data.balancer = {status: balancer_status};
 }

 //告知Tilestrata,TB已经发现其存在,可以不断地进行心跳ping
 var incoming_token = req.headers && req.headers['x-tilestratabalancer-token'];
 if (incoming_token && incoming_token === server.balancer_token) {
  server.handleBalancerPing();
 }

 var resbuffer = new Buffer(JSON.stringify(data), 'utf8');
 res.writeHead(status, {'Content-Length': resbuffer.length, 'Content-Type': 'application/json'});
 res.write(resbuffer);
 res.end();
};

可以看出,Tilestrata只会主动相应200状态,因此,只有当节点上Tilestrata因负载太大发生超时或错误时,才会让TB注销该节点.
其实如果更加智能一些,可以根据Tilestrata的profile数据中的相应指标,动态的调节该节点在TB中的权重.

处理对外的请求:

Balancer.prototype._listenPublic = function(callback) {
 var self = this;
 this.http_public = http.createServer(function(req, res) {
  //处理特殊请求
  if (req.url === '/robots.txt') {
   res.writeHead(200, {'Content-Length': BUFFER_ROBOTSTXT.length, 'Content-Type': 'text/plain'});
   res.write(BUFFER_ROBOTSTXT);
   res.end();
  } else if (req.url === '/health') {
   //这里其实可以返回所有节点的健康状况
   res.writeHead(200, {'Content-Type': 'application/json'});
   res.write('{"ok":true}');
   res.end();
  } else {
   //反向代理地图瓦片请求
   self.handleProxyRequest(req, res, function() {
    res.writeHead(404, {'Content-Type': 'text/plain'});
    res.write('Unrecognized URL');
    res.end();
   });
  }
 });
 //监听对外公开接口
 var port = this.options.port;
 var hostname = this.options.hostname;
 this.http_public.listen(port, hostname, function(err) {
  callback(err);
 });
};

TB的主要任务就是接受客户端的地图瓦片请求,负载均衡然后进行反代:

Balancer.prototype.handleProxyRequest = function(req, res, next) {
 //初步筛选一些格式非法的请求
 var parts = req.url.substring(1).split('/');
 if (parts.length < 5) return next();
 var layer = parts[0];
 var z = Number(parts[1]);
 var x = Number(parts[2]);
 var y = Number(parts[3]);
 if (isNaN(x) || isNaN(y) || isNaN(z)) {
  return next();
 }

 var file = parts[4];
 var qspos = file.indexOf('?');
 if (qspos > -1) file = file.substring(0, qspos);
 //通过一致性哈希获取最适宜节点
 var target = this.nodes.pick(layer, file, z, x, y);
 if (!target) {
  res.writeHead(404, {'Content-Type': 'text/plain'});
  res.write('No servers found to handle the request');
  return res.end();
 }
 //进行反向代理
 this.proxy.web(req, res, {target: 'http://'+target});
};

核心还是最适宜节点的选取:

NodeList.prototype.pick = function(layer, file, z, x, y) {
 var layer_opts = this.layer_options[layer];
 if (!layer_opts) return;

 //进一步根据缩放级别筛选不合理请求
 if (typeof layer_opts.minZoom === 'number' && z < layer_opts.minZoom) return;
 if (typeof layer_opts.maxZoom === 'number' && z > layer_opts.maxZoom) return;

 //进一步根据请求范围筛选不合理请求
 if (!testbbox(x, y, z, layer_opts.bbox)) return;

 //可以修复偏移
 var metatile = layer_opts.metatile || 1;
 x -= x % metatile;
 y -= y % metatile;

 //这里TB不建议把文件类型加入key,
 //因为对于同一个图层的不同种类文件,如.png后缀和.jpg后缀,都应放在同一个服务上,
 //以提高缓存的命中率与利用率,因此不需要作为负载均衡的哈希key.
 var key = layer + '/' + z + '/' + x + '/' + y;
 return this.ring(layer).get(key);
};

总结

TB的实现给了Tilestrata一种很灵活的分压模式:既可以将所有图层挂载到一个Tilestrata上,启动若干个相同的实例,也可以将图层打散,启动若干实例,每个实例挂载不重复的图层.
前一种模式是很普遍的负载均衡,后一种模式则可能更适用于某些特定情况:几个不常用图层,放到一台低性能主机上,几个常用图层放到另一台高性能主机上,甚至对于常用图层,可以开若干专用节点,达到最大性能利用.

相关文章

网友评论

    本文标题:How it works(4) Tilestrata源码阅读(B

    本文链接:https://www.haomeiwen.com/subject/zhhksqtx.html