美文网首页
nginx(openrestry)实现动态负载均衡

nginx(openrestry)实现动态负载均衡

作者: 一抹圆弧 | 来源:发表于2021-05-17 16:12 被阅读0次
image.png

nginx配置

worker_processes  2;
error_log  logs/error.log  info;

events {
    worker_connections  1024;
}

http {
    lua_shared_dict _upstream 2m;
    init_worker_by_lua_file "/usr/local/openresty/nginx/conf/lua_conf/init.lua";
    include       mime.types;
    default_type  application/octet-stream;
    sendfile        on;
    keepalive_timeout  65;

    upstream backend {
        server 0.0.0.1;
        balancer_by_lua_file "/usr/local/openresty/nginx/conf/lua_conf/upstream.lua";
    }
    server {
        listen       7001;
        server_name  localhost;
        location / {
            root   html;
            index  index.html index.htm;
            proxy_pass http://backend;
        }
        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
       }
}

lua文件

相关Lua库

定时拉取upstream

在worker0中启动一个定时器,定时从discovery拉取节点的负载信息,然后写入的worker之间的共享内存中
init.lua

local http = require "resty.http"
local cjson = require "cjson"
-- 通过API获取负载平衡信息
local upstream_url = "http://127.0.0.1:8081/get_upstream"
local method_ = "GET"

-- 更新间隔
local delay = 2
local new_timer = ngx.timer.every

-- 轮询逻辑
local update = function(premature)
    if not premature then
        local httpc = http:new()
        local resp, err = httpc:request_uri(upstream_url, {
            method = method_,
            })
        if not resp then
            ngx.log(ngx.ERR, "get upstream err:", err)
        else
            ngx.shared._upstream:set("_upstream", resp.body)
        end
    end
end

if 0 == ngx.worker.id() then
    local ok, err = new_timer(delay, update)
    if not ok then
        ngx.log(ngx.ERR, "create timer err:", err)
        return
    end
end

检测upstream

请求时从共享内存中Load负载数据,对比上次请求负载的md5,如果不同则更新

local b = require "ngx.balancer"
local md5 = require 'md5'
local cjson = require "cjson"
local resty_roundrobin = require "resty.roundrobin"

function server_data_update()
    local data = ngx.shared._upstream:get("_upstream")
    local upstream_data = cjson.decode(data)
    if not upstream_data then
        ngx.log(ngx.ERR, "cjson.decode err:", err)
        return
    end 
    local message = upstream_data["message"]
    if not message then
        ngx.log(ngx.ERR, "message data not found, data:", data)
        return
    end

    local md5_new = md5.sumhexa(data)
    local md5_old = package.loaded.upstream_md5
    -- md5不同,数据有更新
    if md5_old == md5_new then
        ngx.log(ngx.INFO, "md5 sum upstream not update")
        return
    end
    ngx.log(ngx.INFO, "md5 not sum old:", md5_old, "===> new:",md5_new, " data:", data)

    package.loaded.upstream_md5 = md5_new
    local server_list = {}
    for k, v in pairs(message) do
        server_list[v["addr"]] = v["weight"]
    end
    package.loaded.upstream = resty_roundrobin:new(server_list)
end

server_data_update()
local upstream = package.loaded.upstream
local server = upstream:find()
assert(b.set_current_peer(server))

服务端

提供一个Discovery的接口
和一个获取进程信息的接口

package main

import (
    "github.com/gin-gonic/gin"
    "flag"
    "fmt"
)

var port int

func main() {
    flag.IntVar(&port, "port", 8081, "端口")
    flag.Parse()
    r := gin.Default()
    r.GET("/ping", func(c *gin.Context) {
        c.JSON(200, gin.H{
            "message": "pong",
        })
    })
    r.GET("/info", GetInfo)
    r.GET("/get_upstream", GetUpsteam)
    r.Run(fmt.Sprintf(":%d", port)) // listen and serve on 0.0.0.0:8080 (for windows "localhost:8080")
}

type upstream struct {
    Addr string     `json:"addr"`
    Weight int64    `json:"weight"`
}

func GetUpsteam(c *gin.Context) {
    c.JSON(200, gin.H{
       "message":[]*upstream{&upstream{"127.0.0.1:8001",2}, &upstream{"127.0.0.1:8002",1}}, 
    })
}

func GetInfo(c *gin.Context) {
    c.JSON(200, gin.H{
        "message": fmt.Sprintf("%d SVR", port),    
    })    
} 

测试

起3个进程

go run main.go -port=7001  //充当discovery 
go run main.go -port=8001  //充当业务服务A
go run main.go -port=8002  //充当业务服务B

请求:for i in `seq 1 10`; do curl http://127.0.0.1:7001/info; echo ''; done

upstream修改前结果

upstream{"127.0.0.1:8001",2}, &upstream{"127.0.0.1:8002",1}

{"message":"8001 SVR"}
{"message":"8001 SVR"}
{"message":"8002 SVR"}
{"message":"8001 SVR"}
{"message":"8001 SVR"}
{"message":"8002 SVR"}
{"message":"8001 SVR"}
{"message":"8001 SVR"}
{"message":"8002 SVR"}
{"message":"8001 SVR"}

upstream修改后结果

upstream{"127.0.0.1:8001",1}, &upstream{"127.0.0.1:8002",1}

{"message":"8002 SVR"}
{"message":"8001 SVR"}
{"message":"8002 SVR"}
{"message":"8001 SVR"}
{"message":"8002 SVR"}
{"message":"8001 SVR"}
{"message":"8002 SVR"}
{"message":"8001 SVR"}
{"message":"8002 SVR"}
{"message":"8001 SVR"}

相关文章

网友评论

      本文标题:nginx(openrestry)实现动态负载均衡

      本文链接:https://www.haomeiwen.com/subject/lbyzhltx.html