feat: support for cilium + nodesubnet (#3073)

* feat: support for cilium + nodesubnet

* fix: make linter happy

* fix: make linter happy

* fix: make linter happy

* test: add test for nodesubnet

* chore: add missing files

* nicer comment

* chore: fix comment typo

* fix: update cns/restserver/nodesubnet.go

Co-authored-by: Timothy J. Raymond <timraymond@users.noreply.github.com>
Signed-off-by: Santhosh  Prabhu  <6684582+santhoshmprabhu@users.noreply.github.com>

* fix: update cns/restserver/restserver.go

Co-authored-by: Timothy J. Raymond <timraymond@users.noreply.github.com>
Signed-off-by: Santhosh  Prabhu  <6684582+santhoshmprabhu@users.noreply.github.com>

* refactor: address comments

* fix: address comments

* chore:comment cleanup

* fix: do not use bash in ip config update

* fix: address comments

* fix: make linter happy

* chore: move pipeline changes out

* test: more elaborate test including checks on IP pool state

* fix: use comments suitable for documentation

Co-authored-by: Timothy J. Raymond <timraymond@users.noreply.github.com>
Signed-off-by: Santhosh  Prabhu  <6684582+santhoshmprabhu@users.noreply.github.com>

* chore: address comments

* chore:make linter happy

* fix: address comments

* chore: typo

* chore: address comments

* fix: update comments

---------

Signed-off-by: Santhosh  Prabhu  <6684582+santhoshmprabhu@users.noreply.github.com>
Co-authored-by: Timothy J. Raymond <timraymond@users.noreply.github.com>
This commit is contained in:
Santhosh Prabhu 2024-11-04 16:24:57 -08:00 коммит произвёл GitHub
Родитель 7f0339a294
Коммит 1c1bbaa924
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
6 изменённых файлов: 391 добавлений и 37 удалений

Просмотреть файл

@ -14,9 +14,10 @@ import (
// NMAgentClientFake can be used to query to VM Host info.
type NMAgentClientFake struct {
SupportedAPIsF func(context.Context) ([]string, error)
GetNCVersionListF func(context.Context) (nmagent.NCVersionList, error)
GetHomeAzF func(context.Context) (nmagent.AzResponse, error)
SupportedAPIsF func(context.Context) ([]string, error)
GetNCVersionListF func(context.Context) (nmagent.NCVersionList, error)
GetHomeAzF func(context.Context) (nmagent.AzResponse, error)
GetInterfaceIPInfoF func(ctx context.Context) (nmagent.Interfaces, error)
}
func (n *NMAgentClientFake) SupportedAPIs(ctx context.Context) ([]string, error) {
@ -30,3 +31,7 @@ func (n *NMAgentClientFake) GetNCVersionList(ctx context.Context) (nmagent.NCVer
func (n *NMAgentClientFake) GetHomeAz(ctx context.Context) (nmagent.AzResponse, error) {
return n.GetHomeAzF(ctx)
}
func (n *NMAgentClientFake) GetInterfaceIPInfo(ctx context.Context) (nmagent.Interfaces, error) {
return n.GetInterfaceIPInfoF(ctx)
}

Просмотреть файл

@ -0,0 +1,89 @@
package restserver
import (
"context"
"net/netip"
"testing"
"github.com/Azure/azure-container-networking/cns"
"github.com/Azure/azure-container-networking/cns/common"
"github.com/Azure/azure-container-networking/cns/fakes"
"github.com/Azure/azure-container-networking/cns/nodesubnet"
acn "github.com/Azure/azure-container-networking/common"
"github.com/Azure/azure-container-networking/nmagent"
"github.com/Azure/azure-container-networking/store"
)
// GetRestServiceObjectForNodeSubnetTest creates a new HTTPRestService object for use in nodesubnet unit tests.
func GetRestServiceObjectForNodeSubnetTest(t *testing.T, generator CNIConflistGenerator) *HTTPRestService {
config := &common.ServiceConfig{
Name: "test",
Version: "1.0",
ChannelMode: "AzureHost",
Store: store.NewMockStore("test"),
}
interfaces := nmagent.Interfaces{
Entries: []nmagent.Interface{
{
MacAddress: nmagent.MACAddress{0x00, 0x0D, 0x3A, 0xF9, 0xDC, 0xA6},
IsPrimary: true,
InterfaceSubnets: []nmagent.InterfaceSubnet{
{
Prefix: "10.0.0.0/24",
IPAddress: []nmagent.NodeIP{
{
Address: nmagent.IPAddress(netip.AddrFrom4([4]byte{10, 0, 0, 4})),
IsPrimary: true,
},
{
Address: nmagent.IPAddress(netip.AddrFrom4([4]byte{10, 0, 0, 52})),
IsPrimary: false,
},
{
Address: nmagent.IPAddress(netip.AddrFrom4([4]byte{10, 0, 0, 63})),
IsPrimary: false,
},
{
Address: nmagent.IPAddress(netip.AddrFrom4([4]byte{10, 0, 0, 45})),
IsPrimary: false,
},
},
},
},
},
},
}
svc, err := cns.NewService(config.Name, config.Version, config.ChannelMode, config.Store)
if err != nil {
return nil
}
svc.SetOption(acn.OptCnsURL, "")
svc.SetOption(acn.OptCnsPort, "")
err = svc.Initialize(config)
if err != nil {
return nil
}
t.Cleanup(func() { svc.Uninitialize() })
return &HTTPRestService{
Service: svc,
cniConflistGenerator: generator,
state: &httpRestServiceState{},
PodIPConfigState: make(map[string]cns.IPConfigurationStatus),
PodIPIDByPodInterfaceKey: make(map[string][]string),
nma: &fakes.NMAgentClientFake{
GetInterfaceIPInfoF: func(_ context.Context) (nmagent.Interfaces, error) {
return interfaces, nil
},
},
wscli: &fakes.WireserverClientFake{},
}
}
// GetNodesubnetIPFetcher gets the nodesubnetIPFetcher from the HTTPRestService.
func (service *HTTPRestService) GetNodesubnetIPFetcher() *nodesubnet.IPFetcher {
return service.nodesubnetIPFetcher
}

Просмотреть файл

@ -0,0 +1,64 @@
package restserver
import (
"context"
"net/netip"
"github.com/Azure/azure-container-networking/cns"
"github.com/Azure/azure-container-networking/cns/logger"
nodesubnet "github.com/Azure/azure-container-networking/cns/nodesubnet"
"github.com/Azure/azure-container-networking/cns/types"
"github.com/pkg/errors"
)
var _ nodesubnet.IPConsumer = &HTTPRestService{}
// UpdateIPsForNodeSubnet updates the IP pool of HTTPRestService with newly fetched secondary IPs
func (service *HTTPRestService) UpdateIPsForNodeSubnet(secondaryIPs []netip.Addr) error {
secondaryIPStrs := make([]string, len(secondaryIPs))
for i, ip := range secondaryIPs {
secondaryIPStrs[i] = ip.String()
}
networkContainerRequest := nodesubnet.CreateNodeSubnetNCRequest(secondaryIPStrs)
code, msg := service.saveNetworkContainerGoalState(*networkContainerRequest)
if code != types.Success {
return errors.Errorf("failed to save fetched ips. code: %d, message %s", code, msg)
}
logger.Debugf("IP change processed successfully")
// saved NC successfully. UpdateIPsForNodeSubnet is called only when IPs are fetched from NMAgent.
// We now have IPs to serve IPAM requests. Generate conflist to indicate CNS is ready
service.MustGenerateCNIConflistOnce()
return nil
}
// InitializeNodeSubnet prepares CNS for serving NodeSubnet requests.
// It sets the orchestrator type to KubernetesCRD, reconciles the initial
// CNS state from the statefile, then creates an IP fetcher.
func (service *HTTPRestService) InitializeNodeSubnet(ctx context.Context, podInfoByIPProvider cns.PodInfoByIPProvider) error {
// set orchestrator type
orchestrator := cns.SetOrchestratorTypeRequest{
OrchestratorType: cns.KubernetesCRD,
}
service.SetNodeOrchestrator(&orchestrator)
if podInfoByIPProvider == nil {
logger.Printf("PodInfoByIPProvider is nil, this usually means no saved endpoint state. Skipping reconciliation")
} else if _, err := nodesubnet.ReconcileInitialCNSState(ctx, service, podInfoByIPProvider); err != nil {
return errors.Wrap(err, "reconcile initial CNS state")
}
// statefile (if any) is reconciled. Initialize the IP fetcher. Start the IP fetcher only after the service is started,
// because starting the IP fetcher will generate conflist, which should be done only once we are ready to respond to IPAM requests.
service.nodesubnetIPFetcher = nodesubnet.NewIPFetcher(service.nma, service, 0, 0, logger.Log)
return nil
}
// StartNodeSubnet starts the IP fetcher for NodeSubnet. This will cause secondary IPs to be fetched periodically.
// After the first successful fetch, conflist will be generated to indicate CNS is ready.
func (service *HTTPRestService) StartNodeSubnet(ctx context.Context) {
service.nodesubnetIPFetcher.Start(ctx)
}

Просмотреть файл

@ -0,0 +1,144 @@
package restserver_test
import (
"context"
"net"
"testing"
"github.com/Azure/azure-container-networking/cns/cnireconciler"
"github.com/Azure/azure-container-networking/cns/logger"
"github.com/Azure/azure-container-networking/cns/restserver"
"github.com/Azure/azure-container-networking/cns/types"
"github.com/Azure/azure-container-networking/store"
)
// getMockStore creates a mock KeyValueStore with some endpoint state
func getMockStore() store.KeyValueStore {
mockStore := store.NewMockStore("")
endpointState := map[string]*restserver.EndpointInfo{
"12e65d89e58cb23c784e97840cf76866bfc9902089bdc8e87e9f64032e312b0b": {
PodName: "coredns-54b69f46b8-ldmwr",
PodNamespace: "kube-system",
IfnameToIPMap: map[string]*restserver.IPInfo{
"eth0": {
IPv4: []net.IPNet{
{
IP: net.IPv4(10, 0, 0, 52),
Mask: net.CIDRMask(24, 32),
},
},
},
},
},
"1fc5176913a3a1a7facfb823dde3b4ded404041134fef4f4a0c8bba140fc0413": {
PodName: "load-test-7f7d49687d-wxc9p",
PodNamespace: "load-test",
IfnameToIPMap: map[string]*restserver.IPInfo{
"eth0": {
IPv4: []net.IPNet{
{
IP: net.IPv4(10, 0, 0, 63),
Mask: net.CIDRMask(24, 32),
},
},
},
},
},
}
err := mockStore.Write(restserver.EndpointStoreKey, endpointState)
if err != nil {
return nil
}
return mockStore
}
// Mock implementation of CNIConflistGenerator
type MockCNIConflistGenerator struct {
GenerateCalled chan struct{}
}
func (m *MockCNIConflistGenerator) Generate() error {
close(m.GenerateCalled)
return nil
}
func (m *MockCNIConflistGenerator) Close() error {
// Implement the Close method logic here if needed
return nil
}
// TestNodeSubnet tests initialization of NodeSubnet with endpoint info, and verfies that
// the conflist is generated after fetching secondary IPs
func TestNodeSubnet(t *testing.T) {
podInfoByIPProvider, err := cnireconciler.NewCNSPodInfoProvider(getMockStore())
if err != nil {
t.Fatalf("NewCNSPodInfoProvider returned an error: %v", err)
}
// create a real HTTPRestService object
mockCNIConflistGenerator := &MockCNIConflistGenerator{
GenerateCalled: make(chan struct{}),
}
service := restserver.GetRestServiceObjectForNodeSubnetTest(t, mockCNIConflistGenerator)
ctx, cancel := testContext(t)
defer cancel()
err = service.InitializeNodeSubnet(ctx, podInfoByIPProvider)
if err != nil {
t.Fatalf("InitializeNodeSubnet returned an error: %v", err)
}
expectedIPs := map[string]types.IPState{
"10.0.0.52": types.Assigned,
"10.0.0.63": types.Assigned,
}
checkIPassignment(t, service, expectedIPs)
service.StartNodeSubnet(ctx)
if service.GetNodesubnetIPFetcher() == nil {
t.Fatal("NodeSubnetIPFetcher is not initialized")
}
select {
case <-ctx.Done():
t.Errorf("test context done - %s", ctx.Err())
return
case <-mockCNIConflistGenerator.GenerateCalled:
break
}
expectedIPs["10.0.0.45"] = types.Available
checkIPassignment(t, service, expectedIPs)
}
// checkIPassignment checks whether the IP assignment state in the HTTPRestService object matches expectation
func checkIPassignment(t *testing.T, service *restserver.HTTPRestService, expectedIPs map[string]types.IPState) {
if len(service.PodIPConfigState) != len(expectedIPs) {
t.Fatalf("expected 2 entries in PodIPConfigState, got %d", len(service.PodIPConfigState))
}
for ip := range service.GetPodIPConfigState() {
config := service.GetPodIPConfigState()[ip]
if assignmentState, exists := expectedIPs[ip]; !exists {
t.Fatalf("unexpected IP %s in PodIPConfigState", ip)
} else if config.GetState() != assignmentState {
t.Fatalf("expected state 'Assigned' for IP %s, got %s", ip, config.GetState())
}
}
}
// testContext creates a context from the provided testing.T that will be
// canceled if the test suite is terminated.
func testContext(t *testing.T) (context.Context, context.CancelFunc) {
if deadline, ok := t.Deadline(); ok {
return context.WithDeadline(context.Background(), deadline)
}
return context.WithCancel(context.Background())
}
func init() {
logger.InitLogger("testlogs", 0, 0, "./")
}

Просмотреть файл

@ -13,6 +13,7 @@ import (
"github.com/Azure/azure-container-networking/cns/dockerclient"
"github.com/Azure/azure-container-networking/cns/logger"
"github.com/Azure/azure-container-networking/cns/networkcontainers"
"github.com/Azure/azure-container-networking/cns/nodesubnet"
"github.com/Azure/azure-container-networking/cns/routes"
"github.com/Azure/azure-container-networking/cns/types"
"github.com/Azure/azure-container-networking/cns/types/bounded"
@ -40,6 +41,7 @@ type nmagentClient interface {
SupportedAPIs(context.Context) ([]string, error)
GetNCVersionList(context.Context) (nma.NCVersionList, error)
GetHomeAz(context.Context) (nma.AzResponse, error)
GetInterfaceIPInfo(ctx context.Context) (nma.Interfaces, error)
}
type wireserverProxy interface {
@ -76,6 +78,7 @@ type HTTPRestService struct {
IPConfigsHandlerMiddleware cns.IPConfigsHandlerMiddleware
PnpIDByMacAddress map[string]string
imdsClient imdsClient
nodesubnetIPFetcher *nodesubnet.IPFetcher
}
type CNIConflistGenerator interface {

Просмотреть файл

@ -736,7 +736,6 @@ func main() {
}
imdsClient := imds.NewClient()
httpRemoteRestService, err := restserver.NewHTTPRestService(&config, wsclient, &wsProxy, nmaClient,
endpointStateStore, conflistGenerator, homeAzMonitor, imdsClient)
if err != nil {
@ -871,6 +870,32 @@ func main() {
}
}
// AzureHost channelmode indicates Nodesubnet. IPs are to be fetched from NMagent.
if config.ChannelMode == cns.AzureHost {
if !cnsconfig.ManageEndpointState {
logger.Errorf("ManageEndpointState must be set to true for AzureHost mode")
return
}
// If cns manageendpointstate is true, then cns maintains its own state and reconciles from it.
// in this case, cns maintains state with containerid as key and so in-memory cache can lookup
// and update based on container id.
cns.GlobalPodInfoScheme = cns.InfraIDPodInfoScheme
var podInfoByIPProvider cns.PodInfoByIPProvider
podInfoByIPProvider, err = getPodInfoByIPProvider(rootCtx, cnsconfig, httpRemoteRestService, nil, "")
if err != nil {
logger.Errorf("[Azure CNS] Failed to get PodInfoByIPProvider: %v", err)
return
}
err = httpRemoteRestService.InitializeNodeSubnet(rootCtx, podInfoByIPProvider)
if err != nil {
logger.Errorf("[Azure CNS] Failed to initialize node subnet: %v", err)
return
}
}
// Initialize multi-tenant controller if the CNS is running in MultiTenantCRD mode.
// It must be started before we start HTTPRemoteRestService.
if config.ChannelMode == cns.MultiTenantCRD {
@ -909,6 +934,7 @@ func main() {
}
// if user provides cns url by -c option, then only start HTTP remote server using this url
logger.Printf("[Azure CNS] Start HTTP Remote server")
if httpRemoteRestService != nil {
if cnsconfig.EnablePprof {
@ -1019,6 +1045,13 @@ func main() {
}(privateEndpoint, infravnet, nodeID)
}
if config.ChannelMode == cns.AzureHost {
// at this point, rest service is running. We can now start serving new requests. So call StartNodeSubnet, which
// will fetch secondary IPs and generate conflist. Do not move this all before rest service start - this will cause
// CNI to start sending requests, and if the service doesn't start successfully, the requests will fail.
httpRemoteRestService.StartNodeSubnet(rootCtx)
}
// mark the service as "ready"
close(readyCh)
// block until process exiting
@ -1249,40 +1282,11 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn
}
}
var podInfoByIPProvider cns.PodInfoByIPProvider
switch {
case cnsconfig.ManageEndpointState:
logger.Printf("Initializing from self managed endpoint store")
podInfoByIPProvider, err = cnireconciler.NewCNSPodInfoProvider(httpRestServiceImplementation.EndpointStateStore) // get reference to endpoint state store from rest server
if err != nil {
if errors.Is(err, store.ErrKeyNotFound) {
logger.Printf("[Azure CNS] No endpoint state found, skipping initializing CNS state")
} else {
return errors.Wrap(err, "failed to create CNS PodInfoProvider")
}
}
case cnsconfig.InitializeFromCNI:
logger.Printf("Initializing from CNI")
podInfoByIPProvider, err = cnireconciler.NewCNIPodInfoProvider()
if err != nil {
return errors.Wrap(err, "failed to create CNI PodInfoProvider")
}
default:
logger.Printf("Initializing from Kubernetes")
podInfoByIPProvider = cns.PodInfoByIPProviderFunc(func() (map[string]cns.PodInfo, error) {
pods, err := clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{ //nolint:govet // ignore err shadow
FieldSelector: "spec.nodeName=" + nodeName,
})
if err != nil {
return nil, errors.Wrap(err, "failed to list Pods for PodInfoProvider")
}
podInfo, err := cns.KubePodsToPodInfoByIP(pods.Items)
if err != nil {
return nil, errors.Wrap(err, "failed to convert Pods to PodInfoByIP")
}
return podInfo, nil
})
podInfoByIPProvider, err := getPodInfoByIPProvider(ctx, cnsconfig, httpRestServiceImplementation, clientset, nodeName)
if err != nil {
return errors.Wrap(err, "failed to initialize ip state")
}
// create scoped kube clients.
directcli, err := client.New(kubeConfig, client.Options{Scheme: nodenetworkconfig.Scheme})
if err != nil {
@ -1505,6 +1509,51 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn
return nil
}
// getPodInfoByIPProvider returns a PodInfoByIPProvider that reads endpoint state from the configured source
func getPodInfoByIPProvider(
ctx context.Context,
cnsconfig *configuration.CNSConfig,
httpRestServiceImplementation *restserver.HTTPRestService,
clientset *kubernetes.Clientset,
nodeName string,
) (podInfoByIPProvider cns.PodInfoByIPProvider, err error) {
switch {
case cnsconfig.ManageEndpointState:
logger.Printf("Initializing from self managed endpoint store")
podInfoByIPProvider, err = cnireconciler.NewCNSPodInfoProvider(httpRestServiceImplementation.EndpointStateStore) // get reference to endpoint state store from rest server
if err != nil {
if errors.Is(err, store.ErrKeyNotFound) {
logger.Printf("[Azure CNS] No endpoint state found, skipping initializing CNS state")
} else {
return podInfoByIPProvider, errors.Wrap(err, "failed to create CNS PodInfoProvider")
}
}
case cnsconfig.InitializeFromCNI:
logger.Printf("Initializing from CNI")
podInfoByIPProvider, err = cnireconciler.NewCNIPodInfoProvider()
if err != nil {
return podInfoByIPProvider, errors.Wrap(err, "failed to create CNI PodInfoProvider")
}
default:
logger.Printf("Initializing from Kubernetes")
podInfoByIPProvider = cns.PodInfoByIPProviderFunc(func() (map[string]cns.PodInfo, error) {
pods, err := clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{ //nolint:govet // ignore err shadow
FieldSelector: "spec.nodeName=" + nodeName,
})
if err != nil {
return nil, errors.Wrap(err, "failed to list Pods for PodInfoProvider")
}
podInfo, err := cns.KubePodsToPodInfoByIP(pods.Items)
if err != nil {
return nil, errors.Wrap(err, "failed to convert Pods to PodInfoByIP")
}
return podInfo, nil
})
}
return podInfoByIPProvider, nil
}
// createOrUpdateNodeInfoCRD polls imds to learn the VM Unique ID and then creates or updates the NodeInfo CRD
// with that vm unique ID
func createOrUpdateNodeInfoCRD(ctx context.Context, restConfig *rest.Config, node *corev1.Node) error {