[NPM Lite] Querying L1VH + Non-L1VH Endpoints (#3086)
* Added logic to make 2 hns calls for 2 different endpoint states * added querying to l1vh hns only if npm lite is enabled * added logging line for debugging * updated config * removed logging lines * fixing go lint err * refactored based on pr comments * replaced with errors.Wrap and fixed a logging statement * added if condition with logic * changed errl1vh to err * added omments * added logging lines for debugging * added npm lite enabled log debugging * spacing * syntax * added logs for debugging * optimizing api load * added function to remove common endpoints * added logging for debugging * removed npm lite check * removed all the debugging comments * added extra unit test cases * added additional unit tests * removed protobuf code * fixed comment * fixed a spelling error * resolved pr comments * updated a comment * revised comment * resolved further pr comments * changed back to for loop from range
This commit is contained in:
Родитель
3c4641c1c8
Коммит
b7190c88c0
|
@ -137,6 +137,8 @@ func start(config npmconfig.Config, flags npmconfig.Flags) error {
|
|||
stopChannel := wait.NeverStop
|
||||
if config.Toggles.EnableV2NPM {
|
||||
// update the dataplane config
|
||||
npmV2DataplaneCfg.EnableNPMLite = config.Toggles.EnableNPMLite
|
||||
|
||||
npmV2DataplaneCfg.MaxBatchedACLsPerPod = config.MaxBatchedACLsPerPod
|
||||
|
||||
npmV2DataplaneCfg.NetPolInBackground = config.Toggles.NetPolInBackground
|
||||
|
|
|
@ -192,7 +192,7 @@ func (npMgr *NetworkPolicyManager) Start(config npmconfig.Config, stopCh <-chan
|
|||
// Starts all informers manufactured by npMgr's informerFactory.
|
||||
npMgr.InformerFactory.Start(stopCh)
|
||||
|
||||
// npn lite
|
||||
// npm lite
|
||||
if npMgr.NpmLiteToggle {
|
||||
npMgr.PodInformerFactory.Start(stopCh)
|
||||
}
|
||||
|
|
|
@ -45,6 +45,7 @@ type Config struct {
|
|||
NetPolInBackground bool
|
||||
MaxPendingNetPols int
|
||||
NetPolInterval time.Duration
|
||||
EnableNPMLite bool
|
||||
*ipsets.IPSetManagerCfg
|
||||
*policies.PolicyManagerCfg
|
||||
}
|
||||
|
@ -64,12 +65,13 @@ type DataPlane struct {
|
|||
nodeName string
|
||||
// endpointCache stores all endpoints of the network (including off-node)
|
||||
// Key is PodIP
|
||||
endpointCache *endpointCache
|
||||
ioShim *common.IOShim
|
||||
updatePodCache *updatePodCache
|
||||
endpointQuery *endpointQuery
|
||||
applyInfo *applyInfo
|
||||
netPolQueue *netPolQueue
|
||||
endpointCache *endpointCache
|
||||
ioShim *common.IOShim
|
||||
updatePodCache *updatePodCache
|
||||
endpointQuery *endpointQuery
|
||||
endpointQueryAttachedState *endpointQuery // windows -> filter for state 2 (attached) endpoints in l1vh
|
||||
applyInfo *applyInfo
|
||||
netPolQueue *netPolQueue
|
||||
// removePolicyInfo tracks when a policy was removed yet had ApplyIPSet failures.
|
||||
// This field is only relevant for Linux.
|
||||
removePolicyInfo removePolicyInfo
|
||||
|
@ -88,11 +90,12 @@ func NewDataPlane(nodeName string, ioShim *common.IOShim, cfg *Config, stopChann
|
|||
policyMgr: policies.NewPolicyManager(ioShim, cfg.PolicyManagerCfg),
|
||||
ipsetMgr: ipsets.NewIPSetManager(cfg.IPSetManagerCfg, ioShim),
|
||||
// networkID is set when initializing Windows dataplane
|
||||
networkID: "",
|
||||
endpointCache: newEndpointCache(),
|
||||
nodeName: nodeName,
|
||||
ioShim: ioShim,
|
||||
endpointQuery: new(endpointQuery),
|
||||
networkID: "",
|
||||
endpointCache: newEndpointCache(),
|
||||
nodeName: nodeName,
|
||||
ioShim: ioShim,
|
||||
endpointQuery: new(endpointQuery),
|
||||
endpointQueryAttachedState: new(endpointQuery),
|
||||
applyInfo: &applyInfo{
|
||||
inBootupPhase: true,
|
||||
},
|
||||
|
@ -128,7 +131,6 @@ func NewDataPlane(nodeName string, ioShim *common.IOShim, cfg *Config, stopChann
|
|||
} else {
|
||||
metrics.SendLog(util.DaemonDataplaneID, "[DataPlane] dataplane configured to NOT add netpols in background", true)
|
||||
}
|
||||
|
||||
return dp, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -2,7 +2,6 @@ package dataplane
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
@ -12,6 +11,7 @@ import (
|
|||
"github.com/Azure/azure-container-networking/npm/util"
|
||||
npmerrors "github.com/Azure/azure-container-networking/npm/util/errors"
|
||||
"github.com/Microsoft/hcsshim/hcn"
|
||||
"github.com/pkg/errors"
|
||||
"k8s.io/klog"
|
||||
)
|
||||
|
||||
|
@ -50,14 +50,31 @@ func (dp *DataPlane) initializeDataPlane() error {
|
|||
},
|
||||
Flags: hcn.HostComputeQueryFlagsNone,
|
||||
}
|
||||
// Initialize Endpoint query used to filter healthy endpoints (vNIC) of Windows pods on L1VH Node
|
||||
dp.endpointQueryAttachedState.query = hcn.HostComputeQuery{
|
||||
SchemaVersion: hcn.SchemaVersion{
|
||||
Major: hcnSchemaMajorVersion,
|
||||
Minor: hcnSchemaMinorVersion,
|
||||
},
|
||||
Flags: hcn.HostComputeQueryFlagsNone,
|
||||
}
|
||||
|
||||
// Filter out any endpoints that are not in "AttachedShared" State. All running Windows pods with networking must be in this state.
|
||||
filterMap := map[string]uint16{"State": hcnEndpointStateAttachedSharing}
|
||||
filter, err := json.Marshal(filterMap)
|
||||
if err != nil {
|
||||
return npmerrors.SimpleErrorWrapper("failed to marshal endpoint filter map", err)
|
||||
return errors.Wrap(err, "failed to marshal endpoint filter map for attachedsharing state")
|
||||
}
|
||||
dp.endpointQuery.query.Filter = string(filter)
|
||||
|
||||
// Filter out any endpoints that are not in "Attached" State. All running Windows pods on L1VH with networking must be in this state.
|
||||
filterMapAttached := map[string]uint16{"State": hcnEndpointStateAttached}
|
||||
filterAttached, err := json.Marshal(filterMapAttached)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to marshal endpoint filter map for attched state")
|
||||
}
|
||||
dp.endpointQueryAttachedState.query.Filter = string(filterAttached)
|
||||
|
||||
// reset endpoint cache so that netpol references are removed for all endpoints while refreshing pod endpoints
|
||||
// no need to lock endpointCache at boot up
|
||||
dp.endpointCache.cache = make(map[string]*npmEndpoint)
|
||||
|
@ -329,14 +346,28 @@ func (dp *DataPlane) getEndpointsToApplyPolicies(netPols []*policies.NPMNetworkP
|
|||
|
||||
func (dp *DataPlane) getLocalPodEndpoints() ([]*hcn.HostComputeEndpoint, error) {
|
||||
klog.Info("getting local endpoints")
|
||||
|
||||
// Gets endpoints in state: Attached
|
||||
timer := metrics.StartNewTimer()
|
||||
endpointsAttached, err := dp.ioShim.Hns.ListEndpointsQuery(dp.endpointQueryAttachedState.query)
|
||||
metrics.RecordListEndpointsLatency(timer)
|
||||
if err != nil {
|
||||
metrics.IncListEndpointsFailures()
|
||||
return nil, errors.Wrap(err, "failed to get local pod endpoints in state:attached")
|
||||
}
|
||||
|
||||
// Gets endpoints in state: AttachedSharing
|
||||
timer = metrics.StartNewTimer()
|
||||
endpoints, err := dp.ioShim.Hns.ListEndpointsQuery(dp.endpointQuery.query)
|
||||
metrics.RecordListEndpointsLatency(timer)
|
||||
if err != nil {
|
||||
metrics.IncListEndpointsFailures()
|
||||
return nil, npmerrors.SimpleErrorWrapper("failed to get local pod endpoints", err)
|
||||
return nil, errors.Wrap(err, "failed to get local pod endpoints in state: attachedSharing")
|
||||
}
|
||||
|
||||
// Get endpoints unique to endpoints and endpointsAttached
|
||||
endpoints = GetUniqueEndpoints(endpoints, endpointsAttached)
|
||||
|
||||
epPointers := make([]*hcn.HostComputeEndpoint, 0, len(endpoints))
|
||||
for k := range endpoints {
|
||||
epPointers = append(epPointers, &endpoints[k])
|
||||
|
@ -344,6 +375,24 @@ func (dp *DataPlane) getLocalPodEndpoints() ([]*hcn.HostComputeEndpoint, error)
|
|||
return epPointers, nil
|
||||
}
|
||||
|
||||
func GetUniqueEndpoints(endpoints, endpointsAttached []hcn.HostComputeEndpoint) []hcn.HostComputeEndpoint {
|
||||
// Store IDs of endpoints list in a map for quick lookup
|
||||
idMap := make(map[string]struct{}, len(endpoints))
|
||||
for i := 0; i < len(endpoints); i++ {
|
||||
ep := endpoints[i]
|
||||
idMap[ep.Id] = struct{}{}
|
||||
}
|
||||
|
||||
// Add endpointsAttached list endpoints in endpoints list if the endpoint is not in the map
|
||||
for i := 0; i < len(endpointsAttached); i++ {
|
||||
ep := endpointsAttached[i]
|
||||
if _, ok := idMap[ep.Id]; !ok {
|
||||
endpoints = append(endpoints, ep)
|
||||
}
|
||||
}
|
||||
return endpoints
|
||||
}
|
||||
|
||||
// refreshPodEndpoints will refresh all the pod endpoints and create empty netpol references for new endpoints
|
||||
/*
|
||||
Key Assumption: a new pod event (w/ IP) cannot come before HNS knows (and can tell us) about the endpoint.
|
||||
|
|
|
@ -10,6 +10,8 @@ import (
|
|||
"github.com/Azure/azure-container-networking/npm/metrics"
|
||||
"github.com/Azure/azure-container-networking/npm/pkg/dataplane/ipsets"
|
||||
dptestutils "github.com/Azure/azure-container-networking/npm/pkg/dataplane/testutils"
|
||||
"github.com/Microsoft/hcsshim/hcn"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
@ -86,6 +88,68 @@ func TestMultiJobApplyInBackground(t *testing.T) {
|
|||
testMultiJobCases(t, multiJobApplyInBackgroundTests(), time.Duration(1*time.Second))
|
||||
}
|
||||
|
||||
func TestRemoveCommonEndpoints(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
endpoints []hcn.HostComputeEndpoint
|
||||
endpointsAttached []hcn.HostComputeEndpoint
|
||||
expected []hcn.HostComputeEndpoint
|
||||
}{
|
||||
{
|
||||
name: "1 value same",
|
||||
endpoints: []hcn.HostComputeEndpoint{{Id: "456901"}, {Id: "123456"}, {Id: "560971"}},
|
||||
endpointsAttached: []hcn.HostComputeEndpoint{{Id: "567890"}, {Id: "123456"}, {Id: "789012"}},
|
||||
expected: []hcn.HostComputeEndpoint{{Id: "456901"}, {Id: "123456"}, {Id: "560971"}, {Id: "567890"}, {Id: "789012"}},
|
||||
},
|
||||
{
|
||||
name: "no values same",
|
||||
endpoints: []hcn.HostComputeEndpoint{{Id: "456901"}, {Id: "560971"}},
|
||||
endpointsAttached: []hcn.HostComputeEndpoint{{Id: "567890"}, {Id: "789012"}},
|
||||
expected: []hcn.HostComputeEndpoint{{Id: "456901"}, {Id: "560971"}, {Id: "567890"}, {Id: "789012"}},
|
||||
},
|
||||
{
|
||||
name: "1 value same",
|
||||
endpoints: []hcn.HostComputeEndpoint{{Id: "456901"}, {Id: "123456"}, {Id: "560971"}},
|
||||
endpointsAttached: []hcn.HostComputeEndpoint{{Id: "567890"}, {Id: "123456"}, {Id: "789012"}},
|
||||
expected: []hcn.HostComputeEndpoint{{Id: "456901"}, {Id: "123456"}, {Id: "560971"}, {Id: "567890"}, {Id: "789012"}},
|
||||
},
|
||||
{
|
||||
name: "two values same",
|
||||
endpoints: []hcn.HostComputeEndpoint{{Id: "456901"}, {Id: "560971"}, {Id: "123456"}, {Id: "789012"}},
|
||||
endpointsAttached: []hcn.HostComputeEndpoint{{Id: "567890"}, {Id: "789012"}, {Id: "123456"}},
|
||||
expected: []hcn.HostComputeEndpoint{{Id: "456901"}, {Id: "560971"}, {Id: "123456"}, {Id: "789012"}, {Id: "567890"}},
|
||||
},
|
||||
{
|
||||
name: "no values",
|
||||
endpoints: []hcn.HostComputeEndpoint{},
|
||||
endpointsAttached: []hcn.HostComputeEndpoint{},
|
||||
expected: []hcn.HostComputeEndpoint{},
|
||||
},
|
||||
{
|
||||
name: "1 value - same",
|
||||
endpoints: []hcn.HostComputeEndpoint{{Id: "456901"}},
|
||||
endpointsAttached: []hcn.HostComputeEndpoint{{Id: "456901"}},
|
||||
expected: []hcn.HostComputeEndpoint{{Id: "456901"}},
|
||||
},
|
||||
{
|
||||
name: "1 value - different",
|
||||
endpoints: []hcn.HostComputeEndpoint{{Id: "456901"}},
|
||||
endpointsAttached: []hcn.HostComputeEndpoint{},
|
||||
expected: []hcn.HostComputeEndpoint{{Id: "456901"}},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
result := GetUniqueEndpoints(tt.endpoints, tt.endpointsAttached)
|
||||
if !cmp.Equal(tt.expected, result) {
|
||||
t.Errorf("Test %s failed: expected %v, got %v", tt.name, tt.expected, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testSerialCases(t *testing.T, tests []*SerialTestCase, finalSleep time.Duration) {
|
||||
for i, tt := range tests {
|
||||
i := i
|
||||
|
|
Загрузка…
Ссылка в новой задаче