diff --git a/cns/client/client_test.go b/cns/client/client_test.go index 0a93dd876..d4d845aaa 100644 --- a/cns/client/client_test.go +++ b/cns/client/client_test.go @@ -314,7 +314,7 @@ func TestCNSClientPodContextApi(t *testing.T) { addTestStateToRestServer(t, secondaryIps) - podInfo := cns.NewPodInfo("", "", podName, podNamespace) + podInfo := cns.NewPodInfo("some-guid-1", "abc-eth0", podName, podNamespace) orchestratorContext, err := json.Marshal(podInfo) assert.NoError(t, err) @@ -344,7 +344,7 @@ func TestCNSClientDebugAPI(t *testing.T) { addTestStateToRestServer(t, secondaryIps) - podInfo := cns.NewPodInfo("", "", podName, podNamespace) + podInfo := cns.NewPodInfo("some-guid-1", "abc-eth0", podName, podNamespace) orchestratorContext, err := json.Marshal(podInfo) assert.NoError(t, err) diff --git a/cns/cnireconciler/podinfoprovider.go b/cns/cnireconciler/podinfoprovider.go index 3fabac9e7..fd237ebb6 100644 --- a/cns/cnireconciler/podinfoprovider.go +++ b/cns/cnireconciler/podinfoprovider.go @@ -51,19 +51,22 @@ func newCNIPodInfoProvider(exec exec.Interface) (cns.PodInfoByIPProvider, error) } // cniStateToPodInfoByIP converts an AzureCNIState dumped from a CNI exec -// into a PodInfo map, using the first endpoint IP as the key in the map. +// into a PodInfo map, using the endpoint IPs as keys in the map. +// for pods with multiple IPs (such as in dualstack cases), this means multiple keys in the map +// will point to the same pod information. func cniStateToPodInfoByIP(state *api.AzureCNIState) (map[string]cns.PodInfo, error) { podInfoByIP := map[string]cns.PodInfo{} for _, endpoint := range state.ContainerInterfaces { - if _, ok := podInfoByIP[endpoint.IPAddresses[0].IP.String()]; ok { - return nil, errors.Wrap(cns.ErrDuplicateIP, endpoint.IPAddresses[0].IP.String()) + for _, epIP := range endpoint.IPAddresses { + podInfo := cns.NewPodInfo(endpoint.ContainerID, endpoint.PodEndpointId, endpoint.PodName, endpoint.PodNamespace) + + ipKey := epIP.IP.String() + if prevPodInfo, ok := podInfoByIP[ipKey]; ok { + return nil, errors.Wrapf(cns.ErrDuplicateIP, "duplicate ip %s found for different pods: pod: %+v, pod: %+v", ipKey, podInfo, prevPodInfo) + } + + podInfoByIP[ipKey] = podInfo } - podInfoByIP[endpoint.IPAddresses[0].IP.String()] = cns.NewPodInfo( - endpoint.ContainerID, - endpoint.PodEndpointId, - endpoint.PodName, - endpoint.PodNamespace, - ) } return podInfoByIP, nil } diff --git a/cns/restserver/internalapi.go b/cns/restserver/internalapi.go index 0fe021b60..5e2af46f5 100644 --- a/cns/restserver/internalapi.go +++ b/cns/restserver/internalapi.go @@ -8,6 +8,7 @@ import ( "context" "encoding/json" "fmt" + "net" "net/http" "net/http/httptest" "reflect" @@ -274,50 +275,110 @@ func (service *HTTPRestService) syncHostNCVersion(ctx context.Context, channelMo return len(programmedNCs), nil } -// This API will be called by CNS RequestController on CRD update. -func (service *HTTPRestService) ReconcileNCState(ncRequest *cns.CreateNetworkContainerRequest, podInfoByIP map[string]cns.PodInfo, nnc *v1alpha.NodeNetworkConfig) types.ResponseCode { - logger.Printf("Reconciling NC state with CreateNCRequest: [%v], PodInfo [%+v], NNC: [%+v]", ncRequest, podInfoByIP, nnc) - // check if ncRequest is null, then return as there is no CRD state yet - if ncRequest == nil { +func (service *HTTPRestService) ReconcileIPAMState(ncReqs []*cns.CreateNetworkContainerRequest, podInfoByIP map[string]cns.PodInfo, nnc *v1alpha.NodeNetworkConfig) types.ResponseCode { + logger.Printf("Reconciling CNS IPAM state with nc requests: [%+v], PodInfo [%+v], NNC: [%+v]", ncReqs, podInfoByIP, nnc) + // if no nc reqs, there is no CRD state yet + if len(ncReqs) == 0 { logger.Printf("CNS starting with no NC state, podInfoMap count %d", len(podInfoByIP)) return types.Success } - // If the NC was created successfully, then reconcile the assigned pod state - returnCode := service.CreateOrUpdateNetworkContainerInternal(ncRequest) - if returnCode != types.Success { - return returnCode - } - - // now parse the secondaryIP list, if it exists in PodInfo list, then assign that ip. - for _, secIPConfig := range ncRequest.SecondaryIPConfigs { - if podInfo, exists := podInfoByIP[secIPConfig.IPAddress]; exists { - logger.Printf("SecondaryIP %+v is assigned to Pod. %+v, ncId: %s", secIPConfig, podInfo, ncRequest.NetworkContainerid) - - jsonContext, err := podInfo.OrchestratorContext() - if err != nil { - logger.Errorf("Failed to marshal KubernetesPodInfo, error: %v", err) - return types.UnexpectedError - } - - ipconfigsRequest := cns.IPConfigsRequest{ - DesiredIPAddresses: []string{secIPConfig.IPAddress}, - OrchestratorContext: jsonContext, - InfraContainerID: podInfo.InfraContainerID(), - PodInterfaceID: podInfo.InterfaceID(), - } - - if _, err := requestIPConfigsHelper(service, ipconfigsRequest); err != nil { - logger.Errorf("AllocateIPConfig failed for SecondaryIP %+v, podInfo %+v, ncId %s, error: %v", secIPConfig, podInfo, ncRequest.NetworkContainerid, err) - return types.FailedToAllocateIPConfig - } - } else { - logger.Printf("SecondaryIP %+v is not assigned. ncId: %s", secIPConfig, ncRequest.NetworkContainerid) + // first step in reconciliation is to create all the NCs in CNS, no IP assignment yet. + for _, ncReq := range ncReqs { + returnCode := service.CreateOrUpdateNetworkContainerInternal(ncReq) + if returnCode != types.Success { + return returnCode } } - err := service.MarkExistingIPsAsPendingRelease(nnc.Spec.IPsNotInUse) + // index all the secondary IP configs for all the nc reqs, for easier lookup later on. + allSecIPsIdx := make(map[string]*cns.CreateNetworkContainerRequest) + for i := range ncReqs { + for _, secIPConfig := range ncReqs[i].SecondaryIPConfigs { + allSecIPsIdx[secIPConfig.IPAddress] = ncReqs[i] + } + } + + // we now need to reconcile IP assignment. + // considering that a single pod may have multiple ips (such as in dual stack scenarios) + // and that IP assignment in CNS (as done by requestIPConfigsHelper) does not allow + // updates (it returns the existing state if one already exists for the pod's interface), + // we need to assign all IPs for a pod interface or name+namespace at the same time. + // + // iterating over single IPs is not appropriate then, since assignment for the first IP for + // a pod will prevent the second IP from being added. the following function call transforms + // pod info indexed by ip address: + // + // { + // "10.0.0.1": podInfo{interface: "aaa-eth0"}, + // "fe80::1": podInfo{interface: "aaa-eth0"}, + // } + // + // to pod IPs indexed by pod key (interface or name+namespace, depending on scenario): + // + // { + // "aaa-eth0": podIPs{v4IP: 10.0.0.1, v6IP: fe80::1} + // } + // + // such that we can iterate over pod interfaces, and assign all IPs for it at once. + podKeyToPodIPs, err := newPodKeyToPodIPsMap(podInfoByIP) if err != nil { + logger.Errorf("could not transform pods indexed by IP address to pod IPs indexed by interface: %v", err) + return types.UnexpectedError + } + + for podKey, podIPs := range podKeyToPodIPs { + var ( + desiredIPs []string + ncIDs []string + ) + + var ips []net.IP + if podIPs.v4IP != nil { + ips = append(ips, podIPs.v4IP) + } + + if podIPs.v6IP != nil { + ips = append(ips, podIPs.v6IP) + } + + for _, ip := range ips { + if ncReq, ok := allSecIPsIdx[ip.String()]; ok { + logger.Printf("secondary ip %s is assigned to pod %+v, ncId: %s ncVersion: %s", ip, podIPs, ncReq.NetworkContainerid, ncReq.Version) + desiredIPs = append(desiredIPs, ip.String()) + ncIDs = append(ncIDs, ncReq.NetworkContainerid) + } else { + // it might still be possible to see host networking pods here (where ips are not from ncs) if we are restoring using the kube podinfo provider + // todo: once kube podinfo provider reconcile flow is removed, this line will not be necessary/should be removed. + logger.Errorf("ip %s assigned to pod %+v but not found in any nc", ip, podIPs) + } + } + + if len(desiredIPs) == 0 { + // this may happen for pods in the host network + continue + } + + jsonContext, err := podIPs.OrchestratorContext() + if err != nil { + logger.Errorf("Failed to marshal KubernetesPodInfo, error: %v", err) + return types.UnexpectedError + } + + ipconfigsRequest := cns.IPConfigsRequest{ + DesiredIPAddresses: desiredIPs, + OrchestratorContext: jsonContext, + InfraContainerID: podIPs.InfraContainerID(), + PodInterfaceID: podIPs.InterfaceID(), + } + + if _, err := requestIPConfigsHelper(service, ipconfigsRequest); err != nil { + logger.Errorf("requestIPConfigsHelper failed for pod key %s, podInfo %+v, ncIds %v, error: %v", podKey, podIPs, ncIDs, err) + return types.FailedToAllocateIPConfig + } + } + + if err := service.MarkExistingIPsAsPendingRelease(nnc.Spec.IPsNotInUse); err != nil { logger.Errorf("[Azure CNS] Error. Failed to mark IPs as pending %v", nnc.Spec.IPsNotInUse) return types.UnexpectedError } @@ -325,6 +386,54 @@ func (service *HTTPRestService) ReconcileNCState(ncRequest *cns.CreateNetworkCon return 0 } +var ( + errIPParse = errors.New("parse IP") + errMultipleIPPerFamily = errors.New("multiple IPs per family") +) + +// newPodKeyToPodIPsMap groups IPs by interface id and returns them indexed by interface id. +func newPodKeyToPodIPsMap(podInfoByIP map[string]cns.PodInfo) (map[string]podIPs, error) { + podKeyToPodIPs := make(map[string]podIPs) + + for ipStr, podInfo := range podInfoByIP { + id := podInfo.Key() + + ips, ok := podKeyToPodIPs[id] + if !ok { + ips.PodInfo = podInfo + } + + ip := net.ParseIP(ipStr) + switch { + case ip == nil: + return nil, errors.Wrapf(errIPParse, "could not parse ip string %q on pod %+v", ipStr, podInfo) + case ip.To4() != nil: + if ips.v4IP != nil { + return nil, errors.Wrapf(errMultipleIPPerFamily, "multiple ipv4 addresses (%v, %v) associated to pod %+v", ips.v4IP, ip, podInfo) + } + + ips.v4IP = ip + case ip.To16() != nil: + if ips.v6IP != nil { + return nil, errors.Wrapf(errMultipleIPPerFamily, "multiple ipv6 addresses (%v, %v) associated to pod %+v", ips.v6IP, ip, podInfo) + } + + ips.v6IP = ip + } + + podKeyToPodIPs[id] = ips + } + + return podKeyToPodIPs, nil +} + +// podIPs are all the IPs associated with a pod, along with pod info +type podIPs struct { + cns.PodInfo + v4IP net.IP + v6IP net.IP +} + // GetNetworkContainerInternal gets network container details. func (service *HTTPRestService) GetNetworkContainerInternal( req cns.GetNetworkContainerRequest, diff --git a/cns/restserver/internalapi_test.go b/cns/restserver/internalapi_test.go index 2cf3b34a0..0cc6b19fc 100644 --- a/cns/restserver/internalapi_test.go +++ b/cns/restserver/internalapi_test.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "math/rand" + "net" "os" "reflect" "strconv" @@ -22,6 +23,7 @@ import ( "github.com/google/uuid" "github.com/pkg/errors" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "golang.org/x/exp/maps" ) @@ -73,16 +75,20 @@ func TestReconcileNCStatePrimaryIPChangeShouldFail(t *testing.T) { }, } - // now try to reconcile the state where the NC primary IP has changed - resp := svc.ReconcileNCState(&cns.CreateNetworkContainerRequest{ - NetworkContainerid: ncID, - IPConfiguration: cns.IPConfiguration{ - IPSubnet: cns.IPSubnet{ - IPAddress: "10.0.2.0", // note this IP has changed - PrefixLength: 24, + ncReqs := []*cns.CreateNetworkContainerRequest{ + { + NetworkContainerid: ncID, + IPConfiguration: cns.IPConfiguration{ + IPSubnet: cns.IPSubnet{ + IPAddress: "10.0.2.0", // note this IP has changed + PrefixLength: 24, + }, }, }, - }, map[string]cns.PodInfo{}, &v1alpha.NodeNetworkConfig{}) + } + + // now try to reconcile the state where the NC primary IP has changed + resp := svc.ReconcileIPAMState(ncReqs, map[string]cns.PodInfo{}, &v1alpha.NodeNetworkConfig{}) assert.Equal(t, types.PrimaryCANotSame, resp) } @@ -117,15 +123,19 @@ func TestReconcileNCStateGatewayChange(t *testing.T) { }, } - // now try to reconcile the state where the NC gateway has changed - resp := svc.ReconcileNCState(&cns.CreateNetworkContainerRequest{ - NetworkContainerid: ncID, - NetworkContainerType: cns.Kubernetes, - IPConfiguration: cns.IPConfiguration{ - IPSubnet: ncPrimaryIP, - GatewayIPAddress: newGW, // note this IP has changed + ncReqs := []*cns.CreateNetworkContainerRequest{ + { + NetworkContainerid: ncID, + NetworkContainerType: cns.Kubernetes, + IPConfiguration: cns.IPConfiguration{ + IPSubnet: ncPrimaryIP, + GatewayIPAddress: newGW, // note this IP has changed + }, }, - }, map[string]cns.PodInfo{}, &v1alpha.NodeNetworkConfig{}) + } + + // now try to reconcile the state where the NC gateway has changed + resp := svc.ReconcileIPAMState(ncReqs, map[string]cns.PodInfo{}, &v1alpha.NodeNetworkConfig{}) assert.Equal(t, types.Success, resp) // assert the new state reflects the gateway update @@ -322,7 +332,7 @@ func TestReconcileNCWithEmptyState(t *testing.T) { expectedNcCount := len(svc.state.ContainerStatus) expectedAssignedPods := make(map[string]cns.PodInfo) - returnCode := svc.ReconcileNCState(nil, expectedAssignedPods, &v1alpha.NodeNetworkConfig{ + returnCode := svc.ReconcileIPAMState(nil, expectedAssignedPods, &v1alpha.NodeNetworkConfig{ Status: v1alpha.NodeNetworkConfigStatus{ Scaler: v1alpha.Scaler{ BatchSize: batchSize, @@ -372,7 +382,7 @@ func TestReconcileNCWithEmptyStateAndPendingRelease(t *testing.T) { return pendingIPs }() req := generateNetworkContainerRequest(secondaryIPConfigs, "reconcileNc1", "-1") - returnCode := svc.ReconcileNCState(req, expectedAssignedPods, &v1alpha.NodeNetworkConfig{ + returnCode := svc.ReconcileIPAMState([]*cns.CreateNetworkContainerRequest{req}, expectedAssignedPods, &v1alpha.NodeNetworkConfig{ Spec: v1alpha.NodeNetworkConfigSpec{ IPsNotInUse: pending, }, @@ -398,8 +408,8 @@ func TestReconcileNCWithExistingStateAndPendingRelease(t *testing.T) { secondaryIPConfigs[ipID.String()] = secIPConfig } expectedAssignedPods := map[string]cns.PodInfo{ - "10.0.0.6": cns.NewPodInfo("", "", "reconcilePod1", "PodNS1"), - "10.0.0.7": cns.NewPodInfo("", "", "reconcilePod2", "PodNS1"), + "10.0.0.6": cns.NewPodInfo("some-guid-1", "58a0b427-eth0", "reconcilePod1", "PodNS1"), + "10.0.0.7": cns.NewPodInfo("some-guid-2", "45b9b555-eth0", "reconcilePod2", "PodNS1"), } pendingIPIDs := func() map[string]cns.PodInfo { numPending := rand.Intn(len(secondaryIPConfigs)) + 1 //nolint:gosec // weak rand is sufficient in test @@ -419,7 +429,7 @@ func TestReconcileNCWithExistingStateAndPendingRelease(t *testing.T) { req := generateNetworkContainerRequest(secondaryIPConfigs, "reconcileNc1", "-1") expectedNcCount := len(svc.state.ContainerStatus) - returnCode := svc.ReconcileNCState(req, expectedAssignedPods, &v1alpha.NodeNetworkConfig{ + returnCode := svc.ReconcileIPAMState([]*cns.CreateNetworkContainerRequest{req}, expectedAssignedPods, &v1alpha.NodeNetworkConfig{ Spec: v1alpha.NodeNetworkConfigSpec{ IPsNotInUse: maps.Keys(pendingIPIDs), }, @@ -451,12 +461,12 @@ func TestReconcileNCWithExistingState(t *testing.T) { req := generateNetworkContainerRequest(secondaryIPConfigs, "reconcileNc1", "-1") expectedAssignedPods := map[string]cns.PodInfo{ - "10.0.0.6": cns.NewPodInfo("", "", "reconcilePod1", "PodNS1"), - "10.0.0.7": cns.NewPodInfo("", "", "reconcilePod2", "PodNS1"), + "10.0.0.6": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"), + "10.0.0.7": cns.NewPodInfo("some-guid-2", "def-eth0", "reconcilePod2", "PodNS1"), } expectedNcCount := len(svc.state.ContainerStatus) - returnCode := svc.ReconcileNCState(req, expectedAssignedPods, &v1alpha.NodeNetworkConfig{ + returnCode := svc.ReconcileIPAMState([]*cns.CreateNetworkContainerRequest{req}, expectedAssignedPods, &v1alpha.NodeNetworkConfig{ Status: v1alpha.NodeNetworkConfigStatus{ Scaler: v1alpha.Scaler{ BatchSize: batchSize, @@ -475,6 +485,188 @@ func TestReconcileNCWithExistingState(t *testing.T) { validateNCStateAfterReconcile(t, req, expectedNcCount+1, expectedAssignedPods, nil) } +func TestReconcileCNSIPAMWithDualStackPods(t *testing.T) { + restartService() + setEnv(t) + setOrchestratorTypeInternal(cns.KubernetesCRD) + + secIPv4Configs := make(map[string]cns.SecondaryIPConfig) + secIPv6Configs := make(map[string]cns.SecondaryIPConfig) + + offset := 6 + for i := 0; i < 4; i++ { + ipv4 := fmt.Sprintf("10.0.0.%d", offset) + secIPv4Configs[uuid.New().String()] = newSecondaryIPConfig(ipv4, -1) + + ipv6 := fmt.Sprintf("fe80::%d", offset) + secIPv6Configs[uuid.New().String()] = newSecondaryIPConfig(ipv6, -1) + + offset++ + } + + ipv4NC := generateNetworkContainerRequest(secIPv4Configs, "reconcileNc1", "-1") + ipv6NC := generateNetworkContainerRequest(secIPv6Configs, "reconcileNc2", "-1") + + podByIP := map[string]cns.PodInfo{ + "10.0.0.6": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"), + "fe80::6": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"), + + "10.0.0.7": cns.NewPodInfo("some-guid-2", "def-eth0", "reconcilePod2", "PodNS1"), + "fe80::7": cns.NewPodInfo("some-guid-2", "def-eth0", "reconcilePod2", "PodNS1"), + } + + ncReqs := []*cns.CreateNetworkContainerRequest{ipv4NC, ipv6NC} + + returnCode := svc.ReconcileIPAMState(ncReqs, podByIP, &v1alpha.NodeNetworkConfig{ + Status: v1alpha.NodeNetworkConfigStatus{ + Scaler: v1alpha.Scaler{ + BatchSize: batchSize, + ReleaseThresholdPercent: releasePercent, + RequestThresholdPercent: requestPercent, + }, + }, + Spec: v1alpha.NodeNetworkConfigSpec{ + RequestedIPCount: initPoolSize, + }, + }) + + require.Equal(t, types.Success, returnCode) + + validateIPAMStateAfterReconcile(t, ncReqs, podByIP) +} + +func TestReconcileCNSIPAMWithMultipleIPsPerFamilyPerPod(t *testing.T) { + restartService() + setEnv(t) + setOrchestratorTypeInternal(cns.KubernetesCRD) + + secIPv4Configs := make(map[string]cns.SecondaryIPConfig) + secIPv6Configs := make(map[string]cns.SecondaryIPConfig) + + offset := 6 + for i := 0; i < 4; i++ { + ipv4 := fmt.Sprintf("10.0.0.%d", offset) + secIPv4Configs[uuid.New().String()] = newSecondaryIPConfig(ipv4, -1) + + ipv6 := fmt.Sprintf("fe80::%d", offset) + secIPv6Configs[uuid.New().String()] = newSecondaryIPConfig(ipv6, -1) + + offset++ + } + + ipv4NC := generateNetworkContainerRequest(secIPv4Configs, "reconcileNc1", "-1") + ipv6NC := generateNetworkContainerRequest(secIPv6Configs, "reconcileNc2", "-1") + + podByIP := map[string]cns.PodInfo{ + "10.0.0.6": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"), + "10.0.0.7": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"), + "fe80::6": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"), + } + + ncReqs := []*cns.CreateNetworkContainerRequest{ipv4NC, ipv6NC} + + returnCode := svc.ReconcileIPAMState(ncReqs, podByIP, &v1alpha.NodeNetworkConfig{ + Status: v1alpha.NodeNetworkConfigStatus{ + Scaler: v1alpha.Scaler{ + BatchSize: batchSize, + ReleaseThresholdPercent: releasePercent, + RequestThresholdPercent: requestPercent, + }, + }, + Spec: v1alpha.NodeNetworkConfigSpec{ + RequestedIPCount: initPoolSize, + }, + }) + + require.Equal(t, types.UnexpectedError, returnCode) +} + +func TestPodIPsIndexedByInterface(t *testing.T) { + tests := []struct { + name string + input map[string]cns.PodInfo + expectedErr error + expectedOutput map[string]podIPs + }{ + { + name: "happy path", + input: map[string]cns.PodInfo{ + "10.0.0.6": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"), + "fe80::6": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"), + "10.0.0.7": cns.NewPodInfo("some-guid-2", "def-eth0", "reconcilePod2", "PodNS2"), + "fe80::7": cns.NewPodInfo("some-guid-2", "def-eth0", "reconcilePod2", "PodNS2"), + }, + expectedOutput: map[string]podIPs{ + "reconcilePod1:PodNS1": { + PodInfo: cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"), + v4IP: net.IPv4(10, 0, 0, 6), + v6IP: []byte{0xfe, 0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x06}, + }, + "reconcilePod2:PodNS2": { + PodInfo: cns.NewPodInfo("some-guid-2", "def-eth0", "reconcilePod2", "PodNS2"), + v4IP: net.IPv4(10, 0, 0, 7), + v6IP: []byte{0xfe, 0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x07}, + }, + }, + }, + { + name: "multiple ipv4 on single stack pod", + input: map[string]cns.PodInfo{ + "10.0.0.6": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"), + "10.0.0.7": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"), + }, + expectedErr: errMultipleIPPerFamily, + }, + { + name: "multiple ipv4 on dual stack pod", + input: map[string]cns.PodInfo{ + "10.0.0.6": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"), + "10.0.0.7": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"), + "fe80::6": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"), + }, + expectedErr: errMultipleIPPerFamily, + }, + { + name: "multiple ipv6 on single stack pod", + input: map[string]cns.PodInfo{ + "fe80::6": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"), + "fe80::7": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"), + }, + expectedErr: errMultipleIPPerFamily, + }, + { + name: "multiple ipv6 on dual stack pod", + input: map[string]cns.PodInfo{ + "10.0.0.6": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"), + "fe80::6": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"), + "fe80::7": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"), + }, + expectedErr: errMultipleIPPerFamily, + }, + { + name: "malformed ip", + input: map[string]cns.PodInfo{ + "10.0.0.": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"), + }, + expectedErr: errIPParse, + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + out, err := newPodKeyToPodIPsMap(tt.input) + if tt.expectedErr != nil { + require.ErrorIs(t, err, tt.expectedErr) + return + } + + require.Equal(t, tt.expectedOutput, out) + }) + } +} + func TestReconcileNCWithExistingStateFromInterfaceID(t *testing.T) { restartService() setEnv(t) @@ -500,7 +692,7 @@ func TestReconcileNCWithExistingStateFromInterfaceID(t *testing.T) { } expectedNcCount := len(svc.state.ContainerStatus) - returnCode := svc.ReconcileNCState(req, expectedAssignedPods, &v1alpha.NodeNetworkConfig{ + returnCode := svc.ReconcileIPAMState([]*cns.CreateNetworkContainerRequest{req}, expectedAssignedPods, &v1alpha.NodeNetworkConfig{ Status: v1alpha.NodeNetworkConfigStatus{ Scaler: v1alpha.Scaler{ BatchSize: batchSize, @@ -519,7 +711,7 @@ func TestReconcileNCWithExistingStateFromInterfaceID(t *testing.T) { validateNCStateAfterReconcile(t, req, expectedNcCount+1, expectedAssignedPods, nil) } -func TestReconcileNCWithSystemPods(t *testing.T) { +func TestReconcileCNSIPAMWithKubePodInfoProvider(t *testing.T) { restartService() setEnv(t) setOrchestratorTypeInternal(cns.KubernetesCRD) @@ -536,14 +728,16 @@ func TestReconcileNCWithSystemPods(t *testing.T) { } req := generateNetworkContainerRequest(secondaryIPConfigs, uuid.New().String(), "-1") + // the following pod info constructors leave container id and interface id blank on purpose. + // this is to simulate the information we get from the kube info provider expectedAssignedPods := make(map[string]cns.PodInfo) expectedAssignedPods["10.0.0.6"] = cns.NewPodInfo("", "", "customerpod1", "PodNS1") - // Allocate non-vnet IP for system pod + // allocate non-vnet IP for pod in host network expectedAssignedPods["192.168.0.1"] = cns.NewPodInfo("", "", "systempod", "kube-system") expectedNcCount := len(svc.state.ContainerStatus) - returnCode := svc.ReconcileNCState(req, expectedAssignedPods, &v1alpha.NodeNetworkConfig{ + returnCode := svc.ReconcileIPAMState([]*cns.CreateNetworkContainerRequest{req}, expectedAssignedPods, &v1alpha.NodeNetworkConfig{ Status: v1alpha.NodeNetworkConfigStatus{ Scaler: v1alpha.Scaler{ BatchSize: batchSize, @@ -813,6 +1007,51 @@ func validateNCStateAfterReconcile(t *testing.T, ncRequest *cns.CreateNetworkCon } } +func validateIPAMStateAfterReconcile(t *testing.T, ncReqs []*cns.CreateNetworkContainerRequest, expectedAssignedIPs map[string]cns.PodInfo) { + t.Helper() + + interfaceIdx, err := newPodKeyToPodIPsMap(expectedAssignedIPs) + require.NoError(t, err, "expected IPs contain incorrect state") + + assert.Len(t, svc.PodIPIDByPodInterfaceKey, len(interfaceIdx), "unexepected quantity of interfaces in CNS") + + for ipAddress, podInfo := range expectedAssignedIPs { + podIPUUIDs, ok := svc.PodIPIDByPodInterfaceKey[podInfo.Key()] + assert.Truef(t, ok, "no pod uuids for pod info key %s", podInfo.Key()) + + for _, ipID := range podIPUUIDs { + ipConfigstate, ok := svc.PodIPConfigState[ipID] + assert.Truef(t, ok, "ip id %s not found in CNS pod ip id index", ipID) + assert.Equalf(t, types.Assigned, ipConfigstate.GetState(), "ip address %s not marked as assigned to pod: %+v, ip config state: %+v", ipAddress, podInfo, ipConfigstate) + + nc, ok := svc.state.ContainerStatus[ipConfigstate.NCID] + assert.Truef(t, ok, "nc id %s in ip config state %+v not found in CNS container status index", nc, ipConfigstate) + + _, ok = nc.CreateNetworkContainerRequest.SecondaryIPConfigs[ipID] + assert.Truef(t, ok, "secondary ip id %s not found in nc request", ipID) + } + } + + allSecIPsIdx := make(map[string]*cns.CreateNetworkContainerRequest) + for i := range ncReqs { + for _, secIPConfig := range ncReqs[i].SecondaryIPConfigs { + allSecIPsIdx[secIPConfig.IPAddress] = ncReqs[i] + } + } + + // validate rest of secondary IPs in available state + for _, ncReq := range ncReqs { + for secIPID, secIPConfig := range ncReq.SecondaryIPConfigs { + secIPConfigState, ok := svc.PodIPConfigState[secIPID] + assert.True(t, ok) + + if _, isAssigned := expectedAssignedIPs[secIPConfig.IPAddress]; !isAssigned { + assert.Equal(t, types.Available, secIPConfigState.GetState()) + } + } + } +} + func createNCReqInternal(t *testing.T, secondaryIPConfigs map[string]cns.SecondaryIPConfig, ncID, ncVersion string) cns.CreateNetworkContainerRequest { req := generateNetworkContainerRequest(secondaryIPConfigs, ncID, ncVersion) returnCode := svc.CreateOrUpdateNetworkContainerInternal(req) diff --git a/cns/service/main.go b/cns/service/main.go index 6c267b7eb..ed9dfae70 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -1017,13 +1017,13 @@ type nodeNetworkConfigGetter interface { Get(context.Context) (*v1alpha.NodeNetworkConfig, error) } -type ncStateReconciler interface { - ReconcileNCState(ncRequest *cns.CreateNetworkContainerRequest, podInfoByIP map[string]cns.PodInfo, nnc *v1alpha.NodeNetworkConfig) cnstypes.ResponseCode +type ipamStateReconciler interface { + ReconcileIPAMState(ncRequests []*cns.CreateNetworkContainerRequest, podInfoByIP map[string]cns.PodInfo, nnc *v1alpha.NodeNetworkConfig) cnstypes.ResponseCode } // TODO(rbtr) where should this live?? // reconcileInitialCNSState initializes cns by passing pods and a CreateNetworkContainerRequest -func reconcileInitialCNSState(ctx context.Context, cli nodeNetworkConfigGetter, ncReconciler ncStateReconciler, podInfoByIPProvider cns.PodInfoByIPProvider) error { +func reconcileInitialCNSState(ctx context.Context, cli nodeNetworkConfigGetter, ipamReconciler ipamStateReconciler, podInfoByIPProvider cns.PodInfoByIPProvider) error { // Get nnc using direct client nnc, err := cli.Get(ctx) if err != nil { @@ -1043,11 +1043,20 @@ func reconcileInitialCNSState(ctx context.Context, cli nodeNetworkConfigGetter, return errors.New("failed to init CNS state: no NCs found in NNC CRD") } + // Get previous PodInfo state from podInfoByIPProvider + podInfoByIP, err := podInfoByIPProvider.PodInfoByIP() + if err != nil { + return errors.Wrap(err, "provider failed to provide PodInfoByIP") + } + + ncReqs := make([]*cns.CreateNetworkContainerRequest, len(nnc.Status.NetworkContainers)) + // For each NC, we need to create a CreateNetworkContainerRequest and use it to rebuild our state. for i := range nnc.Status.NetworkContainers { - var ncRequest *cns.CreateNetworkContainerRequest - var err error - + var ( + ncRequest *cns.CreateNetworkContainerRequest + err error + ) switch nnc.Status.NetworkContainers[i].AssignmentMode { //nolint:exhaustive // skipping dynamic case case v1alpha.Static: ncRequest, err = nncctrl.CreateNCRequestFromStaticNC(nnc.Status.NetworkContainers[i]) @@ -1059,17 +1068,15 @@ func reconcileInitialCNSState(ctx context.Context, cli nodeNetworkConfigGetter, return errors.Wrapf(err, "failed to convert NNC status to network container request, "+ "assignmentMode: %s", nnc.Status.NetworkContainers[i].AssignmentMode) } - // Get previous PodInfo state from podInfoByIPProvider - podInfoByIP, err := podInfoByIPProvider.PodInfoByIP() - if err != nil { - return errors.Wrap(err, "provider failed to provide PodInfoByIP") - } - // Call cnsclient init cns passing those two things. - if err := restserver.ResponseCodeToError(ncReconciler.ReconcileNCState(ncRequest, podInfoByIP, nnc)); err != nil { - return errors.Wrap(err, "failed to reconcile NC state") - } + ncReqs[i] = ncRequest } + + // Call cnsclient init cns passing those two things. + if err := restserver.ResponseCodeToError(ipamReconciler.ReconcileIPAMState(ncReqs, podInfoByIP, nnc)); err != nil { + return errors.Wrap(err, "failed to reconcile CNS IPAM state") + } + return nil }