Skip to content

Commit e567505

Browse files
committed
Track node online/offline state in master egress IP allocator
and don't allocate egress IPs to offline nodes
1 parent 9d1e4a1 commit e567505

File tree

3 files changed

+220
-7
lines changed

3 files changed

+220
-7
lines changed

pkg/network/common/egressip.go

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -442,22 +442,32 @@ func (eit *EgressIPTracker) SetNodeOffline(nodeIP string, offline bool) {
442442
eit.egressIPChanged(eg)
443443
}
444444
}
445+
446+
if node.requestedCIDRs.Len() != 0 {
447+
eit.updateEgressCIDRs = true
448+
}
449+
445450
eit.syncEgressIPs()
446451
}
447452

453+
func (eit *EgressIPTracker) lookupNodeIP(ip string) string {
454+
eit.Lock()
455+
defer eit.Unlock()
456+
457+
if node := eit.nodesByNodeIP[ip]; node != nil {
458+
return node.sdnIP
459+
}
460+
return ip
461+
}
462+
448463
// Ping a node and return whether or not it is online. We do this by trying to open a TCP
449464
// connection to the "discard" service (port 9); if the node is offline, the attempt will
450465
// time out with no response (and we will return false). If the node is online then we
451466
// presumably will get a "connection refused" error; the code below assumes that anything
452467
// other than timing out indicates that the node is online.
453468
func (eit *EgressIPTracker) Ping(ip string, timeout time.Duration) bool {
454-
eit.Lock()
455-
defer eit.Unlock()
456-
457469
// If the caller used a public node IP, replace it with the SDN IP
458-
if node := eit.nodesByNodeIP[ip]; node != nil {
459-
ip = node.sdnIP
460-
}
470+
ip = eit.lookupNodeIP(ip)
461471

462472
conn, err := net.DialTimeout("tcp", ip+":9", timeout)
463473
if conn != nil {
@@ -477,6 +487,9 @@ func (eit *EgressIPTracker) findEgressIPAllocation(ip net.IP, allocation map[str
477487
otherNodes := false
478488

479489
for _, node := range eit.nodes {
490+
if node.offline {
491+
continue
492+
}
480493
egressIPs, exists := allocation[node.nodeName]
481494
if !exists {
482495
continue
@@ -524,7 +537,7 @@ func (eit *EgressIPTracker) ReallocateEgressIPs() map[string][]string {
524537
break
525538
}
526539
}
527-
if found {
540+
if found && !node.offline {
528541
allocation[node.nodeName] = append(allocation[node.nodeName], egressIP)
529542
}
530543
// (We set alreadyAllocated even if the egressIP will be removed from

pkg/network/common/egressip_test.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,20 @@ func (w *testEIPWatcher) assertNoChanges() error {
7373
return w.assertChanges()
7474
}
7575

76+
func (w *testEIPWatcher) flushChanges() {
77+
w.changes = []string{}
78+
}
79+
80+
func (w *testEIPWatcher) assertUpdateEgressCIDRsNotification() error {
81+
for _, change := range w.changes {
82+
if change == "update egress CIDRs" {
83+
w.flushChanges()
84+
return nil
85+
}
86+
}
87+
return fmt.Errorf("expected change \"update egress CIDRs\", got %#v", w.changes)
88+
}
89+
7690
func setupEgressIPTracker(t *testing.T) (*EgressIPTracker, *testEIPWatcher) {
7791
watcher := &testEIPWatcher{}
7892
return NewEgressIPTracker(watcher), watcher
@@ -998,3 +1012,98 @@ func TestEgressNodeRenumbering(t *testing.T) {
9981012
t.Fatalf("%v", err)
9991013
}
10001014
}
1015+
1016+
func TestEgressCIDRAllocationOffline(t *testing.T) {
1017+
eit, w := setupEgressIPTracker(t)
1018+
1019+
// Create nodes...
1020+
updateHostSubnetEgress(eit, &networkapi.HostSubnet{
1021+
HostIP: "172.17.0.3",
1022+
EgressIPs: []string{},
1023+
EgressCIDRs: []string{"172.17.0.0/24", "172.17.1.0/24"},
1024+
})
1025+
updateHostSubnetEgress(eit, &networkapi.HostSubnet{
1026+
HostIP: "172.17.0.4",
1027+
EgressIPs: []string{},
1028+
EgressCIDRs: []string{"172.17.0.0/24"},
1029+
})
1030+
updateHostSubnetEgress(eit, &networkapi.HostSubnet{
1031+
HostIP: "172.17.0.5",
1032+
EgressIPs: []string{},
1033+
EgressCIDRs: []string{"172.17.1.0/24"},
1034+
})
1035+
1036+
// Create namespaces
1037+
updateNetNamespaceEgress(eit, &networkapi.NetNamespace{
1038+
NetID: 100,
1039+
EgressIPs: []string{"172.17.0.100"},
1040+
})
1041+
updateNetNamespaceEgress(eit, &networkapi.NetNamespace{
1042+
NetID: 101,
1043+
EgressIPs: []string{"172.17.0.101"},
1044+
})
1045+
updateNetNamespaceEgress(eit, &networkapi.NetNamespace{
1046+
NetID: 102,
1047+
EgressIPs: []string{"172.17.0.102"},
1048+
})
1049+
updateNetNamespaceEgress(eit, &networkapi.NetNamespace{
1050+
NetID: 200,
1051+
EgressIPs: []string{"172.17.1.200"},
1052+
})
1053+
updateNetNamespaceEgress(eit, &networkapi.NetNamespace{
1054+
NetID: 201,
1055+
EgressIPs: []string{"172.17.1.201"},
1056+
})
1057+
updateNetNamespaceEgress(eit, &networkapi.NetNamespace{
1058+
NetID: 202,
1059+
EgressIPs: []string{"172.17.1.202"},
1060+
})
1061+
1062+
// In a perfect world, we'd get 2 IPs on each node, but depending on processing
1063+
// order, this isn't guaranteed. Eg, if the three 172.17.0.x nodes get processed
1064+
// first, we could get two of them on node-3 and one on node-4. Then the first two
1065+
// 172.17.1.x nodes get assigned to node-5, and the last one could go to either
1066+
// node-3 or node-5. Regardless of order, node-3 is guaranteed to get at least
1067+
// two nodes since there's no way either node-4 or node-5 could be assigned a
1068+
// third IP if node-3 still only had one.
1069+
allocation := eit.ReallocateEgressIPs()
1070+
node3ips := allocation["node-3"]
1071+
node4ips := allocation["node-4"]
1072+
node5ips := allocation["node-5"]
1073+
if len(node3ips) < 2 || len(node4ips) == 0 || len(node5ips) == 0 ||
1074+
len(node3ips) + len(node4ips) + len(node5ips) != 6 {
1075+
t.Fatalf("Bad IP allocation: %#v", allocation)
1076+
}
1077+
updateAllocations(eit, allocation)
1078+
1079+
w.flushChanges()
1080+
1081+
// Now take node-3 offline
1082+
eit.SetNodeOffline("172.17.0.3", true)
1083+
err := w.assertUpdateEgressCIDRsNotification()
1084+
if err != nil {
1085+
t.Fatalf("%v", err)
1086+
}
1087+
1088+
// First reallocation should empty out node-3
1089+
allocation = eit.ReallocateEgressIPs()
1090+
if node3ips, ok := allocation["node-3"]; !ok || len(node3ips) != 0 {
1091+
t.Fatalf("Bad IP allocation: %#v", allocation)
1092+
}
1093+
updateAllocations(eit, allocation)
1094+
1095+
err = w.assertUpdateEgressCIDRsNotification()
1096+
if err != nil {
1097+
t.Fatalf("%v", err)
1098+
}
1099+
1100+
// Next reallocation should reassign egress IPs to node-4 and node-5
1101+
allocation = eit.ReallocateEgressIPs()
1102+
node3ips = allocation["node-3"]
1103+
node4ips = allocation["node-4"]
1104+
node5ips = allocation["node-5"]
1105+
if len(node3ips) != 0 || len(node4ips) != 3 || len(node5ips) != 3 {
1106+
t.Fatalf("Bad IP allocation: %#v", allocation)
1107+
}
1108+
updateAllocations(eit, allocation)
1109+
}

pkg/network/master/egressip.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"sync"
66
"time"
77

8+
"github.com/golang/glog"
9+
810
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
911
"k8s.io/apimachinery/pkg/util/sets"
1012
utilwait "k8s.io/apimachinery/pkg/util/wait"
@@ -24,6 +26,15 @@ type egressIPManager struct {
2426

2527
updatePending bool
2628
updatedAgain bool
29+
30+
monitorNodes map[string]*egressNode
31+
stop chan struct{}
32+
}
33+
34+
type egressNode struct {
35+
ip string
36+
offline bool
37+
retries int
2738
}
2839

2940
func newEgressIPManager() *egressIPManager {
@@ -76,13 +87,20 @@ func (eim *egressIPManager) maybeDoUpdateEgressCIDRs() (bool, error) {
7687
// we won't process that until this reallocation is complete.
7788

7889
allocation := eim.tracker.ReallocateEgressIPs()
90+
monitorNodes := make(map[string]*egressNode, len(allocation))
7991
for nodeName, egressIPs := range allocation {
8092
resultErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
8193
hs, err := eim.hostSubnetInformer.Lister().Get(nodeName)
8294
if err != nil {
8395
return err
8496
}
8597

98+
if node := eim.monitorNodes[hs.HostIP]; node != nil {
99+
monitorNodes[hs.HostIP] = node
100+
} else {
101+
monitorNodes[hs.HostIP] = &egressNode{ip: hs.HostIP}
102+
}
103+
86104
oldIPs := sets.NewString(hs.EgressIPs...)
87105
newIPs := sets.NewString(egressIPs...)
88106
if !oldIPs.Equal(newIPs) {
@@ -96,9 +114,82 @@ func (eim *egressIPManager) maybeDoUpdateEgressCIDRs() (bool, error) {
96114
}
97115
}
98116

117+
eim.monitorNodes = monitorNodes
118+
if len(monitorNodes) > 0 {
119+
if eim.stop == nil {
120+
eim.stop = make(chan struct{})
121+
go eim.poll(eim.stop)
122+
}
123+
} else {
124+
if eim.stop != nil {
125+
close(eim.stop)
126+
eim.stop = nil
127+
}
128+
}
129+
99130
return true, nil
100131
}
101132

133+
const (
134+
pollInterval = 5 * time.Second
135+
repollInterval = time.Second
136+
maxRetries = 2
137+
)
138+
139+
func (eim *egressIPManager) poll(stop chan struct{}) {
140+
retry := false
141+
for {
142+
select {
143+
case <-stop:
144+
return
145+
default:
146+
}
147+
148+
start := time.Now()
149+
retry := eim.check(retry)
150+
if !retry {
151+
// If less than pollInterval has passed since start, then sleep until it has
152+
time.Sleep(start.Add(pollInterval).Sub(time.Now()))
153+
}
154+
}
155+
}
156+
157+
func (eim *egressIPManager) check(retrying bool) bool {
158+
var timeout time.Duration
159+
if retrying {
160+
timeout = repollInterval
161+
} else {
162+
timeout = pollInterval
163+
}
164+
165+
needRetry := false
166+
for _, node := range eim.monitorNodes {
167+
if retrying && node.retries == 0 {
168+
continue
169+
}
170+
171+
online := eim.tracker.Ping(node.ip, timeout)
172+
if node.offline && online {
173+
glog.Infof("Node %s is back online", node.ip)
174+
node.offline = false
175+
eim.tracker.SetNodeOffline(node.ip, false)
176+
} else if !node.offline && !online {
177+
node.retries++
178+
if node.retries > maxRetries {
179+
glog.Warningf("Node %s is offline", node.ip)
180+
node.retries = 0
181+
node.offline = true
182+
eim.tracker.SetNodeOffline(node.ip, true)
183+
} else {
184+
glog.V(2).Infof("Node %s may be offline... retrying", node.ip)
185+
needRetry = true
186+
}
187+
}
188+
}
189+
190+
return needRetry
191+
}
192+
102193
func (eim *egressIPManager) ClaimEgressIP(vnid uint32, egressIP, nodeIP string) {
103194
}
104195

0 commit comments

Comments
 (0)