
nginx配置
worker_processes 2;
error_log logs/error.log info;
events {
worker_connections 1024;
}
http {
lua_shared_dict _upstream 2m;
init_worker_by_lua_file "/usr/local/openresty/nginx/conf/lua_conf/init.lua";
include mime.types;
default_type application/octet-stream;
sendfile on;
keepalive_timeout 65;
upstream backend {
server 0.0.0.1;
balancer_by_lua_file "/usr/local/openresty/nginx/conf/lua_conf/upstream.lua";
}
server {
listen 7001;
server_name localhost;
location / {
root html;
index index.html index.htm;
proxy_pass http://backend;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
lua文件
相关Lua库
- json解析库:https://github.com/mpx/lua-cjson
- 负载均衡库:https://github.com/openresty/lua-resty-balancer
- md5库:https://github.com/kikito/md5.lua
定时拉取upstream
在worker0中启动一个定时器,定时从discovery拉取节点的负载信息,然后写入的worker之间的共享内存中
init.lua
local http = require "resty.http"
local cjson = require "cjson"
-- 通过API获取负载平衡信息
local upstream_url = "http://127.0.0.1:8081/get_upstream"
local method_ = "GET"
-- 更新间隔
local delay = 2
local new_timer = ngx.timer.every
-- 轮询逻辑
local update = function(premature)
if not premature then
local httpc = http:new()
local resp, err = httpc:request_uri(upstream_url, {
method = method_,
})
if not resp then
ngx.log(ngx.ERR, "get upstream err:", err)
else
ngx.shared._upstream:set("_upstream", resp.body)
end
end
end
if 0 == ngx.worker.id() then
local ok, err = new_timer(delay, update)
if not ok then
ngx.log(ngx.ERR, "create timer err:", err)
return
end
end
检测upstream
请求时从共享内存中Load负载数据,对比上次请求负载的md5,如果不同则更新
local b = require "ngx.balancer"
local md5 = require 'md5'
local cjson = require "cjson"
local resty_roundrobin = require "resty.roundrobin"
function server_data_update()
local data = ngx.shared._upstream:get("_upstream")
local upstream_data = cjson.decode(data)
if not upstream_data then
ngx.log(ngx.ERR, "cjson.decode err:", err)
return
end
local message = upstream_data["message"]
if not message then
ngx.log(ngx.ERR, "message data not found, data:", data)
return
end
local md5_new = md5.sumhexa(data)
local md5_old = package.loaded.upstream_md5
-- md5不同,数据有更新
if md5_old == md5_new then
ngx.log(ngx.INFO, "md5 sum upstream not update")
return
end
ngx.log(ngx.INFO, "md5 not sum old:", md5_old, "===> new:",md5_new, " data:", data)
package.loaded.upstream_md5 = md5_new
local server_list = {}
for k, v in pairs(message) do
server_list[v["addr"]] = v["weight"]
end
package.loaded.upstream = resty_roundrobin:new(server_list)
end
server_data_update()
local upstream = package.loaded.upstream
local server = upstream:find()
assert(b.set_current_peer(server))
服务端
提供一个Discovery的接口
和一个获取进程信息的接口
package main
import (
"github.com/gin-gonic/gin"
"flag"
"fmt"
)
var port int
func main() {
flag.IntVar(&port, "port", 8081, "端口")
flag.Parse()
r := gin.Default()
r.GET("/ping", func(c *gin.Context) {
c.JSON(200, gin.H{
"message": "pong",
})
})
r.GET("/info", GetInfo)
r.GET("/get_upstream", GetUpsteam)
r.Run(fmt.Sprintf(":%d", port)) // listen and serve on 0.0.0.0:8080 (for windows "localhost:8080")
}
type upstream struct {
Addr string `json:"addr"`
Weight int64 `json:"weight"`
}
func GetUpsteam(c *gin.Context) {
c.JSON(200, gin.H{
"message":[]*upstream{&upstream{"127.0.0.1:8001",2}, &upstream{"127.0.0.1:8002",1}},
})
}
func GetInfo(c *gin.Context) {
c.JSON(200, gin.H{
"message": fmt.Sprintf("%d SVR", port),
})
}
测试
起3个进程
go run main.go -port=7001 //充当discovery
go run main.go -port=8001 //充当业务服务A
go run main.go -port=8002 //充当业务服务B
请求:for i in `seq 1 10`; do curl http://127.0.0.1:7001/info; echo ''; done
upstream修改前结果
upstream{"127.0.0.1:8001",2}, &upstream{"127.0.0.1:8002",1}
{"message":"8001 SVR"}
{"message":"8001 SVR"}
{"message":"8002 SVR"}
{"message":"8001 SVR"}
{"message":"8001 SVR"}
{"message":"8002 SVR"}
{"message":"8001 SVR"}
{"message":"8001 SVR"}
{"message":"8002 SVR"}
{"message":"8001 SVR"}
upstream修改后结果
upstream{"127.0.0.1:8001",1}, &upstream{"127.0.0.1:8002",1}
{"message":"8002 SVR"}
{"message":"8001 SVR"}
{"message":"8002 SVR"}
{"message":"8001 SVR"}
{"message":"8002 SVR"}
{"message":"8001 SVR"}
{"message":"8002 SVR"}
{"message":"8001 SVR"}
{"message":"8002 SVR"}
{"message":"8001 SVR"}
网友评论