Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions agent/pkg/dns/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"k8s.io/klog/v2"

beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
"github.com/kubeedge/edgemesh/agent/pkg/dns/controller"
"github.com/kubeedge/edgemesh/common/util"
)

Expand Down Expand Up @@ -67,9 +66,10 @@ func lookup(serviceURL string) (ip string, exist bool) {
// Here serviceURL is a domain name which has at least a "." suffix. So here we need trim it.
serviceURL = strings.TrimSuffix(serviceURL, ".")
name, namespace := util.SplitServiceKey(serviceURL)
ip, err := controller.APIConn.GetSvcIP(namespace, name)
if err != nil {
klog.Errorf("service `%s.%s` reverse clusterIP error: %v", name, namespace, err)

ip = util.GetSvcIP(namespace, name)
if ip == "" {
klog.Errorf("service `%s.%s` reverse clusterIP error", name, namespace)
return "", false
}
klog.Infof("dns server parse %s ip %s", serviceURL, ip)
Expand Down
4 changes: 4 additions & 0 deletions agent/pkg/proxy/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ type EdgeProxyConfig struct {
Enable bool `json:"enable,omitempty"`
// SubNet indicates the subnet of proxier, which equals to k8s service-cluster-ip-range
SubNet string `json:"subNet,omitempty"`
// FakeSubNet indicates the fake subnet of headless service, which is used to support using domain to access the headless service
// default 9.251.0.0/16
FakeSubNet string `json:"fakeSubNet,omitempty"`
// ListenInterface indicates the listen interface of edgeproxy
// do not allow users to configure manually
ListenInterface string `json:"listenInterface,omitempty"`
Expand All @@ -18,6 +21,7 @@ type EdgeProxyConfig struct {
func NewEdgeProxyConfig() *EdgeProxyConfig {
return &EdgeProxyConfig{
Enable: false,
FakeSubNet: "9.251.0.0/16",
ListenPort: 40001,
}
}
59 changes: 51 additions & 8 deletions agent/pkg/proxy/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"fmt"
"strconv"
"strings"
"sync"

Expand All @@ -13,14 +14,18 @@ import (
"github.com/kubeedge/edgemesh/common/informers"
)

const None = "None"
const (
None = "None"
defaultNetworkPrefix = "9.251."
)

var (
APIConn *ProxyController
once sync.Once
)

type ProxyController struct {
FakeIPIndex uint16
svcInformer cache.SharedIndexInformer
svcEventHandlers map[string]cache.ResourceEventHandlerFuncs // key: service event handler name

Expand All @@ -32,6 +37,7 @@ type ProxyController struct {
func Init(ifm *informers.Manager) {
once.Do(func() {
APIConn = &ProxyController{
FakeIPIndex: uint16(1), // avoid 0.0
svcInformer: ifm.GetKubeFactory().Core().V1().Services().Informer(),
svcEventHandlers: make(map[string]cache.ResourceEventHandlerFuncs),
svcPortsByIP: make(map[string]string),
Expand Down Expand Up @@ -62,6 +68,14 @@ func (c *ProxyController) SetServiceEventHandlers(name string, handlerFuncs cach
c.Unlock()
}

func (c *ProxyController) setFakeIPIndex(newIndex uint16) {
c.FakeIPIndex = newIndex
}

func (c *ProxyController) getFakeIPIndex() uint16 {
return c.FakeIPIndex
}

func getSvcPorts(svc *v1.Service) string {
svcPorts := ""
svcName := svc.Namespace + "." + svc.Name
Expand Down Expand Up @@ -94,7 +108,8 @@ func (c *ProxyController) svcAdd(obj interface{}) {
svcName := svc.Namespace + "." + svc.Name
ip := svc.Spec.ClusterIP
if ip == "" || ip == None {
return
ip = GetFakeIP()
klog.Warningf("[EdgeMesh] use fakeIP for service %s, fakeIP:%s", svcName, ip)
}
c.addOrUpdateService(svcName, ip, svcPorts)
}
Expand All @@ -108,10 +123,18 @@ func (c *ProxyController) svcUpdate(oldObj, newObj interface{}) {
svcPorts := getSvcPorts(svc)
svcName := svc.Namespace + "." + svc.Name
ip := svc.Spec.ClusterIP
if ip == "" || ip == None {
return

clusterIP := ip
old, ok := c.ipBySvc[svcName]
if ok {
clusterIP = old
} else {
if ip == "" || ip == None {
clusterIP = GetFakeIP()
klog.Warningf("[EdgeMesh] use fakeIP for service %s, fakeIP:%s", svcName, clusterIP)
}
}
c.addOrUpdateService(svcName, ip, svcPorts)
c.addOrUpdateService(svcName, clusterIP, svcPorts)
}

func (c *ProxyController) svcDelete(obj interface{}) {
Expand All @@ -122,9 +145,6 @@ func (c *ProxyController) svcDelete(obj interface{}) {
}
svcName := svc.Namespace + "." + svc.Name
ip := svc.Spec.ClusterIP
if ip == "" || ip == None {
return
}
c.deleteService(svcName, ip)
}

Expand Down Expand Up @@ -159,3 +179,26 @@ func (c *ProxyController) GetSvcPorts(ip string) string {
c.RUnlock()
return svcPorts
}

func GetFakeIP() (ip string) {
for {
index := APIConn.getFakeIPIndex() + 1
APIConn.setFakeIPIndex(index)

ip = defaultNetworkPrefix + getSubNet(index)

_, ok := APIConn.svcPortsByIP[ip]
if !ok {
break
}

}
return
}

// getSubNet converts uint16 to "uint8.uint8"
func getSubNet(subNet uint16) string {
arg1 := uint64(subNet & 0x00ff)
arg2 := uint64((subNet & 0xff00) >> 8)
return strconv.FormatUint(arg2, 10) + "." + strconv.FormatUint(arg1, 10)
}
44 changes: 43 additions & 1 deletion agent/pkg/proxy/iptables.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ type Proxier struct {
// serviceCIDR is kubernetes service-cluster-ip-range
serviceCIDR string

// headlessSvcCIDR is fake headless-service-cluster-ip-range
headlessSvcCIDR string

// protoProxies represents the protocol that requires proxy
protoProxies []protocol.ProtoProxy

Expand All @@ -64,14 +67,15 @@ type Proxier struct {
dnatRules []iptablesJumpChain
}

func NewProxier(subnet string, protoProxies []protocol.ProtoProxy, kubeClient kubernetes.Interface) (proxier *Proxier, err error) {
func NewProxier(subnet, fakeSubNet string, protoProxies []protocol.ProtoProxy, kubeClient kubernetes.Interface) (proxier *Proxier, err error) {
primaryProtocol := utiliptables.ProtocolIPv4
execer := utilexec.New()
iptInterface := utiliptables.New(execer, primaryProtocol)
proxier = &Proxier{
iptables: iptInterface,
kubeClient: kubeClient,
serviceCIDR: subnet,
headlessSvcCIDR: fakeSubNet,
protoProxies: protoProxies,
ignoreRules: make([]iptablesJumpChain, 0),
expiredIgnoreRules: make([]iptablesJumpChain, 0),
Expand Down Expand Up @@ -226,6 +230,10 @@ func (proxier *Proxier) createProxyRules() (proxyRules, dnatRules []iptablesJump
return []string{"-p", string(protoName), "-d", proxier.serviceCIDR}
}

fakeProxyRuleArgs = func(protoName protocol.ProtoName) []string {
return []string{"-p", string(protoName), "-d", proxier.headlessSvcCIDR}
}

dnatRuleArgs = func(protoName protocol.ProtoName, serverAddr string) []string {
return []string{"-p", string(protoName), "-j", "DNAT", "--to-destination", serverAddr}
}
Expand All @@ -240,6 +248,13 @@ func (proxier *Proxier) createProxyRules() (proxyRules, dnatRules []iptablesJump
comment: proxyRuleComment(proto.GetName()),
extraArgs: proxyRuleArgs(proto.GetName()),
})
proxyRules = append(proxyRules, iptablesJumpChain{
table: utiliptables.TableNAT,
dstChain: newChainName,
srcChain: meshRootChain,
comment: proxyRuleComment(proto.GetName()),
extraArgs: fakeProxyRuleArgs(proto.GetName()),
})
dnatRules = append(dnatRules, iptablesJumpChain{
table: utiliptables.TableNAT,
dstChain: "DNAT",
Expand Down Expand Up @@ -405,6 +420,14 @@ func (proxier *Proxier) CleanResidue() {
if err := proxier.iptables.DeleteRule(utiliptables.TableNAT, utiliptables.ChainOutput, nonIfiRuleArgs...); err != nil {
klog.V(4).ErrorS(err, "Failed clean residual non-interface rule %v", nonIfiRuleArgs)
}
nonIfiRuleArgs = strings.Split(fmt.Sprintf("-p tcp -d %s -j EDGE-MESH", proxier.headlessSvcCIDR), " ")
if err := proxier.iptables.DeleteRule(utiliptables.TableNAT, utiliptables.ChainPrerouting, nonIfiRuleArgs...); err != nil {
klog.V(4).Error(err, "Failed clean residual non-interface rule %v", nonIfiRuleArgs)
}
if err := proxier.iptables.DeleteRule(utiliptables.TableNAT, utiliptables.ChainOutput, nonIfiRuleArgs...); err != nil {
klog.V(4).Error(err, "Failed clean residual non-interface rule %v", nonIfiRuleArgs)
}


// clean up interface iptables rules
ifiList := []string{"docker0", "cni0"}
Expand All @@ -417,6 +440,15 @@ func (proxier *Proxier) CleanResidue() {
if err := proxier.iptables.DeleteRule(utiliptables.TableNAT, utiliptables.ChainOutput, outboundRuleAgrs...); err != nil {
klog.V(4).ErrorS(err, "Failed clean residual outbound rule %v", outboundRuleAgrs)
}

inboundRuleArgs = strings.Split(fmt.Sprintf("-p tcp -d %s -i %s -j EDGE-MESH", proxier.headlessSvcCIDR, ifi), " ")
if err := proxier.iptables.DeleteRule(utiliptables.TableNAT, utiliptables.ChainPrerouting, inboundRuleArgs...); err != nil {
klog.V(4).Error(err, "Failed clean residual inbound rule %v", inboundRuleArgs)
}
outboundRuleAgrs = strings.Split(fmt.Sprintf("-p tcp -d %s -o %s -j EDGE-MESH", proxier.headlessSvcCIDR, ifi), " ")
if err := proxier.iptables.DeleteRule(utiliptables.TableNAT, utiliptables.ChainOutput, outboundRuleAgrs...); err != nil {
klog.V(4).Error(err, "Failed clean residual outbound rule %v", outboundRuleAgrs)
}
}

// clean up ip routes
Expand All @@ -425,6 +457,11 @@ func (proxier *Proxier) CleanResidue() {
klog.Errorf("parse subnet(serviceCIDR) error: %v", err)
return
}
dst1, err := netlink.ParseIPNet(proxier.headlessSvcCIDR)
if err != nil {
klog.Errorf("parse subnet(headlessSvcCIDR) error: %v", err)
return
}

// try to delete the route that may exist
for _, ifi := range ifiList {
Expand All @@ -433,6 +470,11 @@ func (proxier *Proxier) CleanResidue() {
if err := netlink.RouteDel(&route); err != nil {
klog.V(4).ErrorS(err, "Failed delete route %v", route)
}

route = netlink.Route{Dst: dst1, Gw: gw}
if err := netlink.RouteDel(&route); err != nil {
klog.V(4).ErrorS(err, "Failed delete route %v", route)
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion agent/pkg/proxy/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func newEdgeProxy(c *config.EdgeProxyConfig, ifm *informers.Manager) (proxy *Edg

// new proxier
protoProxies := []protocol.ProtoProxy{proxy.TCPProxy}
proxy.Proxier, err = NewProxier(proxy.Config.SubNet, protoProxies, ifm.GetKubeClient())
proxy.Proxier, err = NewProxier(proxy.Config.SubNet, proxy.Config.FakeSubNet, protoProxies, ifm.GetKubeClient())
if err != nil {
return proxy, fmt.Errorf("new proxier err: %v", err)
}
Expand Down
8 changes: 8 additions & 0 deletions common/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/klog/v2"

"github.com/kubeedge/edgemesh/agent/pkg/proxy/controller"
)

const (
Expand Down Expand Up @@ -82,3 +84,9 @@ func FetchPublicIP() string {
func IsNotFoundError(err error) bool {
return strings.Contains(err.Error(), "not found")
}

// GetSvcIP returns the ip by given service name
func GetSvcIP(namespace, name string) string {
svcName := namespace + "." + name
return controller.APIConn.GetSvcIP(svcName)
}