azure-container-networking/ipam/manager.go

289 строки
5.5 KiB
Go
Исходник Обычный вид История

2016-09-16 06:27:36 +03:00
// Copyright Microsoft Corp.
// All rights reserved.
package ipam
import (
"sync"
"github.com/Azure/Aqua/common"
"github.com/Azure/Aqua/log"
2016-09-22 01:37:35 +03:00
"github.com/Azure/Aqua/network"
2016-09-16 06:27:36 +03:00
"github.com/Azure/Aqua/store"
)
const (
// IPAM store key.
storeKey = "IPAM"
)
// AddressManager manages the set of address spaces and pools allocated to containers.
type addressManager struct {
AddrSpaces map[string]*addressSpace `json:"AddressSpaces"`
store store.KeyValueStore
source addressConfigSource
2016-09-22 01:37:35 +03:00
netApi network.NetApi
2016-09-16 06:27:36 +03:00
sync.Mutex
}
// AddressConfigSource configures the address pools managed by AddressManager.
type addressConfigSource interface {
start(sink addressConfigSink) error
stop()
refresh() error
}
// AddressConfigSink interface is used by AddressConfigSources to configure address pools.
type addressConfigSink interface {
newAddressSpace(id string, scope string) (*addressSpace, error)
setAddressSpace(*addressSpace) error
}
// Creates a new address manager.
func newAddressManager() (*addressManager, error) {
am := &addressManager{
AddrSpaces: make(map[string]*addressSpace),
}
return am, nil
}
// Initialize configures address manager.
func (am *addressManager) Initialize(config *common.PluginConfig, sourceType string) error {
am.store = config.Store
2016-09-22 01:37:35 +03:00
am.netApi = config.NetApi.(network.NetApi)
2016-09-16 06:27:36 +03:00
// Restore persisted state.
err := am.restore()
if err != nil {
return err
}
// Start source.
err = am.startSource(sourceType)
return err
}
// Uninitialize cleans up address manager.
func (am *addressManager) Uninitialize() {
am.stopSource()
}
// Restore reads address manager state from persistent store.
func (am *addressManager) restore() error {
// Read any persisted state.
err := am.store.Read(storeKey, am)
if err != nil {
if err == store.ErrKeyNotFound {
return nil
} else {
log.Printf("[ipam] Failed to restore state, err:%v\n", err)
return err
}
}
// Populate pointers.
for _, as := range am.AddrSpaces {
for _, ap := range as.Pools {
ap.as = as
}
}
log.Printf("[ipam] Restored state, %+v\n", am)
return nil
}
// Save writes address manager state to persistent store.
func (am *addressManager) save() error {
err := am.store.Write(storeKey, am)
if err == nil {
log.Printf("[ipam] Save succeeded.\n")
} else {
log.Printf("[ipam] Save failed, err:%v\n", err)
}
return err
}
// Starts configuration source.
func (am *addressManager) startSource(sourceType string) error {
var err error
switch sourceType {
case "azure", "":
am.source, err = newAzureSource()
case "mas":
am.source, err = newMasSource()
case "null":
am.source, err = newNullSource()
default:
return errInvalidConfiguration
}
if am.source != nil {
err = am.source.start(am)
}
return err
}
// Stops the configuration source.
func (am *addressManager) stopSource() {
if am.source != nil {
am.source.stop()
am.source = nil
}
}
// Signals configuration source to refresh.
func (am *addressManager) refreshSource() {
if am.source != nil {
err := am.source.refresh()
if err != nil {
log.Printf("[ipam] Source refresh failed, err:%v.\n", err)
}
}
}
2016-09-22 01:37:35 +03:00
//
// AddressManager API
//
// Provides atomic stateful wrappers around core IPAM functionality.
//
2016-09-16 06:27:36 +03:00
// GetDefaultAddressSpaces returns the default local and global address space IDs.
func (am *addressManager) GetDefaultAddressSpaces() (string, string) {
var localId, globalId string
am.Lock()
defer am.Unlock()
am.refreshSource()
local := am.AddrSpaces[localDefaultAddressSpaceId]
if local != nil {
localId = local.Id
}
global := am.AddrSpaces[globalDefaultAddressSpaceId]
if global != nil {
globalId = global.Id
}
return localId, globalId
}
// RequestPool reserves an address pool.
func (am *addressManager) RequestPool(asId, poolId, subPoolId string, options map[string]string, v6 bool) (string, string, error) {
am.Lock()
defer am.Unlock()
am.refreshSource()
as, err := am.getAddressSpace(asId)
if err != nil {
return "", "", err
}
pool, err := as.requestPool(poolId, subPoolId, options, v6)
if err != nil {
return "", "", err
}
err = am.save()
if err != nil {
return "", "", err
}
return pool.Id, pool.Subnet.String(), nil
}
// ReleasePool releases a previously reserved address pool.
func (am *addressManager) ReleasePool(asId string, poolId string) error {
am.Lock()
defer am.Unlock()
am.refreshSource()
as, err := am.getAddressSpace(asId)
if err != nil {
return err
}
err = as.releasePool(poolId)
if err != nil {
return err
}
err = am.save()
if err != nil {
return err
}
return nil
}
// RequestAddress reserves a new address from the address pool.
func (am *addressManager) RequestAddress(asId, poolId, address string, options map[string]string) (string, error) {
am.Lock()
defer am.Unlock()
am.refreshSource()
as, err := am.getAddressSpace(asId)
if err != nil {
return "", err
}
ap, err := as.getAddressPool(poolId)
if err != nil {
return "", err
}
addr, err := ap.requestAddress(address, options)
if err != nil {
return "", err
}
err = am.save()
if err != nil {
return "", err
}
return addr, nil
}
// ReleaseAddress releases a previously reserved address.
func (am *addressManager) ReleaseAddress(asId string, poolId string, address string) error {
am.Lock()
defer am.Unlock()
am.refreshSource()
as, err := am.getAddressSpace(asId)
if err != nil {
return err
}
ap, err := as.getAddressPool(poolId)
if err != nil {
return err
}
err = ap.releaseAddress(address)
if err != nil {
return err
}
err = am.save()
if err != nil {
return err
}
return nil
}