azure-container-networking/ipam/manager.go

405 строки
8.8 KiB
Go

// Copyright 2017 Microsoft. All rights reserved.
// MIT License
package ipam
import (
"sync"
"time"
"github.com/Azure/azure-container-networking/common"
"github.com/Azure/azure-container-networking/platform"
"github.com/Azure/azure-container-networking/store"
"go.uber.org/zap"
)
const (
// IPAM store key.
storeKey = "IPAM"
)
// AddressManager manages the set of address spaces and pools allocated to containers.
type addressManager struct {
Version string
TimeStamp time.Time
AddrSpaces map[string]*addressSpace `json:"AddressSpaces"`
store store.KeyValueStore
source addressConfigSource
netApi common.NetApi
sync.Mutex
}
// AddressManager API.
type AddressManager interface {
Initialize(config *common.PluginConfig, rehydrateIpamInfoOnReboot bool, options map[string]interface{}) error
Uninitialize()
StartSource(options map[string]interface{}) error
StopSource()
GetDefaultAddressSpaces() (string, string)
RequestPool(asId, poolId, subPoolId string, options map[string]string, v6 bool) (string, string, error)
ReleasePool(asId, poolId string) error
GetPoolInfo(asId, poolId string) (*AddressPoolInfo, error)
RequestAddress(asId, poolId, address string, options map[string]string) (string, error)
ReleaseAddress(asId, poolId, address string, options map[string]string) error
}
// AddressConfigSource configures the address pools managed by AddressManager.
type addressConfigSource interface {
start(sink addressConfigSink) error
stop()
refresh() error
}
// AddressConfigSink interface is used by AddressConfigSources to configure address pools.
type addressConfigSink interface {
newAddressSpace(id string, scope int) (*addressSpace, error)
setAddressSpace(*addressSpace) error
}
// Creates a new address manager.
func NewAddressManager() (AddressManager, error) {
am := &addressManager{
AddrSpaces: make(map[string]*addressSpace),
}
return am, nil
}
// Initialize configures address manager.
func (am *addressManager) Initialize(config *common.PluginConfig, rehydrateIpamInfoOnReboot bool, options map[string]interface{}) error {
am.Version = config.Version
am.store = config.Store
am.netApi = config.NetApi
// Restore persisted state.
err := am.restore(rehydrateIpamInfoOnReboot)
if err != nil {
return err
}
// Start source.
err = am.StartSource(options)
return err
}
// Uninitialize cleans up address manager.
func (am *addressManager) Uninitialize() {
am.StopSource()
}
// Restore reads address manager state from persistent store.
func (am *addressManager) restore(rehydrateIpamInfoOnReboot bool) error {
// Skip if a store is not provided.
if am.store == nil {
logger.Info("ipam store is nil")
return nil
}
// Read any persisted state.
err := am.store.Read(storeKey, am)
if err != nil {
if err == store.ErrKeyNotFound {
logger.Info("store key not found")
return nil
} else if err == store.ErrStoreEmpty {
logger.Info("store empty")
return nil
} else {
logger.Error("Failed to restore state", zap.Error(err))
return err
}
}
// Populate pointers.
for _, as := range am.AddrSpaces {
for _, ap := range as.Pools {
ap.as = as
ap.addrsByID = make(map[string]*addressRecord)
for _, ar := range ap.Addresses {
if ar.ID != "" {
ap.addrsByID[ar.ID] = ar
}
}
}
}
// if rebooted mark the ip as not in use.
if rehydrateIpamInfoOnReboot {
// Check if the VM is rebooted.
modTime, err := am.store.GetModificationTime()
if err == nil {
p := platform.NewExecClient(nil)
rebootTime, err := p.GetLastRebootTime()
logger.Info("reboot store mod", zap.Any("rebootTime", rebootTime), zap.Any("modTime", modTime))
if err == nil && rebootTime.After(modTime) {
logger.Info("Rehydrating ipam state from persistent store")
for _, as := range am.AddrSpaces {
for _, ap := range as.Pools {
ap.as = as
ap.RefCount = 0
for _, ar := range ap.Addresses {
ar.InUse = false
}
}
}
}
}
}
logger.Info("Restored state", zap.Any("am", am))
return nil
}
// Save writes address manager state to persistent store.
func (am *addressManager) save() error {
// Skip if a store is not provided.
if am.store == nil {
logger.Info("ipam store is nil")
return nil
}
// Update time stamp.
am.TimeStamp = time.Now()
logger.Info("saving ipam state")
err := am.store.Write(storeKey, am)
if err == nil {
logger.Info("Save succeeded")
} else {
logger.Error("Save failed", zap.Error(err))
}
return err
}
// Starts configuration source.
func (am *addressManager) StartSource(options map[string]interface{}) error {
var err error
var isLoaded bool
environment, _ := options[common.OptEnvironment].(string)
if am.AddrSpaces != nil && len(am.AddrSpaces) > 0 &&
am.AddrSpaces[LocalDefaultAddressSpaceId] != nil &&
len(am.AddrSpaces[LocalDefaultAddressSpaceId].Pools) > 0 {
isLoaded = true
}
switch environment {
case common.OptEnvironmentAzure:
am.source, err = newAzureSource(options)
case common.OptEnvironmentMAS:
am.source, err = newFileIpamSource(options)
case common.OptEnvironmentFileIpam:
am.source, err = newFileIpamSource(options)
case common.OptEnvironmentIPv6NodeIpam:
am.source, err = newIPv6IpamSource(options, isLoaded)
case "null":
am.source, err = newNullSource()
case "":
am.source = nil
default:
return errInvalidConfiguration
}
if am.source != nil {
logger.Info("Starting source", zap.String("environment", environment))
err = am.source.start(am)
}
if err != nil {
logger.Error("Failed to start source", zap.String("environment", environment), zap.Error(err))
}
return err
}
// Stops the configuration source.
func (am *addressManager) StopSource() {
if am.source != nil {
am.source.stop()
am.source = nil
}
}
// Signals configuration source to refresh.
func (am *addressManager) refreshSource() {
if am.source != nil {
logger.Info("Refreshing address source.")
err := am.source.refresh()
if err != nil {
logger.Error("Source refresh failed", zap.Error(err))
}
}
}
//
// AddressManager API
//
// Provides atomic stateful wrappers around core IPAM functionality.
//
// GetDefaultAddressSpaces returns the default local and global address space IDs.
func (am *addressManager) GetDefaultAddressSpaces() (string, string) {
var localId, globalId string
am.Lock()
defer am.Unlock()
am.refreshSource()
local := am.AddrSpaces[LocalDefaultAddressSpaceId]
if local != nil {
localId = local.Id
}
global := am.AddrSpaces[GlobalDefaultAddressSpaceId]
if global != nil {
globalId = global.Id
}
return localId, globalId
}
// RequestPool reserves an address pool.
func (am *addressManager) RequestPool(asId, poolId, subPoolId string, options map[string]string, v6 bool) (string, string, error) {
am.Lock()
defer am.Unlock()
am.refreshSource()
as, err := am.getAddressSpace(asId)
if err != nil {
return "", "", err
}
pool, err := as.requestPool(poolId, subPoolId, options, v6)
if err != nil {
return "", "", err
}
err = am.save()
if err != nil {
return "", "", err
}
return pool.Id, pool.Subnet.String(), nil
}
// ReleasePool releases a previously reserved address pool.
func (am *addressManager) ReleasePool(asId string, poolId string) error {
am.Lock()
defer am.Unlock()
am.refreshSource()
as, err := am.getAddressSpace(asId)
if err != nil {
return err
}
err = as.releasePool(poolId)
if err != nil {
return err
}
err = am.save()
if err != nil {
return err
}
return nil
}
// GetPoolInfo returns information about the given address pool.
func (am *addressManager) GetPoolInfo(asId string, poolId string) (*AddressPoolInfo, error) {
am.Lock()
defer am.Unlock()
as, err := am.getAddressSpace(asId)
if err != nil {
return nil, err
}
ap, err := as.getAddressPool(poolId)
if err != nil {
return nil, err
}
return ap.getInfo(), nil
}
// RequestAddress reserves a new address from the address pool.
func (am *addressManager) RequestAddress(asId, poolId, address string, options map[string]string) (string, error) {
am.Lock()
defer am.Unlock()
am.refreshSource()
as, err := am.getAddressSpace(asId)
if err != nil {
return "", err
}
ap, err := as.getAddressPool(poolId)
if err != nil {
return "", err
}
addr, err := ap.requestAddress(address, options)
if err != nil {
return "", err
}
err = am.save()
if err != nil {
ap.releaseAddress(addr, options)
return "", err
}
return addr, nil
}
// ReleaseAddress releases a previously reserved address.
func (am *addressManager) ReleaseAddress(asId string, poolId string, address string, options map[string]string) error {
am.Lock()
defer am.Unlock()
am.refreshSource()
as, err := am.getAddressSpace(asId)
if err != nil {
return err
}
ap, err := as.getAddressPool(poolId)
if err != nil {
return err
}
err = ap.releaseAddress(address, options)
if err != nil {
return err
}
err = am.save()
if err != nil {
return err
}
return nil
}