From f4a3394be09620bff00bd69cf5ee3ce9c2387f31 Mon Sep 17 00:00:00 2001 From: muxuelan Date: Thu, 25 Nov 2021 14:59:07 +0800 Subject: [PATCH] none Signed-off-by: muxuelan --- agent/pkg/dns/dns.go | 8 ++-- agent/pkg/proxy/config/config.go | 4 ++ agent/pkg/proxy/controller/controller.go | 59 ++++++++++++++++++++---- agent/pkg/proxy/iptables.go | 44 +++++++++++++++++- agent/pkg/proxy/module.go | 2 +- common/util/util.go | 8 ++++ 6 files changed, 111 insertions(+), 14 deletions(-) diff --git a/agent/pkg/dns/dns.go b/agent/pkg/dns/dns.go index 977501ab1..04c9c01d0 100644 --- a/agent/pkg/dns/dns.go +++ b/agent/pkg/dns/dns.go @@ -10,7 +10,6 @@ import ( "k8s.io/klog/v2" beehiveContext "github.com/kubeedge/beehive/pkg/core/context" - "github.com/kubeedge/edgemesh/agent/pkg/dns/controller" "github.com/kubeedge/edgemesh/common/util" ) @@ -67,9 +66,10 @@ func lookup(serviceURL string) (ip string, exist bool) { // Here serviceURL is a domain name which has at least a "." suffix. So here we need trim it. serviceURL = strings.TrimSuffix(serviceURL, ".") name, namespace := util.SplitServiceKey(serviceURL) - ip, err := controller.APIConn.GetSvcIP(namespace, name) - if err != nil { - klog.Errorf("service `%s.%s` reverse clusterIP error: %v", name, namespace, err) + + ip = util.GetSvcIP(namespace, name) + if ip == "" { + klog.Errorf("service `%s.%s` reverse clusterIP error", name, namespace) return "", false } klog.Infof("dns server parse %s ip %s", serviceURL, ip) diff --git a/agent/pkg/proxy/config/config.go b/agent/pkg/proxy/config/config.go index e168e742e..7f61551d6 100644 --- a/agent/pkg/proxy/config/config.go +++ b/agent/pkg/proxy/config/config.go @@ -7,6 +7,9 @@ type EdgeProxyConfig struct { Enable bool `json:"enable,omitempty"` // SubNet indicates the subnet of proxier, which equals to k8s service-cluster-ip-range SubNet string `json:"subNet,omitempty"` + // FakeSubNet indicates the fake subnet of headless service, which is used to support using domain to access the headless service + // default 9.251.0.0/16 + FakeSubNet string `json:"fakeSubNet,omitempty"` // ListenInterface indicates the listen interface of edgeproxy // do not allow users to configure manually ListenInterface string `json:"listenInterface,omitempty"` @@ -18,6 +21,7 @@ type EdgeProxyConfig struct { func NewEdgeProxyConfig() *EdgeProxyConfig { return &EdgeProxyConfig{ Enable: false, + FakeSubNet: "9.251.0.0/16", ListenPort: 40001, } } diff --git a/agent/pkg/proxy/controller/controller.go b/agent/pkg/proxy/controller/controller.go index 8806d438c..f8cdb7ea0 100644 --- a/agent/pkg/proxy/controller/controller.go +++ b/agent/pkg/proxy/controller/controller.go @@ -2,6 +2,7 @@ package controller import ( "fmt" + "strconv" "strings" "sync" @@ -13,7 +14,10 @@ import ( "github.com/kubeedge/edgemesh/common/informers" ) -const None = "None" +const ( + None = "None" + defaultNetworkPrefix = "9.251." +) var ( APIConn *ProxyController @@ -21,6 +25,7 @@ var ( ) type ProxyController struct { + FakeIPIndex uint16 svcInformer cache.SharedIndexInformer svcEventHandlers map[string]cache.ResourceEventHandlerFuncs // key: service event handler name @@ -32,6 +37,7 @@ type ProxyController struct { func Init(ifm *informers.Manager) { once.Do(func() { APIConn = &ProxyController{ + FakeIPIndex: uint16(1), // avoid 0.0 svcInformer: ifm.GetKubeFactory().Core().V1().Services().Informer(), svcEventHandlers: make(map[string]cache.ResourceEventHandlerFuncs), svcPortsByIP: make(map[string]string), @@ -62,6 +68,14 @@ func (c *ProxyController) SetServiceEventHandlers(name string, handlerFuncs cach c.Unlock() } +func (c *ProxyController) setFakeIPIndex(newIndex uint16) { + c.FakeIPIndex = newIndex +} + +func (c *ProxyController) getFakeIPIndex() uint16 { + return c.FakeIPIndex +} + func getSvcPorts(svc *v1.Service) string { svcPorts := "" svcName := svc.Namespace + "." + svc.Name @@ -94,7 +108,8 @@ func (c *ProxyController) svcAdd(obj interface{}) { svcName := svc.Namespace + "." + svc.Name ip := svc.Spec.ClusterIP if ip == "" || ip == None { - return + ip = GetFakeIP() + klog.Warningf("[EdgeMesh] use fakeIP for service %s, fakeIP:%s", svcName, ip) } c.addOrUpdateService(svcName, ip, svcPorts) } @@ -108,10 +123,18 @@ func (c *ProxyController) svcUpdate(oldObj, newObj interface{}) { svcPorts := getSvcPorts(svc) svcName := svc.Namespace + "." + svc.Name ip := svc.Spec.ClusterIP - if ip == "" || ip == None { - return + + clusterIP := ip + old, ok := c.ipBySvc[svcName] + if ok { + clusterIP = old + } else { + if ip == "" || ip == None { + clusterIP = GetFakeIP() + klog.Warningf("[EdgeMesh] use fakeIP for service %s, fakeIP:%s", svcName, clusterIP) + } } - c.addOrUpdateService(svcName, ip, svcPorts) + c.addOrUpdateService(svcName, clusterIP, svcPorts) } func (c *ProxyController) svcDelete(obj interface{}) { @@ -122,9 +145,6 @@ func (c *ProxyController) svcDelete(obj interface{}) { } svcName := svc.Namespace + "." + svc.Name ip := svc.Spec.ClusterIP - if ip == "" || ip == None { - return - } c.deleteService(svcName, ip) } @@ -159,3 +179,26 @@ func (c *ProxyController) GetSvcPorts(ip string) string { c.RUnlock() return svcPorts } + +func GetFakeIP() (ip string) { + for { + index := APIConn.getFakeIPIndex() + 1 + APIConn.setFakeIPIndex(index) + + ip = defaultNetworkPrefix + getSubNet(index) + + _, ok := APIConn.svcPortsByIP[ip] + if !ok { + break + } + + } + return +} + +// getSubNet converts uint16 to "uint8.uint8" +func getSubNet(subNet uint16) string { + arg1 := uint64(subNet & 0x00ff) + arg2 := uint64((subNet & 0xff00) >> 8) + return strconv.FormatUint(arg2, 10) + "." + strconv.FormatUint(arg1, 10) +} diff --git a/agent/pkg/proxy/iptables.go b/agent/pkg/proxy/iptables.go index c38f1201c..56c622dc6 100644 --- a/agent/pkg/proxy/iptables.go +++ b/agent/pkg/proxy/iptables.go @@ -54,6 +54,9 @@ type Proxier struct { // serviceCIDR is kubernetes service-cluster-ip-range serviceCIDR string + // headlessSvcCIDR is fake headless-service-cluster-ip-range + headlessSvcCIDR string + // protoProxies represents the protocol that requires proxy protoProxies []protocol.ProtoProxy @@ -64,7 +67,7 @@ type Proxier struct { dnatRules []iptablesJumpChain } -func NewProxier(subnet string, protoProxies []protocol.ProtoProxy, kubeClient kubernetes.Interface) (proxier *Proxier, err error) { +func NewProxier(subnet, fakeSubNet string, protoProxies []protocol.ProtoProxy, kubeClient kubernetes.Interface) (proxier *Proxier, err error) { primaryProtocol := utiliptables.ProtocolIPv4 execer := utilexec.New() iptInterface := utiliptables.New(execer, primaryProtocol) @@ -72,6 +75,7 @@ func NewProxier(subnet string, protoProxies []protocol.ProtoProxy, kubeClient ku iptables: iptInterface, kubeClient: kubeClient, serviceCIDR: subnet, + headlessSvcCIDR: fakeSubNet, protoProxies: protoProxies, ignoreRules: make([]iptablesJumpChain, 0), expiredIgnoreRules: make([]iptablesJumpChain, 0), @@ -226,6 +230,10 @@ func (proxier *Proxier) createProxyRules() (proxyRules, dnatRules []iptablesJump return []string{"-p", string(protoName), "-d", proxier.serviceCIDR} } + fakeProxyRuleArgs = func(protoName protocol.ProtoName) []string { + return []string{"-p", string(protoName), "-d", proxier.headlessSvcCIDR} + } + dnatRuleArgs = func(protoName protocol.ProtoName, serverAddr string) []string { return []string{"-p", string(protoName), "-j", "DNAT", "--to-destination", serverAddr} } @@ -240,6 +248,13 @@ func (proxier *Proxier) createProxyRules() (proxyRules, dnatRules []iptablesJump comment: proxyRuleComment(proto.GetName()), extraArgs: proxyRuleArgs(proto.GetName()), }) + proxyRules = append(proxyRules, iptablesJumpChain{ + table: utiliptables.TableNAT, + dstChain: newChainName, + srcChain: meshRootChain, + comment: proxyRuleComment(proto.GetName()), + extraArgs: fakeProxyRuleArgs(proto.GetName()), + }) dnatRules = append(dnatRules, iptablesJumpChain{ table: utiliptables.TableNAT, dstChain: "DNAT", @@ -405,6 +420,14 @@ func (proxier *Proxier) CleanResidue() { if err := proxier.iptables.DeleteRule(utiliptables.TableNAT, utiliptables.ChainOutput, nonIfiRuleArgs...); err != nil { klog.V(4).ErrorS(err, "Failed clean residual non-interface rule %v", nonIfiRuleArgs) } + nonIfiRuleArgs = strings.Split(fmt.Sprintf("-p tcp -d %s -j EDGE-MESH", proxier.headlessSvcCIDR), " ") + if err := proxier.iptables.DeleteRule(utiliptables.TableNAT, utiliptables.ChainPrerouting, nonIfiRuleArgs...); err != nil { + klog.V(4).Error(err, "Failed clean residual non-interface rule %v", nonIfiRuleArgs) + } + if err := proxier.iptables.DeleteRule(utiliptables.TableNAT, utiliptables.ChainOutput, nonIfiRuleArgs...); err != nil { + klog.V(4).Error(err, "Failed clean residual non-interface rule %v", nonIfiRuleArgs) + } + // clean up interface iptables rules ifiList := []string{"docker0", "cni0"} @@ -417,6 +440,15 @@ func (proxier *Proxier) CleanResidue() { if err := proxier.iptables.DeleteRule(utiliptables.TableNAT, utiliptables.ChainOutput, outboundRuleAgrs...); err != nil { klog.V(4).ErrorS(err, "Failed clean residual outbound rule %v", outboundRuleAgrs) } + + inboundRuleArgs = strings.Split(fmt.Sprintf("-p tcp -d %s -i %s -j EDGE-MESH", proxier.headlessSvcCIDR, ifi), " ") + if err := proxier.iptables.DeleteRule(utiliptables.TableNAT, utiliptables.ChainPrerouting, inboundRuleArgs...); err != nil { + klog.V(4).Error(err, "Failed clean residual inbound rule %v", inboundRuleArgs) + } + outboundRuleAgrs = strings.Split(fmt.Sprintf("-p tcp -d %s -o %s -j EDGE-MESH", proxier.headlessSvcCIDR, ifi), " ") + if err := proxier.iptables.DeleteRule(utiliptables.TableNAT, utiliptables.ChainOutput, outboundRuleAgrs...); err != nil { + klog.V(4).Error(err, "Failed clean residual outbound rule %v", outboundRuleAgrs) + } } // clean up ip routes @@ -425,6 +457,11 @@ func (proxier *Proxier) CleanResidue() { klog.Errorf("parse subnet(serviceCIDR) error: %v", err) return } + dst1, err := netlink.ParseIPNet(proxier.headlessSvcCIDR) + if err != nil { + klog.Errorf("parse subnet(headlessSvcCIDR) error: %v", err) + return + } // try to delete the route that may exist for _, ifi := range ifiList { @@ -433,6 +470,11 @@ func (proxier *Proxier) CleanResidue() { if err := netlink.RouteDel(&route); err != nil { klog.V(4).ErrorS(err, "Failed delete route %v", route) } + + route = netlink.Route{Dst: dst1, Gw: gw} + if err := netlink.RouteDel(&route); err != nil { + klog.V(4).ErrorS(err, "Failed delete route %v", route) + } } } } diff --git a/agent/pkg/proxy/module.go b/agent/pkg/proxy/module.go index 32cac4f3b..1ed286eb9 100644 --- a/agent/pkg/proxy/module.go +++ b/agent/pkg/proxy/module.go @@ -44,7 +44,7 @@ func newEdgeProxy(c *config.EdgeProxyConfig, ifm *informers.Manager) (proxy *Edg // new proxier protoProxies := []protocol.ProtoProxy{proxy.TCPProxy} - proxy.Proxier, err = NewProxier(proxy.Config.SubNet, protoProxies, ifm.GetKubeClient()) + proxy.Proxier, err = NewProxier(proxy.Config.SubNet, proxy.Config.FakeSubNet, protoProxies, ifm.GetKubeClient()) if err != nil { return proxy, fmt.Errorf("new proxier err: %v", err) } diff --git a/common/util/util.go b/common/util/util.go index 09d34b21b..f16c47633 100644 --- a/common/util/util.go +++ b/common/util/util.go @@ -12,6 +12,8 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" "k8s.io/klog/v2" + + "github.com/kubeedge/edgemesh/agent/pkg/proxy/controller" ) const ( @@ -82,3 +84,9 @@ func FetchPublicIP() string { func IsNotFoundError(err error) bool { return strings.Contains(err.Error(), "not found") } + +// GetSvcIP returns the ip by given service name +func GetSvcIP(namespace, name string) string { + svcName := namespace + "." + name + return controller.APIConn.GetSvcIP(svcName) +}