From 45a145121d1f8d0be227eb64254e5ff9cc68549a Mon Sep 17 00:00:00 2001 From: 600669 Date: Thu, 25 Jan 2024 10:08:46 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=EF=BC=9A=E5=BD=93=E8=8A=82?= =?UTF-8?q?=E7=82=B9=E5=9B=A0=E7=BD=91=E7=BB=9C=E7=A6=BB=E7=BA=BF=E5=AF=BC?= =?UTF-8?q?=E8=87=B4svc=E8=BD=AC=E5=8F=91=E6=97=B6=EF=BC=8C=E5=B0=9D?= =?UTF-8?q?=E8=AF=95=E8=AE=BF=E9=97=AE=E7=A6=BB=E7=BA=BF=E8=8A=82=E7=82=B9?= =?UTF-8?q?ep,=E8=B4=9F=E8=BD=BD=E7=A6=BB=E7=BA=BF=E8=8A=82=E7=82=B9?= =?UTF-8?q?=E6=94=AF=E6=8C=81k8s=E7=AD=96=E7=95=A5=EF=BC=8C=E9=BB=98?= =?UTF-8?q?=E8=AE=A4=E9=9A=8F=E6=9C=BA=E7=AD=96=E7=95=A5=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/loadbalancer/loadbalancer.go | 119 +++++++++++++++++++++++++------ pkg/loadbalancer/util.go | 53 +++++++++----- 2 files changed, 132 insertions(+), 40 deletions(-) diff --git a/pkg/loadbalancer/loadbalancer.go b/pkg/loadbalancer/loadbalancer.go index a1d78bf7f..ae2042402 100644 --- a/pkg/loadbalancer/loadbalancer.go +++ b/pkg/loadbalancer/loadbalancer.go @@ -2,14 +2,6 @@ package loadbalancer import ( "fmt" - "net" - "net/http" - "reflect" - "strconv" - "sync" - "sync/atomic" - "time" - istioapi "istio.io/client-go/pkg/apis/networking/v1alpha3" istio "istio.io/client-go/pkg/clientset/versioned" istioinformers "istio.io/client-go/pkg/informers/externalversions" @@ -28,6 +20,14 @@ import ( "k8s.io/kubernetes/pkg/util/async" netutils "k8s.io/utils/net" stringslices "k8s.io/utils/strings/slices" + "math/rand" + "net" + "net/http" + "reflect" + "strconv" + "sync" + "sync/atomic" + "time" "github.com/kubeedge/edgemesh/pkg/apis/config/defaults" "github.com/kubeedge/edgemesh/pkg/apis/config/v1alpha1" @@ -111,9 +111,10 @@ func newAffinityPolicy(affinityType v1.ServiceAffinity, ttlSeconds int) *affinit } type balancerState struct { - endpoints []string // a list of "nodeName:podName:ip:port" style strings, nodeName and podName can be empty!!! - index int // current index into endpoints - affinity affinityPolicy + endpoints []string // a list of "nodeName:podName:ip:port" style strings, nodeName and podName can be empty!!! + noReadyEndpoints map[string]string //不可用节点ep信息 key -->nodeName:podName:ip:port value-->0 + index int // current index into endpoints + affinity affinityPolicy } const numBurstSyncs int = 2 @@ -625,15 +626,22 @@ func (lb *LoadBalancer) OnEndpointsAdd(endpoints *v1.Endpoints) { // To be safe we will call it here. A new service will only be created // if one does not already exist. state = lb.newServiceInternal(svcPort, v1.ServiceAffinity(""), 0) - state.endpoints = utilproxy.ShuffleStrings(newEndpoints) - + state.endpoints = utilproxy.ShuffleStrings(newEndpoints[0]) + noReadyEp := utilproxy.ShuffleStrings(newEndpoints[1]) + if len(state.noReadyEndpoints) == 0 { + state.noReadyEndpoints = map[string]string{} + } + for _, ep := range noReadyEp { + state.noReadyEndpoints[ep] = "" + } + klog.Infof("[OnEndpointsAdd](初始化)服务信息:%s,在线端点信息:[%s],离线端点信息:[%s]", svcPort, state.endpoints, state.noReadyEndpoints) // Reset the round-robin index. state.index = 0 // Sync the backend loadBalancer policy. lb.policyMutex.Lock() if policy, exists := lb.policyMap[svcPort]; exists { - policy.Sync(newEndpoints) + policy.Sync(newEndpoints[0]) } lb.policyMutex.Unlock() } @@ -658,28 +666,37 @@ func (lb *LoadBalancer) OnEndpointsUpdate(oldEndpoints *v1.Endpoints, endpoints if state != nil { curEndpoints = state.endpoints } - - if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(stringslices.Clone(curEndpoints), newEndpoints) { + curNoReadyEndpoints := make(map[string]string) + if state != nil { + curNoReadyEndpoints = state.noReadyEndpoints + } + klog.Infof("[OnEndpointsUpdate]:更新中 endpoints for service,servicePortName:%s,ready-endpoints:%s,noReady-endpoints:%s,当前ready-endpoints:%s,当前noReady-endpoints:%s", svcPort, newEndpoints[0], newEndpoints[1], curEndpoints, curNoReadyEndpoints) + if !exists || state == nil || len(curEndpoints) != len(newEndpoints[0]) || !slicesEquiv(stringslices.Clone(curEndpoints), newEndpoints[0]) || !reflect.DeepEqual(curNoReadyEndpoints, newEndpoints[1]) { klog.V(1).InfoS("LoadBalancerEX: Setting endpoints for service", "servicePortName", svcPort, "endpoints", newEndpoints) - lb.removeStaleAffinity(svcPort, newEndpoints) + lb.removeStaleAffinity(svcPort, newEndpoints[0]) // OnEndpointsUpdate can be called without NewService being called externally. // To be safe we will call it here. A new service will only be created // if one does not already exist. The affinity will be updated // later, once NewService is called. state = lb.newServiceInternal(svcPort, v1.ServiceAffinity(""), 0) - state.endpoints = utilproxy.ShuffleStrings(newEndpoints) - + state.endpoints = utilproxy.ShuffleStrings(newEndpoints[0]) + noReadyEp := utilproxy.ShuffleStrings(newEndpoints[1]) + state.noReadyEndpoints = map[string]string{} + for _, ep := range noReadyEp { + state.noReadyEndpoints[ep] = "" + } // Reset the round-robin index. state.index = 0 // Sync the backend loadBalancer policy. lb.policyMutex.Lock() if policy, exists := lb.policyMap[svcPort]; exists { - policy.Sync(newEndpoints) + policy.Sync(newEndpoints[0]) } lb.policyMutex.Unlock() } registeredEndpoints[svcPort] = true + klog.Infof("[OnEndpointsUpdate](已更新)服务信息:%s,在线端点信息:[%s],离线端点信息:[%s]", svcPort, state.endpoints, state.noReadyEndpoints) } // Now remove all endpoints missing from the update. @@ -705,6 +722,10 @@ func (lb *LoadBalancer) resetService(svcPort proxy.ServicePortName) { klog.V(2).InfoS("Removing endpoints service", "servicePortName", svcPort) state.endpoints = []string{} } + if len(state.noReadyEndpoints) > 0 { + klog.V(2).InfoS("LoadBalancerEX: Removing endpoints service", "servicePortName", svcPort) + state.noReadyEndpoints = map[string]string{} + } state.index = 0 state.affinity.affinityMap = map[string]*affinityState{} } @@ -824,19 +845,42 @@ func (lb *LoadBalancer) tryPickEndpoint(svcPort proxy.ServicePortName, sessionAf if !exists { return "", cliReq, false } + klog.Infof("[TryPickEndpoint]服务信息:%s,策略:%s", svcPort, policy.Name()) if exists && sessionAffinityEnabled { klog.Warningf("LoadBalancer policy conflicted with sessionAffinity: ClientIP") return "", cliReq, false } endpoint, req, err := policy.Pick(endpoints, srcAddr, netConn, cliReq) if err != nil { + klog.Warningf("[TryPickEndpoint]服务信息:%s,失败原因:%s", svcPort, err) + return "", req, false + } + klog.Infof("[TryPickEndpoint]服务信息:%s,结果:%s", svcPort, endpoint) + return endpoint, req, true +} + +// TryPickNoReadyEndpoint try to pick a service endpoint from load-balance strategy. +func (lb *LoadBalancer) TryPickNoReadyEndpoint(svcPort proxy.ServicePortName, endpoints []string, + srcAddr net.Addr, netConn net.Conn, cliReq *http.Request) (string, *http.Request, bool) { + lb.policyMutex.Lock() + defer lb.policyMutex.Unlock() + + policy, exists := lb.policyMap[svcPort] + if !exists { + return "", cliReq, false + } + klog.Infof("[TryPickNoReadyEndpoint]服务信息:%s,策略:%s", svcPort, policy.Name()) + endpoint, req, err := policy.Pick(endpoints, srcAddr, netConn, cliReq) + if err != nil { + klog.Warningf("[TryPickNoReadyEndpoint]服务信息:%s,失败原因:%s", svcPort, err) return "", req, false } + klog.Infof("[TryPickNoReadyEndpoint]服务信息:%s,结果:%s", svcPort, endpoint) return endpoint, req, true } func (lb *LoadBalancer) nextEndpointWithConn(svcPort proxy.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool, - netConn net.Conn, cliReq *http.Request) (string, *http.Request, error) { + netConn net.Conn, cliReq *http.Request, delNoReadyEndpoint []string) (string, *http.Request, error) { // Coarse locking is simple. We can get more fine-grained if/when we // can prove it matters. lb.lock.Lock() @@ -846,7 +890,37 @@ func (lb *LoadBalancer) nextEndpointWithConn(svcPort proxy.ServicePortName, srcA if !exists || state == nil { return "", cliReq, userspace.ErrMissingServiceEntry } + klog.Infof("[NextEndpoint](未清理)服务信息:%s,在线端点信息:[%s],离线端点信息:[%s]", svcPort, state.endpoints, state.noReadyEndpoints) + //移除已经验证访问过的不可用节点的ep信息 + if len(delNoReadyEndpoint) != 0 && delNoReadyEndpoint[0] != "" { + delete(state.noReadyEndpoints, delNoReadyEndpoint[0]) + } + //组装剩余不可用节点的ep信息 + var noReadyEndpoints []string + if len(state.noReadyEndpoints) != 0 { + for ep, _ := range state.noReadyEndpoints { + noReadyEndpoints = append(noReadyEndpoints, ep) + } + } + klog.Infof("[NextEndpoint](已更新)服务信息:%s,客户端信息:[%s],在线端点信息:[%s],离线端点信息:[%s]", svcPort, srcAddr.String(), state.endpoints, noReadyEndpoints) if len(state.endpoints) == 0 { + //所有ready节点都没有时使用noready节点负载 + if len(noReadyEndpoints) != 0 { + // Take the next endpoint. + endpoint, req, picked := lb.TryPickNoReadyEndpoint(svcPort, noReadyEndpoints, srcAddr, netConn, cliReq) + klog.Infof("[NextEndpoint]noReady endpoint选址,服务信息:%s,结果:%s,picked:%s", svcPort, endpoint, picked) + if picked { + //如果时noReady节点访问异常之后,自动清理对应的endpoints信息 + delNoReadyEndpoint[0] = endpoint + return endpoint, req, nil + } else { + //策略选择失败,则默认使用随机选择 + k := rand.Int() % len(noReadyEndpoints) + //如果时noReady节点访问异常之后,自动清理对应的endpoints信息 + delNoReadyEndpoint[0] = noReadyEndpoints[k] + return noReadyEndpoints[k], req, nil + } + } return "", cliReq, userspace.ErrMissingEndpoints } klog.V(4).InfoS("NextEndpoint for service", "servicePortName", svcPort, "address", srcAddr, "endpoints", state.endpoints) @@ -904,8 +978,9 @@ func (lb *LoadBalancer) nextEndpointWithConn(svcPort proxy.ServicePortName, srcA func (lb *LoadBalancer) TryConnectEndpoints(service proxy.ServicePortName, srcAddr net.Addr, protocol string, netConn net.Conn, cliReq *http.Request) (out net.Conn, err error) { sessionAffinityReset := false + delNoReadyEndpoint := make([]string, 1) for _, dialTimeout := range userspace.EndpointDialTimeouts { - endpoint, req, err := lb.nextEndpointWithConn(service, srcAddr, sessionAffinityReset, netConn, cliReq) + endpoint, req, err := lb.nextEndpointWithConn(service, srcAddr, sessionAffinityReset, netConn, cliReq, delNoReadyEndpoint) if err != nil { klog.ErrorS(err, "Couldn't find an endpoint for service", "service", service) return nil, err diff --git a/pkg/loadbalancer/util.go b/pkg/loadbalancer/util.go index b0b56465f..9d20eee27 100644 --- a/pkg/loadbalancer/util.go +++ b/pkg/loadbalancer/util.go @@ -43,32 +43,49 @@ func isValidEndpoint(host string, port int) bool { // buildPortsToEndpointsMap builds a map of portname -> all nodeName:podName:ip:port // for that portname. Explode Endpoints.Subsets[*] into this structure. -func buildPortsToEndpointsMap(endpoints *v1.Endpoints) map[string][]string { - portsToEndpoints := map[string][]string{} +//key---portName +//value 二维数据,第一层两个元素 0位---ready ep信息 1位---noReady ep信息 +func buildPortsToEndpointsMap(endpoints *v1.Endpoints) map[string][][]string { + portsToEndpoints := map[string][][]string{} for i := range endpoints.Subsets { ss := &endpoints.Subsets[i] - for i := range ss.Ports { - port := &ss.Ports[i] - for i := range ss.Addresses { - addr := &ss.Addresses[i] - if isValidEndpoint(addr.IP, int(port.Port)) { - nodeName := defaults.EmptyNodeName - podName := defaults.EmptyPodName - if addr.NodeName != nil { - nodeName = *addr.NodeName - } - if addr.TargetRef != nil { - podName = addr.TargetRef.Name - } - endpoint := fmt.Sprintf("%s:%s:%s", nodeName, podName, net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port)))) - portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], endpoint) - } + for m := range ss.Ports { + for n := range ss.Addresses { + addEndpoints(&ss.Addresses[n], &ss.Ports[m], portsToEndpoints, true) + } + for n := range ss.NotReadyAddresses { + addEndpoints(&ss.NotReadyAddresses[n], &ss.Ports[m], portsToEndpoints, false) } } } return portsToEndpoints } +func addEndpoints(addr *v1.EndpointAddress, port *v1.EndpointPort, portsToEndpoints map[string][][]string, isReady bool) { + rlt, ok := portsToEndpoints[port.Name] + if !ok { + rlt = make([][]string, 2) + portsToEndpoints[port.Name] = rlt + } + if isValidEndpoint(addr.IP, int(port.Port)) { + nodeName := defaults.EmptyNodeName + podName := defaults.EmptyNodeName + if addr.NodeName != nil { + nodeName = *addr.NodeName + } + if addr.TargetRef != nil { + podName = addr.TargetRef.Name + } + endpoint := fmt.Sprintf("%s:%s:%s", nodeName, podName, net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port)))) + if isReady { + rlt[0] = append(rlt[0], endpoint) + } else { + rlt[1] = append(rlt[1], endpoint) + } + + } +} + // isSessionAffinity return true if this service is using some form of session affinity. func isSessionAffinity(affinity *affinityPolicy) bool { // Should never be empty string, but checking for it to be safe.