- How it works(4) Tilestrata源码阅读(B
- How it works(3) Tilestrata源码阅读(A
- How it works(11) NodeODM源码阅读(B)
- How it works(6) TileStache源码阅读(B
- How it works(2) autocannon源码阅读(A
- How it works(10) NodeODM源码阅读(A)
- How it works(5) TileStache源码阅读(A
- 现有金融体系相比比特币也不是一无是处
- JavaScript和JavaScript引擎是怎么工作
- How it works(1) winston3源码阅读(A
引入
阅读Tilestrata的源码是绕不开Tilestrata-Balancer(以下简称TB)这个默认的负载均衡服务的.其特性是:
- 支持动态增加,减少节点
- 定时检查节点健康状况
- 简单的请求过滤
大多数服务会采取专业的负载均衡或者反向代理,比如常见的nginx.理论上讲,nodejs的http-proxy性能是拍马也不及nginx的,但对于地图瓦片服务,绝大多数瓶颈在瓦片的生成,而非反向代理和负载均衡的效率,直接使用node实现很大的增加了灵活性,性能实际也没什么大下降.
架构
注册与注销节点
TB本质上是个反向代理服务器,处理对内和对外两方面的请求:
- 对内的请求:接收局域网内各个Tilestrata服务的的注册与注销请求.
- 对外的请求:接收客户端发过来的地图瓦片请求.
Balancer.prototype.listen = function(callback) {
callback = callback || function() {};
var self = this;
async.parallel([
//同时开启对外对内的端口监听
this._listenPrivate.bind(this),
this._listenPublic.bind(this)
], function(err) {
if (err) {
log.error(null, err);
return self.close(function() {
callback(err);
});
}
return callback();
});
};
处理对内的请求:
Balancer.prototype._listenPrivate = function(callback) {
var self = this;
this.http_private = http.createServer(function(req, res) {
//对内通过向'nodes'接口post数据,供Tilestrata服务注册节点或注销节点
if (req.method === 'POST' && req.url === '/nodes') {
self.handleNodeRegister(req, res);
} else if (req.method === 'DELETE' && REGEX_NODES.test(req.url)) {
self.handleNodeUnregister(req, res);
} else {
res.writeHead(404, {'Content-Type': 'text/plain'});
res.write('Unrecognized URL');
res.end();
}
});
//监听对内专用端口
var port = this.options.privatePort;
var hostname = this.options.hostname;
this.http_private.listen(port, hostname, function(err) {
callback(err);
});
};
以注册节点为例(注销节点代码类似,逻辑相反),TB是被动注册的,它需要Tilestrata主动Post一个JSON格式的实体,将自身的信息传递给负载均衡服务,使TB找到这个节点,进而完成注册.要了解Tilestrata与TB交互的细节,需要看看Tilestrata中我在上一篇中没提到的有关负载均衡的代码.
Tilestrata/TileServer.js中的代码:
//构建请求实体
TileServer.prototype._buildBalancerData = function () {
var self = this;
//绑定所有的图层
return {
id: this.uuid,
version: this.version,
listen_port: this.http_port,
node_weight: this.options.balancer.node_weight || 1,
layers: Object.keys(this.layers).map(function (layerName) {
var layer = self.layer(layerName);
return {
name: layerName,
options: layer.options,
routes: Object.keys(layer.routes)
};
})
};
};
//绑定节点到TB上
TileServer.prototype.bindBalancer = function () {
var self = this;
var balancer = this.options.balancer;
if (!balancer) return;
if (this.balancer) return;
//断线重连
var recovery = this.balancer = new Recovery({
'retries': Infinity,
'min': balancer.register_mindelay || '1000 ms',
'max': balancer.register_maxdelay || '30 seconds',
'reconnect timeout': balancer.register_timeout || '30 seconds'
});
//每次断线重连都重新注册
recovery.on('reconnect', function (opts) {
log.info('balancer', 'Attempting to register with ' + balancer.host + '...');
request({
json: true,
method: 'POST',
url: 'http://' + balancer.host + '/nodes',
timeout: balancer.register_timeout || 30000,
body: self._buildBalancerData()
}, function (err, res, body) {
//处理TB返回的参数
if (err) {
var level = err.code === 'ETIMEDOUT' || err.code === 'ECONNREFUSED' ? 'warn' : 'error';
log[level]('balancer', 'Registration failed (' + err.message + ')');
return recovery.reconnected(err);
} else if (res.statusCode !== 201 && res.statusCode !== 200) {
log.warn('balancer', 'Registration failed (HTTP ' + res.statusCode + ' ' + HTTPStatus[res.statusCode] + ')');
return recovery.reconnected(new Error('Non 201 status (' + res.statusCode + ')'));
} else if (!body || typeof body !== 'object') {
log.error('balancer', 'Registration failed (invalid body)');
return recovery.reconnected(new Error('Invalid body from balancer'));
} else if (!body.token) {
log.error('balancer', 'Registration failed ("token" not provided)');
return recovery.reconnected(new Error('Invalid token'));
} else if (isNaN(body.check_interval)) {
log.error('balancer', 'Registration failed ("check_interval" not a number or missing)');
return recovery.reconnected(new Error('Invalid check_interval'));
}
log.info('balancer', res.statusCode === 201 ? 'Successfully registered' : 'Already in pool');
//获取TB返回的token
self.balancer_token = body.token;
//获取TB的存活检查时间间隔
self.balancer_check_interval = body.check_interval;
//TB连接保证
self.handleBalancerPing();
//主动重连
recovery.reconnected();
});
});
recovery.reconnect();
};
//预防TB超时未检查,自动重连
TileServer.prototype.handleBalancerPing = function () {
//每次执行健康检查时,都会执行该函数,从而重置超时计时器
//如果因为各种原因TB没有进行节点健康监测,则视为断线,重新注册给TB
var self = this;
clearTimeout(this.balancer_timeout);
this.balancer_timeout = setTimeout(function () {
log.warn('balancer', 'Balancer not checking in, re-notifying of existence...');
self.balancer.reconnect();
}, this.balancer_check_interval * 3 + Math.random() * 500);
};
Tilestrata发送的内容清楚了,TB接受到Tilestrata发送的请求,如何处理以及注册到负载均衡的呢?先看处理请求:
Balancer.prototype.handleNodeRegister = function(req, res) {
function fail(err) {
log.error(ipaddr, 'Failed to register (' + err.message + ')');
res.writeHead(500, {'Content-Type': 'text/plain'});
res.write(err.message);
return res.end();
}
var self = this;
var ipaddr = req.connection.remoteAddress;
//格式化注册实体
this._getJSONBody(req, function(err, body) {
if (err) return fail(err);
//注册节点,反馈信息给Tilestra
self.nodes.register(ipaddr, body, function(err, added) {
if (err) return fail(err);
res.writeHead(added ? 201 : 200, {'Content-Type': 'application/json'});
res.write(self.register_body);
res.end();
});
});
};
请求的处理其实没什么好说的.关键点在于节点的注册.TB是负载均衡服务,因此采用了常见的一致性哈希(hashring)来作为分配的请求的算法:
NodeList.prototype.register = function(ipaddr, body, callback) {
var self = this;
var id = body.id;
var target = ipaddr + ':' + body.listen_port;
var existing_id = this.ids_by_host[target];
var update_ring = true;
//避免重复注册
//重复注册存在双重验证:id重复,或ip+端口重复
if (this.hosts_by_id[id]) {
log.info('pool', target + ' already in pool');
return callback(null, false);
} else if (existing_id && existing_id !== id) {
//对与id不重复,但ip+端口出现重复,可能是节点重启了
//释放上一次的id,但不用刷新一致性哈希表,因为除了id,其他的一切都没变
this.unreference(existing_id);
update_ring = false;
log.warn('pool', target + ' already in pool w/different id (node did not exit cleanly): ' + existing_id + ' != ' + id);
}
var entry = {};
//权重
entry[target] = {weight: body.node_weight || 1};
this.ids_by_host[target] = id;
this.hosts_by_id[id] = target;
this.layers_by_id[id] = [];
//将信息插入一致性哈希中
body.layers.forEach(function(layer) {
self.layers_by_id[id].push(layer.name);
self.layer_options[layer.name] = layer.options;
if (update_ring) self.ring(layer.name).add(entry);
});
//监视该节点
this.watch(id);
if (update_ring) log.info('pool', 'Added ' + target);
callback(null, update_ring);
};
动态的负载均衡,需要根据节点的状态进行不同的负载指向:
//定时检测节点健康程度
NodeList.prototype.watch = function(id) {
if (this.check_timers_by_id[id]) return;
var self = this;
var unhealthy = 0;
var interval = this.options.checkInterval;
var maxUnhealthy = this.options.unhealthyCount;
var target = this.hosts_by_id[id];
//Tilestrata的获取健康度接口
var check_url = 'http://' + target + '/health';
function performCheck() {
self.check_timers_by_id[id] = setTimeout(function() {
request.get(check_url, {
timeout: interval,
headers: {
'X-TileStrataBalancer-Token': self.token
}
}, function(err, res) {
//节点不健康程度超过阈值,就注销节点
if (err || res.statusCode !== 200) ++unhealthy;
else unhealthy = Math.max(0, unhealthy-1);
if (unhealthy >= maxUnhealthy) {
log.warn(null, target + ' is unhealthy. Removing...');
self.unregister(id);
} else {
performCheck();
}
});
}, interval);
}
performCheck();
};
其中的获取健康度方法,调用了上一次我没有提到的Tilestrata中的'health'接口.
Tilestrata/routes/health.js中的代码:
module.exports = function(req, res, server) {
var status = 200;
var host = process.env.TILESTRATA_HIDEHOSTNAME ? '(hidden)' : hostname;
var data = _.extend({
ok: true,
version: server.version,
host: host,
uptime: humanizeDuration(server.uptime().duration),
uptime_s: server.uptime().duration / 1000
}, result);
if (server.options.balancer) {
var balancer_status = 'initializing';
if (server.balancer) {
if (server.balancer.reconnecting()) {
balancer_status = 'connecting';
} else {
balancer_status = 'connected';
}
}
data.balancer = {status: balancer_status};
}
//告知Tilestrata,TB已经发现其存在,可以不断地进行心跳ping
var incoming_token = req.headers && req.headers['x-tilestratabalancer-token'];
if (incoming_token && incoming_token === server.balancer_token) {
server.handleBalancerPing();
}
var resbuffer = new Buffer(JSON.stringify(data), 'utf8');
res.writeHead(status, {'Content-Length': resbuffer.length, 'Content-Type': 'application/json'});
res.write(resbuffer);
res.end();
};
可以看出,Tilestrata只会主动相应200状态,因此,只有当节点上Tilestrata因负载太大发生超时或错误时,才会让TB注销该节点.
其实如果更加智能一些,可以根据Tilestrata的profile数据中的相应指标,动态的调节该节点在TB中的权重.
处理对外的请求:
Balancer.prototype._listenPublic = function(callback) {
var self = this;
this.http_public = http.createServer(function(req, res) {
//处理特殊请求
if (req.url === '/robots.txt') {
res.writeHead(200, {'Content-Length': BUFFER_ROBOTSTXT.length, 'Content-Type': 'text/plain'});
res.write(BUFFER_ROBOTSTXT);
res.end();
} else if (req.url === '/health') {
//这里其实可以返回所有节点的健康状况
res.writeHead(200, {'Content-Type': 'application/json'});
res.write('{"ok":true}');
res.end();
} else {
//反向代理地图瓦片请求
self.handleProxyRequest(req, res, function() {
res.writeHead(404, {'Content-Type': 'text/plain'});
res.write('Unrecognized URL');
res.end();
});
}
});
//监听对外公开接口
var port = this.options.port;
var hostname = this.options.hostname;
this.http_public.listen(port, hostname, function(err) {
callback(err);
});
};
TB的主要任务就是接受客户端的地图瓦片请求,负载均衡然后进行反代:
Balancer.prototype.handleProxyRequest = function(req, res, next) {
//初步筛选一些格式非法的请求
var parts = req.url.substring(1).split('/');
if (parts.length < 5) return next();
var layer = parts[0];
var z = Number(parts[1]);
var x = Number(parts[2]);
var y = Number(parts[3]);
if (isNaN(x) || isNaN(y) || isNaN(z)) {
return next();
}
var file = parts[4];
var qspos = file.indexOf('?');
if (qspos > -1) file = file.substring(0, qspos);
//通过一致性哈希获取最适宜节点
var target = this.nodes.pick(layer, file, z, x, y);
if (!target) {
res.writeHead(404, {'Content-Type': 'text/plain'});
res.write('No servers found to handle the request');
return res.end();
}
//进行反向代理
this.proxy.web(req, res, {target: 'http://'+target});
};
核心还是最适宜节点的选取:
NodeList.prototype.pick = function(layer, file, z, x, y) {
var layer_opts = this.layer_options[layer];
if (!layer_opts) return;
//进一步根据缩放级别筛选不合理请求
if (typeof layer_opts.minZoom === 'number' && z < layer_opts.minZoom) return;
if (typeof layer_opts.maxZoom === 'number' && z > layer_opts.maxZoom) return;
//进一步根据请求范围筛选不合理请求
if (!testbbox(x, y, z, layer_opts.bbox)) return;
//可以修复偏移
var metatile = layer_opts.metatile || 1;
x -= x % metatile;
y -= y % metatile;
//这里TB不建议把文件类型加入key,
//因为对于同一个图层的不同种类文件,如.png后缀和.jpg后缀,都应放在同一个服务上,
//以提高缓存的命中率与利用率,因此不需要作为负载均衡的哈希key.
var key = layer + '/' + z + '/' + x + '/' + y;
return this.ring(layer).get(key);
};
总结
TB的实现给了Tilestrata一种很灵活的分压模式:既可以将所有图层挂载到一个Tilestrata上,启动若干个相同的实例,也可以将图层打散,启动若干实例,每个实例挂载不重复的图层.
前一种模式是很普遍的负载均衡,后一种模式则可能更适用于某些特定情况:几个不常用图层,放到一台低性能主机上,几个常用图层放到另一台高性能主机上,甚至对于常用图层,可以开若干专用节点,达到最大性能利用.
网友评论