From 03997df2b18810c0e26b5518a889cf6e6985bc1e Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Mon, 29 Jan 2018 15:15:50 -0500 Subject: [PATCH 1/5] Drop getVethInfo() It was only being used in one remaining case (to get the pod IP when tearing down a pod), but we can just always use the fallback code to get that information from OVS instead. --- pkg/network/node/ovscontroller.go | 15 +++---- pkg/network/node/ovscontroller_test.go | 2 +- pkg/network/node/pod.go | 58 +------------------------- 3 files changed, 8 insertions(+), 67 deletions(-) diff --git a/pkg/network/node/ovscontroller.go b/pkg/network/node/ovscontroller.go index 0a18a2b37be5..76ac41e9a733 100644 --- a/pkg/network/node/ovscontroller.go +++ b/pkg/network/node/ovscontroller.go @@ -404,15 +404,12 @@ func (oc *ovsController) UpdatePod(sandboxID string, vnid uint32) error { return oc.setupPodFlows(ofport, podIP, podMAC, note, vnid) } -func (oc *ovsController) TearDownPod(podIP, sandboxID string) error { - if podIP == "" { - var err error - _, podIP, _, _, err = oc.getPodDetailsBySandboxID(sandboxID) - if err != nil { - // OVS flows related to sandboxID not found - // Nothing needs to be done in that case - return nil - } +func (oc *ovsController) TearDownPod(sandboxID string) error { + _, podIP, _, _, err := oc.getPodDetailsBySandboxID(sandboxID) + if err != nil { + // OVS flows related to sandboxID not found + // Nothing needs to be done in that case + return nil } if err := oc.cleanupPodFlows(podIP); err != nil { diff --git a/pkg/network/node/ovscontroller_test.go b/pkg/network/node/ovscontroller_test.go index 5e9fbd21643d..484a41fc58a5 100644 --- a/pkg/network/node/ovscontroller_test.go +++ b/pkg/network/node/ovscontroller_test.go @@ -303,7 +303,7 @@ func TestOVSPod(t *testing.T) { } // Delete - err = oc.TearDownPod("10.128.0.2", sandboxID) + err = oc.TearDownPod(sandboxID) if err != nil { t.Fatalf("Unexpected error deleting pod rules: %v", err) } diff --git a/pkg/network/node/pod.go b/pkg/network/node/pod.go index a662f0b2c90a..842e0fae8eb7 100644 --- a/pkg/network/node/pod.go +++ b/pkg/network/node/pod.go @@ -339,51 +339,6 @@ func (m *podManager) processRequest(request *cniserver.PodRequest) *cniserver.Po return result } -// For a given container, returns host veth name, container veth MAC, and pod IP -func getVethInfo(netns, containerIfname string) (string, string, string, error) { - var ( - peerIfindex int - contVeth netlink.Link - err error - podIP string - ) - - containerNs, err := ns.GetNS(netns) - if err != nil { - return "", "", "", fmt.Errorf("failed to get container netns: %v", err) - } - defer containerNs.Close() - - err = containerNs.Do(func(ns.NetNS) error { - contVeth, err = netlink.LinkByName(containerIfname) - if err != nil { - return err - } - peerIfindex = contVeth.Attrs().ParentIndex - - addrs, err := netlink.AddrList(contVeth, netlink.FAMILY_V4) - if err != nil { - return fmt.Errorf("failed to get container IP addresses: %v", err) - } - if len(addrs) == 0 { - return fmt.Errorf("container had no addresses") - } - podIP = addrs[0].IP.String() - - return nil - }) - if err != nil { - return "", "", "", fmt.Errorf("failed to inspect container interface: %v", err) - } - - hostVeth, err := netlink.LinkByIndex(peerIfindex) - if err != nil { - return "", "", "", fmt.Errorf("failed to get host veth: %v", err) - } - - return hostVeth.Attrs().Name, contVeth.Attrs().HardwareAddr.String(), podIP, nil -} - // Adds a macvlan interface to a container, if requested, for use with the egress router feature func maybeAddMacvlan(pod *kapi.Pod, netns string) error { annotation, ok := pod.Annotations[networkapi.AssignMacvlanAnnotation] @@ -679,20 +634,9 @@ func (m *podManager) update(req *cniserver.PodRequest) (uint32, error) { func (m *podManager) teardown(req *cniserver.PodRequest) error { defer PodOperationsLatency.WithLabelValues(PodOperationTeardown).Observe(sinceInMicroseconds(time.Now())) - var podIP string errList := []error{} - if err := ns.IsNSorErr(req.Netns); err != nil { - if _, ok := err.(ns.NSPathNotExistErr); !ok { - // Namespace still exists, get pod IP from the veth - _, _, podIP, err = getVethInfo(req.Netns, podInterfaceName) - if err != nil { - errList = append(errList, err) - } - } - } - - if err := m.ovs.TearDownPod(podIP, req.SandboxID); err != nil { + if err := m.ovs.TearDownPod(req.SandboxID); err != nil { errList = append(errList, err) } From ddb2b45cb1cd29619e7db88fe11fa60dfb121152 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Mon, 29 Jan 2018 15:36:37 -0500 Subject: [PATCH 2/5] Drop sandboxID as OVS flow note, use external-ids on OVS port instead This requires two OVS commands rather than one but it saves us having to parse all that dump-flows output. --- pkg/network/node/ovscontroller.go | 101 ++++++++++--------------- pkg/network/node/ovscontroller_test.go | 17 ++--- pkg/util/ovs/fake_ovs.go | 70 +++++++++++++---- pkg/util/ovs/fake_ovs_test.go | 56 ++++++++++++++ 4 files changed, 154 insertions(+), 90 deletions(-) diff --git a/pkg/network/node/ovscontroller.go b/pkg/network/node/ovscontroller.go index 76ac41e9a733..7bc52d7c94ae 100644 --- a/pkg/network/node/ovscontroller.go +++ b/pkg/network/node/ovscontroller.go @@ -3,7 +3,6 @@ package node import ( - "encoding/hex" "fmt" "net" "sort" @@ -228,11 +227,11 @@ func (oc *ovsController) ensureOvsPort(hostVeth, sandboxID string) (int, error) return oc.ovs.AddPort(hostVeth, -1, "external-ids=sandbox="+sandboxID) } -func (oc *ovsController) setupPodFlows(ofport int, podIP, podMAC, note string, vnid uint32) error { +func (oc *ovsController) setupPodFlows(ofport int, podIP, podMAC string, vnid uint32) error { otx := oc.ovs.NewTransaction() // ARP/IP traffic from container - otx.AddFlow("table=20, priority=100, in_port=%d, arp, nw_src=%s, arp_sha=%s, actions=load:%d->NXM_NX_REG0[], note:%s, goto_table:21", ofport, podIP, podMAC, vnid, note) + otx.AddFlow("table=20, priority=100, in_port=%d, arp, nw_src=%s, arp_sha=%s, actions=load:%d->NXM_NX_REG0[], goto_table:21", ofport, podIP, podMAC, vnid) otx.AddFlow("table=20, priority=100, in_port=%d, ip, nw_src=%s, actions=load:%d->NXM_NX_REG0[], goto_table:21", ofport, podIP, vnid) if oc.useConnTrack { otx.AddFlow("table=25, priority=100, ip, nw_src=%s, actions=load:%d->NXM_NX_REG0[], goto_table:30", podIP, vnid) @@ -256,34 +255,12 @@ func (oc *ovsController) cleanupPodFlows(podIP string) error { return otx.EndTransaction() } -func getPodNote(sandboxID string) (string, error) { - bytes, err := hex.DecodeString(sandboxID) - if err != nil { - return "", fmt.Errorf("failed to decode sandbox ID %q: %v", sandboxID, err) - } - if len(bytes) != 32 { - return "", fmt.Errorf("invalid sandbox ID %q length; expected 32 bytes", sandboxID) - } - var note string - for _, b := range bytes { - if len(note) > 0 { - note += "." - } - note += fmt.Sprintf("%02x", b) - } - return note, nil -} - func (oc *ovsController) SetUpPod(hostVeth, podIP, podMAC, sandboxID string, vnid uint32) (int, error) { - note, err := getPodNote(sandboxID) - if err != nil { - return -1, err - } ofport, err := oc.ensureOvsPort(hostVeth, sandboxID) if err != nil { return -1, err } - return ofport, oc.setupPodFlows(ofport, podIP, podMAC, note, vnid) + return ofport, oc.setupPodFlows(ofport, podIP, podMAC, vnid) } // Returned list can also be used for port names @@ -346,54 +323,52 @@ func (oc *ovsController) SetPodBandwidth(hostVeth, sandboxID string, ingressBPS, return nil } -func (oc *ovsController) getPodDetailsBySandboxID(sandboxID string) (int, string, string, string, error) { - note, err := getPodNote(sandboxID) +func (oc *ovsController) getPodDetailsBySandboxID(sandboxID string) (int, string, string, error) { + strports, err := oc.ovs.Find("interface", "ofport", "external-ids:sandbox="+sandboxID) if err != nil { - return 0, "", "", "", err + return 0, "", "", err + } else if len(strports) == 0 { + return 0, "", "", fmt.Errorf("failed to find pod details from OVS flows") + } else if len(strports) > 1 { + return 0, "", "", fmt.Errorf("found multiple ofports for sandbox ID %q: %#v", sandboxID, strports) } - flows, err := oc.ovs.DumpFlows("table=20,arp") + ofport, err := strconv.Atoi(strports[0]) if err != nil { - return 0, "", "", "", err + return 0, "", "", fmt.Errorf("could not parse ofport %q: %v", strports[0], err) } - // Find the flow with the given note and extract the podIP, ofport, and MAC from them - for _, flow := range flows { - parsed, err := ovs.ParseFlow(ovs.ParseForDump, flow) - if err != nil { - return 0, "", "", "", err - } - if !parsed.NoteHasPrefix(note) { - continue - } + flows, err := oc.ovs.DumpFlows("table=20,arp,in_port=%d", ofport) + if err != nil { + return 0, "", "", err + } else if len(flows) != 1 { + return 0, "", "", fmt.Errorf("could not find correct OVS flows for port %d", ofport) + } - macField, macOk := parsed.FindField("arp_sha") - portField, pOk := parsed.FindField("in_port") - ipField, ipOk := parsed.FindField("arp_spa") - if !macOk || !pOk || !ipOk { - continue - } + parsed, err := ovs.ParseFlow(ovs.ParseForDump, flows[0]) + if err != nil { + return 0, "", "", err + } - ofport, err := strconv.Atoi(portField.Value) - if err != nil { - return 0, "", "", "", fmt.Errorf("failed to parse ofport %q: %v", portField.Value, err) - } - if _, err := net.ParseMAC(macField.Value); err != nil { - return 0, "", "", "", fmt.Errorf("failed to parse arp_sha %q: %v", macField.Value, err) - } - podMAC := macField.Value - if net.ParseIP(ipField.Value) == nil { - return 0, "", "", "", fmt.Errorf("failed to parse arp_spa %q", ipField.Value) - } - podIP := ipField.Value + macField, macOk := parsed.FindField("arp_sha") + ipField, ipOk := parsed.FindField("arp_spa") + if !macOk || !ipOk { + return 0, "", "", fmt.Errorf("failed to parse OVS flows for sandbox ID %q", sandboxID) + } - return ofport, podIP, podMAC, note, nil + if _, err := net.ParseMAC(macField.Value); err != nil { + return 0, "", "", fmt.Errorf("failed to parse arp_sha %q: %v", macField.Value, err) + } + podMAC := macField.Value + if net.ParseIP(ipField.Value) == nil { + return 0, "", "", fmt.Errorf("failed to parse arp_spa %q", ipField.Value) } + podIP := ipField.Value - return 0, "", "", "", fmt.Errorf("failed to find pod details from OVS flows") + return ofport, podIP, podMAC, nil } func (oc *ovsController) UpdatePod(sandboxID string, vnid uint32) error { - ofport, podIP, podMAC, note, err := oc.getPodDetailsBySandboxID(sandboxID) + ofport, podIP, podMAC, err := oc.getPodDetailsBySandboxID(sandboxID) if err != nil { return err } @@ -401,11 +376,11 @@ func (oc *ovsController) UpdatePod(sandboxID string, vnid uint32) error { if err != nil { return err } - return oc.setupPodFlows(ofport, podIP, podMAC, note, vnid) + return oc.setupPodFlows(ofport, podIP, podMAC, vnid) } func (oc *ovsController) TearDownPod(sandboxID string) error { - _, podIP, _, _, err := oc.getPodDetailsBySandboxID(sandboxID) + _, podIP, _, err := oc.getPodDetailsBySandboxID(sandboxID) if err != nil { // OVS flows related to sandboxID not found // Nothing needs to be done in that case diff --git a/pkg/network/node/ovscontroller_test.go b/pkg/network/node/ovscontroller_test.go index 484a41fc58a5..3d4c4272f277 100644 --- a/pkg/network/node/ovscontroller_test.go +++ b/pkg/network/node/ovscontroller_test.go @@ -218,9 +218,7 @@ func TestOVSService(t *testing.T) { } const ( - sandboxID string = "bcb5d8d287fcf97458c48ad643b101079e3bc265a94e097e7407440716112f69" - sandboxNote string = "bc.b5.d8.d2.87.fc.f9.74.58.c4.8a.d6.43.b1.01.07.9e.3b.c2.65.a9.4e.09.7e.74.07.44.07.16.11.2f.69" - sandboxNoteAction string = "note:" + sandboxNote + sandboxID string = "bcb5d8d287fcf97458c48ad643b101079e3bc265a94e097e7407440716112f69" ) func TestOVSPod(t *testing.T) { @@ -239,7 +237,7 @@ func TestOVSPod(t *testing.T) { err = assertFlowChanges(origFlows, flows, flowChange{ kind: flowAdded, - match: []string{"table=20", fmt.Sprintf("in_port=%d", ofport), "arp", "10.128.0.2", "11:22:33:44:55:66", sandboxNoteAction}, + match: []string{"table=20", fmt.Sprintf("in_port=%d", ofport), "arp", "10.128.0.2", "11:22:33:44:55:66"}, }, flowChange{ kind: flowAdded, @@ -267,7 +265,7 @@ func TestOVSPod(t *testing.T) { // Update err = oc.UpdatePod(sandboxID, 43) if err != nil { - t.Fatalf("Unexpected error adding pod rules: %v", err) + t.Fatalf("Unexpected error updating pod rules: %v", err) } flows, err = ovsif.DumpFlows("") @@ -277,7 +275,7 @@ func TestOVSPod(t *testing.T) { err = assertFlowChanges(origFlows, flows, flowChange{ kind: flowAdded, - match: []string{"table=20", fmt.Sprintf("in_port=%d", ofport), "arp", "10.128.0.2", "11:22:33:44:55:66", sandboxNoteAction}, + match: []string{"table=20", fmt.Sprintf("in_port=%d", ofport), "arp", "10.128.0.2", "11:22:33:44:55:66"}, }, flowChange{ kind: flowAdded, @@ -323,7 +321,6 @@ func TestGetPodDetails(t *testing.T) { sandboxID string ip string mac string - note string errStr string } @@ -332,7 +329,6 @@ func TestGetPodDetails(t *testing.T) { sandboxID: sandboxID, ip: "10.130.0.2", mac: "4a:77:32:e4:ab:9d", - note: sandboxNote, }, } @@ -343,7 +339,7 @@ func TestGetPodDetails(t *testing.T) { t.Fatalf("Unexpected error adding pod rules: %v", err) } - ofport, ip, mac, note, err := oc.getPodDetailsBySandboxID(tc.sandboxID) + ofport, ip, mac, err := oc.getPodDetailsBySandboxID(tc.sandboxID) if err != nil { if tc.errStr != "" { if !strings.Contains(err.Error(), tc.errStr) { @@ -364,9 +360,6 @@ func TestGetPodDetails(t *testing.T) { if mac != tc.mac { t.Fatalf("unexpected mac %q (expected %q)", mac, tc.mac) } - if note != tc.note { - t.Fatalf("unexpected note %q (expected %q)", note, tc.note) - } } } diff --git a/pkg/util/ovs/fake_ovs.go b/pkg/util/ovs/fake_ovs.go index 42be09ebee38..093b827a7f7e 100644 --- a/pkg/util/ovs/fake_ovs.go +++ b/pkg/util/ovs/fake_ovs.go @@ -3,6 +3,7 @@ package ovs import ( "fmt" "sort" + "strings" ) // ovsFake implements a fake ovs.Interface for testing purposes @@ -12,10 +13,16 @@ import ( // to support enough features to make the SDN unit tests pass, and should do enough // error checking to catch bugs that have tripped us up in the past (eg, // specifying "nw_dst" without "ip"). + +type ovsPortInfo struct { + ofport int + externalIDs map[string]string +} + type ovsFake struct { bridge string - ports map[string]int + ports map[string]ovsPortInfo flows ovsFlows } @@ -25,7 +32,7 @@ func NewFake(bridge string) Interface { } func (fake *ovsFake) AddBridge(properties ...string) error { - fake.ports = make(map[string]int) + fake.ports = make(map[string]ovsPortInfo) fake.flows = make([]OvsFlow, 0) return nil } @@ -48,8 +55,8 @@ func (fake *ovsFake) GetOFPort(port string) (int, error) { return -1, err } - if ofport, exists := fake.ports[port]; exists { - return ofport, nil + if portInfo, exists := fake.ports[port]; exists { + return portInfo.ofport, nil } else { return -1, fmt.Errorf("no row %q in table Interface", port) } @@ -60,28 +67,45 @@ func (fake *ovsFake) AddPort(port string, ofportRequest int, properties ...strin return -1, err } - ofport, exists := fake.ports[port] + var externalIDs map[string]string + for _, property := range properties { + if !strings.HasPrefix(property, "external-ids=") { + continue + } + externalIDs = make(map[string]string, 1) + for _, id := range strings.Split(property[13:], ",") { + parsed := strings.Split(id, "=") + if len(parsed) != 2 { + return -1, fmt.Errorf("could not parse external-id %q", id) + } + externalIDs[parsed[0]] = parsed[1] + } + } + + portInfo, exists := fake.ports[port] if exists { - if ofport != ofportRequest && ofportRequest != -1 { - return -1, fmt.Errorf("allocated ofport (%d) did not match request (%d)", ofport, ofportRequest) + if portInfo.ofport != ofportRequest && ofportRequest != -1 { + return -1, fmt.Errorf("allocated ofport (%d) did not match request (%d)", portInfo.ofport, ofportRequest) } } else { if ofportRequest == -1 { - ofport := 1 - for _, existingPort := range fake.ports { - if existingPort >= ofport { - ofport = existingPort + 1 + portInfo.ofport = 1 + for _, existingPortInfo := range fake.ports { + if existingPortInfo.ofport >= portInfo.ofport { + portInfo.ofport = existingPortInfo.ofport + 1 } } } else { if ofportRequest < 1 || ofportRequest > 65535 { return -1, fmt.Errorf("requested ofport (%d) out of range", ofportRequest) } - ofport = ofportRequest + portInfo.ofport = ofportRequest } - fake.ports[port] = ofport + portInfo.externalIDs = externalIDs + fake.ports[port] = portInfo } - return ofport, nil + + return portInfo.ofport, nil } func (fake *ovsFake) DeletePort(port string) error { @@ -114,7 +138,23 @@ func (fake *ovsFake) Set(table, record string, values ...string) error { } func (fake *ovsFake) Find(table, column, condition string) ([]string, error) { - return make([]string, 0), nil + results := make([]string, 0) + if (table == "Interface" || table == "interface") && strings.HasPrefix(condition, "external-ids:") { + parsed := strings.Split(condition[13:], "=") + if len(parsed) != 2 { + return nil, fmt.Errorf("could not parse condition %q", condition) + } + for portName, portInfo := range fake.ports { + if portInfo.externalIDs[parsed[0]] == parsed[1] { + if column == "name" { + results = append(results, portName) + } else if column == "ofport" { + results = append(results, fmt.Sprintf("%d", portInfo.ofport)) + } + } + } + } + return results, nil } func (fake *ovsFake) Clear(table, record string, columns ...string) error { diff --git a/pkg/util/ovs/fake_ovs_test.go b/pkg/util/ovs/fake_ovs_test.go index fac67ab44d8c..7cdf741be649 100644 --- a/pkg/util/ovs/fake_ovs_test.go +++ b/pkg/util/ovs/fake_ovs_test.go @@ -39,6 +39,62 @@ func TestFakePorts(t *testing.T) { } } +func TestFind(t *testing.T) { + ovsif := NewFake("br0") + err := ovsif.AddBridge() + if err != nil { + t.Fatalf("unexpected error adding bridge: %v", err) + } + + vethA, err := ovsif.AddPort("vethA", -1, "external-ids=sandboxID=ALPHA") + if err != nil { + t.Fatalf("unexpected error adding port: %v", err) + } + vethB, err := ovsif.AddPort("vethB", -1, "external-ids=sandboxID=BETA,notSandbox=ALPHA") + if err != nil { + t.Fatalf("unexpected error adding port: %v", err) + } + vethC, err := ovsif.AddPort("vethC", -1, "external-ids=sandboxID=GAMMA,notSandbox=ALPHA") + if err != nil { + t.Fatalf("unexpected error adding port: %v", err) + } + if vethA == vethB || vethA == vethC || vethB == vethC { + t.Fatalf("port numbers are reused: %d, %d, %d", vethA, vethB, vethC) + } + + ports, err := ovsif.Find("interface", "name", "external-ids:sandboxID=ALPHA") + if err != nil { + t.Fatalf("unexpected error finding port: %v", err) + } + if len(ports) != 1 || ports[0] != "vethA" { + t.Fatalf("unexpected result finding port ALPHA's name: %#v", ports) + } + + ports, err = ovsif.Find("interface", "ofport", "external-ids:sandboxID=BETA") + if err != nil { + t.Fatalf("unexpected error finding port: %v", err) + } + if len(ports) != 1 || ports[0] != fmt.Sprintf("%d", vethB) { + t.Fatalf("unexpected result finding port BETA's ofport: %#v", ports) + } + + ports, err = ovsif.Find("interface", "name", "external-ids:notSandbox=ALPHA") + if err != nil { + t.Fatalf("unexpected error finding port: %v", err) + } + if len(ports) != 2 || (ports[0] != "vethB" && ports[0] != "vethC") || (ports[1] != "vethB" && ports[1] != "vethC") || (ports[0] == ports[1]) { + t.Fatalf("unexpected result finding notSandbox=ALPHA ports: %#v", ports) + } + + ports, err = ovsif.Find("interface", "name", "external-ids:sandboxID=DELTA") + if err != nil { + t.Fatalf("unexpected error finding port: %v", err) + } + if len(ports) != 0 { + t.Fatalf("unexpected result finding sandboxID=DELTA ports: %#v", ports) + } +} + func checkDump(ovsif Interface, filter string, cmpFlows []string) error { dumpedFlows, err := ovsif.DumpFlows(filter) if err != nil { From 2f29592d56b2d476e76b0820a47698c3542f05db Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Tue, 30 Jan 2018 14:15:52 -0500 Subject: [PATCH 3/5] Don't read back veth MAC; we already know it's based on the IP --- pkg/network/node/ovscontroller.go | 72 +++++++++++++------------- pkg/network/node/ovscontroller_test.go | 35 ++++++++----- pkg/network/node/pod.go | 5 +- 3 files changed, 61 insertions(+), 51 deletions(-) diff --git a/pkg/network/node/ovscontroller.go b/pkg/network/node/ovscontroller.go index 7bc52d7c94ae..a7a25630994b 100644 --- a/pkg/network/node/ovscontroller.go +++ b/pkg/network/node/ovscontroller.go @@ -227,40 +227,46 @@ func (oc *ovsController) ensureOvsPort(hostVeth, sandboxID string) (int, error) return oc.ovs.AddPort(hostVeth, -1, "external-ids=sandbox="+sandboxID) } -func (oc *ovsController) setupPodFlows(ofport int, podIP, podMAC string, vnid uint32) error { +func (oc *ovsController) setupPodFlows(ofport int, podIP net.IP, vnid uint32) error { otx := oc.ovs.NewTransaction() + ipstr := podIP.String() + podIP = podIP.To4() + ipmac := fmt.Sprintf("00:00:%02x:%02x:%02x:%02x/00:00:ff:ff:ff:ff", podIP[0], podIP[1], podIP[2], podIP[3]) + // ARP/IP traffic from container - otx.AddFlow("table=20, priority=100, in_port=%d, arp, nw_src=%s, arp_sha=%s, actions=load:%d->NXM_NX_REG0[], goto_table:21", ofport, podIP, podMAC, vnid) - otx.AddFlow("table=20, priority=100, in_port=%d, ip, nw_src=%s, actions=load:%d->NXM_NX_REG0[], goto_table:21", ofport, podIP, vnid) + otx.AddFlow("table=20, priority=100, in_port=%d, arp, nw_src=%s, arp_sha=%s, actions=load:%d->NXM_NX_REG0[], goto_table:21", ofport, ipstr, ipmac, vnid) + otx.AddFlow("table=20, priority=100, in_port=%d, ip, nw_src=%s, actions=load:%d->NXM_NX_REG0[], goto_table:21", ofport, ipstr, vnid) if oc.useConnTrack { - otx.AddFlow("table=25, priority=100, ip, nw_src=%s, actions=load:%d->NXM_NX_REG0[], goto_table:30", podIP, vnid) + otx.AddFlow("table=25, priority=100, ip, nw_src=%s, actions=load:%d->NXM_NX_REG0[], goto_table:30", ipstr, vnid) } // ARP request/response to container (not isolated) - otx.AddFlow("table=40, priority=100, arp, nw_dst=%s, actions=output:%d", podIP, ofport) + otx.AddFlow("table=40, priority=100, arp, nw_dst=%s, actions=output:%d", ipstr, ofport) // IP traffic to container - otx.AddFlow("table=70, priority=100, ip, nw_dst=%s, actions=load:%d->NXM_NX_REG1[], load:%d->NXM_NX_REG2[], goto_table:80", podIP, vnid, ofport) + otx.AddFlow("table=70, priority=100, ip, nw_dst=%s, actions=load:%d->NXM_NX_REG1[], load:%d->NXM_NX_REG2[], goto_table:80", ipstr, vnid, ofport) return otx.EndTransaction() } -func (oc *ovsController) cleanupPodFlows(podIP string) error { +func (oc *ovsController) cleanupPodFlows(podIP net.IP) error { + ipstr := podIP.String() + otx := oc.ovs.NewTransaction() - otx.DeleteFlows("ip, nw_dst=%s", podIP) - otx.DeleteFlows("ip, nw_src=%s", podIP) - otx.DeleteFlows("arp, nw_dst=%s", podIP) - otx.DeleteFlows("arp, nw_src=%s", podIP) + otx.DeleteFlows("ip, nw_dst=%s", ipstr) + otx.DeleteFlows("ip, nw_src=%s", ipstr) + otx.DeleteFlows("arp, nw_dst=%s", ipstr) + otx.DeleteFlows("arp, nw_src=%s", ipstr) return otx.EndTransaction() } -func (oc *ovsController) SetUpPod(hostVeth, podIP, podMAC, sandboxID string, vnid uint32) (int, error) { +func (oc *ovsController) SetUpPod(sandboxID, hostVeth string, podIP net.IP, vnid uint32) (int, error) { ofport, err := oc.ensureOvsPort(hostVeth, sandboxID) if err != nil { return -1, err } - return ofport, oc.setupPodFlows(ofport, podIP, podMAC, vnid) + return ofport, oc.setupPodFlows(ofport, podIP, vnid) } // Returned list can also be used for port names @@ -323,52 +329,46 @@ func (oc *ovsController) SetPodBandwidth(hostVeth, sandboxID string, ingressBPS, return nil } -func (oc *ovsController) getPodDetailsBySandboxID(sandboxID string) (int, string, string, error) { +func (oc *ovsController) getPodDetailsBySandboxID(sandboxID string) (int, net.IP, error) { strports, err := oc.ovs.Find("interface", "ofport", "external-ids:sandbox="+sandboxID) if err != nil { - return 0, "", "", err + return 0, nil, err } else if len(strports) == 0 { - return 0, "", "", fmt.Errorf("failed to find pod details from OVS flows") + return 0, nil, fmt.Errorf("failed to find pod details from OVS flows") } else if len(strports) > 1 { - return 0, "", "", fmt.Errorf("found multiple ofports for sandbox ID %q: %#v", sandboxID, strports) + return 0, nil, fmt.Errorf("found multiple ofports for sandbox ID %q: %#v", sandboxID, strports) } ofport, err := strconv.Atoi(strports[0]) if err != nil { - return 0, "", "", fmt.Errorf("could not parse ofport %q: %v", strports[0], err) + return 0, nil, fmt.Errorf("could not parse ofport %q: %v", strports[0], err) } flows, err := oc.ovs.DumpFlows("table=20,arp,in_port=%d", ofport) if err != nil { - return 0, "", "", err + return 0, nil, err } else if len(flows) != 1 { - return 0, "", "", fmt.Errorf("could not find correct OVS flows for port %d", ofport) + return 0, nil, fmt.Errorf("could not find correct OVS flows for port %d", ofport) } parsed, err := ovs.ParseFlow(ovs.ParseForDump, flows[0]) if err != nil { - return 0, "", "", err + return 0, nil, err } - macField, macOk := parsed.FindField("arp_sha") ipField, ipOk := parsed.FindField("arp_spa") - if !macOk || !ipOk { - return 0, "", "", fmt.Errorf("failed to parse OVS flows for sandbox ID %q", sandboxID) - } - - if _, err := net.ParseMAC(macField.Value); err != nil { - return 0, "", "", fmt.Errorf("failed to parse arp_sha %q: %v", macField.Value, err) + if !ipOk { + return 0, nil, fmt.Errorf("failed to parse OVS flows for sandbox ID %q", sandboxID) } - podMAC := macField.Value - if net.ParseIP(ipField.Value) == nil { - return 0, "", "", fmt.Errorf("failed to parse arp_spa %q", ipField.Value) + podIP := net.ParseIP(ipField.Value) + if podIP == nil { + return 0, nil, fmt.Errorf("failed to parse arp_spa %q", ipField.Value) } - podIP := ipField.Value - return ofport, podIP, podMAC, nil + return ofport, podIP, nil } func (oc *ovsController) UpdatePod(sandboxID string, vnid uint32) error { - ofport, podIP, podMAC, err := oc.getPodDetailsBySandboxID(sandboxID) + ofport, podIP, err := oc.getPodDetailsBySandboxID(sandboxID) if err != nil { return err } @@ -376,11 +376,11 @@ func (oc *ovsController) UpdatePod(sandboxID string, vnid uint32) error { if err != nil { return err } - return oc.setupPodFlows(ofport, podIP, podMAC, vnid) + return oc.setupPodFlows(ofport, podIP, vnid) } func (oc *ovsController) TearDownPod(sandboxID string) error { - _, podIP, _, err := oc.getPodDetailsBySandboxID(sandboxID) + _, podIP, err := oc.getPodDetailsBySandboxID(sandboxID) if err != nil { // OVS flows related to sandboxID not found // Nothing needs to be done in that case diff --git a/pkg/network/node/ovscontroller_test.go b/pkg/network/node/ovscontroller_test.go index 3d4c4272f277..5a405ee4fcef 100644 --- a/pkg/network/node/ovscontroller_test.go +++ b/pkg/network/node/ovscontroller_test.go @@ -4,6 +4,7 @@ package node import ( "fmt" + "net" "reflect" "sort" "strings" @@ -14,6 +15,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kapi "k8s.io/kubernetes/pkg/apis/core" + + "github.com/containernetworking/plugins/pkg/utils/hwaddr" ) func setupOVSController(t *testing.T) (ovs.Interface, *ovsController, []string) { @@ -225,7 +228,7 @@ func TestOVSPod(t *testing.T) { ovsif, oc, origFlows := setupOVSController(t) // Add - ofport, err := oc.SetUpPod("veth1", "10.128.0.2", "11:22:33:44:55:66", sandboxID, 42) + ofport, err := oc.SetUpPod(sandboxID, "veth1", net.ParseIP("10.128.0.2"), 42) if err != nil { t.Fatalf("Unexpected error adding pod rules: %v", err) } @@ -237,7 +240,7 @@ func TestOVSPod(t *testing.T) { err = assertFlowChanges(origFlows, flows, flowChange{ kind: flowAdded, - match: []string{"table=20", fmt.Sprintf("in_port=%d", ofport), "arp", "10.128.0.2", "11:22:33:44:55:66"}, + match: []string{"table=20", fmt.Sprintf("in_port=%d", ofport), "arp", "10.128.0.2", "00:00:0a:80:00:02/00:00:ff:ff:ff:ff"}, }, flowChange{ kind: flowAdded, @@ -275,7 +278,7 @@ func TestOVSPod(t *testing.T) { err = assertFlowChanges(origFlows, flows, flowChange{ kind: flowAdded, - match: []string{"table=20", fmt.Sprintf("in_port=%d", ofport), "arp", "10.128.0.2", "11:22:33:44:55:66"}, + match: []string{"table=20", fmt.Sprintf("in_port=%d", ofport), "arp", "10.128.0.2", "00:00:0a:80:00:02/00:00:ff:ff:ff:ff"}, }, flowChange{ kind: flowAdded, @@ -320,7 +323,6 @@ func TestGetPodDetails(t *testing.T) { type testcase struct { sandboxID string ip string - mac string errStr string } @@ -328,18 +330,17 @@ func TestGetPodDetails(t *testing.T) { { sandboxID: sandboxID, ip: "10.130.0.2", - mac: "4a:77:32:e4:ab:9d", }, } for _, tc := range testcases { _, oc, _ := setupOVSController(t) - tcOFPort, err := oc.SetUpPod("veth1", tc.ip, tc.mac, tc.sandboxID, 42) + tcOFPort, err := oc.SetUpPod(tc.sandboxID, "veth1", net.ParseIP(tc.ip), 42) if err != nil { t.Fatalf("Unexpected error adding pod rules: %v", err) } - ofport, ip, mac, err := oc.getPodDetailsBySandboxID(tc.sandboxID) + ofport, ip, err := oc.getPodDetailsBySandboxID(tc.sandboxID) if err != nil { if tc.errStr != "" { if !strings.Contains(err.Error(), tc.errStr) { @@ -354,11 +355,8 @@ func TestGetPodDetails(t *testing.T) { if ofport != tcOFPort { t.Fatalf("unexpected ofport %d (expected %d)", ofport, tcOFPort) } - if ip != tc.ip { - t.Fatalf("unexpected ip %q (expected %q)", ip, tc.ip) - } - if mac != tc.mac { - t.Fatalf("unexpected mac %q (expected %q)", mac, tc.mac) + if ip.String() != tc.ip { + t.Fatalf("unexpected ip %q (expected %q)", ip.String(), tc.ip) } } } @@ -1050,3 +1048,16 @@ func TestSyncVNIDRules(t *testing.T) { } } } + +// Ensure that CNI's IP-addressed-based MAC addresses use the IP in the way we expect +func TestSetHWAddrByIP(t *testing.T) { + ip := net.ParseIP("1.2.3.4") + hwAddr, err := hwaddr.GenerateHardwareAddr4(ip, hwaddr.PrivateMACPrefix) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + expectedHWAddr := net.HardwareAddr(append(hwaddr.PrivateMACPrefix, ip.To4()...)) + if !reflect.DeepEqual(hwAddr, expectedHWAddr) { + t.Fatalf("hwaddr.GenerateHardwareAddr4 changed behavior! (%#v != %#v)", hwAddr, expectedHWAddr) + } +} diff --git a/pkg/network/node/pod.go b/pkg/network/node/pod.go index 842e0fae8eb7..035be95ee093 100644 --- a/pkg/network/node/pod.go +++ b/pkg/network/node/pod.go @@ -539,7 +539,7 @@ func (m *podManager) setup(req *cniserver.PodRequest) (cnitypes.Result, *running } } - var hostVethName, contVethMac string + var hostVethName string err = ns.WithNetNSPath(req.Netns, func(hostNS ns.NetNS) error { hostVeth, contVeth, err := ip.SetupVeth(podInterfaceName, int(m.mtu), hostNS) if err != nil { @@ -588,7 +588,6 @@ func (m *podManager) setup(req *cniserver.PodRequest) (cnitypes.Result, *running } hostVethName = hostVeth.Name - contVethMac = contVeth.HardwareAddr.String() return nil }) if err != nil { @@ -604,7 +603,7 @@ func (m *podManager) setup(req *cniserver.PodRequest) (cnitypes.Result, *running return nil, nil, err } - ofport, err := m.ovs.SetUpPod(hostVethName, podIP.String(), contVethMac, req.SandboxID, vnid) + ofport, err := m.ovs.SetUpPod(req.SandboxID, hostVethName, podIP, vnid) if err != nil { return nil, nil, err } From 3c843d75d3a37fdd316c377e7529ed9e731796f6 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Wed, 31 Jan 2018 11:29:28 -0500 Subject: [PATCH 4/5] Pass directory rather than socket filename to CNIServer.Start() --- pkg/network/node/cniserver/cniserver.go | 32 ++++++++++--------- pkg/network/node/cniserver/cniserver_test.go | 6 ++-- pkg/network/node/node.go | 2 +- pkg/network/node/pod.go | 4 +-- pkg/network/node/pod_test.go | 14 +++++--- .../sdn-cni-plugin/sdn_cni_plugin_test.go | 4 +-- 6 files changed, 35 insertions(+), 27 deletions(-) diff --git a/pkg/network/node/cniserver/cniserver.go b/pkg/network/node/cniserver/cniserver.go index 86797a97ad52..9b5ff9418578 100644 --- a/pkg/network/node/cniserver/cniserver.go +++ b/pkg/network/node/cniserver/cniserver.go @@ -9,7 +9,7 @@ import ( "net" "net/http" "os" - "path" + "path/filepath" "strings" "github.com/golang/glog" @@ -44,9 +44,12 @@ import ( // removed and re-created with 0700 permissions each time openshift-node is // started. -// Default CNIServer unix domain socket path which the OpenShift SDN CNI -// plugin uses to talk to the CNIServer -const CNIServerSocketPath string = "/var/run/openshift-sdn/cni-server.sock" +// Default directory for CNIServer runtime files +const CNIServerRunDir string = "/var/run/openshift-sdn" + +// CNIServer socket name, and default full path +const CNIServerSocketName string = "cni-server.sock" +const CNIServerSocketPath string = CNIServerRunDir + "/" + CNIServerSocketName // Explicit type for CNI commands the server handles type CNICommand string @@ -95,19 +98,18 @@ type cniRequestFunc func(request *PodRequest) ([]byte, error) type CNIServer struct { http.Server requestFunc cniRequestFunc - path string + rundir string } -// Create and return a new CNIServer object which will listen on the given -// socket path -func NewCNIServer(socketPath string) *CNIServer { +// Create and return a new CNIServer object which will listen on a socket in the given path +func NewCNIServer(rundir string) *CNIServer { router := mux.NewRouter() s := &CNIServer{ Server: http.Server{ Handler: router, }, - path: socketPath, + rundir: rundir, } router.NotFoundHandler = http.HandlerFunc(http.NotFound) router.HandleFunc("/", s.handleCNIRequest).Methods("POST") @@ -125,25 +127,25 @@ func (s *CNIServer) Start(requestFunc cniRequestFunc) error { s.requestFunc = requestFunc // Remove and re-create the socket directory with root-only permissions - dirName := path.Dir(s.path) - if err := os.RemoveAll(s.path); err != nil && !os.IsNotExist(err) { + if err := os.RemoveAll(s.rundir); err != nil && !os.IsNotExist(err) { utilruntime.HandleError(fmt.Errorf("failed to remove old pod info socket: %v", err)) } - if err := os.RemoveAll(dirName); err != nil && !os.IsNotExist(err) { + if err := os.RemoveAll(s.rundir); err != nil && !os.IsNotExist(err) { utilruntime.HandleError(fmt.Errorf("failed to remove contents of socket directory: %v", err)) } - if err := os.MkdirAll(dirName, 0700); err != nil { + if err := os.MkdirAll(s.rundir, 0700); err != nil { return fmt.Errorf("failed to create pod info socket directory: %v", err) } // On Linux the socket is created with the permissions of the directory // it is in, so as long as the directory is root-only we can avoid // racy umask manipulation. - l, err := net.Listen("unix", s.path) + socketPath := filepath.Join(s.rundir, CNIServerSocketName) + l, err := net.Listen("unix", socketPath) if err != nil { return fmt.Errorf("failed to listen on pod info socket: %v", err) } - if err := os.Chmod(s.path, 0600); err != nil { + if err := os.Chmod(socketPath, 0600); err != nil { l.Close() return fmt.Errorf("failed to set pod info socket mode: %v", err) } diff --git a/pkg/network/node/cniserver/cniserver_test.go b/pkg/network/node/cniserver/cniserver_test.go index b499d831cc8c..b4615af7884d 100644 --- a/pkg/network/node/cniserver/cniserver_test.go +++ b/pkg/network/node/cniserver/cniserver_test.go @@ -60,9 +60,9 @@ func TestCNIServer(t *testing.T) { t.Fatalf("failed to create temp directory: %v", err) } defer os.RemoveAll(tmpDir) + socketPath := filepath.Join(tmpDir, CNIServerSocketName) - path := filepath.Join(tmpDir, "cni-server.sock") - s := NewCNIServer(path) + s := NewCNIServer(tmpDir) if err := s.Start(serverHandleCNI); err != nil { t.Fatalf("error starting CNI server: %v", err) } @@ -70,7 +70,7 @@ func TestCNIServer(t *testing.T) { client := &http.Client{ Transport: &http.Transport{ Dial: func(proto, addr string) (net.Conn, error) { - return net.Dial("unix", path) + return net.Dial("unix", socketPath) }, }, } diff --git a/pkg/network/node/node.go b/pkg/network/node/node.go index cfe7095735a0..1f2834cb07a1 100644 --- a/pkg/network/node/node.go +++ b/pkg/network/node/node.go @@ -346,7 +346,7 @@ func (node *OsdnNode) Start() error { } glog.V(2).Infof("Starting openshift-sdn pod manager") - if err := node.podManager.Start(cniserver.CNIServerSocketPath, node.localSubnetCIDR, node.networkInfo.ClusterNetworks); err != nil { + if err := node.podManager.Start(cniserver.CNIServerRunDir, node.localSubnetCIDR, node.networkInfo.ClusterNetworks); err != nil { return err } diff --git a/pkg/network/node/pod.go b/pkg/network/node/pod.go index 035be95ee093..ccea44794a3b 100644 --- a/pkg/network/node/pod.go +++ b/pkg/network/node/pod.go @@ -167,7 +167,7 @@ func getIPAMConfig(clusterNetworks []common.ClusterNetwork, localSubnet string) } // Start the CNI server and start processing requests from it -func (m *podManager) Start(socketPath string, localSubnetCIDR string, clusterNetworks []common.ClusterNetwork) error { +func (m *podManager) Start(rundir string, localSubnetCIDR string, clusterNetworks []common.ClusterNetwork) error { if m.enableHostports { iptInterface := utiliptables.New(utilexec.New(), utildbus.New(), utiliptables.ProtocolIpv4) m.hostportSyncer = kubehostport.NewHostportSyncer(iptInterface) @@ -180,7 +180,7 @@ func (m *podManager) Start(socketPath string, localSubnetCIDR string, clusterNet go m.processCNIRequests() - m.cniServer = cniserver.NewCNIServer(socketPath) + m.cniServer = cniserver.NewCNIServer(rundir) return m.cniServer.Start(m.handleCNIRequest) } diff --git a/pkg/network/node/pod_test.go b/pkg/network/node/pod_test.go index ca2751484291..46f55c6110b1 100644 --- a/pkg/network/node/pod_test.go +++ b/pkg/network/node/pod_test.go @@ -156,7 +156,7 @@ func TestPodManager(t *testing.T) { t.Fatalf("failed to create temp directory: %v", err) } defer os.RemoveAll(tmpDir) - socketPath := filepath.Join(tmpDir, "cni-server.sock") + socketPath := filepath.Join(tmpDir, cniserver.CNIServerSocketName) testcases := map[string]struct { operations []*operation @@ -318,7 +318,10 @@ func TestPodManager(t *testing.T) { podManager := newDefaultPodManager() podManager.podHandler = podTester _, cidr, _ := net.ParseCIDR("1.2.0.0/16") - podManager.Start(socketPath, "1.2.3.0/24", []common.ClusterNetwork{{ClusterCIDR: cidr, HostSubnetLength: 8}}) + err := podManager.Start(tmpDir, "1.2.3.0/24", []common.ClusterNetwork{{ClusterCIDR: cidr, HostSubnetLength: 8}}) + if err != nil { + t.Fatalf("could not start PodManager: %v", err) + } // Add pods to our expected pod list before kicking off the // actual pod setup to ensure we don't concurrently access @@ -408,13 +411,16 @@ func TestDirectPodUpdate(t *testing.T) { t.Fatalf("failed to create temp directory: %v", err) } defer os.RemoveAll(tmpDir) - socketPath := filepath.Join(tmpDir, "cni-server.sock") + socketPath := filepath.Join(tmpDir, cniserver.CNIServerSocketName) podTester := newPodTester(t, "update", socketPath) podManager := newDefaultPodManager() podManager.podHandler = podTester _, cidr, _ := net.ParseCIDR("1.2.0.0/16") - podManager.Start(socketPath, "1.2.3.0/24", []common.ClusterNetwork{{ClusterCIDR: cidr, HostSubnetLength: 8}}) + err = podManager.Start(tmpDir, "1.2.3.0/24", []common.ClusterNetwork{{ClusterCIDR: cidr, HostSubnetLength: 8}}) + if err != nil { + t.Fatalf("could not start PodManager: %v", err) + } op := &operation{ command: cniserver.CNI_UPDATE, diff --git a/pkg/network/sdn-cni-plugin/sdn_cni_plugin_test.go b/pkg/network/sdn-cni-plugin/sdn_cni_plugin_test.go index 8eeadbe6cb3c..10b42f3a5709 100644 --- a/pkg/network/sdn-cni-plugin/sdn_cni_plugin_test.go +++ b/pkg/network/sdn-cni-plugin/sdn_cni_plugin_test.go @@ -63,8 +63,8 @@ func TestOpenshiftSdnCNIPlugin(t *testing.T) { } defer os.RemoveAll(tmpDir) - path := filepath.Join(tmpDir, "cni-server.sock") - server := cniserver.NewCNIServer(path) + path := filepath.Join(tmpDir, cniserver.CNIServerSocketName) + server := cniserver.NewCNIServer(tmpDir) if err := server.Start(serverHandleCNI); err != nil { t.Fatalf("error starting CNI server: %v", err) } From dd7a75402cef36ddfdd20c3ec74de80349a2ceda Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Wed, 31 Jan 2018 11:30:04 -0500 Subject: [PATCH 5/5] Move remaining pod-namespace operations out of the main process --- pkg/network/node/cniserver/cniserver.go | 47 ++++++- pkg/network/node/cniserver/cniserver_test.go | 21 ++- pkg/network/node/pod.go | 78 +---------- pkg/network/sdn-cni-plugin/openshift-sdn.go | 126 ++++++++++++++++-- .../sdn-cni-plugin/sdn_cni_plugin_test.go | 15 ++- 5 files changed, 198 insertions(+), 89 deletions(-) diff --git a/pkg/network/node/cniserver/cniserver.go b/pkg/network/node/cniserver/cniserver.go index 9b5ff9418578..f3a545380d44 100644 --- a/pkg/network/node/cniserver/cniserver.go +++ b/pkg/network/node/cniserver/cniserver.go @@ -51,6 +51,15 @@ const CNIServerRunDir string = "/var/run/openshift-sdn" const CNIServerSocketName string = "cni-server.sock" const CNIServerSocketPath string = CNIServerRunDir + "/" + CNIServerSocketName +// Config file containing MTU, and default full path +const CNIServerConfigFileName string = "config.json" +const CNIServerConfigFilePath string = CNIServerRunDir + "/" + CNIServerConfigFileName + +// Server-to-plugin config data +type Config struct { + MTU uint32 `json:"mtu"` +} + // Explicit type for CNI commands the server handles type CNICommand string @@ -79,6 +88,8 @@ type PodRequest struct { SandboxID string // kernel network namespace path Netns string + // for an ADD request, the host side of the created veth + HostVeth string // Channel for returning the operation result to the CNIServer Result chan *PodResult } @@ -99,10 +110,11 @@ type CNIServer struct { http.Server requestFunc cniRequestFunc rundir string + config *Config } // Create and return a new CNIServer object which will listen on a socket in the given path -func NewCNIServer(rundir string) *CNIServer { +func NewCNIServer(rundir string, config *Config) *CNIServer { router := mux.NewRouter() s := &CNIServer{ @@ -110,6 +122,7 @@ func NewCNIServer(rundir string) *CNIServer { Handler: router, }, rundir: rundir, + config: config, } router.NotFoundHandler = http.HandlerFunc(http.NotFound) router.HandleFunc("/", s.handleCNIRequest).Methods("POST") @@ -137,6 +150,17 @@ func (s *CNIServer) Start(requestFunc cniRequestFunc) error { return fmt.Errorf("failed to create pod info socket directory: %v", err) } + // Write config file + config, err := json.Marshal(s.config) + if err != nil { + return fmt.Errorf("could not marshal config data: %v", err) + } + configPath := filepath.Join(s.rundir, CNIServerConfigFileName) + err = ioutil.WriteFile(configPath, config, os.FileMode(0444)) + if err != nil { + return fmt.Errorf("could not write config file %q: %v", configPath, err) + } + // On Linux the socket is created with the permissions of the directory // it is in, so as long as the directory is root-only we can avoid // racy umask manipulation. @@ -159,6 +183,22 @@ func (s *CNIServer) Start(requestFunc cniRequestFunc) error { return nil } +func ReadConfig(configPath string) (*Config, error) { + bytes, err := ioutil.ReadFile(configPath) + if err != nil { + if os.IsNotExist(err) { + return nil, fmt.Errorf("OpenShift SDN network process is not (yet?) available") + } else { + return nil, fmt.Errorf("could not read config file %q: %v", configPath, err) + } + } + var config Config + if err = json.Unmarshal(bytes, &config); err != nil { + return nil, fmt.Errorf("could not parse config file %q: %v", configPath, err) + } + return &config, nil +} + // Split the "CNI_ARGS" environment variable's value into a map. CNI_ARGS // contains arbitrary key/value pairs separated by ';' and is for runtime or // plugin specific uses. Kubernetes passes the pod namespace and name in @@ -206,6 +246,11 @@ func cniRequestToPodRequest(r *http.Request) (*PodRequest, error) { return nil, fmt.Errorf("missing CNI_NETNS") } + req.HostVeth, ok = cr.Env["OSDN_HOSTVETH"] + if !ok && req.Command == CNI_ADD { + return nil, fmt.Errorf("missing OSDN_HOSTVETH") + } + cniArgs, err := gatherCNIArgs(cr.Env) if err != nil { return nil, err diff --git a/pkg/network/node/cniserver/cniserver_test.go b/pkg/network/node/cniserver/cniserver_test.go index b4615af7884d..66cc45accaf0 100644 --- a/pkg/network/node/cniserver/cniserver_test.go +++ b/pkg/network/node/cniserver/cniserver_test.go @@ -62,7 +62,7 @@ func TestCNIServer(t *testing.T) { defer os.RemoveAll(tmpDir) socketPath := filepath.Join(tmpDir, CNIServerSocketName) - s := NewCNIServer(tmpDir) + s := NewCNIServer(tmpDir, &Config{MTU: 1500}) if err := s.Start(serverHandleCNI); err != nil { t.Fatalf("error starting CNI server: %v", err) } @@ -102,6 +102,7 @@ func TestCNIServer(t *testing.T) { "CNI_CONTAINERID": "adsfadsfasfdasdfasf", "CNI_NETNS": "/path/to/something", "CNI_ARGS": "K8S_POD_NAMESPACE=awesome-namespace;K8S_POD_NAME=awesome-name", + "OSDN_HOSTVETH": "vethABC", }, Config: []byte("{\"cniVersion\": \"0.1.0\",\"name\": \"openshift-sdn\",\"type\": \"openshift-sdn\"}"), }, @@ -143,6 +144,7 @@ func TestCNIServer(t *testing.T) { "CNI_COMMAND": string(CNI_ADD), "CNI_CONTAINERID": "adsfadsfasfdasdfasf", "CNI_NETNS": "/path/to/something", + "OSDN_HOSTVETH": "vethABC", }, Config: []byte("{\"cniVersion\": \"0.1.0\",\"name\": \"openshift-sdn\",\"type\": \"openshift-sdn\"}"), }, @@ -157,6 +159,7 @@ func TestCNIServer(t *testing.T) { "CNI_COMMAND": string(CNI_ADD), "CNI_CONTAINERID": "adsfadsfasfdasdfasf", "CNI_ARGS": "K8S_POD_NAMESPACE=awesome-namespace;K8S_POD_NAME=awesome-name", + "OSDN_HOSTVETH": "vethABC", }, Config: []byte("{\"cniVersion\": \"0.1.0\",\"name\": \"openshift-sdn\",\"type\": \"openshift-sdn\"}"), }, @@ -171,12 +174,28 @@ func TestCNIServer(t *testing.T) { "CNI_CONTAINERID": "adsfadsfasfdasdfasf", "CNI_NETNS": "/path/to/something", "CNI_ARGS": "K8S_POD_NAMESPACE=awesome-namespace;K8S_POD_NAME=awesome-name", + "OSDN_HOSTVETH": "vethABC", }, Config: []byte("{\"cniVersion\": \"0.1.0\",\"name\": \"openshift-sdn\",\"type\": \"openshift-sdn\"}"), }, result: nil, errorPrefix: "unexpected or missing CNI_COMMAND", }, + // Missing OSDN_HOSTVETH + { + name: "ARGS4", + request: &CNIRequest{ + Env: map[string]string{ + "CNI_COMMAND": string(CNI_ADD), + "CNI_CONTAINERID": "adsfadsfasfdasdfasf", + "CNI_NETNS": "/path/to/something", + "CNI_ARGS": "K8S_POD_NAMESPACE=awesome-namespace;K8S_POD_NAME=awesome-name", + }, + Config: []byte("{\"cniVersion\": \"0.1.0\",\"name\": \"openshift-sdn\",\"type\": \"openshift-sdn\"}"), + }, + result: nil, + errorPrefix: "missing OSDN_HOSTVETH", + }, } for _, tc := range testcases { diff --git a/pkg/network/node/pod.go b/pkg/network/node/pod.go index ccea44794a3b..4117be894ced 100644 --- a/pkg/network/node/pod.go +++ b/pkg/network/node/pod.go @@ -35,9 +35,6 @@ import ( "github.com/containernetworking/cni/pkg/invoke" cnitypes "github.com/containernetworking/cni/pkg/types" cni020 "github.com/containernetworking/cni/pkg/types/020" - cnicurrent "github.com/containernetworking/cni/pkg/types/current" - "github.com/containernetworking/plugins/pkg/ip" - "github.com/containernetworking/plugins/pkg/ipam" "github.com/containernetworking/plugins/pkg/ns" "github.com/vishvananda/netlink" @@ -180,7 +177,7 @@ func (m *podManager) Start(rundir string, localSubnetCIDR string, clusterNetwork go m.processCNIRequests() - m.cniServer = cniserver.NewCNIServer(rundir) + m.cniServer = cniserver.NewCNIServer(rundir, &cniserver.Config{MTU: m.mtu}) return m.cniServer.Start(m.handleCNIRequest) } @@ -384,6 +381,8 @@ func maybeAddMacvlan(pod *kapi.Pod, netns string) error { } } + // Note that this use of ns is safe because it doesn't call Do() or WithNetNSPath() + podNs, err := ns.GetNS(netns) if err != nil { return fmt.Errorf("could not open netns %q", netns) @@ -402,17 +401,7 @@ func maybeAddMacvlan(pod *kapi.Pod, netns string) error { if err != nil { return fmt.Errorf("failed to create macvlan interface: %v", err) } - return podNs.Do(func(netns ns.NetNS) error { - l, err := netlink.LinkByName("macvlan0") - if err != nil { - return fmt.Errorf("failed to find macvlan interface: %v", err) - } - err = netlink.LinkSetUp(l) - if err != nil { - return fmt.Errorf("failed to set macvlan interface up: %v", err) - } - return nil - }) + return nil } func createIPAMArgs(netnsPath string, action cniserver.CNICommand, id string) *invoke.Args { @@ -539,61 +528,6 @@ func (m *podManager) setup(req *cniserver.PodRequest) (cnitypes.Result, *running } } - var hostVethName string - err = ns.WithNetNSPath(req.Netns, func(hostNS ns.NetNS) error { - hostVeth, contVeth, err := ip.SetupVeth(podInterfaceName, int(m.mtu), hostNS) - if err != nil { - return fmt.Errorf("failed to create container veth: %v", err) - } - // Force a consistent MAC address based on the IP address - if err := ip.SetHWAddrByIP(podInterfaceName, podIP, nil); err != nil { - return fmt.Errorf("failed to set pod interface MAC address: %v", err) - } - // refetch to get hardware address and other properties - tmp, err := net.InterfaceByIndex(contVeth.Index) - if err != nil { - return fmt.Errorf("failed to fetch container veth: %v", err) - } - contVeth = *tmp - - // Clear out gateway to prevent ConfigureIface from adding the cluster - // subnet via the gateway - ipamResult.IP4.Gateway = nil - result030, err := cnicurrent.NewResultFromResult(ipamResult) - if err != nil { - return fmt.Errorf("failed to convert IPAM: %v", err) - } - // Add a sandbox interface record which ConfigureInterface expects. - // The only interface we report is the pod interface. - result030.Interfaces = []*cnicurrent.Interface{ - { - Name: podInterfaceName, - Mac: contVeth.HardwareAddr.String(), - Sandbox: req.Netns, - }, - } - intPtr := 0 - result030.IPs[0].Interface = &intPtr - - if err = ipam.ConfigureIface(podInterfaceName, result030); err != nil { - return fmt.Errorf("failed to configure container IPAM: %v", err) - } - - lo, err := netlink.LinkByName("lo") - if err == nil { - err = netlink.LinkSetUp(lo) - } - if err != nil { - return fmt.Errorf("failed to configure container loopback: %v", err) - } - - hostVethName = hostVeth.Name - return nil - }) - if err != nil { - return nil, nil, err - } - vnid, err := m.policy.GetVNID(req.PodNamespace) if err != nil { return nil, nil, err @@ -603,11 +537,11 @@ func (m *podManager) setup(req *cniserver.PodRequest) (cnitypes.Result, *running return nil, nil, err } - ofport, err := m.ovs.SetUpPod(req.SandboxID, hostVethName, podIP, vnid) + ofport, err := m.ovs.SetUpPod(req.SandboxID, req.HostVeth, podIP, vnid) if err != nil { return nil, nil, err } - if err := setupPodBandwidth(m.ovs, pod, hostVethName, req.SandboxID); err != nil { + if err := setupPodBandwidth(m.ovs, pod, req.HostVeth, req.SandboxID); err != nil { return nil, nil, err } diff --git a/pkg/network/sdn-cni-plugin/openshift-sdn.go b/pkg/network/sdn-cni-plugin/openshift-sdn.go index 837cb89d9cde..dc26f62f3280 100644 --- a/pkg/network/sdn-cni-plugin/openshift-sdn.go +++ b/pkg/network/sdn-cni-plugin/openshift-sdn.go @@ -19,15 +19,22 @@ import ( "github.com/containernetworking/cni/pkg/skel" "github.com/containernetworking/cni/pkg/types" "github.com/containernetworking/cni/pkg/types/020" + "github.com/containernetworking/cni/pkg/types/current" "github.com/containernetworking/cni/pkg/version" + "github.com/containernetworking/plugins/pkg/ip" + "github.com/containernetworking/plugins/pkg/ipam" + "github.com/containernetworking/plugins/pkg/ns" + + "github.com/vishvananda/netlink" ) type cniPlugin struct { socketPath string + hostNS ns.NetNS } -func NewCNIPlugin(socketPath string) *cniPlugin { - return &cniPlugin{socketPath: socketPath} +func NewCNIPlugin(socketPath string, hostNS ns.NetNS) *cniPlugin { + return &cniPlugin{socketPath: socketPath, hostNS: hostNS} } // Create and fill a CNIRequest with this plugin's environment and stdin which @@ -63,7 +70,11 @@ func (p *cniPlugin) doCNI(url string, req *cniserver.CNIRequest) ([]byte, error) }, } - resp, err := client.Post(url, "application/json", bytes.NewReader(data)) + var resp *http.Response + err = p.hostNS.Do(func(ns.NetNS) error { + resp, err = client.Post(url, "application/json", bytes.NewReader(data)) + return err + }) if err != nil { return nil, fmt.Errorf("failed to send CNI request: %v", err) } @@ -83,8 +94,9 @@ func (p *cniPlugin) doCNI(url string, req *cniserver.CNIRequest) ([]byte, error) // Send the ADD command environment and config to the CNI server, returning // the IPAM result to the caller -func (p *cniPlugin) CmdAdd(args *skel.CmdArgs) (types.Result, error) { - body, err := p.doCNI("http://dummy/", newCNIRequest(args)) +func (p *cniPlugin) doCNIServerAdd(req *cniserver.CNIRequest, hostVeth string) (types.Result, error) { + req.Env["OSDN_HOSTVETH"] = hostVeth + body, err := p.doCNI("http://dummy/", req) if err != nil { return nil, err } @@ -99,17 +111,102 @@ func (p *cniPlugin) CmdAdd(args *skel.CmdArgs) (types.Result, error) { return result, nil } -// Send the ADD command environment and config to the CNI server, printing -// the IPAM result to stdout when called as a CNI plugin -func (p *cniPlugin) skelCmdAdd(args *skel.CmdArgs) error { - result, err := p.CmdAdd(args) +func (p *cniPlugin) testCmdAdd(args *skel.CmdArgs) (types.Result, error) { + return p.doCNIServerAdd(newCNIRequest(args), "dummy0") +} + +func (p *cniPlugin) CmdAdd(args *skel.CmdArgs) error { + req := newCNIRequest(args) + ifname := req.Env["CNI_IFNAME"] + netns := req.Env["CNI_NETNS"] + if ifname == "" || netns == "" { + return fmt.Errorf("CNI request did not include required environment variables") + } + + config, err := cniserver.ReadConfig(cniserver.CNIServerConfigFilePath) if err != nil { return err } + + var hostVeth, contVeth net.Interface + err = ns.WithNetNSPath(netns, func(hostNS ns.NetNS) error { + hostVeth, contVeth, err = ip.SetupVeth(ifname, int(config.MTU), hostNS) + if err != nil { + return fmt.Errorf("failed to create container veth: %v", err) + } + return nil + }) + if err != nil { + return err + } + result, err := p.doCNIServerAdd(req, hostVeth.Name) + if err != nil { + return err + } + + // current.NewResultFromResult and ipam.ConfigureIface both think that + // a route with no gateway specified means to pass the default gateway + // as the next hop to ip.AddRoute, but that's not what we want; we want + // to pass nil as the next hop. So we need to clear the default gateway. + result020, err := types020.GetResult(result) + if err != nil { + return fmt.Errorf("failed to convert IPAM result: %v", err) + } + result020.IP4.Gateway = nil + + result030, err := current.NewResultFromResult(result020) + if err != nil || len(result030.IPs) != 1 || result030.IPs[0].Version != "4" { + return fmt.Errorf("failed to convert IPAM result: %v", err) + } + + // Add a sandbox interface record which ConfigureInterface expects. + // The only interface we report is the pod interface. + result030.Interfaces = []*current.Interface{ + { + Name: ifname, + Mac: contVeth.HardwareAddr.String(), + Sandbox: netns, + }, + } + index := 0 + result030.IPs[0].Interface = &index + + err = ns.WithNetNSPath(netns, func(ns.NetNS) error { + // Set up eth0 + if err := ip.SetHWAddrByIP(ifname, result030.IPs[0].Address.IP, nil); err != nil { + return fmt.Errorf("failed to set pod interface MAC address: %v", err) + } + if err := ipam.ConfigureIface(ifname, result030); err != nil { + return fmt.Errorf("failed to configure container IPAM: %v", err) + } + + // Set up lo + link, err := netlink.LinkByName("lo") + if err == nil { + err = netlink.LinkSetUp(link) + } + if err != nil { + return fmt.Errorf("failed to configure container loopback: %v", err) + } + + // Set up macvlan0 (if it exists) + link, err = netlink.LinkByName("macvlan0") + if err == nil { + err = netlink.LinkSetUp(link) + if err != nil { + return fmt.Errorf("failed to configure macvlan device: %v", err) + } + } + + return nil + }) + if err != nil { + return err + } + return result.Print() } -// Send the DEL command environment and config to the CNI server func (p *cniPlugin) CmdDel(args *skel.CmdArgs) error { _, err := p.doCNI("http://dummy/", newCNIRequest(args)) return err @@ -117,6 +214,11 @@ func (p *cniPlugin) CmdDel(args *skel.CmdArgs) error { func main() { rand.Seed(time.Now().UTC().UnixNano()) - p := NewCNIPlugin(cniserver.CNIServerSocketPath) - skel.PluginMain(p.skelCmdAdd, p.CmdDel, version.Legacy) + hostNS, err := ns.GetCurrentNS() + if err != nil { + panic(fmt.Sprintf("could not get current kernel netns: %v", err)) + } + defer hostNS.Close() + p := NewCNIPlugin(cniserver.CNIServerSocketPath, hostNS) + skel.PluginMain(p.CmdAdd, p.CmdDel, version.Legacy) } diff --git a/pkg/network/sdn-cni-plugin/sdn_cni_plugin_test.go b/pkg/network/sdn-cni-plugin/sdn_cni_plugin_test.go index 10b42f3a5709..607feb82bc98 100644 --- a/pkg/network/sdn-cni-plugin/sdn_cni_plugin_test.go +++ b/pkg/network/sdn-cni-plugin/sdn_cni_plugin_test.go @@ -15,6 +15,7 @@ import ( cniskel "github.com/containernetworking/cni/pkg/skel" cnitypes "github.com/containernetworking/cni/pkg/types" cni020 "github.com/containernetworking/cni/pkg/types/020" + "github.com/containernetworking/plugins/pkg/ns" "github.com/openshift/origin/pkg/network/node/cniserver" utiltesting "k8s.io/client-go/util/testing" @@ -64,12 +65,17 @@ func TestOpenshiftSdnCNIPlugin(t *testing.T) { defer os.RemoveAll(tmpDir) path := filepath.Join(tmpDir, cniserver.CNIServerSocketName) - server := cniserver.NewCNIServer(tmpDir) + server := cniserver.NewCNIServer(tmpDir, &cniserver.Config{MTU: 1500}) if err := server.Start(serverHandleCNI); err != nil { t.Fatalf("error starting CNI server: %v", err) } - cniPlugin := NewCNIPlugin(path) + hostNS, err := ns.GetCurrentNS() + if err != nil { + panic(fmt.Sprintf("could not get current kernel netns: %v", err)) + } + defer hostNS.Close() + cniPlugin := NewCNIPlugin(path, hostNS) expectedIP, expectedNet, _ := net.ParseCIDR("10.0.0.2/24") expectedResult = &cni020.Result{ @@ -139,7 +145,7 @@ func TestOpenshiftSdnCNIPlugin(t *testing.T) { skelArgsToEnv(tc.reqType, tc.skelArgs) switch tc.reqType { case cniserver.CNI_ADD: - result, err = cniPlugin.CmdAdd(tc.skelArgs) + result, err = cniPlugin.testCmdAdd(tc.skelArgs) case cniserver.CNI_DEL: err = cniPlugin.CmdDel(tc.skelArgs) default: @@ -148,6 +154,9 @@ func TestOpenshiftSdnCNIPlugin(t *testing.T) { clearEnv() if tc.errorPrefix == "" { + if err != nil { + t.Fatalf("[%s] expected result %v but got error: %v", tc.name, tc.result, err) + } if tc.result != nil && !reflect.DeepEqual(result, tc.result) { t.Fatalf("[%s] expected result %v but got %v", tc.name, tc.result, result) }