阿里云Cloud-Provider
系开源项目,见github
主要提供Service的LoadBalance功能
源码解析
源码相对比较简单,就是一个简单的自定义控制器,默认包括了这几个控制器:
-
节点控制器:负责维护节点信息
-
路由控制器:负责添加POD网段到VPC路由表
-
Service控制器:负责申请/释放LoadBalance类型Service的负载均衡器
-
NLB控制器:类似CLB,不过控制的是网络负载均衡,对应阿里云文档
节点控制器
当使用云厂商的cloud-provider时,Kubelet 必须配置启动参数--cloud-provider=external
设置后kubelet会给节点添加污点,参考kubelet代码
if kl.externalCloudProvider {
taint := v1.Taint{
Key: cloudproviderapi.TaintExternalCloudProvider,
Value: "true",
Effect: v1.TaintEffectNoSchedule,
}
nodeTaints = append(nodeTaints, taint)
}
而这里的节点控制器就是为了从云厂商获取节点IP、状态等信息,然后去除这个污点,使节点可用
func (m *ReconcileNode) syncCloudNode(node *corev1.Node) error {
var cloudTaint *v1.Taint
for _, taint := range taints {
if taint.Key == api.TaintExternalCloudProvider {
cloudTaint = &taint
}
}
if cloudTaint == nil {
klog.V(5).Infof("node %s is registered without cloud taint. return ok", node.Name)
return nil
}
err := m.doAddCloudNode(node)
return nil
}
这里可用看到如果污点已经没了,则节点控制器就不再处理这个节点了;否则就需要进行初始化了
- 首先查找这个节点对应的ECS
func findCloudECS(ins prvd.IInstance, node *v1.Node) (*prvd.NodeAttribute, error) {
if node.Spec.ProviderID != "" {
return findCloudECSById(ins, node)
} else {
return findCloudECSByIp(ins, node)
}
}
其中providerId的格式是cn-hangzhou.i-v98dklsmnxkkgiiil7
:即REGION.NODEID
如果指定了providerId,则会查询region下是否存在这个nodeid对应的ECS
如果没有指定,则会查询VPC下这个节点的内网IP对应的ECS
var internalIP []string
for _, addr := range node.Status.Addresses {
if addr.Type == v1.NodeInternalIP {
internalIP = append(internalIP, addr.Address)
}
}
- 找到ECS后,就能更新节点信息了,主要是更新节点的标签信息和污点信息
func setFields(node *v1.Node, ins *prvd.NodeAttribute, cfgRoute bool) {
var modifiers []nodeModifier
if ins.InstanceType != "" {
modify := func(n *v1.Node) {
n.Labels["beta.kubernetes.io/instance-type"] = ins.InstanceType
n.Labels["node.kubernetes.io/instance-type"] = ins.InstanceType
}
modifiers = append(modifiers, modify)
}
if ins.Zone != "" {
modify := func(n *v1.Node) {
n.Labels["failure-domain.beta.kubernetes.io/zone"] = ins.Zone
n.Labels["topology.kubernetes.io/zone"] = ins.Zone
}
modifiers = append(modifiers, modify)
}
if ins.Region != "" {
modify := func(n *v1.Node) {
n.Labels["failure-domain.beta.kubernetes.io/region"] = ins.Region
n.Labels["topology.kubernetes.io/region"] = ins.Region
}
modifiers = append(modifiers, modify)
}
if node.Spec.ProviderID == "" && ins.InstanceID != "" {
prvdId := fmt.Sprintf("%s.%s", ins.Region, ins.InstanceID)
modify := func(n *v1.Node) {
n.Spec.ProviderID = prvdId
}
modifiers = append(modifiers, modify)
}
modifiers = append(modifiers, removeCloudTaints)
for _, modify := range modifiers {
modify(node)
}
}
主要包括这些标签,可以看到主要就是ECS对应的信息
beta.kubernetes.io/instance-type
node.kubernetes.io/instance-type
failure-domain.beta.kubernetes.io/zone
topology.kubernetes.io/zone
failure-domain.beta.kubernetes.io/region
topology.kubernetes.io/region
然后移除污点、并且设置node.Spec.ProviderID
为REGION.NODEID
- 最后再更新节点的status字段,主要是设置
status.address
字段
func findAddress(instance *ecs.Instance) []v1.NodeAddress {
var addrs []v1.NodeAddress
if len(instance.PublicIpAddress.IpAddress) > 0 {
for _, ipaddr := range instance.PublicIpAddress.IpAddress {
addrs = append(addrs, v1.NodeAddress{Type: v1.NodeExternalIP, Address: ipaddr})
}
}
if instance.EipAddress.IpAddress != "" {
addrs = append(addrs, v1.NodeAddress{Type: v1.NodeExternalIP, Address: instance.EipAddress.IpAddress})
}
if len(instance.InnerIpAddress.IpAddress) > 0 {
for _, ipaddr := range instance.InnerIpAddress.IpAddress {
addrs = append(addrs, v1.NodeAddress{Type: v1.NodeInternalIP, Address: ipaddr})
}
}
if len(instance.VpcAttributes.PrivateIpAddress.IpAddress) > 0 {
for _, ipaddr := range instance.VpcAttributes.PrivateIpAddress.IpAddress {
addrs = append(addrs, v1.NodeAddress{Type: v1.NodeInternalIP, Address: ipaddr})
}
}
return addrs
}
这里会统计ECS上的公网、内网IP;其中公网IP作为status.address
中的ExternalIP
,内网IP作为status.address
中的InternalIP
然后再拼接kubelet默认为节点设置的hostname,作为status.address
中的Hostname
func setHostnameAddress(node *v1.Node, addrs []v1.NodeAddress) []v1.NodeAddress {
// Check if a hostname address exists in the cloud provided addresses
hostnameExists := false
for i := range addrs {
if addrs[i].Type == v1.NodeHostName {
hostnameExists = true
}
}
// If hostname was not present in cloud provided addresses, use the hostname
// from the existing node (populated by kubelet)
if !hostnameExists {
for _, addr := range node.Status.Addresses {
if addr.Type == v1.NodeHostName {
addrs = append(addrs, addr)
}
}
}
return addrs
}
拼接完成后,使用patch接口将信息更新到节点上
diff := func(copy runtime.Object) (client.Object, error) {
nins := copy.(*corev1.Node)
nins.Status.Addresses = cloudNode.Addresses
return nins, nil
}
err := helper.PatchM(m.client, node, diff, helper.PatchStatus)
路由控制器
路由控制器主要是为了解决POD跨主访问的问题
通过cloud-provider的配置参数启动,默认就是会配置路由信息
fs.BoolVar(&cfg.ConfigureCloudRoutes, flagConfigureCloudRoutes, true, "Should CIDRs allocated by allocate-node-cidrs be configured on the cloud provider.")
配置的是节点的POD CIDR
func getIPv4RouteForNode(node *v1.Node) (*net.IPNet, string, error) {
var (
ipv4CIDR *net.IPNet
ipv4CIDRStr string
err error
)
for _, podCidr := range append(node.Spec.PodCIDRs, node.Spec.PodCIDR) {
if podCidr != "" {
_, ipv4CIDR, err = net.ParseCIDR(podCidr)
ipv4CIDRStr = ipv4CIDR.String()
if len(ipv4CIDR.Mask) == net.IPv4len {
ipv4CIDRStr = ipv4CIDR.String()
break
}
}
}
return ipv4CIDR, ipv4CIDRStr, nil
}
然后获取VPC路由表,vpc id可以直接通过阿里云的metadata服务器获取,参考terway系列文章中的metadata获取
func getRouteTables(ctx context.Context, providerIns prvd.Provider) ([]string, error) {
vpcId, err := providerIns.VpcID()
tables, err := providerIns.ListRouteTables(ctx, vpcId)
if len(tables) > 1 {
return nil, fmt.Errorf("multiple route tables found by vpc id[%s], length(tables)=%d", ctrlCfg.CloudCFG.Global.VpcID, len(tables))
}
if len(tables) == 0 {
return nil, fmt.Errorf("no route tables found by vpc id[%s]", ctrlCfg.CloudCFG.Global.VpcID)
}
return tables, nil
}
最后就是添加POD CIDR到路由表上
func (r *VPCProvider) CreateRoute(
ctx context.Context, table string, provideID string, destinationCIDR string,
) (*model.Route, error) {
createRouteEntryRequest := vpc.CreateCreateRouteEntryRequest()
createRouteEntryRequest.RouteTableId = table
createRouteEntryRequest.DestinationCidrBlock = destinationCIDR
createRouteEntryRequest.NextHopType = model.RouteNextHopTypeInstance
_, instance, err := util.NodeFromProviderID(provideID)
if err != nil {
return nil, fmt.Errorf("invalid provide id: %v, err: %v", provideID, err)
}
createRouteEntryRequest.NextHopId = instance
_, err = r.auth.VPC.CreateRouteEntry(createRouteEntryRequest)
if err != nil {
return nil, fmt.Errorf("error create route entry for %s, %s, error: %v", provideID, destinationCIDR, err)
}
return &model.Route{
Name: fmt.Sprintf("%s-%s", provideID, destinationCIDR),
DestinationCIDR: destinationCIDR,
ProviderId: provideID,
}, nil
}
可以看到这里的路由信息:目标网段是POD CIDR的,设置下一跳为当前ECS的nodeid,即所有到POD CIDR的数据包下一跳都是当前ECS,这样就能打通POD跨主访问的网络了,都是通过VPC路由来实现的
Service控制器
Service是主要控制对象,而且支持的注解很多,参考阿里云CLB文档
下面主要分两个主要流程分析下
创建CLB
这个场景对应的是在创建Service时指定type: LoadBalancer
- 首先给Service添加一个特殊的Finalizer,方便删除CLB的时候只处理有这个Finalizer的Service
if err := m.finalizerManager.AddFinalizers(req.Ctx, req.Service, helper.ServiceFinalizer); err != nil {
}
- 然后解析Service的注解,获取LoadBalance的基本信息,如公网还是内网、收费方式、带宽等信息
func (mgr *LoadBalancerManager) BuildLocalModel(reqCtx *svcCtx.RequestContext, mdl *model.LoadBalancer) error {
mdl.LoadBalancerAttribute.AddressType = model.AddressType(reqCtx.Anno.Get(annotation.AddressType))
mdl.LoadBalancerAttribute.InternetChargeType = model.InternetChargeType(reqCtx.Anno.Get(annotation.ChargeType))
mdl.LoadBalancerAttribute.InstanceChargeType = model.InstanceChargeType(reqCtx.Anno.Get(annotation.InstanceChargeType))
mdl.LoadBalancerAttribute.LoadBalancerSpec = model.LoadBalancerSpecType(reqCtx.Anno.Get(annotation.Spec))
bandwidth := reqCtx.Anno.Get(annotation.Bandwidth)
if bandwidth != "" {
i, err := strconv.Atoi(bandwidth)
mdl.LoadBalancerAttribute.Bandwidth = i
}
mdl.LoadBalancerAttribute.LoadBalancerSpec = model.LoadBalancerSpecType(reqCtx.Anno.Get(annotation.Spec))
if reqCtx.Anno.Get(annotation.LoadBalancerId) != "" {
mdl.LoadBalancerAttribute.LoadBalancerId = reqCtx.Anno.Get(annotation.LoadBalancerId)
mdl.LoadBalancerAttribute.IsUserManaged = true
}
mdl.LoadBalancerAttribute.LoadBalancerName = reqCtx.Anno.Get(annotation.LoadBalancerName)
mdl.LoadBalancerAttribute.VSwitchId = reqCtx.Anno.Get(annotation.VswitchId)
mdl.LoadBalancerAttribute.MasterZoneId = reqCtx.Anno.Get(annotation.MasterZoneID)
mdl.LoadBalancerAttribute.SlaveZoneId = reqCtx.Anno.Get(annotation.SlaveZoneID)
mdl.LoadBalancerAttribute.ResourceGroupId = reqCtx.Anno.Get(annotation.ResourceGroupId)
mdl.LoadBalancerAttribute.AddressIPVersion = model.AddressIPVersionType(reqCtx.Anno.Get(annotation.IPVersion))
mdl.LoadBalancerAttribute.DeleteProtection = model.FlagType(reqCtx.Anno.Get(annotation.DeleteProtection))
mdl.LoadBalancerAttribute.ModificationProtectionStatus =
model.ModificationProtectionType(reqCtx.Anno.Get(annotation.ModificationProtection))
mdl.LoadBalancerAttribute.Tags = reqCtx.Anno.GetLoadBalancerAdditionalTags()
mdl.LoadBalancerAttribute.Address = reqCtx.Anno.Get(annotation.IP)
return nil
}
- 然后构建后端负载均衡池,主要是解析Service对应的Endpoints,主要是解析Endpoints中所有的容器IP和容器端口
func setBackendsFromEndpoints(candidates *backend.EndpointWithENI, vgroup model.VServerGroup) []model.BackendAttribute {
var backends []model.BackendAttribute
if len(candidates.Endpoints.Subsets) == 0 {
return nil
}
for _, ep := range candidates.Endpoints.Subsets {
var backendPort int
if vgroup.ServicePort.TargetPort.Type == intstr.Int {
backendPort = vgroup.ServicePort.TargetPort.IntValue()
} else {
for _, p := range ep.Ports {
if p.Name == vgroup.ServicePort.Name {
backendPort = int(p.Port)
break
}
}
if backendPort == 0 {
klog.Warningf("%s cannot find port according port name: %s", vgroup.VGroupName, vgroup.ServicePort.Name)
}
}
for _, addr := range ep.Addresses {
backends = append(backends, model.BackendAttribute{
NodeName: addr.NodeName,
ServerIp: addr.IP,
// set backend port to targetPort by default
// if backend type is ecs, update backend port to nodePort
Port: backendPort,
Description: vgroup.VGroupName,
})
}
}
return backends
}
构建后端负载均衡池的时候会分三种类型处理
1)如果Service指定了externalTrafficPolicy: Local
我们知道这种类型只会将POD所在的那些节点加入到负载均衡池里面,因此只需要将Endpoints中的那些POD加入到负载均衡池中即可
这里的ServerId对应的就是ECS的nodeid,然后置空ServerIp,通过NodePort方式访问
for _, backend := range initBackends {
backend.ServerId = id
backend.ServerIp = ""
backend.Type = model.ECSBackendType
// for ECS backend type, port should be set to NodePort
backend.Port = int(vgroup.ServicePort.NodePort)
ecsBackends = append(ecsBackends, backend)
}
最后在设置后端负载均衡池中的各个后端服务器的权重,节点上存在的POD个数就是当前节点的权重
func podNumberAlgorithm(mode helper.TrafficPolicy, backends []model.BackendAttribute) []model.BackendAttribute {
// LocalTrafficPolicy
ecsPods := make(map[string]int)
for _, b := range backends {
ecsPods[b.ServerId] += 1
}
for i := range backends {
backends[i].Weight = ecsPods[backends[i].ServerId]
}
return backends
}
2)如果Service指定了externalTrafficPolicy: Cluster
我们知道这种类型会将所有节点都加入到负载均衡池里面,因此这种模式会将所有节点都加入到负载均衡池里
func (mgr *VGroupManager) buildClusterBackends(reqCtx *svcCtx.RequestContext, candidates *backend.EndpointWithENI, vgroup model.VServerGroup) ([]model.BackendAttribute, error) {
// 1. add ecs backends. add all cluster nodes.
for _, node := range candidates.Nodes {
ecsBackends = append(
ecsBackends,
model.BackendAttribute{
ServerId: id,
Weight: DefaultServerWeight,
Port: int(vgroup.ServicePort.NodePort),
Type: model.ECSBackendType,
Description: vgroup.VGroupName,
},
)
}
return setWeightBackends(helper.ClusterTrafficPolicy, backends, vgroup.VGroupWeight), nil
}
然后各个后端服务器的权重都是一样的
func podNumberAlgorithm(mode helper.TrafficPolicy, backends []model.BackendAttribute) []model.BackendAttribute {
if mode == helper.ClusterTrafficPolicy {
for i := range backends {
backends[i].Weight = 100
}
return backends
}
}
3)还一种是直接使用EIP,通过注解service.beta.kubernetes.io/backend-type: eni
这种对应的是Terway网络插件直接分配EIP给POD,然后直接将POD加入到负载均衡池中
func updateENIBackends(reqCtx *svcCtx.RequestContext, mgr *VGroupManager, backends []model.BackendAttribute, ipVersion model.AddressIPVersionType) (
[]model.BackendAttribute, error) {
vpcId, err := mgr.cloud.VpcID()
vpcCIDRs, err := mgr.cloud.DescribeVpcCIDRBlock(context.TODO(), vpcId, ipVersion)
var ips []string
for _, b := range backends {
ips = append(ips, b.ServerIp)
}
result, err := mgr.cloud.DescribeNetworkInterfaces(vpcId, ips, ipVersion)
var skipIPs []string
for i := range backends {
eniid, ok := result[backends[i].ServerIp]
// for ENI backend type, port should be set to targetPort (default value), no need to update
backends[i].ServerId = eniid
backends[i].Type = model.ENIBackendType
}
return backends, nil
}
然后各个后端服务器的权重也都是一样的
func podNumberAlgorithm(mode helper.TrafficPolicy, backends []model.BackendAttribute) []model.BackendAttribute {
if mode == helper.ENITrafficPolicy{
for i := range backends {
backends[i].Weight = 100
}
return backends
}
}
- 最后是构建Load Balance的监听器,也是通过解析Service的注解,支持的注解同样参考阿里云CLB文档
func (mgr *ListenerManager) BuildLocalModel(reqCtx *svcCtx.RequestContext, mdl *model.LoadBalancer) error {
for _, port := range reqCtx.Service.Spec.Ports {
listener, err := mgr.buildListenerFromServicePort(reqCtx, port)
mdl.Listeners = append(mdl.Listeners, listener)
}
return nil
}
至此,LoadBalance从Service上解析完成了,可以看到主要包括三个方面
-
基本信息,从注解里解析
-
负载均衡池,从Endpoints中解析
-
监听器,从注解里解析
LoadBalance解析完成后,然后就要看是否需要创建这个LoadBalance了,怎么看呢?
首先需要确定这个Service是否已经关联了已有的LoadBalance,主要是看是否通过注解设置了service.beta.kubernetes.io/alibaba-cloud-loadbalancer-id
如果指定了这个注解,则会先去SLB查询这个LoadBalance
func (mgr *LoadBalancerManager) Find(reqCtx *svcCtx.RequestContext, mdl *model.LoadBalancer) error {
// 1. set load balancer id
if reqCtx.Anno.Get(annotation.LoadBalancerId) != "" {
mdl.LoadBalancerAttribute.LoadBalancerId = reqCtx.Anno.Get(annotation.LoadBalancerId)
}
// 2. set default loadbalancer name
// it's safe to set loadbalancer name which will be overwritten in FindLoadBalancer func
mdl.LoadBalancerAttribute.LoadBalancerName = reqCtx.Anno.GetDefaultLoadBalancerName()
// 3. set default loadbalancer tag
// filter tags using logic operator OR, so only TAGKEY tag can be added
mdl.LoadBalancerAttribute.Tags = []tag.Tag{
{
Key: "kubernetes.do.not.delete",
Value: reqCtx.Anno.GetDefaultLoadBalancerName(),
},
}
return mgr.cloud.FindLoadBalancer(reqCtx.Ctx, mdl)
}
如果没有指定LoadBalance,那么就会根据上面解析出来的LoadBalance信息自动创建一个
if remote.LoadBalancerAttribute.LoadBalancerId == "" {
if err := m.slbMgr.Create(reqCtx, local); err != nil {
}
}
自动创建的LoadBalance名称和Service的UID有关
func (n *AnnotationRequest) GetDefaultLoadBalancerName() string {
//GCE requires that the name of a load balancer starts with a lower case letter.
ret := "a" + string(n.Service.UID)
ret = strings.Replace(ret, "-", "", -1)
//AWS requires that the name of a load balancer is shorter than 32 bytes.
if len(ret) > 32 {
ret = ret[:32]
}
return ret
}
最终,不管是自动创建的LoadBalance还是使用已有的LoadBalance,到现在是已经有了一个可用的LoadBalance了
但是还需要给这个LoadBalance设置负载均衡池和监听器
而负载均衡池和监听器就来自上面解析出来的信息了
需要注意的是,如果使用的是已有的LoadBalance,那么会对比已有LoadBalance中的负载均衡池及监听器 和 上面解析出来的是负载均衡池及监听器,并且以解析出来的 负载均衡池及监听器 为准来更新已有的LoadBalance
比如对负载均衡池的更新,会以local(解析出来的)中的负载均衡池为基准,在remote(SLB中的LoadBalancer)中找对应的后端服务器,如果找到了,就以local的为准更新remote中的后端服务器信息;如果没有找到,就先创建这个后端服务器,然后加入到remote中
func (m *ModelApplier) applyVGroups(reqCtx *svcCtx.RequestContext, local *model.LoadBalancer, remote *model.LoadBalancer) error {
var errs []error
for i := range local.VServerGroups {
found := false
var old model.VServerGroup
for _, rv := range remote.VServerGroups {
// for reuse vgroup case, find by vgroup id first
if local.VServerGroups[i].VGroupId != "" &&
local.VServerGroups[i].VGroupId == rv.VGroupId {
found = true
old = rv
break
}
// find by vgroup name
if local.VServerGroups[i].VGroupId == "" &&
local.VServerGroups[i].VGroupName == rv.VGroupName {
found = true
local.VServerGroups[i].VGroupId = rv.VGroupId
old = rv
break
}
}
// update
if found {
if err := m.vGroupMgr.UpdateVServerGroup(reqCtx, local.VServerGroups[i], old); err != nil {
}
}
// create
if !found {
err := m.vGroupMgr.CreateVServerGroup(reqCtx, &local.VServerGroups[i], remote.LoadBalancerAttribute.LoadBalancerId)
if err := m.vGroupMgr.BatchAddVServerGroupBackendServers(reqCtx, local.VServerGroups[i],
local.VServerGroups[i].Backends); err != nil {
}
remote.VServerGroups = append(remote.VServerGroups, local.VServerGroups[i])
}
}
}
LoadBalance设置完成后,会添加Service的标签信息,其中HASH是根据Service中的所有字段求出来的
service.beta.kubernetes.io/hash
service.k8s.alibaba/loadbalancer-id
然后再把LoadBalance的地址更新到Service的status里
if len(newStatus.Ingress) == 0 {
newStatus.Ingress = append(newStatus.Ingress,
v1.LoadBalancerIngress{
IP: lb.LoadBalancerAttribute.Address,
})
}
删除CLB
这个主要对应的就是删除Service或者修改Service的类型
func NeedDeleteLoadBalancer(svc *v1.Service) bool {
return svc.DeletionTimestamp != nil || svc.Spec.Type != v1.ServiceTypeLoadBalancer
}
这时候就只会处理那些带有Finalizer
的,而且是自动创建的LoadBalancer,将其自动删除
if helper.NeedDeleteLoadBalancer(reqCtx.Service) {
if !local.LoadBalancerAttribute.IsUserManaged {
err := m.slbMgr.Delete(reqCtx, remote)
remote.LoadBalancerAttribute.LoadBalancerId = ""
remote.LoadBalancerAttribute.Address = ""
return nil
}
}
然后再移除Service的标签、status、Finalizer信息
NLB控制器
最后一个默认会启用的控制器,对应的是阿里云的NLB负载均衡,代码流程和CLB几乎是一模一样
只是由于NLB的自身产品实现和CLB的不同,所以最终的效果才不同
网友评论