Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 97 additions & 22 deletions pkg/loadbalancer/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,6 @@ package loadbalancer

import (
"fmt"
"net"
"net/http"
"reflect"
"strconv"
"sync"
"sync/atomic"
"time"

istioapi "istio.io/client-go/pkg/apis/networking/v1alpha3"
istio "istio.io/client-go/pkg/clientset/versioned"
istioinformers "istio.io/client-go/pkg/informers/externalversions"
Expand All @@ -28,6 +20,14 @@ import (
"k8s.io/kubernetes/pkg/util/async"
netutils "k8s.io/utils/net"
stringslices "k8s.io/utils/strings/slices"
"math/rand"
"net"
"net/http"
"reflect"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/kubeedge/edgemesh/pkg/apis/config/defaults"
"github.com/kubeedge/edgemesh/pkg/apis/config/v1alpha1"
Expand Down Expand Up @@ -111,9 +111,10 @@ func newAffinityPolicy(affinityType v1.ServiceAffinity, ttlSeconds int) *affinit
}

type balancerState struct {
endpoints []string // a list of "nodeName:podName:ip:port" style strings, nodeName and podName can be empty!!!
index int // current index into endpoints
affinity affinityPolicy
endpoints []string // a list of "nodeName:podName:ip:port" style strings, nodeName and podName can be empty!!!
noReadyEndpoints map[string]string //不可用节点ep信息 key -->nodeName:podName:ip:port value-->0
index int // current index into endpoints
affinity affinityPolicy
}

const numBurstSyncs int = 2
Expand Down Expand Up @@ -625,15 +626,22 @@ func (lb *LoadBalancer) OnEndpointsAdd(endpoints *v1.Endpoints) {
// To be safe we will call it here. A new service will only be created
// if one does not already exist.
state = lb.newServiceInternal(svcPort, v1.ServiceAffinity(""), 0)
state.endpoints = utilproxy.ShuffleStrings(newEndpoints)

state.endpoints = utilproxy.ShuffleStrings(newEndpoints[0])
noReadyEp := utilproxy.ShuffleStrings(newEndpoints[1])
if len(state.noReadyEndpoints) == 0 {
state.noReadyEndpoints = map[string]string{}
}
for _, ep := range noReadyEp {
state.noReadyEndpoints[ep] = ""
}
klog.Infof("[OnEndpointsAdd](初始化)服务信息:%s,在线端点信息:[%s],离线端点信息:[%s]", svcPort, state.endpoints, state.noReadyEndpoints)
// Reset the round-robin index.
state.index = 0

// Sync the backend loadBalancer policy.
lb.policyMutex.Lock()
if policy, exists := lb.policyMap[svcPort]; exists {
policy.Sync(newEndpoints)
policy.Sync(newEndpoints[0])
}
lb.policyMutex.Unlock()
}
Expand All @@ -658,28 +666,37 @@ func (lb *LoadBalancer) OnEndpointsUpdate(oldEndpoints *v1.Endpoints, endpoints
if state != nil {
curEndpoints = state.endpoints
}

if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(stringslices.Clone(curEndpoints), newEndpoints) {
curNoReadyEndpoints := make(map[string]string)
if state != nil {
curNoReadyEndpoints = state.noReadyEndpoints
}
klog.Infof("[OnEndpointsUpdate]:更新中 endpoints for service,servicePortName:%s,ready-endpoints:%s,noReady-endpoints:%s,当前ready-endpoints:%s,当前noReady-endpoints:%s", svcPort, newEndpoints[0], newEndpoints[1], curEndpoints, curNoReadyEndpoints)
if !exists || state == nil || len(curEndpoints) != len(newEndpoints[0]) || !slicesEquiv(stringslices.Clone(curEndpoints), newEndpoints[0]) || !reflect.DeepEqual(curNoReadyEndpoints, newEndpoints[1]) {
klog.V(1).InfoS("LoadBalancerEX: Setting endpoints for service", "servicePortName", svcPort, "endpoints", newEndpoints)
lb.removeStaleAffinity(svcPort, newEndpoints)
lb.removeStaleAffinity(svcPort, newEndpoints[0])
// OnEndpointsUpdate can be called without NewService being called externally.
// To be safe we will call it here. A new service will only be created
// if one does not already exist. The affinity will be updated
// later, once NewService is called.
state = lb.newServiceInternal(svcPort, v1.ServiceAffinity(""), 0)
state.endpoints = utilproxy.ShuffleStrings(newEndpoints)

state.endpoints = utilproxy.ShuffleStrings(newEndpoints[0])
noReadyEp := utilproxy.ShuffleStrings(newEndpoints[1])
state.noReadyEndpoints = map[string]string{}
for _, ep := range noReadyEp {
state.noReadyEndpoints[ep] = ""
}
// Reset the round-robin index.
state.index = 0

// Sync the backend loadBalancer policy.
lb.policyMutex.Lock()
if policy, exists := lb.policyMap[svcPort]; exists {
policy.Sync(newEndpoints)
policy.Sync(newEndpoints[0])
}
lb.policyMutex.Unlock()
}
registeredEndpoints[svcPort] = true
klog.Infof("[OnEndpointsUpdate](已更新)服务信息:%s,在线端点信息:[%s],离线端点信息:[%s]", svcPort, state.endpoints, state.noReadyEndpoints)
}

// Now remove all endpoints missing from the update.
Expand All @@ -705,6 +722,10 @@ func (lb *LoadBalancer) resetService(svcPort proxy.ServicePortName) {
klog.V(2).InfoS("Removing endpoints service", "servicePortName", svcPort)
state.endpoints = []string{}
}
if len(state.noReadyEndpoints) > 0 {
klog.V(2).InfoS("LoadBalancerEX: Removing endpoints service", "servicePortName", svcPort)
state.noReadyEndpoints = map[string]string{}
}
state.index = 0
state.affinity.affinityMap = map[string]*affinityState{}
}
Expand Down Expand Up @@ -824,19 +845,42 @@ func (lb *LoadBalancer) tryPickEndpoint(svcPort proxy.ServicePortName, sessionAf
if !exists {
return "", cliReq, false
}
klog.Infof("[TryPickEndpoint]服务信息:%s,策略:%s", svcPort, policy.Name())
if exists && sessionAffinityEnabled {
klog.Warningf("LoadBalancer policy conflicted with sessionAffinity: ClientIP")
return "", cliReq, false
}
endpoint, req, err := policy.Pick(endpoints, srcAddr, netConn, cliReq)
if err != nil {
klog.Warningf("[TryPickEndpoint]服务信息:%s,失败原因:%s", svcPort, err)
return "", req, false
}
klog.Infof("[TryPickEndpoint]服务信息:%s,结果:%s", svcPort, endpoint)
return endpoint, req, true
}

// TryPickNoReadyEndpoint try to pick a service endpoint from load-balance strategy.
func (lb *LoadBalancer) TryPickNoReadyEndpoint(svcPort proxy.ServicePortName, endpoints []string,
srcAddr net.Addr, netConn net.Conn, cliReq *http.Request) (string, *http.Request, bool) {
lb.policyMutex.Lock()
defer lb.policyMutex.Unlock()

policy, exists := lb.policyMap[svcPort]
if !exists {
return "", cliReq, false
}
klog.Infof("[TryPickNoReadyEndpoint]服务信息:%s,策略:%s", svcPort, policy.Name())
endpoint, req, err := policy.Pick(endpoints, srcAddr, netConn, cliReq)
if err != nil {
klog.Warningf("[TryPickNoReadyEndpoint]服务信息:%s,失败原因:%s", svcPort, err)
return "", req, false
}
klog.Infof("[TryPickNoReadyEndpoint]服务信息:%s,结果:%s", svcPort, endpoint)
return endpoint, req, true
}

func (lb *LoadBalancer) nextEndpointWithConn(svcPort proxy.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool,
netConn net.Conn, cliReq *http.Request) (string, *http.Request, error) {
netConn net.Conn, cliReq *http.Request, delNoReadyEndpoint []string) (string, *http.Request, error) {
// Coarse locking is simple. We can get more fine-grained if/when we
// can prove it matters.
lb.lock.Lock()
Expand All @@ -846,7 +890,37 @@ func (lb *LoadBalancer) nextEndpointWithConn(svcPort proxy.ServicePortName, srcA
if !exists || state == nil {
return "", cliReq, userspace.ErrMissingServiceEntry
}
klog.Infof("[NextEndpoint](未清理)服务信息:%s,在线端点信息:[%s],离线端点信息:[%s]", svcPort, state.endpoints, state.noReadyEndpoints)
//移除已经验证访问过的不可用节点的ep信息
if len(delNoReadyEndpoint) != 0 && delNoReadyEndpoint[0] != "" {
delete(state.noReadyEndpoints, delNoReadyEndpoint[0])
}
//组装剩余不可用节点的ep信息
var noReadyEndpoints []string
if len(state.noReadyEndpoints) != 0 {
for ep, _ := range state.noReadyEndpoints {
noReadyEndpoints = append(noReadyEndpoints, ep)
}
}
klog.Infof("[NextEndpoint](已更新)服务信息:%s,客户端信息:[%s],在线端点信息:[%s],离线端点信息:[%s]", svcPort, srcAddr.String(), state.endpoints, noReadyEndpoints)
if len(state.endpoints) == 0 {
//所有ready节点都没有时使用noready节点负载
if len(noReadyEndpoints) != 0 {
// Take the next endpoint.
endpoint, req, picked := lb.TryPickNoReadyEndpoint(svcPort, noReadyEndpoints, srcAddr, netConn, cliReq)
klog.Infof("[NextEndpoint]noReady endpoint选址,服务信息:%s,结果:%s,picked:%s", svcPort, endpoint, picked)
if picked {
//如果时noReady节点访问异常之后,自动清理对应的endpoints信息
delNoReadyEndpoint[0] = endpoint
return endpoint, req, nil
} else {
//策略选择失败,则默认使用随机选择
k := rand.Int() % len(noReadyEndpoints)
//如果时noReady节点访问异常之后,自动清理对应的endpoints信息
delNoReadyEndpoint[0] = noReadyEndpoints[k]
return noReadyEndpoints[k], req, nil
}
}
return "", cliReq, userspace.ErrMissingEndpoints
}
klog.V(4).InfoS("NextEndpoint for service", "servicePortName", svcPort, "address", srcAddr, "endpoints", state.endpoints)
Expand Down Expand Up @@ -904,8 +978,9 @@ func (lb *LoadBalancer) nextEndpointWithConn(svcPort proxy.ServicePortName, srcA
func (lb *LoadBalancer) TryConnectEndpoints(service proxy.ServicePortName, srcAddr net.Addr, protocol string,
netConn net.Conn, cliReq *http.Request) (out net.Conn, err error) {
sessionAffinityReset := false
delNoReadyEndpoint := make([]string, 1)
for _, dialTimeout := range userspace.EndpointDialTimeouts {
endpoint, req, err := lb.nextEndpointWithConn(service, srcAddr, sessionAffinityReset, netConn, cliReq)
endpoint, req, err := lb.nextEndpointWithConn(service, srcAddr, sessionAffinityReset, netConn, cliReq, delNoReadyEndpoint)
if err != nil {
klog.ErrorS(err, "Couldn't find an endpoint for service", "service", service)
return nil, err
Expand Down
53 changes: 35 additions & 18 deletions pkg/loadbalancer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,32 +43,49 @@ func isValidEndpoint(host string, port int) bool {

// buildPortsToEndpointsMap builds a map of portname -> all nodeName:podName:ip:port
// for that portname. Explode Endpoints.Subsets[*] into this structure.
func buildPortsToEndpointsMap(endpoints *v1.Endpoints) map[string][]string {
portsToEndpoints := map[string][]string{}
//key---portName
//value 二维数据,第一层两个元素 0位---ready ep信息 1位---noReady ep信息
func buildPortsToEndpointsMap(endpoints *v1.Endpoints) map[string][][]string {
portsToEndpoints := map[string][][]string{}
for i := range endpoints.Subsets {
ss := &endpoints.Subsets[i]
for i := range ss.Ports {
port := &ss.Ports[i]
for i := range ss.Addresses {
addr := &ss.Addresses[i]
if isValidEndpoint(addr.IP, int(port.Port)) {
nodeName := defaults.EmptyNodeName
podName := defaults.EmptyPodName
if addr.NodeName != nil {
nodeName = *addr.NodeName
}
if addr.TargetRef != nil {
podName = addr.TargetRef.Name
}
endpoint := fmt.Sprintf("%s:%s:%s", nodeName, podName, net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port))))
portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], endpoint)
}
for m := range ss.Ports {
for n := range ss.Addresses {
addEndpoints(&ss.Addresses[n], &ss.Ports[m], portsToEndpoints, true)
}
for n := range ss.NotReadyAddresses {
addEndpoints(&ss.NotReadyAddresses[n], &ss.Ports[m], portsToEndpoints, false)
}
}
}
return portsToEndpoints
}

func addEndpoints(addr *v1.EndpointAddress, port *v1.EndpointPort, portsToEndpoints map[string][][]string, isReady bool) {
rlt, ok := portsToEndpoints[port.Name]
if !ok {
rlt = make([][]string, 2)
portsToEndpoints[port.Name] = rlt
}
if isValidEndpoint(addr.IP, int(port.Port)) {
nodeName := defaults.EmptyNodeName
podName := defaults.EmptyNodeName
if addr.NodeName != nil {
nodeName = *addr.NodeName
}
if addr.TargetRef != nil {
podName = addr.TargetRef.Name
}
endpoint := fmt.Sprintf("%s:%s:%s", nodeName, podName, net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port))))
if isReady {
rlt[0] = append(rlt[0], endpoint)
} else {
rlt[1] = append(rlt[1], endpoint)
}

}
}

// isSessionAffinity return true if this service is using some form of session affinity.
func isSessionAffinity(affinity *affinityPolicy) bool {
// Should never be empty string, but checking for it to be safe.
Expand Down