azure-container-networking/cns/ipampool/monitor.go

524 строки
21 KiB
Go

package ipampool
import (
"context"
"fmt"
"net/netip"
"strconv"
"sync"
"time"
"github.com/Azure/azure-container-networking/cns"
"github.com/Azure/azure-container-networking/cns/ipampool/metrics"
"github.com/Azure/azure-container-networking/cns/logger"
"github.com/Azure/azure-container-networking/cns/metric"
"github.com/Azure/azure-container-networking/cns/types"
"github.com/Azure/azure-container-networking/crd/clustersubnetstate/api/v1alpha1"
"github.com/Azure/azure-container-networking/crd/nodenetworkconfig/api/v1alpha"
"github.com/avast/retry-go/v4"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)
const (
// DefaultRefreshDelay pool monitor poll delay default in seconds.
DefaultRefreshDelay = 1 * time.Second
// DefaultMaxIPs default maximum allocatable IPs
DefaultMaxIPs = 250
// fieldManager is the field manager used when patching the NodeNetworkConfig.
fieldManager = "azure-cns"
// Subnet ARM ID /subscriptions/$(SUB)/resourceGroups/$(GROUP)/providers/Microsoft.Network/virtualNetworks/$(VNET)/subnets/$(SUBNET)
subnetARMIDTemplate = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Network/virtualNetworks/%s/subnets/%s"
)
type nodeNetworkConfigSpecUpdater interface {
PatchSpec(context.Context, *v1alpha.NodeNetworkConfigSpec, string) (*v1alpha.NodeNetworkConfig, error)
}
// metaState is the Monitor's configuration state for the IP pool.
type metaState struct {
batch int64
exhausted bool
max int64
maxFreeCount int64
minFreeCount int64
notInUseCount int64
primaryIPAddresses map[string]struct{}
subnet string
subnetARMID string
subnetCIDR string
}
type Options struct {
RefreshDelay time.Duration
MaxIPs int64
}
type Monitor struct {
opts *Options
spec v1alpha.NodeNetworkConfigSpec
metastate metaState
nnccli nodeNetworkConfigSpecUpdater
httpService cns.HTTPService
cssSource <-chan v1alpha1.ClusterSubnetState
nncSource chan v1alpha.NodeNetworkConfig
started chan interface{}
once sync.Once
}
func NewMonitor(httpService cns.HTTPService, nnccli nodeNetworkConfigSpecUpdater, cssSource <-chan v1alpha1.ClusterSubnetState, opts *Options) *Monitor {
if opts.RefreshDelay < 1 {
opts.RefreshDelay = DefaultRefreshDelay
}
if opts.MaxIPs < 1 {
opts.MaxIPs = DefaultMaxIPs
}
return &Monitor{
opts: opts,
httpService: httpService,
nnccli: nnccli,
cssSource: cssSource,
nncSource: make(chan v1alpha.NodeNetworkConfig),
started: make(chan interface{}),
}
}
// Start begins the Monitor's pool reconcile loop.
// On first run, it will block until a NodeNetworkConfig is received (through a call to Update()).
// Subsequently, it will run run once per RefreshDelay and attempt to re-reconcile the pool.
func (pm *Monitor) Start(ctx context.Context) error {
logger.Printf("[ipam-pool-monitor] Starting CNS IPAM Pool Monitor")
ticker := time.NewTicker(pm.opts.RefreshDelay)
defer ticker.Stop()
for {
// proceed when things happen:
select {
case <-ctx.Done(): // calling context has closed, we'll exit.
return errors.Wrap(ctx.Err(), "pool monitor context closed")
case <-ticker.C: // attempt to reconcile every tick.
select {
default:
// if we have NOT initialized and enter this case, we continue out of this iteration and let the for loop begin again.
continue
case <-pm.started: // this blocks until we have initialized
// if we have initialized and enter this case, we proceed out of the select and continue to reconcile.
}
case css := <-pm.cssSource: // received an updated ClusterSubnetState
pm.metastate.exhausted = css.Status.Exhausted
logger.Printf("subnet exhausted status = %t", pm.metastate.exhausted)
metrics.IpamSubnetExhaustionCount.With(prometheus.Labels{
metrics.SubnetLabel: pm.metastate.subnet, metrics.SubnetCIDRLabel: pm.metastate.subnetCIDR,
metrics.PodnetARMIDLabel: pm.metastate.subnetARMID, metrics.SubnetExhaustionStateLabel: strconv.FormatBool(pm.metastate.exhausted),
}).Inc()
select {
default:
// if we have NOT initialized and enter this case, we continue out of this iteration and let the for loop begin again.
continue
case <-pm.started: // this blocks until we have initialized
// if we have initialized and enter this case, we proceed out of the select and continue to reconcile.
}
case nnc := <-pm.nncSource: // received a new NodeNetworkConfig, extract the data from it and re-reconcile.
if len(nnc.Status.NetworkContainers) > 0 {
// Set SubnetName, SubnetAddressSpace and Pod Network ARM ID values to the global subnet, subnetCIDR and subnetARM variables.
pm.metastate.subnet = nnc.Status.NetworkContainers[0].SubnetName
pm.metastate.subnetCIDR = nnc.Status.NetworkContainers[0].SubnetAddressSpace
pm.metastate.subnetARMID = GenerateARMID(&nnc.Status.NetworkContainers[0])
}
pm.metastate.primaryIPAddresses = make(map[string]struct{})
// Add Primary IP to Map, if not present.
// This is only for Swift i.e. if NC Type is vnet.
for i := 0; i < len(nnc.Status.NetworkContainers); i++ {
nc := nnc.Status.NetworkContainers[i]
if nc.Type == "" || nc.Type == v1alpha.VNET {
pm.metastate.primaryIPAddresses[nc.PrimaryIP] = struct{}{}
}
if nc.Type == v1alpha.VNETBlock {
primaryPrefix, err := netip.ParsePrefix(nc.PrimaryIP)
if err != nil {
return errors.Wrapf(err, "unable to parse ip prefix: %s", nc.PrimaryIP)
}
pm.metastate.primaryIPAddresses[primaryPrefix.Addr().String()] = struct{}{}
}
}
scaler := nnc.Status.Scaler
pm.metastate.batch = scaler.BatchSize
pm.metastate.max = scaler.MaxIPCount
pm.metastate.minFreeCount, pm.metastate.maxFreeCount = CalculateMinFreeIPs(scaler), CalculateMaxFreeIPs(scaler)
pm.once.Do(func() {
pm.spec = nnc.Spec // set the spec from the NNC initially (afterwards we write the Spec so we know target state).
logger.Printf("[ipam-pool-monitor] set initial pool spec %+v", pm.spec)
close(pm.started) // close the init channel the first time we fully receive a NodeNetworkConfig.
})
}
// if control has flowed through the select(s) to this point, we can now reconcile.
err := pm.reconcile(ctx)
if err != nil {
logger.Printf("[ipam-pool-monitor] Reconcile failed with err %v", err)
}
}
}
// ipPoolState is the current actual state of the CNS IP pool.
type ipPoolState struct {
// allocatedToPods are the IPs CNS gives to Pods.
allocatedToPods int64
// available are the IPs in state "Available".
available int64
// currentAvailableIPs are the current available IPs: allocated - assigned - pendingRelease.
currentAvailableIPs int64
// expectedAvailableIPs are the "future" available IPs, if the requested IP count is honored: requested - assigned.
expectedAvailableIPs int64
// pendingProgramming are the IPs in state "PendingProgramming".
pendingProgramming int64
// pendingRelease are the IPs in state "PendingRelease".
pendingRelease int64
// requestedIPs are the IPs CNS has requested that it be allocated by DNC.
requestedIPs int64
// secondaryIPs are all the IPs given to CNS by DNC, not including the primary IP of the NC.
secondaryIPs int64
}
func buildIPPoolState(ips map[string]cns.IPConfigurationStatus, spec v1alpha.NodeNetworkConfigSpec) ipPoolState {
state := ipPoolState{
secondaryIPs: int64(len(ips)),
requestedIPs: spec.RequestedIPCount,
}
for i := range ips {
ip := ips[i]
switch ip.GetState() {
case types.Assigned:
state.allocatedToPods++
case types.Available:
state.available++
case types.PendingProgramming:
state.pendingProgramming++
case types.PendingRelease:
state.pendingRelease++
}
}
state.currentAvailableIPs = state.secondaryIPs - state.allocatedToPods - state.pendingRelease
state.expectedAvailableIPs = state.requestedIPs - state.allocatedToPods
return state
}
var statelogDownsample int
func (pm *Monitor) reconcile(ctx context.Context) error {
allocatedIPs := pm.httpService.GetPodIPConfigState()
meta := pm.metastate
state := buildIPPoolState(allocatedIPs, pm.spec)
observeIPPoolState(state, meta)
// log every 30th reconcile to reduce the AI load. we will always log when the monitor
// changes the pool, below.
if statelogDownsample = (statelogDownsample + 1) % 30; statelogDownsample == 0 { //nolint:gomnd //downsample by 30
logger.Printf("ipam-pool-monitor state: %+v, meta: %+v", state, meta)
}
// if the subnet is exhausted, overwrite the batch/minfree/maxfree in the meta copy for this iteration
if meta.exhausted {
meta.batch = 1
meta.minFreeCount = 1
meta.maxFreeCount = 2
}
switch {
// pod count is increasing
case state.expectedAvailableIPs < meta.minFreeCount:
if state.requestedIPs == meta.max {
// If we're already at the maxIPCount, don't try to increase
return nil
}
logger.Printf("ipam-pool-monitor state %+v", state)
logger.Printf("[ipam-pool-monitor] Increasing pool size...")
return pm.increasePoolSize(ctx, meta, state)
// pod count is decreasing
case state.currentAvailableIPs >= meta.maxFreeCount:
logger.Printf("ipam-pool-monitor state %+v", state)
logger.Printf("[ipam-pool-monitor] Decreasing pool size...")
return pm.decreasePoolSize(ctx, meta, state)
// CRD has reconciled CNS state, and target spec is now the same size as the state
// free to remove the IPs from the CRD
case int64(len(pm.spec.IPsNotInUse)) != state.pendingRelease:
logger.Printf("ipam-pool-monitor state %+v", state)
logger.Printf("[ipam-pool-monitor] Removing Pending Release IPs from CRD...")
return pm.cleanPendingRelease(ctx)
// no pods scheduled
case state.allocatedToPods == 0:
logger.Printf("ipam-pool-monitor state %+v", state)
logger.Printf("[ipam-pool-monitor] No pods scheduled")
return nil
}
return nil
}
func (pm *Monitor) increasePoolSize(ctx context.Context, meta metaState, state ipPoolState) error {
tempNNCSpec := pm.createNNCSpecForCRD()
// Query the max IP count
previouslyRequestedIPCount := tempNNCSpec.RequestedIPCount
batchSize := meta.batch
modResult := previouslyRequestedIPCount % batchSize
logger.Printf("[ipam-pool-monitor] Previously RequestedIP Count %d", previouslyRequestedIPCount)
logger.Printf("[ipam-pool-monitor] Batch size : %d", batchSize)
logger.Printf("[ipam-pool-monitor] modResult of (previously requested IP count mod batch size) = %d", modResult)
tempNNCSpec.RequestedIPCount += batchSize - modResult
if tempNNCSpec.RequestedIPCount > meta.max {
// We don't want to ask for more ips than the max
logger.Printf("[ipam-pool-monitor] Requested IP count (%d) is over max limit (%d), requesting max limit instead.", tempNNCSpec.RequestedIPCount, meta.max)
tempNNCSpec.RequestedIPCount = meta.max
}
// If the requested IP count is same as before, then don't do anything
if tempNNCSpec.RequestedIPCount == previouslyRequestedIPCount {
logger.Printf("[ipam-pool-monitor] Previously requested IP count %d is same as updated IP count %d, doing nothing", previouslyRequestedIPCount, tempNNCSpec.RequestedIPCount)
return nil
}
logger.Printf("[ipam-pool-monitor] Increasing pool size, pool %+v, spec %+v", state, tempNNCSpec)
if _, err := pm.nnccli.PatchSpec(ctx, &tempNNCSpec, fieldManager); err != nil {
// caller will retry to update the CRD again
return errors.Wrap(err, "executing UpdateSpec with NNC client")
}
logger.Printf("[ipam-pool-monitor] Increasing pool size: UpdateCRDSpec succeeded for spec %+v", tempNNCSpec)
// start an alloc timer
metric.StartPoolIncreaseTimer(batchSize)
// save the updated state to cachedSpec
pm.spec = tempNNCSpec
return nil
}
func (pm *Monitor) decreasePoolSize(ctx context.Context, meta metaState, state ipPoolState) error {
// mark n number of IPs as pending
var newIpsMarkedAsPending bool
var pendingIPAddresses map[string]cns.IPConfigurationStatus
var updatedRequestedIPCount int64
// Ensure the updated requested IP count is a multiple of the batch size
previouslyRequestedIPCount := pm.spec.RequestedIPCount
batchSize := meta.batch
modResult := previouslyRequestedIPCount % batchSize
logger.Printf("[ipam-pool-monitor] Previously RequestedIP Count %d", previouslyRequestedIPCount)
logger.Printf("[ipam-pool-monitor] Batch size : %d", batchSize)
logger.Printf("[ipam-pool-monitor] modResult of (previously requested IP count mod batch size) = %d", modResult)
if modResult != 0 {
// Example: previouscount = 25, batchsize = 10, 25 - 10 = 15, NOT a multiple of batchsize (10)
// Don't want that, so make requestedIPCount 20 (25 - (25 % 10)) so that it is a multiple of the batchsize (10)
updatedRequestedIPCount = previouslyRequestedIPCount - modResult
} else {
// Example: previouscount = 30, batchsize = 10, 30 - 10 = 20 which is multiple of batchsize (10) so all good
updatedRequestedIPCount = previouslyRequestedIPCount - batchSize
}
decreaseIPCountBy := previouslyRequestedIPCount - updatedRequestedIPCount
logger.Printf("[ipam-pool-monitor] updatedRequestedIPCount %d", updatedRequestedIPCount)
if meta.notInUseCount == 0 || meta.notInUseCount < state.pendingRelease {
logger.Printf("[ipam-pool-monitor] Marking IPs as PendingRelease, ipsToBeReleasedCount %d", decreaseIPCountBy)
var err error
if pendingIPAddresses, err = pm.httpService.MarkIPAsPendingRelease(int(decreaseIPCountBy)); err != nil {
return errors.Wrap(err, "marking IPs that are pending release")
}
newIpsMarkedAsPending = true
}
tempNNCSpec := pm.createNNCSpecForCRD()
if newIpsMarkedAsPending {
// cache the updatingPendingRelease so that we dont re-set new IPs to PendingRelease in case UpdateCRD call fails
pm.metastate.notInUseCount = int64(len(tempNNCSpec.IPsNotInUse))
}
logger.Printf("[ipam-pool-monitor] Releasing IPCount in this batch %d, updatingPendingIpsNotInUse count %d",
len(pendingIPAddresses), pm.metastate.notInUseCount)
tempNNCSpec.RequestedIPCount -= int64(len(pendingIPAddresses))
logger.Printf("[ipam-pool-monitor] Decreasing pool size, pool %+v, spec %+v", state, tempNNCSpec)
attempts := 0
if err := retry.Do(func() error {
attempts++
_, err := pm.nnccli.PatchSpec(ctx, &tempNNCSpec, fieldManager)
if err != nil {
// caller will retry to update the CRD again
logger.Printf("failed to update NNC spec attempt #%d, err: %v", attempts, err)
return errors.Wrap(err, "executing UpdateSpec with NNC client")
}
logger.Printf("successfully updated NNC spec attempt #%d", attempts)
return nil
}, retry.Attempts(5), retry.DelayType(retry.BackOffDelay)); err != nil { //nolint:gomnd // ignore retry magic number
logger.Errorf("all attempts failed to update NNC during scale-down, state is corrupt: %v", err)
panic(err)
}
logger.Printf("[ipam-pool-monitor] Decreasing pool size: UpdateCRDSpec succeeded for spec %+v", tempNNCSpec)
// start a dealloc timer
metric.StartPoolDecreaseTimer(batchSize)
// save the updated state to cachedSpec
pm.spec = tempNNCSpec
// clear the updatingPendingIpsNotInUse, as we have Updated the CRD
logger.Printf("[ipam-pool-monitor] cleaning the updatingPendingIpsNotInUse, existing length %d", pm.metastate.notInUseCount)
pm.metastate.notInUseCount = 0
return nil
}
// cleanPendingRelease removes IPs from the cache and CRD if the request controller has reconciled
// CNS state and the pending IP release map is empty.
func (pm *Monitor) cleanPendingRelease(ctx context.Context) error {
tempNNCSpec := pm.createNNCSpecForCRD()
_, err := pm.nnccli.PatchSpec(ctx, &tempNNCSpec, fieldManager)
if err != nil {
// caller will retry to update the CRD again
return errors.Wrap(err, "executing UpdateSpec with NNC client")
}
logger.Printf("[ipam-pool-monitor] cleanPendingRelease: UpdateCRDSpec succeeded for spec %+v", tempNNCSpec)
// save the updated state to cachedSpec
pm.spec = tempNNCSpec
return nil
}
// createNNCSpecForCRD translates CNS's map of IPs to be released and requested IP count into an NNC Spec.
func (pm *Monitor) createNNCSpecForCRD() v1alpha.NodeNetworkConfigSpec {
var spec v1alpha.NodeNetworkConfigSpec
// Update the count from cached spec
spec.RequestedIPCount = pm.spec.RequestedIPCount
// Get All Pending IPs from CNS and populate it again.
pendingIPs := pm.httpService.GetPendingReleaseIPConfigs()
for i := range pendingIPs {
pendingIP := pendingIPs[i]
spec.IPsNotInUse = append(spec.IPsNotInUse, pendingIP.ID)
}
return spec
}
// GetStateSnapshot gets a snapshot of the IPAMPoolMonitor struct.
func (pm *Monitor) GetStateSnapshot() cns.IpamPoolMonitorStateSnapshot {
spec, state := pm.spec, pm.metastate
return cns.IpamPoolMonitorStateSnapshot{
MinimumFreeIps: state.minFreeCount,
MaximumFreeIps: state.maxFreeCount,
UpdatingIpsNotInUseCount: state.notInUseCount,
CachedNNC: v1alpha.NodeNetworkConfig{
Spec: spec,
},
}
}
// GenerateARMID uses the Subnet ARM ID format to populate the ARM ID with the metadata.
// If either of the metadata attributes are empty, then the ARM ID will be an empty string.
func GenerateARMID(nc *v1alpha.NetworkContainer) string {
subscription := nc.SubscriptionID
resourceGroup := nc.ResourceGroupID
vnetID := nc.VNETID
subnetID := nc.SubnetID
if subscription == "" || resourceGroup == "" || vnetID == "" || subnetID == "" {
return ""
}
return fmt.Sprintf(subnetARMIDTemplate, subscription, resourceGroup, vnetID, subnetID)
}
// Update ingests a NodeNetworkConfig, clamping some values to ensure they are legal and then
// pushing it to the Monitor's source channel.
// If the Monitor has been Started but is blocking until it receives an NNC, this will start
// the pool reconcile loop.
// If the Monitor has not been Started, this will block until Start() is called, which will
// immediately read this passed NNC and start the pool reconcile loop.
func (pm *Monitor) Update(nnc *v1alpha.NodeNetworkConfig) error {
pm.clampScaler(&nnc.Status.Scaler)
// if the nnc has converged, observe the pool scaling latency (if any).
allocatedIPs := len(pm.httpService.GetPodIPConfigState()) - len(pm.httpService.GetPendingReleaseIPConfigs())
if int(nnc.Spec.RequestedIPCount) == allocatedIPs {
// observe elapsed duration for IP pool scaling
metric.ObserverPoolScaleLatency()
}
logger.Printf("[ipam-pool-monitor] pushing NodeNetworkConfig update, allocatedIPs = %d", allocatedIPs)
pm.nncSource <- *nnc
return nil
}
// clampScaler makes sure that the values stored in the scaler are sane.
// we usually expect these to be correctly set for us, but we could crash
// without these checks. if they are incorrectly set, there will be some weird
// IP pool behavior for a while until the nnc reconciler corrects the state.
func (pm *Monitor) clampScaler(scaler *v1alpha.Scaler) {
if scaler.MaxIPCount < 1 {
scaler.MaxIPCount = pm.opts.MaxIPs
}
if scaler.BatchSize < 1 {
scaler.BatchSize = 1
}
if scaler.BatchSize > scaler.MaxIPCount {
scaler.BatchSize = scaler.MaxIPCount
}
if scaler.RequestThresholdPercent < 1 {
scaler.RequestThresholdPercent = 1
}
if scaler.RequestThresholdPercent > 100 { //nolint:gomnd // it's a percent
scaler.RequestThresholdPercent = 100
}
if scaler.ReleaseThresholdPercent < scaler.RequestThresholdPercent+100 {
scaler.ReleaseThresholdPercent = scaler.RequestThresholdPercent + 100 //nolint:gomnd // it's a percent
}
}
func observeIPPoolState(state ipPoolState, meta metaState) {
labels := []string{meta.subnet, meta.subnetCIDR, meta.subnetARMID}
metrics.IpamAllocatedIPCount.WithLabelValues(labels...).Set(float64(state.allocatedToPods))
metrics.IpamAvailableIPCount.WithLabelValues(labels...).Set(float64(state.available))
metrics.IpamBatchSize.WithLabelValues(labels...).Set(float64(meta.batch))
metrics.IpamCurrentAvailableIPcount.WithLabelValues(labels...).Set(float64(state.currentAvailableIPs))
metrics.IpamExpectedAvailableIPCount.WithLabelValues(labels...).Set(float64(state.expectedAvailableIPs))
metrics.IpamMaxIPCount.WithLabelValues(labels...).Set(float64(meta.max))
metrics.IpamPendingProgramIPCount.WithLabelValues(labels...).Set(float64(state.pendingProgramming))
metrics.IpamPendingReleaseIPCount.WithLabelValues(labels...).Set(float64(state.pendingRelease))
metrics.IpamPrimaryIPCount.WithLabelValues(labels...).Set(float64(len(meta.primaryIPAddresses)))
metrics.IpamRequestedIPConfigCount.WithLabelValues(labels...).Set(float64(state.requestedIPs))
metrics.IpamSecondaryIPCount.WithLabelValues(labels...).Set(float64(state.secondaryIPs))
metrics.IpamTotalIPCount.WithLabelValues(labels...).Set(float64(state.secondaryIPs + int64(len(meta.primaryIPAddresses))))
if meta.exhausted {
metrics.IpamSubnetExhaustionState.WithLabelValues(labels...).Set(float64(metrics.SubnetIPExhausted))
} else {
metrics.IpamSubnetExhaustionState.WithLabelValues(labels...).Set(float64(metrics.SubnetIPNotExhausted))
}
}
// CalculateMinFreeIPs calculates the minimum free IP quantity based on the Scaler
// in the passed NodeNetworkConfig.
// Half of odd batches are rounded up!
//
//nolint:gocritic // ignore hugeparam
func CalculateMinFreeIPs(scaler v1alpha.Scaler) int64 {
return int64(float64(scaler.BatchSize)*(float64(scaler.RequestThresholdPercent)/100) + .5) //nolint:gomnd // it's a percent
}
// CalculateMaxFreeIPs calculates the maximum free IP quantity based on the Scaler
// in the passed NodeNetworkConfig.
// Half of odd batches are rounded up!
//
//nolint:gocritic // ignore hugeparam
func CalculateMaxFreeIPs(scaler v1alpha.Scaler) int64 {
return int64(float64(scaler.BatchSize)*(float64(scaler.ReleaseThresholdPercent)/100) + .5) //nolint:gomnd // it's a percent
}