Skip to content

Instantly share code, notes, and snippets.

@leblancd
Created August 24, 2017 16:32
Show Gist options
  • Save leblancd/0b4ae3b6203bd1c04c12c3f0a51fd06b to your computer and use it in GitHub Desktop.
Save leblancd/0b4ae3b6203bd1c04c12c3f0a51fd06b to your computer and use it in GitHub Desktop.
Rebased diffs for PR #48551 (not including unit tests)
diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go
index 6f2e666..f023041 100644
--- a/pkg/proxy/iptables/proxier.go
+++ b/pkg/proxy/iptables/proxier.go
@@ -27,6 +27,7 @@ import (
"fmt"
"net"
"reflect"
+ "regexp"
"strconv"
"strings"
"sync"
@@ -166,12 +167,26 @@ type endpointsInfo struct {
chainName utiliptables.Chain
}
+// Returns just the IP part of an IP:port or IP endpoint string. If the IP
+// part is an IPv6 address enclosed in brackets (e.g. "[fd00:1::5]:9999"),
+// then the brackets are stripped as well.
+func ipPart(s string) string {
+ if index := strings.LastIndex(s, ":"); index != -1 {
+ ip := s[0:index]
+ // Strip off any surrounding brackets.
+ re := regexp.MustCompile(`\[(.*)\]`)
+ match := re.FindStringSubmatch(ip)
+ if match != nil {
+ ip = match[1]
+ }
+ return ip
+ }
+ return s
+}
+
// Returns just the IP part of the endpoint.
func (e *endpointsInfo) IPPart() string {
- if index := strings.Index(e.endpoint, ":"); index != -1 {
- return e.endpoint[0:index]
- }
- return e.endpoint
+ return ipPart(e.endpoint)
}
// Returns the endpoint chain name for a given endpointsInfo.
@@ -311,15 +326,26 @@ func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previo
return len(scm.items) > 0
}
+// ipPort returns a string of the form "<ip>:<port>" (for IPv4)
+// or "[<ip>]:<port>" (for IPv6) for a given IP address and port
+func ipPort(ipStr string, port int) string {
+ ip := net.ParseIP(ipStr)
+ if ip.To4() == nil {
+ ipStr = "[" + ipStr + "]"
+ }
+ return fmt.Sprintf("%s:%d", ipStr, port)
+}
+
func (sm *proxyServiceMap) merge(other proxyServiceMap) sets.String {
existingPorts := sets.NewString()
for svcPortName, info := range other {
+ clusterIPPort := ipPort(info.clusterIP.String(), info.port)
existingPorts.Insert(svcPortName.Port)
_, exists := (*sm)[svcPortName]
if !exists {
- glog.V(1).Infof("Adding new service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol)
+ glog.V(1).Infof("Adding new service port %q at %s/%s", svcPortName, clusterIPPort, info.protocol)
} else {
- glog.V(1).Infof("Updating existing service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol)
+ glog.V(1).Infof("Updating existing service port %q at %s/%s", svcPortName, clusterIPPort, info.protocol)
}
(*sm)[svcPortName] = info
}
@@ -413,7 +439,8 @@ type localPort struct {
}
func (lp *localPort) String() string {
- return fmt.Sprintf("%q (%s:%d/%s)", lp.desc, lp.ip, lp.port, lp.protocol)
+ ipPort := ipPort(lp.ip, lp.port)
+ return fmt.Sprintf("%q (%s/%s)", lp.desc, ipPort, lp.protocol)
}
type closeable interface {
@@ -939,10 +966,7 @@ type endpointServicePair struct {
}
func (esp *endpointServicePair) IPPart() string {
- if index := strings.Index(esp.endpoint, ":"); index != -1 {
- return esp.endpoint[0:index]
- }
- return esp.endpoint
+ return ipPart(esp.endpoint)
}
// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
@@ -960,6 +984,16 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServ
}
}
+// hostAddress returns a host address of the form <ip-address>/32 for
+// IPv4 and <ip-address>/128 for IPv6
+func hostAddress(ip net.IP) string {
+ len := 32
+ if ip.To4() == nil {
+ len = 128
+ }
+ return fmt.Sprintf("%s/%d", ip.String(), len)
+}
+
// This is where all of the iptables-save/restore calls happen.
// The only other iptables rules are those that are setup in iptablesInit()
// This assumes proxier.mu is NOT held
@@ -1177,7 +1211,7 @@ func (proxier *Proxier) syncProxyRules() {
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString),
"-m", protocol, "-p", protocol,
- "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()),
+ "-d", hostAddress(svcInfo.clusterIP),
"--dport", strconv.Itoa(svcInfo.port),
)
if proxier.masqueradeAll {
@@ -1231,7 +1265,7 @@ func (proxier *Proxier) syncProxyRules() {
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString),
"-m", protocol, "-p", protocol,
- "-d", fmt.Sprintf("%s/32", externalIP),
+ "-d", hostAddress(net.ParseIP(externalIP)),
"--dport", strconv.Itoa(svcInfo.port),
)
// We have to SNAT packets to external IPs.
@@ -1257,7 +1291,7 @@ func (proxier *Proxier) syncProxyRules() {
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
"-m", protocol, "-p", protocol,
- "-d", fmt.Sprintf("%s/32", externalIP),
+ "-d", hostAddress(net.ParseIP(externalIP)),
"--dport", strconv.Itoa(svcInfo.port),
"-j", "REJECT",
)
@@ -1283,7 +1317,7 @@ func (proxier *Proxier) syncProxyRules() {
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
"-m", protocol, "-p", protocol,
- "-d", fmt.Sprintf("%s/32", ingress.IP),
+ "-d", hostAddress(net.ParseIP(ingress.IP)),
"--dport", strconv.Itoa(svcInfo.port),
)
// jump to service firewall chain
@@ -1321,7 +1355,7 @@ func (proxier *Proxier) syncProxyRules() {
// loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
// Need to add the following rule to allow request on host.
if allowFromNode {
- writeLine(proxier.natRules, append(args, "-s", fmt.Sprintf("%s/32", ingress.IP), "-j", string(chosenChain))...)
+ writeLine(proxier.natRules, append(args, "-s", hostAddress(net.ParseIP(ingress.IP)), "-j", string(chosenChain))...)
}
}
@@ -1404,7 +1438,7 @@ func (proxier *Proxier) syncProxyRules() {
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
"-m", protocol, "-p", protocol,
- "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()),
+ "-d", hostAddress(svcInfo.clusterIP),
"--dport", strconv.Itoa(svcInfo.port),
"-j", "REJECT",
)
@@ -1471,7 +1505,7 @@ func (proxier *Proxier) syncProxyRules() {
)
// Handle traffic that loops back to the originator with SNAT.
writeLine(proxier.natRules, append(args,
- "-s", fmt.Sprintf("%s/32", endpoints[i].IPPart()),
+ "-s", hostAddress(net.ParseIP(endpoints[i].IPPart())),
"-j", string(KubeMarkMasqChain))...)
// Update client-affinity lists.
if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment