fix: [WIN-NPM] race during bootup where we may not add one NetPol to a Pod (#2028)

* fix: lock while adding policy in bootup phase

* test: fix UT to model true pod controller behavior

* fix: prevent deadlock

* style: lint
This commit is contained in:
Hunter Gregory 2023-06-23 10:03:54 -07:00 коммит произвёл GitHub
Родитель c48c3e17a4
Коммит 36b67b4cb2
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
2 изменённых файлов: 64 добавлений и 46 удалений

Просмотреть файл

@ -2224,15 +2224,13 @@ func getAllMultiJobTests() []*MultiJobTestCase {
{ {
Description: "create namespaces, pods, and a policy which applies to a pod", Description: "create namespaces, pods, and a policy which applies to a pod",
Jobs: map[string][]*Action{ Jobs: map[string][]*Action{
"finish_bootup_phase": {
FinishBootupPhase(),
},
"namespace_controller": { "namespace_controller": {
CreateNamespace("x", map[string]string{"k1": "v1"}), CreateNamespace("x", map[string]string{"k1": "v1"}),
CreateNamespace("y", map[string]string{"k2": "v2"}), CreateNamespace("y", map[string]string{"k2": "v2"}),
ApplyDP(), ApplyDP(),
}, },
"pod_controller": { "pod_controller": {
FinishBootupPhase(),
CreatePod("x", "a", ip1, thisNode, map[string]string{"k1": "v1"}), CreatePod("x", "a", ip1, thisNode, map[string]string{"k1": "v1"}),
CreatePod("y", "a", ip2, otherNode, map[string]string{"k1": "v1"}), CreatePod("y", "a", ip2, otherNode, map[string]string{"k1": "v1"}),
ApplyDP(), ApplyDP(),

Просмотреть файл

@ -18,10 +18,11 @@ import (
const ( const (
reconcileDuration = time.Duration(5 * time.Minute) reconcileDuration = time.Duration(5 * time.Minute)
contextBackground = "BACKGROUND" contextBackground = "BACKGROUND"
contextApplyDP = "APPLY-DP" contextApplyDP = "APPLY-DP"
contextAddNetPol = "ADD-NETPOL" contextAddNetPol = "ADD-NETPOL"
contextDelNetPol = "DEL-NETPOL" contextAddNetPolBootup = "BOOTUP-ADD-NETPOL"
contextDelNetPol = "DEL-NETPOL"
) )
var ErrInvalidApplyConfig = errors.New("invalid apply config") var ErrInvalidApplyConfig = errors.New("invalid apply config")
@ -276,24 +277,21 @@ func (dp *DataPlane) RemoveFromList(listName *ipsets.IPSetMetadata, setNames []*
// and accordingly makes changes in dataplane. This function helps emulate a single call to // and accordingly makes changes in dataplane. This function helps emulate a single call to
// dataplane instead of multiple ipset operations calls ipset operations calls to dataplane // dataplane instead of multiple ipset operations calls ipset operations calls to dataplane
func (dp *DataPlane) ApplyDataPlane() error { func (dp *DataPlane) ApplyDataPlane() error {
if dp.applyInBackground { if !dp.applyInBackground {
return dp.incrementBatchAndApplyIfNeeded(contextApplyDP) return dp.applyDataPlaneNow(contextApplyDP)
} }
return dp.applyDataPlaneNow(contextApplyDP) // increment batch and apply dataplane if needed
}
func (dp *DataPlane) incrementBatchAndApplyIfNeeded(context string) error {
dp.applyInfo.Lock() dp.applyInfo.Lock()
dp.applyInfo.numBatches++ dp.applyInfo.numBatches++
newCount := dp.applyInfo.numBatches newCount := dp.applyInfo.numBatches
dp.applyInfo.Unlock() dp.applyInfo.Unlock()
klog.Infof("[DataPlane] [%s] new batch count: %d", context, newCount) klog.Infof("[DataPlane] [%s] new batch count: %d", contextApplyDP, newCount)
if newCount >= dp.ApplyMaxBatches { if newCount >= dp.ApplyMaxBatches {
klog.Infof("[DataPlane] [%s] applying now since reached maximum batch count: %d", context, newCount) klog.Infof("[DataPlane] [%s] applying now since reached maximum batch count: %d", contextApplyDP, newCount)
return dp.applyDataPlaneNow(context) return dp.applyDataPlaneNow(contextApplyDP)
} }
return nil return nil
@ -380,32 +378,65 @@ func (dp *DataPlane) AddPolicy(policy *policies.NPMNetworkPolicy) error {
return fmt.Errorf("[DataPlane] error while adding Rule IPSet references: %w", err) return fmt.Errorf("[DataPlane] error while adding Rule IPSet references: %w", err)
} }
var endpointList map[string]string inBootupPhase := false
if dp.inBootupPhase() { if dp.applyInBackground {
// During bootup phase, the Pod controller will not be running. dp.applyInfo.Lock()
// We don't need to worry about adding Policies to Endpoints, so we don't need IPSets in the kernel yet. inBootupPhase = dp.applyInfo.inBootupPhase
// Ideally, we get all NetworkPolicies in the cache before the Pod controller starts if inBootupPhase {
err = dp.incrementBatchAndApplyIfNeeded(contextAddNetPol) // keep holding the lock to block FinishBootupPhase() and prevent PodController from
if err != nil { // coming back online and causing race issues from updatePod() within applyDataPlaneNow()
return err defer dp.applyInfo.Unlock()
} } else {
} else { dp.applyInfo.Unlock()
err = dp.applyDataPlaneNow(contextAddNetPol)
if err != nil {
return err
}
endpointList, err = dp.getEndpointsToApplyPolicy(policy)
if err != nil {
return fmt.Errorf("[DataPlane] error while getting endpoints to apply policy after applying dataplane: %w", err)
} }
} }
// endpointList will be empty if in bootup phase if inBootupPhase {
// During bootup phase, the Pod controller will not be running.
// We don't need to worry about adding Policies to Endpoints, so we don't need IPSets in the kernel yet.
// Ideally, we get all NetworkPolicies in the cache before the Pod controller starts
// increment batch and apply IPSets if needed
dp.applyInfo.numBatches++
newCount := dp.applyInfo.numBatches
klog.Infof("[DataPlane] [%s] new batch count: %d", contextAddNetPolBootup, newCount)
if newCount >= dp.ApplyMaxBatches {
klog.Infof("[DataPlane] [%s] applying now since reached maximum batch count: %d", contextAddNetPolBootup, newCount)
klog.Infof("[DataPlane] [%s] starting to apply ipsets", contextAddNetPolBootup)
err = dp.ipsetMgr.ApplyIPSets()
if err != nil {
return fmt.Errorf("[DataPlane] [%s] error while applying IPSets: %w", contextAddNetPolBootup, err)
}
klog.Infof("[DataPlane] [%s] finished applying ipsets", contextAddNetPolBootup)
dp.applyInfo.numBatches = 0
}
err = dp.policyMgr.AddPolicy(policy, nil)
if err != nil {
return fmt.Errorf("[DataPlane] [%s] error while adding policy: %w", contextAddNetPolBootup, err)
}
return nil
}
// standard, non-bootup phase
err = dp.applyDataPlaneNow(contextAddNetPol)
if err != nil {
return err
}
var endpointList map[string]string
endpointList, err = dp.getEndpointsToApplyPolicy(policy)
if err != nil {
return fmt.Errorf("[DataPlane] error while getting endpoints to apply policy after applying dataplane: %w", err)
}
err = dp.policyMgr.AddPolicy(policy, endpointList) err = dp.policyMgr.AddPolicy(policy, endpointList)
if err != nil { if err != nil {
return fmt.Errorf("[DataPlane] error while adding policy: %w", err) return fmt.Errorf("[DataPlane] error while adding policy: %w", err)
} }
return nil return nil
} }
@ -582,14 +613,3 @@ func (dp *DataPlane) deleteIPSetsAndReferences(sets []*ipsets.TranslatedIPSet, n
} }
return nil return nil
} }
func (dp *DataPlane) inBootupPhase() bool {
if !dp.applyInBackground {
return false
}
dp.applyInfo.Lock()
defer dp.applyInfo.Unlock()
return dp.applyInfo.inBootupPhase
}