azure-container-networking/ipam/ipv6Ipam.go

300 строки
7.8 KiB
Go

package ipam
import (
"context"
"errors"
"net"
"os"
"regexp"
"runtime"
"strings"
"github.com/Azure/azure-container-networking/common"
"github.com/Masterminds/semver"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
const (
defaultWindowsKubePath = `c:\k\`
defaultWindowsKubeConfigFilePath = defaultWindowsKubePath + `config`
defaultLinuxKubeConfigFilePath = "/var/lib/kubelet/kubeconfig"
k8sMajorVerForNewPolicyDef = "1"
k8sMinorVerForNewPolicyDef = "16"
// by default, Kubernetes node is allocated a /64 for each node. That's too much state to preserve,
// so instead we take the first /120 of that /64 as usable IP's.
defaultIPv6SubnetMaskSizeLimit = "/120"
lessThan = -1
equalTo = 0
greaterThan = 1
comparisonError = -2
)
// regex to get minor version
var re = regexp.MustCompile("[0-9]+")
type ipv6IpamSource struct {
name string
nodeHostname string
subnetMaskSizeLimit string
kubeConfigPath string
kubeClient kubernetes.Interface
kubeNode *v1.Node
isLoaded bool
sink addressConfigSink
}
// creates a new IPv6 Ipam source
func newIPv6IpamSource(options map[string]interface{}, isLoaded bool) (*ipv6IpamSource, error) {
var kubeConfigPath string
name := options[common.OptEnvironment].(string)
if runtime.GOOS == windows {
kubeConfigPath = defaultWindowsKubeConfigFilePath
} else {
kubeConfigPath = defaultLinuxKubeConfigFilePath
}
nodeName, err := os.Hostname()
if err != nil {
return nil, err
}
return &ipv6IpamSource{
name: name,
subnetMaskSizeLimit: defaultIPv6SubnetMaskSizeLimit,
nodeHostname: strings.ToLower(nodeName),
kubeConfigPath: kubeConfigPath,
isLoaded: isLoaded,
}, nil
}
// Starts the MAS source.
func (source *ipv6IpamSource) start(sink addressConfigSink) error {
source.sink = sink
return nil
}
// Stops the MAS source.
func (source *ipv6IpamSource) stop() {
source.sink = nil
}
// creates a KubernetesClientset using the Kubeconfig stored on each agent node
func (source *ipv6IpamSource) loadKubernetesConfig() (kubernetes.Interface, error) {
config, err := clientcmd.BuildConfigFromFlags("", source.kubeConfigPath)
if err != nil {
logger.Error("Failed to load Kubernetes config from disk", zap.Error(err))
return nil, err
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
logger.Error("Failed to create Kubernetes config", zap.Error(err))
return nil, err
}
minimumVersion := &version.Info{
Major: k8sMajorVerForNewPolicyDef,
Minor: k8sMinorVerForNewPolicyDef,
}
serverVersion, err := client.ServerVersion()
if err != nil {
logger.Error("Failed to retrieve Kubernetes version", zap.Error(err))
return nil, err
}
comparison := CompareK8sVer(serverVersion, minimumVersion)
if comparison == equalTo || comparison == lessThan {
return nil, errors.New("Incompatible Kubernetes version for dual stack")
} else if comparison == comparisonError {
return nil, errors.New("Error comparing Kubernetes API versions")
}
return client, err
}
// CompareK8sVer compares two k8s versions.
// returns -1, 0, 1 if firstVer smaller, equals, bigger than secondVer respectively.
// returns -2 for error.
func CompareK8sVer(firstVer *version.Info, secondVer *version.Info) int {
v1Minor := re.FindAllString(firstVer.Minor, -1)
if len(v1Minor) < 1 {
return comparisonError
}
v1, err := semver.NewVersion(firstVer.Major + "." + v1Minor[0])
if err != nil {
return comparisonError
}
v2Minor := re.FindAllString(secondVer.Minor, -1)
if len(v2Minor) < 1 {
return comparisonError
}
v2, err := semver.NewVersion(secondVer.Major + "." + v2Minor[0])
if err != nil {
return comparisonError
}
return v1.Compare(v2)
}
func (source *ipv6IpamSource) refresh() error {
if source == nil {
return errors.New("ipv6ipam is nil")
}
if source.isLoaded {
logger.Info("ipv6 source already loaded")
return nil
}
if source.kubeClient == nil {
kubeClient, err := source.loadKubernetesConfig()
if err != nil {
logger.Error("Failed to load Kubernetes config", zap.Error(err))
return err
}
source.kubeClient = kubeClient
}
kubeNode, err := source.kubeClient.CoreV1().Nodes().Get(context.TODO(), source.nodeHostname, metav1.GetOptions{})
if err != nil {
logger.Error("Failed to retrieve node using", zap.String("hostName", source.nodeHostname), zap.Error(err))
return err
}
source.kubeNode = kubeNode
logger.Info("Discovered", zap.Any("CIDR's", source.kubeNode.Spec.PodCIDRs))
// Query the list of Kubernetes Pod IPs
interfaceIPs, err := retrieveKubernetesPodIPs(source.kubeNode, source.subnetMaskSizeLimit)
if err != nil {
logger.Error("Failed retrieve Kubernetes IP's from subnet", zap.Error(err))
return err
}
// Configure the local default address space.
local, err := source.sink.newAddressSpace(LocalDefaultAddressSpaceId, LocalScope)
if err != nil {
logger.Error("Failed to configure local default address space", zap.Error(err))
return err
}
for _, i := range interfaceIPs.Interfaces {
for _, s := range i.IPSubnets {
_, subnet, err := net.ParseCIDR(s.Prefix)
ifaceName := ""
priority := 0
ap, err := local.newAddressPool(ifaceName, priority, subnet)
for _, a := range s.IPAddresses {
address := net.ParseIP(a.Address)
_, err = ap.newAddressRecord(&address)
if err != nil {
logger.Error("Failed to create", zap.Any("address", address), zap.Error(err))
continue
}
}
}
}
// Set the local address space as active.
if err = source.sink.setAddressSpace(local); err != nil {
return err
}
source.isLoaded = true
logger.Info("Address space successfully populated from Kubernetes API Server")
return err
}
// retrieves the allocated pod IP's, and populates the NetworkInterfaces struture
func retrieveKubernetesPodIPs(node *v1.Node, subnetMaskBitSize string) (*NetworkInterfaces, error) {
var nodeCidr net.IP
var ipnetv6 *net.IPNet
// get IPv6 subnet allocated to node
for _, cidr := range node.Spec.PodCIDRs {
ipv6cidr, _, _ := net.ParseCIDR(cidr)
if ipv6cidr.To4() == nil {
nodeCidr = ipv6cidr
break
}
}
if nodeCidr == nil {
return nil, errors.New("[ipam] Failed to retrieve subnet, node does an IPv6 subnet allocated from Kubernetes")
}
subnet := nodeCidr.String() + subnetMaskBitSize
nodeCidr, ipnetv6, err := net.ParseCIDR(subnet)
if err != nil {
return nil, err
}
addresses := getIPsFromAddresses(nodeCidr, ipnetv6)
networkSubnet := IPSubnet{
Prefix: subnet,
}
// skip the first address, explicitly save all IP's from the given subnet
for i := 2; i < len(addresses); i++ {
ipaddress := IPAddress{
IsPrimary: false,
Address: addresses[i].String(),
}
networkSubnet.IPAddresses = append(networkSubnet.IPAddresses, ipaddress)
}
return &NetworkInterfaces{
Interfaces: []Interface{
{
IsPrimary: true,
IPSubnets: []IPSubnet{
networkSubnet,
},
},
},
}, nil
}
// retrieves all IP's from a given subnet
func getIPsFromAddresses(ip net.IP, ipnet *net.IPNet) []net.IP {
ips := make([]net.IP, 0)
for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); incrementIP(ip) {
ips = append(ips, duplicateIP(ip))
}
return ips
}
// increment the IP by one
func incrementIP(ip net.IP) {
for j := len(ip) - 1; j >= 0; j-- {
ip[j]++
if ip[j] > 0 {
break
}
}
}
// Create a copy of a net.IP, used to populate an slice of IP's in a subnet
// net.IP is slice of bytes, slices are reference types, when calculating every
// ip in a subnet, need to save current net.IP slice in it's own memory.
func duplicateIP(ip net.IP) net.IP {
dup := make(net.IP, len(ip))
copy(dup, ip)
return dup
}