CNS - consider multiple IPs for a pod in reconciliation after restart (#2079)

* modifying reconcile function to take multiple ncs instead of one. we need this because for reconciliation in dual stack cases both IPs for a pod which can come from distinct ncs must be added at the same time

* adding comments, and renaming functions and types, to make the intent clearer

* adding some dummy values to cns.NewPodInfo invocations in tests, instead of empty strings since we need distinct interface ids

* adding a basic test for dual stack reconciliation

* adding validation for multiple ips for the same ip family on the same pod, which is not allowed

* changing direct use of interface id to pod key, which is better for reconcile flows using information from kubernetes instead of cni

* fixing comments to use host network terminology instead of system pod

* improving error message on duplicate ip from cni found; improving readability of error handling on ip parsing

* only checking for pod ips that are actually set on a pod
This commit is contained in:
Ramiro 2023-08-01 18:49:21 -07:00 коммит произвёл GitHub
Родитель 0e4fbdd3cf
Коммит c8005ed94b
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 448 добавлений и 90 удалений

Просмотреть файл

@ -314,7 +314,7 @@ func TestCNSClientPodContextApi(t *testing.T) {
addTestStateToRestServer(t, secondaryIps)
podInfo := cns.NewPodInfo("", "", podName, podNamespace)
podInfo := cns.NewPodInfo("some-guid-1", "abc-eth0", podName, podNamespace)
orchestratorContext, err := json.Marshal(podInfo)
assert.NoError(t, err)
@ -344,7 +344,7 @@ func TestCNSClientDebugAPI(t *testing.T) {
addTestStateToRestServer(t, secondaryIps)
podInfo := cns.NewPodInfo("", "", podName, podNamespace)
podInfo := cns.NewPodInfo("some-guid-1", "abc-eth0", podName, podNamespace)
orchestratorContext, err := json.Marshal(podInfo)
assert.NoError(t, err)

Просмотреть файл

@ -51,19 +51,22 @@ func newCNIPodInfoProvider(exec exec.Interface) (cns.PodInfoByIPProvider, error)
}
// cniStateToPodInfoByIP converts an AzureCNIState dumped from a CNI exec
// into a PodInfo map, using the first endpoint IP as the key in the map.
// into a PodInfo map, using the endpoint IPs as keys in the map.
// for pods with multiple IPs (such as in dualstack cases), this means multiple keys in the map
// will point to the same pod information.
func cniStateToPodInfoByIP(state *api.AzureCNIState) (map[string]cns.PodInfo, error) {
podInfoByIP := map[string]cns.PodInfo{}
for _, endpoint := range state.ContainerInterfaces {
if _, ok := podInfoByIP[endpoint.IPAddresses[0].IP.String()]; ok {
return nil, errors.Wrap(cns.ErrDuplicateIP, endpoint.IPAddresses[0].IP.String())
for _, epIP := range endpoint.IPAddresses {
podInfo := cns.NewPodInfo(endpoint.ContainerID, endpoint.PodEndpointId, endpoint.PodName, endpoint.PodNamespace)
ipKey := epIP.IP.String()
if prevPodInfo, ok := podInfoByIP[ipKey]; ok {
return nil, errors.Wrapf(cns.ErrDuplicateIP, "duplicate ip %s found for different pods: pod: %+v, pod: %+v", ipKey, podInfo, prevPodInfo)
}
podInfoByIP[ipKey] = podInfo
}
podInfoByIP[endpoint.IPAddresses[0].IP.String()] = cns.NewPodInfo(
endpoint.ContainerID,
endpoint.PodEndpointId,
endpoint.PodName,
endpoint.PodNamespace,
)
}
return podInfoByIP, nil
}

Просмотреть файл

@ -8,6 +8,7 @@ import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"net/http/httptest"
"reflect"
@ -274,50 +275,110 @@ func (service *HTTPRestService) syncHostNCVersion(ctx context.Context, channelMo
return len(programmedNCs), nil
}
// This API will be called by CNS RequestController on CRD update.
func (service *HTTPRestService) ReconcileNCState(ncRequest *cns.CreateNetworkContainerRequest, podInfoByIP map[string]cns.PodInfo, nnc *v1alpha.NodeNetworkConfig) types.ResponseCode {
logger.Printf("Reconciling NC state with CreateNCRequest: [%v], PodInfo [%+v], NNC: [%+v]", ncRequest, podInfoByIP, nnc)
// check if ncRequest is null, then return as there is no CRD state yet
if ncRequest == nil {
func (service *HTTPRestService) ReconcileIPAMState(ncReqs []*cns.CreateNetworkContainerRequest, podInfoByIP map[string]cns.PodInfo, nnc *v1alpha.NodeNetworkConfig) types.ResponseCode {
logger.Printf("Reconciling CNS IPAM state with nc requests: [%+v], PodInfo [%+v], NNC: [%+v]", ncReqs, podInfoByIP, nnc)
// if no nc reqs, there is no CRD state yet
if len(ncReqs) == 0 {
logger.Printf("CNS starting with no NC state, podInfoMap count %d", len(podInfoByIP))
return types.Success
}
// If the NC was created successfully, then reconcile the assigned pod state
returnCode := service.CreateOrUpdateNetworkContainerInternal(ncRequest)
if returnCode != types.Success {
return returnCode
}
// now parse the secondaryIP list, if it exists in PodInfo list, then assign that ip.
for _, secIPConfig := range ncRequest.SecondaryIPConfigs {
if podInfo, exists := podInfoByIP[secIPConfig.IPAddress]; exists {
logger.Printf("SecondaryIP %+v is assigned to Pod. %+v, ncId: %s", secIPConfig, podInfo, ncRequest.NetworkContainerid)
jsonContext, err := podInfo.OrchestratorContext()
if err != nil {
logger.Errorf("Failed to marshal KubernetesPodInfo, error: %v", err)
return types.UnexpectedError
}
ipconfigsRequest := cns.IPConfigsRequest{
DesiredIPAddresses: []string{secIPConfig.IPAddress},
OrchestratorContext: jsonContext,
InfraContainerID: podInfo.InfraContainerID(),
PodInterfaceID: podInfo.InterfaceID(),
}
if _, err := requestIPConfigsHelper(service, ipconfigsRequest); err != nil {
logger.Errorf("AllocateIPConfig failed for SecondaryIP %+v, podInfo %+v, ncId %s, error: %v", secIPConfig, podInfo, ncRequest.NetworkContainerid, err)
return types.FailedToAllocateIPConfig
}
} else {
logger.Printf("SecondaryIP %+v is not assigned. ncId: %s", secIPConfig, ncRequest.NetworkContainerid)
// first step in reconciliation is to create all the NCs in CNS, no IP assignment yet.
for _, ncReq := range ncReqs {
returnCode := service.CreateOrUpdateNetworkContainerInternal(ncReq)
if returnCode != types.Success {
return returnCode
}
}
err := service.MarkExistingIPsAsPendingRelease(nnc.Spec.IPsNotInUse)
// index all the secondary IP configs for all the nc reqs, for easier lookup later on.
allSecIPsIdx := make(map[string]*cns.CreateNetworkContainerRequest)
for i := range ncReqs {
for _, secIPConfig := range ncReqs[i].SecondaryIPConfigs {
allSecIPsIdx[secIPConfig.IPAddress] = ncReqs[i]
}
}
// we now need to reconcile IP assignment.
// considering that a single pod may have multiple ips (such as in dual stack scenarios)
// and that IP assignment in CNS (as done by requestIPConfigsHelper) does not allow
// updates (it returns the existing state if one already exists for the pod's interface),
// we need to assign all IPs for a pod interface or name+namespace at the same time.
//
// iterating over single IPs is not appropriate then, since assignment for the first IP for
// a pod will prevent the second IP from being added. the following function call transforms
// pod info indexed by ip address:
//
// {
// "10.0.0.1": podInfo{interface: "aaa-eth0"},
// "fe80::1": podInfo{interface: "aaa-eth0"},
// }
//
// to pod IPs indexed by pod key (interface or name+namespace, depending on scenario):
//
// {
// "aaa-eth0": podIPs{v4IP: 10.0.0.1, v6IP: fe80::1}
// }
//
// such that we can iterate over pod interfaces, and assign all IPs for it at once.
podKeyToPodIPs, err := newPodKeyToPodIPsMap(podInfoByIP)
if err != nil {
logger.Errorf("could not transform pods indexed by IP address to pod IPs indexed by interface: %v", err)
return types.UnexpectedError
}
for podKey, podIPs := range podKeyToPodIPs {
var (
desiredIPs []string
ncIDs []string
)
var ips []net.IP
if podIPs.v4IP != nil {
ips = append(ips, podIPs.v4IP)
}
if podIPs.v6IP != nil {
ips = append(ips, podIPs.v6IP)
}
for _, ip := range ips {
if ncReq, ok := allSecIPsIdx[ip.String()]; ok {
logger.Printf("secondary ip %s is assigned to pod %+v, ncId: %s ncVersion: %s", ip, podIPs, ncReq.NetworkContainerid, ncReq.Version)
desiredIPs = append(desiredIPs, ip.String())
ncIDs = append(ncIDs, ncReq.NetworkContainerid)
} else {
// it might still be possible to see host networking pods here (where ips are not from ncs) if we are restoring using the kube podinfo provider
// todo: once kube podinfo provider reconcile flow is removed, this line will not be necessary/should be removed.
logger.Errorf("ip %s assigned to pod %+v but not found in any nc", ip, podIPs)
}
}
if len(desiredIPs) == 0 {
// this may happen for pods in the host network
continue
}
jsonContext, err := podIPs.OrchestratorContext()
if err != nil {
logger.Errorf("Failed to marshal KubernetesPodInfo, error: %v", err)
return types.UnexpectedError
}
ipconfigsRequest := cns.IPConfigsRequest{
DesiredIPAddresses: desiredIPs,
OrchestratorContext: jsonContext,
InfraContainerID: podIPs.InfraContainerID(),
PodInterfaceID: podIPs.InterfaceID(),
}
if _, err := requestIPConfigsHelper(service, ipconfigsRequest); err != nil {
logger.Errorf("requestIPConfigsHelper failed for pod key %s, podInfo %+v, ncIds %v, error: %v", podKey, podIPs, ncIDs, err)
return types.FailedToAllocateIPConfig
}
}
if err := service.MarkExistingIPsAsPendingRelease(nnc.Spec.IPsNotInUse); err != nil {
logger.Errorf("[Azure CNS] Error. Failed to mark IPs as pending %v", nnc.Spec.IPsNotInUse)
return types.UnexpectedError
}
@ -325,6 +386,54 @@ func (service *HTTPRestService) ReconcileNCState(ncRequest *cns.CreateNetworkCon
return 0
}
var (
errIPParse = errors.New("parse IP")
errMultipleIPPerFamily = errors.New("multiple IPs per family")
)
// newPodKeyToPodIPsMap groups IPs by interface id and returns them indexed by interface id.
func newPodKeyToPodIPsMap(podInfoByIP map[string]cns.PodInfo) (map[string]podIPs, error) {
podKeyToPodIPs := make(map[string]podIPs)
for ipStr, podInfo := range podInfoByIP {
id := podInfo.Key()
ips, ok := podKeyToPodIPs[id]
if !ok {
ips.PodInfo = podInfo
}
ip := net.ParseIP(ipStr)
switch {
case ip == nil:
return nil, errors.Wrapf(errIPParse, "could not parse ip string %q on pod %+v", ipStr, podInfo)
case ip.To4() != nil:
if ips.v4IP != nil {
return nil, errors.Wrapf(errMultipleIPPerFamily, "multiple ipv4 addresses (%v, %v) associated to pod %+v", ips.v4IP, ip, podInfo)
}
ips.v4IP = ip
case ip.To16() != nil:
if ips.v6IP != nil {
return nil, errors.Wrapf(errMultipleIPPerFamily, "multiple ipv6 addresses (%v, %v) associated to pod %+v", ips.v6IP, ip, podInfo)
}
ips.v6IP = ip
}
podKeyToPodIPs[id] = ips
}
return podKeyToPodIPs, nil
}
// podIPs are all the IPs associated with a pod, along with pod info
type podIPs struct {
cns.PodInfo
v4IP net.IP
v6IP net.IP
}
// GetNetworkContainerInternal gets network container details.
func (service *HTTPRestService) GetNetworkContainerInternal(
req cns.GetNetworkContainerRequest,

Просмотреть файл

@ -7,6 +7,7 @@ import (
"context"
"fmt"
"math/rand"
"net"
"os"
"reflect"
"strconv"
@ -22,6 +23,7 @@ import (
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/maps"
)
@ -73,16 +75,20 @@ func TestReconcileNCStatePrimaryIPChangeShouldFail(t *testing.T) {
},
}
// now try to reconcile the state where the NC primary IP has changed
resp := svc.ReconcileNCState(&cns.CreateNetworkContainerRequest{
NetworkContainerid: ncID,
IPConfiguration: cns.IPConfiguration{
IPSubnet: cns.IPSubnet{
IPAddress: "10.0.2.0", // note this IP has changed
PrefixLength: 24,
ncReqs := []*cns.CreateNetworkContainerRequest{
{
NetworkContainerid: ncID,
IPConfiguration: cns.IPConfiguration{
IPSubnet: cns.IPSubnet{
IPAddress: "10.0.2.0", // note this IP has changed
PrefixLength: 24,
},
},
},
}, map[string]cns.PodInfo{}, &v1alpha.NodeNetworkConfig{})
}
// now try to reconcile the state where the NC primary IP has changed
resp := svc.ReconcileIPAMState(ncReqs, map[string]cns.PodInfo{}, &v1alpha.NodeNetworkConfig{})
assert.Equal(t, types.PrimaryCANotSame, resp)
}
@ -117,15 +123,19 @@ func TestReconcileNCStateGatewayChange(t *testing.T) {
},
}
// now try to reconcile the state where the NC gateway has changed
resp := svc.ReconcileNCState(&cns.CreateNetworkContainerRequest{
NetworkContainerid: ncID,
NetworkContainerType: cns.Kubernetes,
IPConfiguration: cns.IPConfiguration{
IPSubnet: ncPrimaryIP,
GatewayIPAddress: newGW, // note this IP has changed
ncReqs := []*cns.CreateNetworkContainerRequest{
{
NetworkContainerid: ncID,
NetworkContainerType: cns.Kubernetes,
IPConfiguration: cns.IPConfiguration{
IPSubnet: ncPrimaryIP,
GatewayIPAddress: newGW, // note this IP has changed
},
},
}, map[string]cns.PodInfo{}, &v1alpha.NodeNetworkConfig{})
}
// now try to reconcile the state where the NC gateway has changed
resp := svc.ReconcileIPAMState(ncReqs, map[string]cns.PodInfo{}, &v1alpha.NodeNetworkConfig{})
assert.Equal(t, types.Success, resp)
// assert the new state reflects the gateway update
@ -322,7 +332,7 @@ func TestReconcileNCWithEmptyState(t *testing.T) {
expectedNcCount := len(svc.state.ContainerStatus)
expectedAssignedPods := make(map[string]cns.PodInfo)
returnCode := svc.ReconcileNCState(nil, expectedAssignedPods, &v1alpha.NodeNetworkConfig{
returnCode := svc.ReconcileIPAMState(nil, expectedAssignedPods, &v1alpha.NodeNetworkConfig{
Status: v1alpha.NodeNetworkConfigStatus{
Scaler: v1alpha.Scaler{
BatchSize: batchSize,
@ -372,7 +382,7 @@ func TestReconcileNCWithEmptyStateAndPendingRelease(t *testing.T) {
return pendingIPs
}()
req := generateNetworkContainerRequest(secondaryIPConfigs, "reconcileNc1", "-1")
returnCode := svc.ReconcileNCState(req, expectedAssignedPods, &v1alpha.NodeNetworkConfig{
returnCode := svc.ReconcileIPAMState([]*cns.CreateNetworkContainerRequest{req}, expectedAssignedPods, &v1alpha.NodeNetworkConfig{
Spec: v1alpha.NodeNetworkConfigSpec{
IPsNotInUse: pending,
},
@ -398,8 +408,8 @@ func TestReconcileNCWithExistingStateAndPendingRelease(t *testing.T) {
secondaryIPConfigs[ipID.String()] = secIPConfig
}
expectedAssignedPods := map[string]cns.PodInfo{
"10.0.0.6": cns.NewPodInfo("", "", "reconcilePod1", "PodNS1"),
"10.0.0.7": cns.NewPodInfo("", "", "reconcilePod2", "PodNS1"),
"10.0.0.6": cns.NewPodInfo("some-guid-1", "58a0b427-eth0", "reconcilePod1", "PodNS1"),
"10.0.0.7": cns.NewPodInfo("some-guid-2", "45b9b555-eth0", "reconcilePod2", "PodNS1"),
}
pendingIPIDs := func() map[string]cns.PodInfo {
numPending := rand.Intn(len(secondaryIPConfigs)) + 1 //nolint:gosec // weak rand is sufficient in test
@ -419,7 +429,7 @@ func TestReconcileNCWithExistingStateAndPendingRelease(t *testing.T) {
req := generateNetworkContainerRequest(secondaryIPConfigs, "reconcileNc1", "-1")
expectedNcCount := len(svc.state.ContainerStatus)
returnCode := svc.ReconcileNCState(req, expectedAssignedPods, &v1alpha.NodeNetworkConfig{
returnCode := svc.ReconcileIPAMState([]*cns.CreateNetworkContainerRequest{req}, expectedAssignedPods, &v1alpha.NodeNetworkConfig{
Spec: v1alpha.NodeNetworkConfigSpec{
IPsNotInUse: maps.Keys(pendingIPIDs),
},
@ -451,12 +461,12 @@ func TestReconcileNCWithExistingState(t *testing.T) {
req := generateNetworkContainerRequest(secondaryIPConfigs, "reconcileNc1", "-1")
expectedAssignedPods := map[string]cns.PodInfo{
"10.0.0.6": cns.NewPodInfo("", "", "reconcilePod1", "PodNS1"),
"10.0.0.7": cns.NewPodInfo("", "", "reconcilePod2", "PodNS1"),
"10.0.0.6": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"),
"10.0.0.7": cns.NewPodInfo("some-guid-2", "def-eth0", "reconcilePod2", "PodNS1"),
}
expectedNcCount := len(svc.state.ContainerStatus)
returnCode := svc.ReconcileNCState(req, expectedAssignedPods, &v1alpha.NodeNetworkConfig{
returnCode := svc.ReconcileIPAMState([]*cns.CreateNetworkContainerRequest{req}, expectedAssignedPods, &v1alpha.NodeNetworkConfig{
Status: v1alpha.NodeNetworkConfigStatus{
Scaler: v1alpha.Scaler{
BatchSize: batchSize,
@ -475,6 +485,188 @@ func TestReconcileNCWithExistingState(t *testing.T) {
validateNCStateAfterReconcile(t, req, expectedNcCount+1, expectedAssignedPods, nil)
}
func TestReconcileCNSIPAMWithDualStackPods(t *testing.T) {
restartService()
setEnv(t)
setOrchestratorTypeInternal(cns.KubernetesCRD)
secIPv4Configs := make(map[string]cns.SecondaryIPConfig)
secIPv6Configs := make(map[string]cns.SecondaryIPConfig)
offset := 6
for i := 0; i < 4; i++ {
ipv4 := fmt.Sprintf("10.0.0.%d", offset)
secIPv4Configs[uuid.New().String()] = newSecondaryIPConfig(ipv4, -1)
ipv6 := fmt.Sprintf("fe80::%d", offset)
secIPv6Configs[uuid.New().String()] = newSecondaryIPConfig(ipv6, -1)
offset++
}
ipv4NC := generateNetworkContainerRequest(secIPv4Configs, "reconcileNc1", "-1")
ipv6NC := generateNetworkContainerRequest(secIPv6Configs, "reconcileNc2", "-1")
podByIP := map[string]cns.PodInfo{
"10.0.0.6": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"),
"fe80::6": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"),
"10.0.0.7": cns.NewPodInfo("some-guid-2", "def-eth0", "reconcilePod2", "PodNS1"),
"fe80::7": cns.NewPodInfo("some-guid-2", "def-eth0", "reconcilePod2", "PodNS1"),
}
ncReqs := []*cns.CreateNetworkContainerRequest{ipv4NC, ipv6NC}
returnCode := svc.ReconcileIPAMState(ncReqs, podByIP, &v1alpha.NodeNetworkConfig{
Status: v1alpha.NodeNetworkConfigStatus{
Scaler: v1alpha.Scaler{
BatchSize: batchSize,
ReleaseThresholdPercent: releasePercent,
RequestThresholdPercent: requestPercent,
},
},
Spec: v1alpha.NodeNetworkConfigSpec{
RequestedIPCount: initPoolSize,
},
})
require.Equal(t, types.Success, returnCode)
validateIPAMStateAfterReconcile(t, ncReqs, podByIP)
}
func TestReconcileCNSIPAMWithMultipleIPsPerFamilyPerPod(t *testing.T) {
restartService()
setEnv(t)
setOrchestratorTypeInternal(cns.KubernetesCRD)
secIPv4Configs := make(map[string]cns.SecondaryIPConfig)
secIPv6Configs := make(map[string]cns.SecondaryIPConfig)
offset := 6
for i := 0; i < 4; i++ {
ipv4 := fmt.Sprintf("10.0.0.%d", offset)
secIPv4Configs[uuid.New().String()] = newSecondaryIPConfig(ipv4, -1)
ipv6 := fmt.Sprintf("fe80::%d", offset)
secIPv6Configs[uuid.New().String()] = newSecondaryIPConfig(ipv6, -1)
offset++
}
ipv4NC := generateNetworkContainerRequest(secIPv4Configs, "reconcileNc1", "-1")
ipv6NC := generateNetworkContainerRequest(secIPv6Configs, "reconcileNc2", "-1")
podByIP := map[string]cns.PodInfo{
"10.0.0.6": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"),
"10.0.0.7": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"),
"fe80::6": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"),
}
ncReqs := []*cns.CreateNetworkContainerRequest{ipv4NC, ipv6NC}
returnCode := svc.ReconcileIPAMState(ncReqs, podByIP, &v1alpha.NodeNetworkConfig{
Status: v1alpha.NodeNetworkConfigStatus{
Scaler: v1alpha.Scaler{
BatchSize: batchSize,
ReleaseThresholdPercent: releasePercent,
RequestThresholdPercent: requestPercent,
},
},
Spec: v1alpha.NodeNetworkConfigSpec{
RequestedIPCount: initPoolSize,
},
})
require.Equal(t, types.UnexpectedError, returnCode)
}
func TestPodIPsIndexedByInterface(t *testing.T) {
tests := []struct {
name string
input map[string]cns.PodInfo
expectedErr error
expectedOutput map[string]podIPs
}{
{
name: "happy path",
input: map[string]cns.PodInfo{
"10.0.0.6": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"),
"fe80::6": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"),
"10.0.0.7": cns.NewPodInfo("some-guid-2", "def-eth0", "reconcilePod2", "PodNS2"),
"fe80::7": cns.NewPodInfo("some-guid-2", "def-eth0", "reconcilePod2", "PodNS2"),
},
expectedOutput: map[string]podIPs{
"reconcilePod1:PodNS1": {
PodInfo: cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"),
v4IP: net.IPv4(10, 0, 0, 6),
v6IP: []byte{0xfe, 0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x06},
},
"reconcilePod2:PodNS2": {
PodInfo: cns.NewPodInfo("some-guid-2", "def-eth0", "reconcilePod2", "PodNS2"),
v4IP: net.IPv4(10, 0, 0, 7),
v6IP: []byte{0xfe, 0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x07},
},
},
},
{
name: "multiple ipv4 on single stack pod",
input: map[string]cns.PodInfo{
"10.0.0.6": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"),
"10.0.0.7": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"),
},
expectedErr: errMultipleIPPerFamily,
},
{
name: "multiple ipv4 on dual stack pod",
input: map[string]cns.PodInfo{
"10.0.0.6": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"),
"10.0.0.7": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"),
"fe80::6": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"),
},
expectedErr: errMultipleIPPerFamily,
},
{
name: "multiple ipv6 on single stack pod",
input: map[string]cns.PodInfo{
"fe80::6": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"),
"fe80::7": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"),
},
expectedErr: errMultipleIPPerFamily,
},
{
name: "multiple ipv6 on dual stack pod",
input: map[string]cns.PodInfo{
"10.0.0.6": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"),
"fe80::6": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"),
"fe80::7": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"),
},
expectedErr: errMultipleIPPerFamily,
},
{
name: "malformed ip",
input: map[string]cns.PodInfo{
"10.0.0.": cns.NewPodInfo("some-guid-1", "abc-eth0", "reconcilePod1", "PodNS1"),
},
expectedErr: errIPParse,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
out, err := newPodKeyToPodIPsMap(tt.input)
if tt.expectedErr != nil {
require.ErrorIs(t, err, tt.expectedErr)
return
}
require.Equal(t, tt.expectedOutput, out)
})
}
}
func TestReconcileNCWithExistingStateFromInterfaceID(t *testing.T) {
restartService()
setEnv(t)
@ -500,7 +692,7 @@ func TestReconcileNCWithExistingStateFromInterfaceID(t *testing.T) {
}
expectedNcCount := len(svc.state.ContainerStatus)
returnCode := svc.ReconcileNCState(req, expectedAssignedPods, &v1alpha.NodeNetworkConfig{
returnCode := svc.ReconcileIPAMState([]*cns.CreateNetworkContainerRequest{req}, expectedAssignedPods, &v1alpha.NodeNetworkConfig{
Status: v1alpha.NodeNetworkConfigStatus{
Scaler: v1alpha.Scaler{
BatchSize: batchSize,
@ -519,7 +711,7 @@ func TestReconcileNCWithExistingStateFromInterfaceID(t *testing.T) {
validateNCStateAfterReconcile(t, req, expectedNcCount+1, expectedAssignedPods, nil)
}
func TestReconcileNCWithSystemPods(t *testing.T) {
func TestReconcileCNSIPAMWithKubePodInfoProvider(t *testing.T) {
restartService()
setEnv(t)
setOrchestratorTypeInternal(cns.KubernetesCRD)
@ -536,14 +728,16 @@ func TestReconcileNCWithSystemPods(t *testing.T) {
}
req := generateNetworkContainerRequest(secondaryIPConfigs, uuid.New().String(), "-1")
// the following pod info constructors leave container id and interface id blank on purpose.
// this is to simulate the information we get from the kube info provider
expectedAssignedPods := make(map[string]cns.PodInfo)
expectedAssignedPods["10.0.0.6"] = cns.NewPodInfo("", "", "customerpod1", "PodNS1")
// Allocate non-vnet IP for system pod
// allocate non-vnet IP for pod in host network
expectedAssignedPods["192.168.0.1"] = cns.NewPodInfo("", "", "systempod", "kube-system")
expectedNcCount := len(svc.state.ContainerStatus)
returnCode := svc.ReconcileNCState(req, expectedAssignedPods, &v1alpha.NodeNetworkConfig{
returnCode := svc.ReconcileIPAMState([]*cns.CreateNetworkContainerRequest{req}, expectedAssignedPods, &v1alpha.NodeNetworkConfig{
Status: v1alpha.NodeNetworkConfigStatus{
Scaler: v1alpha.Scaler{
BatchSize: batchSize,
@ -813,6 +1007,51 @@ func validateNCStateAfterReconcile(t *testing.T, ncRequest *cns.CreateNetworkCon
}
}
func validateIPAMStateAfterReconcile(t *testing.T, ncReqs []*cns.CreateNetworkContainerRequest, expectedAssignedIPs map[string]cns.PodInfo) {
t.Helper()
interfaceIdx, err := newPodKeyToPodIPsMap(expectedAssignedIPs)
require.NoError(t, err, "expected IPs contain incorrect state")
assert.Len(t, svc.PodIPIDByPodInterfaceKey, len(interfaceIdx), "unexepected quantity of interfaces in CNS")
for ipAddress, podInfo := range expectedAssignedIPs {
podIPUUIDs, ok := svc.PodIPIDByPodInterfaceKey[podInfo.Key()]
assert.Truef(t, ok, "no pod uuids for pod info key %s", podInfo.Key())
for _, ipID := range podIPUUIDs {
ipConfigstate, ok := svc.PodIPConfigState[ipID]
assert.Truef(t, ok, "ip id %s not found in CNS pod ip id index", ipID)
assert.Equalf(t, types.Assigned, ipConfigstate.GetState(), "ip address %s not marked as assigned to pod: %+v, ip config state: %+v", ipAddress, podInfo, ipConfigstate)
nc, ok := svc.state.ContainerStatus[ipConfigstate.NCID]
assert.Truef(t, ok, "nc id %s in ip config state %+v not found in CNS container status index", nc, ipConfigstate)
_, ok = nc.CreateNetworkContainerRequest.SecondaryIPConfigs[ipID]
assert.Truef(t, ok, "secondary ip id %s not found in nc request", ipID)
}
}
allSecIPsIdx := make(map[string]*cns.CreateNetworkContainerRequest)
for i := range ncReqs {
for _, secIPConfig := range ncReqs[i].SecondaryIPConfigs {
allSecIPsIdx[secIPConfig.IPAddress] = ncReqs[i]
}
}
// validate rest of secondary IPs in available state
for _, ncReq := range ncReqs {
for secIPID, secIPConfig := range ncReq.SecondaryIPConfigs {
secIPConfigState, ok := svc.PodIPConfigState[secIPID]
assert.True(t, ok)
if _, isAssigned := expectedAssignedIPs[secIPConfig.IPAddress]; !isAssigned {
assert.Equal(t, types.Available, secIPConfigState.GetState())
}
}
}
}
func createNCReqInternal(t *testing.T, secondaryIPConfigs map[string]cns.SecondaryIPConfig, ncID, ncVersion string) cns.CreateNetworkContainerRequest {
req := generateNetworkContainerRequest(secondaryIPConfigs, ncID, ncVersion)
returnCode := svc.CreateOrUpdateNetworkContainerInternal(req)

Просмотреть файл

@ -1017,13 +1017,13 @@ type nodeNetworkConfigGetter interface {
Get(context.Context) (*v1alpha.NodeNetworkConfig, error)
}
type ncStateReconciler interface {
ReconcileNCState(ncRequest *cns.CreateNetworkContainerRequest, podInfoByIP map[string]cns.PodInfo, nnc *v1alpha.NodeNetworkConfig) cnstypes.ResponseCode
type ipamStateReconciler interface {
ReconcileIPAMState(ncRequests []*cns.CreateNetworkContainerRequest, podInfoByIP map[string]cns.PodInfo, nnc *v1alpha.NodeNetworkConfig) cnstypes.ResponseCode
}
// TODO(rbtr) where should this live??
// reconcileInitialCNSState initializes cns by passing pods and a CreateNetworkContainerRequest
func reconcileInitialCNSState(ctx context.Context, cli nodeNetworkConfigGetter, ncReconciler ncStateReconciler, podInfoByIPProvider cns.PodInfoByIPProvider) error {
func reconcileInitialCNSState(ctx context.Context, cli nodeNetworkConfigGetter, ipamReconciler ipamStateReconciler, podInfoByIPProvider cns.PodInfoByIPProvider) error {
// Get nnc using direct client
nnc, err := cli.Get(ctx)
if err != nil {
@ -1043,11 +1043,20 @@ func reconcileInitialCNSState(ctx context.Context, cli nodeNetworkConfigGetter,
return errors.New("failed to init CNS state: no NCs found in NNC CRD")
}
// Get previous PodInfo state from podInfoByIPProvider
podInfoByIP, err := podInfoByIPProvider.PodInfoByIP()
if err != nil {
return errors.Wrap(err, "provider failed to provide PodInfoByIP")
}
ncReqs := make([]*cns.CreateNetworkContainerRequest, len(nnc.Status.NetworkContainers))
// For each NC, we need to create a CreateNetworkContainerRequest and use it to rebuild our state.
for i := range nnc.Status.NetworkContainers {
var ncRequest *cns.CreateNetworkContainerRequest
var err error
var (
ncRequest *cns.CreateNetworkContainerRequest
err error
)
switch nnc.Status.NetworkContainers[i].AssignmentMode { //nolint:exhaustive // skipping dynamic case
case v1alpha.Static:
ncRequest, err = nncctrl.CreateNCRequestFromStaticNC(nnc.Status.NetworkContainers[i])
@ -1059,17 +1068,15 @@ func reconcileInitialCNSState(ctx context.Context, cli nodeNetworkConfigGetter,
return errors.Wrapf(err, "failed to convert NNC status to network container request, "+
"assignmentMode: %s", nnc.Status.NetworkContainers[i].AssignmentMode)
}
// Get previous PodInfo state from podInfoByIPProvider
podInfoByIP, err := podInfoByIPProvider.PodInfoByIP()
if err != nil {
return errors.Wrap(err, "provider failed to provide PodInfoByIP")
}
// Call cnsclient init cns passing those two things.
if err := restserver.ResponseCodeToError(ncReconciler.ReconcileNCState(ncRequest, podInfoByIP, nnc)); err != nil {
return errors.Wrap(err, "failed to reconcile NC state")
}
ncReqs[i] = ncRequest
}
// Call cnsclient init cns passing those two things.
if err := restserver.ResponseCodeToError(ipamReconciler.ReconcileIPAMState(ncReqs, podInfoByIP, nnc)); err != nil {
return errors.Wrap(err, "failed to reconcile CNS IPAM state")
}
return nil
}