chore: cleanups in ipampoolmonitor (#969)

This commit is contained in:
Evan Baker 2021-08-16 09:14:51 -07:00 коммит произвёл GitHub
Родитель 36c40bb873
Коммит 059b7b5d5c
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
15 изменённых файлов: 139 добавлений и 219 удалений

Просмотреть файл

@ -52,7 +52,7 @@ type HTTPService interface {
// This struct captures the state for SecondaryIPs associated to a given NC
type IPConfigurationStatus struct {
NCID string
ID string //uuid
ID string // uuid
IPAddress string
State string
PodInfo PodInfo
@ -212,13 +212,14 @@ type NodeConfiguration struct {
NodeID string
NodeSubnet Subnet
}
type IPAMPoolMonitor interface {
Start(ctx context.Context, poolMonitorRefreshMilliseconds int) error
Update(scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec) error
Update(scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec)
GetStateSnapshot() IpamPoolMonitorStateSnapshot
}
//struct to expose state values for IPAMPoolMonitor struct
// IpamPoolMonitorStateSnapshot struct to expose state values for IPAMPoolMonitor struct
type IpamPoolMonitorStateSnapshot struct {
MinimumFreeIps int64
MaximumFreeIps int64

Просмотреть файл

@ -9,7 +9,7 @@ import (
type APIClient interface {
ReconcileNCState(nc *cns.CreateNetworkContainerRequest, pods map[string]cns.PodInfo, scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec) error
CreateOrUpdateNC(nc cns.CreateNetworkContainerRequest) error
UpdateIPAMPoolMonitor(scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec) error
UpdateIPAMPoolMonitor(scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec)
GetNC(nc cns.GetNetworkContainerRequest) (cns.GetNetworkContainerResponse, error)
DeleteNC(nc cns.DeleteNetworkContainerRequest) error
}

Просмотреть файл

@ -25,9 +25,7 @@ import (
"github.com/google/uuid"
)
var (
svc *restserver.HTTPRestService
)
var svc *restserver.HTTPRestService
const (
primaryIp = "10.0.0.5"
@ -40,9 +38,7 @@ const (
initPoolSize = 10
)
var (
dnsservers = []string{"8.8.8.8", "8.8.4.4"}
)
var dnsservers = []string{"8.8.8.8", "8.8.4.4"}
func addTestStateToRestServer(t *testing.T, secondaryIps []string) {
var ipConfig cns.IPConfiguration
@ -78,10 +74,9 @@ func addTestStateToRestServer(t *testing.T, secondaryIps []string) {
t.Fatalf("Failed to createNetworkContainerRequest, req: %+v, err: %d", req, returnCode)
}
returnCode = svc.UpdateIPAMPoolMonitorInternal(fakes.NewFakeScalar(releasePercent, requestPercent, batchSize), fakes.NewFakeNodeNetworkConfigSpec(initPoolSize))
if returnCode != 0 {
t.Fatalf("Failed to UpdateIPAMPoolMonitorInternal, err: %d", returnCode)
}
svc.IPAMPoolMonitor.Update(
fakes.NewFakeScalar(releasePercent, requestPercent, batchSize),
fakes.NewFakeNodeNetworkConfigSpec(initPoolSize))
}
func getIPNetFromResponse(resp *cns.IPConfigResponse) (net.IPNet, error) {
@ -108,7 +103,8 @@ func getIPNetFromResponse(resp *cns.IPConfigResponse) (net.IPNet, error) {
func TestMain(m *testing.M) {
var (
info = &cns.SetOrchestratorTypeRequest{
OrchestratorType: cns.KubernetesCRD}
OrchestratorType: cns.KubernetesCRD,
}
body bytes.Buffer
res *http.Response
)
@ -266,7 +262,7 @@ func TestCNSClientRequestAndRelease(t *testing.T) {
if reflect.DeepEqual(desired, resultIPnet) != true {
t.Fatalf("Desired result not matching actual result, expected: %+v, actual: %+v", desired, resultIPnet)
}
//checking for allocated IP address and pod context printing before ReleaseIPAddress is called
// checking for allocated IP address and pod context printing before ReleaseIPAddress is called
ipaddresses, err := cnsClient.GetIPAddressesMatchingStates(cns.Allocated)
if err != nil {
t.Fatalf("Get allocated IP addresses failed %+v", err)
@ -311,7 +307,7 @@ func TestCNSClientPodContextApi(t *testing.T) {
t.Fatalf("get IP from CNS failed with %+v", err)
}
//test for pod ip by orch context map
// test for pod ip by orch context map
podcontext, err := cnsClient.GetPodOrchestratorContext()
if err != nil {
t.Errorf("Get pod ip by orchestrator context failed: %+v", err)
@ -351,7 +347,7 @@ func TestCNSClientDebugAPI(t *testing.T) {
t.Fatalf("get IP from CNS failed with %+v", err1)
}
//test for debug api/cmd to get inmemory data from HTTPRestService
// test for debug api/cmd to get inmemory data from HTTPRestService
inmemory, err := cnsClient.GetHTTPServiceData()
if err != nil {
t.Errorf("Get in-memory http REST Struct failed %+v", err)
@ -361,7 +357,7 @@ func TestCNSClientDebugAPI(t *testing.T) {
t.Errorf("OrchestratorContext map is expected but not returned")
}
//testing Pod IP Configuration Status values set for test
// testing Pod IP Configuration Status values set for test
podConfig := inmemory.HttpRestServiceData.PodIPConfigState
for _, v := range podConfig {
if v.IPAddress != "10.0.0.5" || v.State != "Allocated" || v.NCID != "testNcId1" {
@ -377,12 +373,12 @@ func TestCNSClientDebugAPI(t *testing.T) {
t.Errorf("IPAMPoolMonitor state is not reflecting the initial set values, %+v", testIpamPoolMonitor)
}
//check for cached NNC Spec struct values
// check for cached NNC Spec struct values
if testIpamPoolMonitor.CachedNNC.Spec.RequestedIPCount != 16 || len(testIpamPoolMonitor.CachedNNC.Spec.IPsNotInUse) != 1 {
t.Errorf("IPAMPoolMonitor cached NNC Spec is not reflecting the initial set values, %+v", testIpamPoolMonitor.CachedNNC.Spec)
}
//check for cached NNC Status struct values
// check for cached NNC Status struct values
if testIpamPoolMonitor.CachedNNC.Status.Scaler.BatchSize != 10 || testIpamPoolMonitor.CachedNNC.Status.Scaler.ReleaseThresholdPercent != 50 || testIpamPoolMonitor.CachedNNC.Status.Scaler.RequestThresholdPercent != 40 {
t.Errorf("IPAMPoolMonitor cached NNC Status is not reflecting the initial set values, %+v", testIpamPoolMonitor.CachedNNC.Status.Scaler)
}
@ -395,5 +391,4 @@ func TestCNSClientDebugAPI(t *testing.T) {
t.Logf("PodIPIDByOrchestratorContext: %+v", inmemory.HttpRestServiceData.PodIPIDByPodInterfaceKey)
t.Logf("PodIPConfigState: %+v", inmemory.HttpRestServiceData.PodIPConfigState)
t.Logf("IPAMPoolMonitor: %+v", inmemory.HttpRestServiceData.IPAMPoolMonitor)
}

Просмотреть файл

@ -27,14 +27,8 @@ func (client *Client) CreateOrUpdateNC(ncRequest cns.CreateNetworkContainerReque
}
// UpdateIPAMPoolMonitor updates IPAM pool monitor.
func (client *Client) UpdateIPAMPoolMonitor(scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec) error {
returnCode := client.RestService.UpdateIPAMPoolMonitorInternal(scalar, spec)
if returnCode != 0 {
return fmt.Errorf("Failed to update IPAM pool monitor scalar: %+v, spec: %+v, errorCode: %d", scalar, spec, returnCode)
}
return nil
func (client *Client) UpdateIPAMPoolMonitor(scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec) {
client.RestService.IPAMPoolMonitor.Update(scalar, spec)
}
// ReconcileNCState initializes cns state

Просмотреть файл

@ -13,9 +13,6 @@ import (
nnc "github.com/Azure/azure-container-networking/nodenetworkconfig/api/v1alpha"
)
// available IP's stack
// all IP's map
type StringStack struct {
lock sync.Mutex // you don't have to do this if you don't want thread safety
items []string
@ -82,19 +79,17 @@ func NewIPStateManager() IPStateManager {
func (ipm *IPStateManager) AddIPConfigs(ipconfigs []cns.IPConfigurationStatus) {
ipm.Lock()
defer ipm.Unlock()
for i := 0; i < len(ipconfigs); i++ {
switch {
case ipconfigs[i].State == cns.PendingProgramming:
ipm.PendingProgramIPConfigState[ipconfigs[i].ID] = ipconfigs[i]
case ipconfigs[i].State == cns.Available:
ipm.AvailableIPConfigState[ipconfigs[i].ID] = ipconfigs[i]
ipm.AvailableIPIDStack.Push(ipconfigs[i].ID)
case ipconfigs[i].State == cns.Allocated:
ipm.AllocatedIPConfigState[ipconfigs[i].ID] = ipconfigs[i]
case ipconfigs[i].State == cns.PendingRelease:
ipm.PendingReleaseIPConfigState[ipconfigs[i].ID] = ipconfigs[i]
for _, ipconfig := range ipconfigs {
switch ipconfig.State {
case cns.PendingProgramming:
ipm.PendingProgramIPConfigState[ipconfig.ID] = ipconfig
case cns.Available:
ipm.AvailableIPConfigState[ipconfig.ID] = ipconfig
ipm.AvailableIPIDStack.Push(ipconfig.ID)
case cns.Allocated:
ipm.AllocatedIPConfigState[ipconfig.ID] = ipconfig
case cns.PendingRelease:
ipm.PendingReleaseIPConfigState[ipconfig.ID] = ipconfig
}
}
}
@ -102,9 +97,8 @@ func (ipm *IPStateManager) AddIPConfigs(ipconfigs []cns.IPConfigurationStatus) {
func (ipm *IPStateManager) RemovePendingReleaseIPConfigs(ipconfigNames []string) {
ipm.Lock()
defer ipm.Unlock()
for i := 0; i < len(ipconfigNames); i++ {
delete(ipm.PendingReleaseIPConfigState, ipconfigNames[i])
for _, name := range ipconfigNames {
delete(ipm.PendingReleaseIPConfigState, name)
}
}
@ -133,20 +127,18 @@ func (ipm *IPStateManager) MarkIPAsPendingRelease(numberOfIPsToMark int) (map[st
ipm.Lock()
defer ipm.Unlock()
var (
err error
)
var err error
pendingRelease := make(map[string]cns.IPConfigurationStatus)
pendingReleaseIPs := make(map[string]cns.IPConfigurationStatus)
defer func() {
// if there was an error, and not all ip's have been freed, restore state
if err != nil && len(pendingRelease) != numberOfIPsToMark {
for uuid, ipState := range pendingRelease {
if err != nil && len(pendingReleaseIPs) != numberOfIPsToMark {
for uuid, ipState := range pendingReleaseIPs {
ipState.State = cns.Available
ipm.AvailableIPIDStack.Push(pendingRelease[uuid].ID)
ipm.AvailableIPConfigState[pendingRelease[uuid].ID] = ipState
delete(ipm.PendingReleaseIPConfigState, pendingRelease[uuid].ID)
ipm.AvailableIPIDStack.Push(pendingReleaseIPs[uuid].ID)
ipm.AvailableIPConfigState[pendingReleaseIPs[uuid].ID] = ipState
delete(ipm.PendingReleaseIPConfigState, pendingReleaseIPs[uuid].ID)
}
}
}()
@ -160,17 +152,17 @@ func (ipm *IPStateManager) MarkIPAsPendingRelease(numberOfIPsToMark int) (map[st
// add all pending release to a slice
ipConfig := ipm.AvailableIPConfigState[id]
ipConfig.State = cns.PendingRelease
pendingRelease[id] = ipConfig
pendingReleaseIPs[id] = ipConfig
delete(ipm.AvailableIPConfigState, id)
}
// if no errors at this point, add the pending ips to the Pending state
for i := range pendingRelease {
ipm.PendingReleaseIPConfigState[pendingRelease[i].ID] = pendingRelease[i]
for _, pendingReleaseIP := range pendingReleaseIPs {
ipm.PendingReleaseIPConfigState[pendingReleaseIP.ID] = pendingReleaseIP
}
return pendingRelease, nil
return pendingReleaseIPs, nil
}
var _ cns.HTTPService = (*HTTPServiceFake)(nil)
@ -181,11 +173,9 @@ type HTTPServiceFake struct {
}
func NewHTTPServiceFake() *HTTPServiceFake {
svc := &HTTPServiceFake{
return &HTTPServiceFake{
IPStateManager: NewIPStateManager(),
}
return svc
}
func (fake *HTTPServiceFake) SetNumberOfAllocatedIPs(desiredAllocatedIPCount int) error {
@ -193,29 +183,26 @@ func (fake *HTTPServiceFake) SetNumberOfAllocatedIPs(desiredAllocatedIPCount int
delta := (desiredAllocatedIPCount - currentAllocatedIPCount)
if delta > 0 {
// allocated IPs
for i := 0; i < delta; i++ {
if _, err := fake.IPStateManager.ReserveIPConfig(); err != nil {
return err
}
}
} else if delta < 0 {
// deallocate IP's
delta *= -1
i := 0
for id := range fake.IPStateManager.AllocatedIPConfigState {
if i < delta {
if _, err := fake.IPStateManager.ReleaseIPConfig(id); err != nil {
return err
}
} else {
break
}
i++
}
return nil
}
// deallocate IPs
delta *= -1
i := 0
for id := range fake.IPStateManager.AllocatedIPConfigState {
if i >= delta {
break
}
if _, err := fake.IPStateManager.ReleaseIPConfig(id); err != nil {
return err
}
i++
}
return nil
}
@ -235,7 +222,6 @@ func (fake *HTTPServiceFake) GetPendingProgramIPConfigs() []cns.IPConfigurationS
for _, ipconfig := range fake.IPStateManager.PendingProgramIPConfigState {
ipconfigs = append(ipconfigs, ipconfig)
}
return ipconfigs
}
@ -244,7 +230,6 @@ func (fake *HTTPServiceFake) GetAvailableIPConfigs() []cns.IPConfigurationStatus
for _, ipconfig := range fake.IPStateManager.AvailableIPConfigState {
ipconfigs = append(ipconfigs, ipconfig)
}
return ipconfigs
}
@ -253,7 +238,6 @@ func (fake *HTTPServiceFake) GetAllocatedIPConfigs() []cns.IPConfigurationStatus
for _, ipconfig := range fake.IPStateManager.AllocatedIPConfigState {
ipconfigs = append(ipconfigs, ipconfig)
}
return ipconfigs
}
@ -262,7 +246,6 @@ func (fake *HTTPServiceFake) GetPendingReleaseIPConfigs() []cns.IPConfigurationS
for _, ipconfig := range fake.IPStateManager.PendingReleaseIPConfigState {
ipconfigs = append(ipconfigs, ipconfig)
}
return ipconfigs
}
@ -272,15 +255,12 @@ func (fake *HTTPServiceFake) GetPodIPConfigState() map[string]cns.IPConfiguratio
for key, val := range fake.IPStateManager.AllocatedIPConfigState {
ipconfigs[key] = val
}
for key, val := range fake.IPStateManager.AvailableIPConfigState {
ipconfigs[key] = val
}
for key, val := range fake.IPStateManager.PendingReleaseIPConfigState {
ipconfigs[key] = val
}
return ipconfigs
}

Просмотреть файл

@ -4,7 +4,6 @@ import (
"context"
"github.com/Azure/azure-container-networking/cns"
nnc "github.com/Azure/azure-container-networking/nodenetworkconfig/api/v1alpha"
)
@ -15,17 +14,11 @@ type IPAMPoolMonitorFake struct {
FakecachedNNC nnc.NodeNetworkConfig
}
func NewIPAMPoolMonitorFake() *IPAMPoolMonitorFake {
return &IPAMPoolMonitorFake{}
}
func (ipm *IPAMPoolMonitorFake) Start(ctx context.Context, poolMonitorRefreshMilliseconds int) error {
return nil
}
func (ipm *IPAMPoolMonitorFake) Update(scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec) error {
return nil
}
func (ipm *IPAMPoolMonitorFake) Update(scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec) {}
func (ipm *IPAMPoolMonitorFake) Reconcile() error {
return nil

Просмотреть файл

@ -12,9 +12,7 @@ import (
nnc "github.com/Azure/azure-container-networking/nodenetworkconfig/api/v1alpha"
)
const (
defaultMaxIPCount = int64(250)
)
const defaultMaxIPCount = int64(250)
type CNSIPAMPoolMonitor struct {
MaximumFreeIps int64
@ -60,7 +58,7 @@ func (pm *CNSIPAMPoolMonitor) Reconcile(ctx context.Context) error {
pendingReleaseIPCount := len(pm.httpService.GetPendingReleaseIPConfigs())
availableIPConfigCount := len(pm.httpService.GetAvailableIPConfigs()) // TODO: add pending allocation count to real cns
freeIPConfigCount := pm.cachedNNC.Spec.RequestedIPCount - int64(allocatedPodIPCount)
batchSize := pm.getBatchSize() //Use getters in case customer changes batchsize manually
batchSize := pm.getBatchSize() // Use getters in case customer changes batchsize manually
maxIPCount := pm.getMaxIPCount()
msg := fmt.Sprintf("[ipam-pool-monitor] Pool Size: %v, Goal Size: %v, BatchSize: %v, MaxIPCount: %v, MinFree: %v, MaxFree:%v, Allocated: %v, Available: %v, Pending Release: %v, Free: %v, Pending Program: %v",
@ -98,12 +96,10 @@ func (pm *CNSIPAMPoolMonitor) Reconcile(ctx context.Context) error {
}
func (pm *CNSIPAMPoolMonitor) increasePoolSize(ctx context.Context) error {
defer pm.mu.Unlock()
pm.mu.Lock()
defer pm.mu.Unlock()
var err error
var tempNNCSpec nnc.NodeNetworkConfigSpec
tempNNCSpec = pm.createNNCSpecForCRD()
tempNNCSpec := pm.createNNCSpecForCRD()
// Query the max IP count
maxIPCount := pm.getMaxIPCount()
@ -125,7 +121,7 @@ func (pm *CNSIPAMPoolMonitor) increasePoolSize(ctx context.Context) error {
logger.Printf("[ipam-pool-monitor] Increasing pool size, Current Pool Size: %v, Updated Requested IP Count: %v, Pods with IP's:%v, ToBeDeleted Count: %v", len(pm.httpService.GetPodIPConfigState()), tempNNCSpec.RequestedIPCount, len(pm.httpService.GetAllocatedIPConfigs()), len(tempNNCSpec.IPsNotInUse))
err = pm.rc.UpdateCRDSpec(ctx, tempNNCSpec)
err := pm.rc.UpdateCRDSpec(ctx, tempNNCSpec)
if err != nil {
// caller will retry to update the CRD again
return err
@ -138,15 +134,13 @@ func (pm *CNSIPAMPoolMonitor) increasePoolSize(ctx context.Context) error {
}
func (pm *CNSIPAMPoolMonitor) decreasePoolSize(ctx context.Context, existingPendingReleaseIPCount int) error {
defer pm.mu.Unlock()
pm.mu.Lock()
defer pm.mu.Unlock()
// mark n number of IP's as pending
var err error
var newIpsMarkedAsPending bool
var pendingIpAddresses map[string]cns.IPConfigurationStatus
var pendingIPAddresses map[string]cns.IPConfigurationStatus
var updatedRequestedIPCount int64
var decreaseIPCountBy int64
// Ensure the updated requested IP count is a multiple of the batch size
previouslyRequestedIPCount := pm.cachedNNC.Spec.RequestedIPCount
@ -166,14 +160,15 @@ func (pm *CNSIPAMPoolMonitor) decreasePoolSize(ctx context.Context, existingPend
updatedRequestedIPCount = previouslyRequestedIPCount - batchSize
}
decreaseIPCountBy = previouslyRequestedIPCount - updatedRequestedIPCount
decreaseIPCountBy := previouslyRequestedIPCount - updatedRequestedIPCount
logger.Printf("[ipam-pool-monitor] updatedRequestedIPCount %v", updatedRequestedIPCount)
if pm.updatingIpsNotInUseCount == 0 ||
pm.updatingIpsNotInUseCount < existingPendingReleaseIPCount {
logger.Printf("[ipam-pool-monitor] Marking IPs as PendingRelease, ipsToBeReleasedCount %d", int(decreaseIPCountBy))
pendingIpAddresses, err = pm.httpService.MarkIPAsPendingRelease(int(decreaseIPCountBy))
var err error
pendingIPAddresses, err = pm.httpService.MarkIPAsPendingRelease(int(decreaseIPCountBy))
if err != nil {
return err
}
@ -181,20 +176,20 @@ func (pm *CNSIPAMPoolMonitor) decreasePoolSize(ctx context.Context, existingPend
newIpsMarkedAsPending = true
}
var tempNNCSpec nnc.NodeNetworkConfigSpec
tempNNCSpec = pm.createNNCSpecForCRD()
tempNNCSpec := pm.createNNCSpecForCRD()
if newIpsMarkedAsPending {
// cache the updatingPendingRelease so that we dont re-set new IPs to PendingRelease in case UpdateCRD call fails
pm.updatingIpsNotInUseCount = len(tempNNCSpec.IPsNotInUse)
}
logger.Printf("[ipam-pool-monitor] Releasing IPCount in this batch %d, updatingPendingIpsNotInUse count %d", len(pendingIpAddresses), pm.updatingIpsNotInUseCount)
logger.Printf("[ipam-pool-monitor] Releasing IPCount in this batch %d, updatingPendingIpsNotInUse count %d",
len(pendingIPAddresses), pm.updatingIpsNotInUseCount)
tempNNCSpec.RequestedIPCount -= int64(len(pendingIpAddresses))
tempNNCSpec.RequestedIPCount -= int64(len(pendingIPAddresses))
logger.Printf("[ipam-pool-monitor] Decreasing pool size, Current Pool Size: %v, Requested IP Count: %v, Pods with IP's: %v, ToBeDeleted Count: %v", len(pm.httpService.GetPodIPConfigState()), tempNNCSpec.RequestedIPCount, len(pm.httpService.GetAllocatedIPConfigs()), len(tempNNCSpec.IPsNotInUse))
err = pm.rc.UpdateCRDSpec(ctx, tempNNCSpec)
err := pm.rc.UpdateCRDSpec(ctx, tempNNCSpec)
if err != nil {
// caller will retry to update the CRD again
return err
@ -212,17 +207,15 @@ func (pm *CNSIPAMPoolMonitor) decreasePoolSize(ctx context.Context, existingPend
return nil
}
// if cns pending ip release map is empty, request controller has already reconciled the CNS state,
// so we can remove it from our cache and remove the IP's from the CRD
// cleanPendingRelease removes IPs from the cache and CRD if the request controller has reconciled
// CNS state and the pending IP release map is empty.
func (pm *CNSIPAMPoolMonitor) cleanPendingRelease(ctx context.Context) error {
defer pm.mu.Unlock()
pm.mu.Lock()
defer pm.mu.Unlock()
var err error
var tempNNCSpec nnc.NodeNetworkConfigSpec
tempNNCSpec = pm.createNNCSpecForCRD()
tempNNCSpec := pm.createNNCSpecForCRD()
err = pm.rc.UpdateCRDSpec(ctx, tempNNCSpec)
err := pm.rc.UpdateCRDSpec(ctx, tempNNCSpec)
if err != nil {
// caller will retry to update the CRD again
return err
@ -235,13 +228,11 @@ func (pm *CNSIPAMPoolMonitor) cleanPendingRelease(ctx context.Context) error {
return nil
}
// CNSToCRDSpec translates CNS's map of Ips to be released and requested ip count into a CRD Spec
// createNNCSpecForCRD translates CNS's map of IPs to be released and requested IP count into an NNC Spec.
func (pm *CNSIPAMPoolMonitor) createNNCSpecForCRD() nnc.NodeNetworkConfigSpec {
var (
spec nnc.NodeNetworkConfigSpec
)
var spec nnc.NodeNetworkConfigSpec
// DUpdate the count from cached spec
// Update the count from cached spec
spec.RequestedIPCount = pm.cachedNNC.Spec.RequestedIPCount
// Get All Pending IPs from CNS and populate it again.
@ -254,9 +245,9 @@ func (pm *CNSIPAMPoolMonitor) createNNCSpecForCRD() nnc.NodeNetworkConfigSpec {
}
// UpdatePoolLimitsTransacted called by request controller on reconcile to set the batch size limits
func (pm *CNSIPAMPoolMonitor) Update(scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec) error {
defer pm.mu.Unlock()
func (pm *CNSIPAMPoolMonitor) Update(scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec) {
pm.mu.Lock()
defer pm.mu.Unlock()
pm.scalarUnits = scalar
@ -267,8 +258,6 @@ func (pm *CNSIPAMPoolMonitor) Update(scalar nnc.Scaler, spec nnc.NodeNetworkConf
logger.Printf("[ipam-pool-monitor] Update spec %+v, pm.MinimumFreeIps %d, pm.MaximumFreeIps %d",
pm.cachedNNC.Spec, pm.MinimumFreeIps, pm.MaximumFreeIps)
return nil
}
func (pm *CNSIPAMPoolMonitor) getMaxIPCount() int64 {
@ -286,7 +275,7 @@ func (pm *CNSIPAMPoolMonitor) getBatchSize() int64 {
return pm.scalarUnits.BatchSize
}
//this function sets the values for state in IPAMPoolMonitor Struct
// GetStateSnapshot gets a snapshot of the IPAMPoolMonitor struct.
func (pm *CNSIPAMPoolMonitor) GetStateSnapshot() cns.IpamPoolMonitorStateSnapshot {
pm.mu.Lock()
defer pm.mu.Unlock()

Просмотреть файл

@ -8,6 +8,7 @@ MOCKGEN = $(TOOLS_BIN_DIR)/mockgen
generate: $(MOCKGEN) ## Generate mock clients
$(MOCKGEN) -source=$(REPO_ROOT)/cns/cnsclient/apiclient.go -package=mockclients APIClient > cnsclient.go
$(MOCKGEN) -source=$(REPO_ROOT)/vendor/sigs.k8s.io/controller-runtime/pkg/client/interfaces.go -package=mockclients Client > kubeclient.go
@sed -i s,$(REPO_ROOT)/,,g cnsclient.go kubeclient.go
$(MOCKGEN):
@make -C $(REPO_ROOT) $(MOCKGEN)

Просмотреть файл

@ -93,11 +93,9 @@ func (mr *MockAPIClientMockRecorder) ReconcileNCState(nc, pods, scalar, spec int
}
// UpdateIPAMPoolMonitor mocks base method.
func (m *MockAPIClient) UpdateIPAMPoolMonitor(scalar v1alpha.Scaler, spec v1alpha.NodeNetworkConfigSpec) error {
func (m *MockAPIClient) UpdateIPAMPoolMonitor(scalar v1alpha.Scaler, spec v1alpha.NodeNetworkConfigSpec) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UpdateIPAMPoolMonitor", scalar, spec)
ret0, _ := ret[0].(error)
return ret0
m.ctrl.Call(m, "UpdateIPAMPoolMonitor", scalar, spec)
}
// UpdateIPAMPoolMonitor indicates an expected call of UpdateIPAMPoolMonitor.

Просмотреть файл

@ -37,6 +37,7 @@ type IPAddress struct {
Address string `xml:"Address,attr"`
IsPrimary bool `xml:"IsPrimary,attr"`
}
type IPSubnet struct {
XMLName xml.Name `xml:"IPSubnet"`
Prefix string `xml:"Prefix,attr"`
@ -61,19 +62,22 @@ var (
mux *http.ServeMux
hostQueryResponse = xmlDocument{
XMLName: xml.Name{Local: "Interfaces"},
Interface: []Interface{Interface{
Interface: []Interface{{
XMLName: xml.Name{Local: "Interface"},
MacAddress: "*",
IsPrimary: true,
IPSubnet: []IPSubnet{
IPSubnet{XMLName: xml.Name{Local: "IPSubnet"},
Prefix: "10.0.0.0/16",
{
XMLName: xml.Name{Local: "IPSubnet"},
Prefix: "10.0.0.0/16",
IPAddress: []IPAddress{
IPAddress{
{
XMLName: xml.Name{Local: "IPAddress"},
Address: "10.0.0.4",
IsPrimary: true},
}},
IsPrimary: true,
},
},
},
},
}},
}
@ -287,7 +291,6 @@ func TestCreateNetworkContainer(t *testing.T) {
t.Errorf("Failed to delete the saved goal state due to error: %+v", err)
t.Fatal(err)
}
}
func TestGetNetworkContainerByOrchestratorContext(t *testing.T) {
@ -519,7 +522,6 @@ func createNC(
func TestPublishNCViaCNS(t *testing.T) {
fmt.Println("Test: publishNetworkContainer")
publishNCViaCNS(t, "vnet1", "ethWebApp")
}
func publishNCViaCNS(t *testing.T,
@ -637,7 +639,6 @@ func TestNmAgentSupportedApisHandler(t *testing.T) {
// Since we are testing the NMAgent API in internalapi_test, we will skip POST call
// and test other paths
fmt.Printf("nmAgentSupportedApisHandler Responded with %+v\n", nmAgentSupportedApisResponse)
}
func TestCreateHostNCApipaEndpoint(t *testing.T) {
@ -667,7 +668,6 @@ func TestCreateHostNCApipaEndpoint(t *testing.T) {
}
fmt.Printf("createHostNCApipaEndpoint Responded with %+v\n", createHostNCApipaEndpointResponse)
}
func setOrchestratorType(t *testing.T, orchestratorType string) error {
@ -930,7 +930,7 @@ func startService() error {
return err
}
svc.IPAMPoolMonitor = fakes.NewIPAMPoolMonitorFake()
svc.IPAMPoolMonitor = &fakes.IPAMPoolMonitorFake{}
if service != nil {
// Create empty azure-cns.json. CNS should start successfully by deleting this file

Просмотреть файл

@ -225,10 +225,7 @@ func (service *HTTPRestService) ReconcileNCState(
if returnCode != types.Success {
return returnCode
}
returnCode = service.UpdateIPAMPoolMonitorInternal(scalar, spec)
if returnCode != types.Success {
return returnCode
}
service.IPAMPoolMonitor.Update(scalar, spec)
// now parse the secondaryIP list, if it exists in PodInfo list, then allocate that ip
for _, secIpConfig := range ncRequest.SecondaryIPConfigs {
@ -358,15 +355,3 @@ func (service *HTTPRestService) CreateOrUpdateNetworkContainerInternal(
return returnCode
}
func (service *HTTPRestService) UpdateIPAMPoolMonitorInternal(
scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec,
) types.ResponseCode {
if err := service.IPAMPoolMonitor.Update(scalar, spec); err != nil {
logger.Errorf("[cns-rc] Error creating or updating IPAM Pool Monitor: %v", err)
// requeue
return types.UnexpectedError
}
return 0
}

Просмотреть файл

@ -203,6 +203,7 @@ func createNCReqeustForSyncHostNCVersion(t *testing.T) cns.CreateNetworkContaine
req := createNCReqInternal(t, secondaryIPConfigs, ncID, strconv.Itoa(ncVersion))
return req
}
func TestReconcileNCWithEmptyState(t *testing.T) {
restartService()
setEnv(t)
@ -225,7 +226,7 @@ func TestReconcileNCWithExistingState(t *testing.T) {
secondaryIPConfigs := make(map[string]cns.SecondaryIPConfig)
var startingIndex = 6
startingIndex := 6
for i := 0; i < 4; i++ {
ipaddress := "10.0.0." + strconv.Itoa(startingIndex)
secIpConfig := newSecondaryIPConfig(ipaddress, -1)
@ -258,7 +259,7 @@ func TestReconcileNCWithExistingStateFromInterfaceID(t *testing.T) {
secondaryIPConfigs := make(map[string]cns.SecondaryIPConfig)
var startingIndex = 6
startingIndex := 6
for i := 0; i < 4; i++ {
ipaddress := "10.0.0." + strconv.Itoa(startingIndex)
secIpConfig := newSecondaryIPConfig(ipaddress, -1)
@ -289,7 +290,7 @@ func TestReconcileNCWithSystemPods(t *testing.T) {
secondaryIPConfigs := make(map[string]cns.SecondaryIPConfig)
var startingIndex = 6
startingIndex := 6
for i := 0; i < 4; i++ {
ipaddress := "10.0.0." + strconv.Itoa(startingIndex)
secIpConfig := newSecondaryIPConfig(ipaddress, -1)
@ -324,7 +325,7 @@ func validateCreateNCInternal(t *testing.T, secondaryIpCount int, ncVersion stri
secondaryIPConfigs := make(map[string]cns.SecondaryIPConfig)
ncId := "testNc1"
ncVersionInInt, _ := strconv.Atoi(ncVersion)
var startingIndex = 6
startingIndex := 6
for i := 0; i < secondaryIpCount; i++ {
ipaddress := "10.0.0." + strconv.Itoa(startingIndex)
secIpConfig := newSecondaryIPConfig(ipaddress, ncVersionInInt)
@ -340,7 +341,7 @@ func validateCreateOrUpdateNCInternal(t *testing.T, secondaryIpCount int, ncVers
secondaryIPConfigs := make(map[string]cns.SecondaryIPConfig)
ncId := "testNc1"
ncVersionInInt, _ := strconv.Atoi(ncVersion)
var startingIndex = 6
startingIndex := 6
for i := 0; i < secondaryIpCount; i++ {
ipaddress := "10.0.0." + strconv.Itoa(startingIndex)
secIpConfig := newSecondaryIPConfig(ipaddress, ncVersionInInt)
@ -365,7 +366,7 @@ func validateCreateOrUpdateNCInternal(t *testing.T, secondaryIpCount int, ncVers
// now Scale down, delete 3 ipaddresses from secondaryIpConfig req
fmt.Println("Validate Scale down")
var count = 0
count := 0
for ipid := range secondaryIPConfigs {
delete(secondaryIPConfigs, ipid)
count++
@ -392,10 +393,9 @@ func createAndValidateNCRequest(t *testing.T, secondaryIPConfigs map[string]cns.
if returnCode != 0 {
t.Fatalf("Failed to createNetworkContainerRequest, req: %+v, err: %d", req, returnCode)
}
returnCode = svc.UpdateIPAMPoolMonitorInternal(fakes.NewFakeScalar(releasePercent, requestPercent, batchSize), fakes.NewFakeNodeNetworkConfigSpec(initPoolSize))
if returnCode != 0 {
t.Fatalf("Failed to UpdateIPAMPoolMonitorInternal, err: %d", returnCode)
}
svc.IPAMPoolMonitor.Update(
fakes.NewFakeScalar(releasePercent, requestPercent, batchSize),
fakes.NewFakeNodeNetworkConfigSpec(initPoolSize))
validateNetworkRequest(t, req)
}
@ -429,7 +429,7 @@ func validateNetworkRequest(t *testing.T, req cns.CreateNetworkContainerRequest)
expectedIPStatus = cns.Available
}
t.Logf("NC version in container status is %s, HostVersion is %s", containerStatus.CreateNetworkContainerRequest.Version, containerStatus.HostVersion)
var alreadyValidated = make(map[string]string)
alreadyValidated := make(map[string]string)
for ipid, ipStatus := range svc.PodIPConfigState {
if ipaddress, found := alreadyValidated[ipid]; !found {
if secondaryIpConfig, ok := req.SecondaryIPConfigs[ipid]; !ok {
@ -557,10 +557,9 @@ func createNCReqInternal(t *testing.T, secondaryIPConfigs map[string]cns.Seconda
if returnCode != 0 {
t.Fatalf("Failed to createNetworkContainerRequest, req: %+v, err: %d", req, returnCode)
}
returnCode = svc.UpdateIPAMPoolMonitorInternal(fakes.NewFakeScalar(releasePercent, requestPercent, batchSize), fakes.NewFakeNodeNetworkConfigSpec(initPoolSize))
if returnCode != 0 {
t.Fatalf("Failed to UpdateIPAMPoolMonitorInternal, err: %d", returnCode)
}
svc.IPAMPoolMonitor.Update(
fakes.NewFakeScalar(releasePercent, requestPercent, batchSize),
fakes.NewFakeNodeNetworkConfigSpec(initPoolSize))
return req
}

Просмотреть файл

@ -36,7 +36,7 @@ func getTestService() *HTTPRestService {
var config common.ServiceConfig
httpsvc, _ := NewHTTPRestService(&config, fakes.NewFakeImdsClient(), fakes.NewFakeNMAgentClient())
svc = httpsvc.(*HTTPRestService)
svc.IPAMPoolMonitor = fakes.NewIPAMPoolMonitorFake()
svc.IPAMPoolMonitor = &fakes.IPAMPoolMonitorFake{}
setOrchestratorTypeInternal(cns.KubernetesCRD)
return svc
@ -525,7 +525,6 @@ func TestAvailableIPConfigs(t *testing.T) {
desiredAllocatedIpConfigs[desiredState.ID] = desiredState
allocatedIps = svc.GetAllocatedIPConfigs()
validateIpState(t, allocatedIps, desiredAllocatedIpConfigs)
}
func validateIpState(t *testing.T, actualIps []cns.IPConfigurationStatus, expectedList map[string]cns.IPConfigurationStatus) {
@ -613,10 +612,9 @@ func TestIPAMMarkIPAsPendingWithPendingProgrammingIPs(t *testing.T) {
if returnCode != 0 {
t.Fatalf("Failed to createNetworkContainerRequest, req: %+v, err: %d", req, returnCode)
}
returnCode = svc.UpdateIPAMPoolMonitorInternal(fakes.NewFakeScalar(releasePercent, requestPercent, batchSize), fakes.NewFakeNodeNetworkConfigSpec(initPoolSize))
if returnCode != 0 {
t.Fatalf("Failed to UpdateIPAMPoolMonitorInternal, req: %+v, err: %d", req, returnCode)
}
svc.IPAMPoolMonitor.Update(
fakes.NewFakeScalar(releasePercent, requestPercent, batchSize),
fakes.NewFakeNodeNetworkConfigSpec(initPoolSize))
// Release pending programming IPs
ips, err := svc.MarkIPAsPendingRelease(2)

Просмотреть файл

@ -29,7 +29,7 @@ func (r *CrdReconciler) Reconcile(ctx context.Context, request reconcile.Request
err error
)
//Get the CRD object
// Get the CRD object
if err = r.KubeClient.Get(ctx, request.NamespacedName, &nodeNetConfig); err != nil {
if apierrors.IsNotFound(err) {
logger.Printf("[cns-rc] CRD not found, ignoring %v", err)
@ -63,7 +63,7 @@ func (r *CrdReconciler) Reconcile(ctx context.Context, request reconcile.Request
ncRequest, err = CRDStatusToNCRequest(nodeNetConfig.Status)
if err != nil {
logger.Errorf("[cns-rc] Error translating crd status to nc request %v", err)
//requeue
// requeue
return reconcile.Result{}, err
}
@ -73,12 +73,7 @@ func (r *CrdReconciler) Reconcile(ctx context.Context, request reconcile.Request
return reconcile.Result{}, err
}
if err = r.CNSClient.UpdateIPAMPoolMonitor(nodeNetConfig.Status.Scaler, nodeNetConfig.Spec); err != nil {
logger.Errorf("[cns-rc] Error update IPAM pool monitor in reconcile: %v", err)
// requeue
return reconcile.Result{}, err
}
r.CNSClient.UpdateIPAMPoolMonitor(nodeNetConfig.Status.Scaler, nodeNetConfig.Spec)
return reconcile.Result{}, err
}

Просмотреть файл

@ -39,7 +39,7 @@ type MockAPI struct {
pods map[MockKey]*corev1.Pod
}
//MockKey is the key to the mockAPI, namespace+"/"+name like in API server
// MockKey is the key to the mockAPI, namespace+"/"+name like in API server
type MockKey struct {
Namespace string
Name string
@ -67,8 +67,8 @@ func (mc MockKubeClient) Get(ctx context.Context, key client.ObjectKey, obj clie
return nil
}
//Mock implementation of the KubeClient interface Update method
//Mimics that of controller-runtime's client.Client
// Mock implementation of the KubeClient interface Update method
// Mimics that of controller-runtime's client.Client
func (mc MockKubeClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error {
nodeNetConfig := obj.(*nnc.NodeNetworkConfig)
@ -102,9 +102,7 @@ func (mi *MockCNSClient) CreateOrUpdateNC(ncRequest cns.CreateNetworkContainerRe
return nil
}
func (mi *MockCNSClient) UpdateIPAMPoolMonitor(scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec) error {
return nil
}
func (mi *MockCNSClient) UpdateIPAMPoolMonitor(scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec) {}
func (mi *MockCNSClient) DeleteNC(nc cns.DeleteNetworkContainerRequest) error {
return nil
@ -145,7 +143,6 @@ func (mc *MockDirectCRDClient) Get(ctx context.Context, name, namespace, typeNam
}
return nodeNetConfig, nil
}
// MockDirectAPIClient implements the DirectAPIClient interface
@ -175,8 +172,9 @@ func (mc *MockDirectAPIClient) ListPods(ctx context.Context, namespace, node str
return &pods, nil
}
func TestNewCrdRequestController(t *testing.T) {
//Test making request controller without logger initialized, should fail
// Test making request controller without logger initialized, should fail
_, err := New(Config{})
if err == nil {
t.Fatalf("Expected error when making NewCrdRequestController without initializing logger, got nil error")
@ -184,11 +182,11 @@ func TestNewCrdRequestController(t *testing.T) {
t.Fatalf("Expected logger error when making NewCrdRequestController without initializing logger, got: %+v", err)
}
//Initialize logger
// Initialize logger
logger.InitLogger("Azure CRD Request Controller", 3, 3, "")
//Test making request controller without NODENAME env var set, should fail
//Save old value though
// Test making request controller without NODENAME env var set, should fail
// Save old value though
nodeName, found := os.LookupEnv(nodeNameEnvVar)
os.Unsetenv(nodeNameEnvVar)
defer func() {
@ -204,7 +202,7 @@ func TestNewCrdRequestController(t *testing.T) {
t.Fatalf("Expected error when making NewCrdRequestController without setting "+nodeNameEnvVar+" env var, got: %+v", err)
}
//TODO: Create integration tests with minikube
// TODO: Create integration tests with minikube
}
func TestGetNonExistingNodeNetConfig(t *testing.T) {
@ -231,12 +229,11 @@ func TestGetNonExistingNodeNetConfig(t *testing.T) {
}
logger.InitLogger("Azure CNS RequestController", 0, 0, "")
//Test getting nonexisting NodeNetconfig obj
// Test getting nonexisting NodeNetconfig obj
_, err := rc.getNodeNetConfig(context.Background(), nonexistingNNCName, nonexistingNamespace)
if err == nil {
t.Fatalf("Expected error when getting nonexisting nodenetconfig obj. Got nil error.")
}
}
func TestGetExistingNodeNetConfig(t *testing.T) {
@ -263,7 +260,7 @@ func TestGetExistingNodeNetConfig(t *testing.T) {
}
logger.InitLogger("Azure CNS RequestController", 0, 0, "")
//Test getting existing NodeNetConfig obj
// Test getting existing NodeNetConfig obj
nodeNetConfig, err := rc.getNodeNetConfig(context.Background(), existingNNCName, existingNamespace)
if err != nil {
t.Fatalf("Expected no error when getting existing NodeNetworkConfig: %+v", err)
@ -298,7 +295,7 @@ func TestUpdateNonExistingNodeNetConfig(t *testing.T) {
}
logger.InitLogger("Azure CNS RequestController", 0, 0, "")
//Test updating non existing NodeNetworkConfig obj
// Test updating non existing NodeNetworkConfig obj
nodeNetConfigNonExisting := &nnc.NodeNetworkConfig{ObjectMeta: metav1.ObjectMeta{
Name: nonexistingNNCName,
Namespace: nonexistingNamespace,
@ -336,7 +333,7 @@ func TestUpdateExistingNodeNetConfig(t *testing.T) {
}
logger.InitLogger("Azure CNS RequestController", 0, 0, "")
//Update an existing NodeNetworkConfig obj from the mock API
// Update an existing NodeNetworkConfig obj from the mock API
nodeNetConfigUpdated := mockAPI.nodeNetConfigs[mockNNCKey].DeepCopy()
nodeNetConfigUpdated.ObjectMeta.ClusterName = "New cluster name"
@ -345,7 +342,7 @@ func TestUpdateExistingNodeNetConfig(t *testing.T) {
t.Fatalf("Expected no error when updating existing NodeNetworkConfig, got :%v", err)
}
//See that NodeNetworkConfig in mock store was updated
// See that NodeNetworkConfig in mock store was updated
if !reflect.DeepEqual(nodeNetConfigUpdated, mockAPI.nodeNetConfigs[mockNNCKey]) {
t.Fatal("Update of existing NodeNetworkConfig did not get passed along")
}
@ -384,7 +381,7 @@ func TestUpdateSpecOnNonExistingNodeNetConfig(t *testing.T) {
},
}
//Test updating spec for existing NodeNetworkConfig
// Test updating spec for existing NodeNetworkConfig
err := rc.UpdateCRDSpec(context.Background(), spec)
if err == nil {
@ -425,9 +422,8 @@ func TestUpdateSpecOnExistingNodeNetConfig(t *testing.T) {
},
}
//Test update spec for existing NodeNetworkConfig
// Test update spec for existing NodeNetworkConfig
err := rc.UpdateCRDSpec(context.Background(), spec)
if err != nil {
t.Fatalf("Expected no error when updating spec on existing crd, got :%v", err)
}
@ -462,7 +458,6 @@ func TestGetExistingNNCDirectClient(t *testing.T) {
}
nodeNetConfigFetched, err := rc.getNodeNetConfigDirect(context.Background(), existingNNCName, existingNamespace)
if err != nil {
t.Fatalf("Expected to be able to get existing nodenetconfig with directCRD client: %v", err)
}
@ -470,7 +465,6 @@ func TestGetExistingNNCDirectClient(t *testing.T) {
if !reflect.DeepEqual(nodeNetConfigFill, nodeNetConfigFetched) {
t.Fatalf("Expected fetched nodenetconfig to be equal to one we loaded into store")
}
}
// test get nnc directly non existing
@ -536,7 +530,6 @@ func TestGetPodsExistingNodeDirectClient(t *testing.T) {
}
pods, err := rc.getAllPods(context.Background(), existingNNCName)
if err != nil {
t.Fatalf("Expected to be able to get all pods given correct node name")
}
@ -687,5 +680,4 @@ func TestInitRequestController(t *testing.T) {
if _, ok := mockCNSClient.NCRequest.SecondaryIPConfigs[allocatedUUID]; !ok {
t.Fatalf("Expected secondary ip config to be in ncrequest")
}
}