Merge pull request #10 from Azure/development

Implemented IPAM plugin for multiCA integration
This commit is contained in:
Onur Filiz 2016-05-10 17:58:42 -07:00
Родитель ee668e9873 950366417e
Коммит 8bfba82f6f
6 изменённых файлов: 766 добавлений и 25 удалений

Просмотреть файл

@ -3,6 +3,10 @@
package ipam
import (
"fmt"
)
const (
// Libnetwork IPAM plugin endpoint type
endpointType = "IpamDriver"
@ -16,6 +20,23 @@ const (
releaseAddressPath = "/IpamDriver.ReleaseAddress"
)
var (
// Error response messages returned by plugin.
errInvalidAddressSpace = fmt.Errorf("Invalid address space")
errInvalidPoolId = fmt.Errorf("Invalid address pool")
errInvalidAddress = fmt.Errorf("Invalid address")
errInvalidScope = fmt.Errorf("Invalid scope")
errInvalidConfiguration = fmt.Errorf("Invalid configuration")
errAddressPoolExists = fmt.Errorf("Address pool already exists")
errAddressPoolNotFound = fmt.Errorf("Address pool not found")
errNoAvailableAddressPools = fmt.Errorf("No available address pools")
errAddressExists = fmt.Errorf("Address already exists")
errAddressNotFound = fmt.Errorf("Address not found")
errAddressInUse = fmt.Errorf("Address already in use")
errAddressNotInUse = fmt.Errorf("Address not in use")
errNoAvailableAddresses = fmt.Errorf("No available addresses")
)
// Request sent by libnetwork when querying plugin capabilities.
type getCapabilitiesRequest struct {
}

61
ipam/config.go Normal file
Просмотреть файл

@ -0,0 +1,61 @@
// Copyright Microsoft Corp.
// All rights reserved.
package ipam
import (
"github.com/Azure/Aqua/log"
)
// IPAM configuration source.
type configSource interface {
start() error
stop()
refresh() error
}
type configSink interface {
setAddressSpace(*addressSpace) error
}
// Starts configuration source.
func (plugin *ipamPlugin) startSource() error {
var err error
switch plugin.GetOption("source") {
case "azure", "":
case "mas":
plugin.source, err = newMasSource(configSink(plugin))
case "null":
plugin.source, err = newNullSource(configSink(plugin))
default:
return errInvalidConfiguration
}
if plugin.source != nil {
err = plugin.source.start()
}
return err
}
// Stops configuration source.
func (plugin *ipamPlugin) stopSource() {
if plugin.source != nil {
plugin.source.stop()
}
}
// Signals configuration source to refresh.
func (plugin *ipamPlugin) refreshSource() {
if plugin.source != nil {
err := plugin.source.refresh()
if err != nil {
log.Printf("%s: Source refresh returned err=%v.\n", plugin.Name, err)
}
}
}

114
ipam/mas.go Normal file
Просмотреть файл

@ -0,0 +1,114 @@
// Copyright Microsoft Corp.
// All rights reserved.
package ipam
import (
"encoding/json"
"net"
"net/http"
"time"
)
const (
// Host URL to query.
masQueryUrl = "http://169.254.169.254:6642/ListNetwork"
// Minimum delay between consecutive polls.
masDefaultMinPollPeriod = 30 * time.Second
)
// Microsoft Azure Stack IPAM configuration source.
type masSource struct {
name string
sink configSink
lastRefresh time.Time
minPollPeriod time.Duration
}
// MAS host agent JSON object format.
type jsonObject struct {
Isolation string
IPs []struct {
IP string
IsolationId string
Mask string
DefaultGateways []string
DnsServers []string
}
}
// Creates the MAS source.
func newMasSource(sink configSink) (*masSource, error) {
return &masSource{
name: "MAS",
sink: sink,
minPollPeriod: masDefaultMinPollPeriod,
}, nil
}
// Starts the MAS source.
func (s *masSource) start() error {
return nil
}
// Stops the MAS source.
func (s *masSource) stop() {
return
}
// Refreshes configuration.
func (s *masSource) refresh() error {
// Refresh only if enough time has passed since the last poll.
if time.Since(s.lastRefresh) < s.minPollPeriod {
return nil
}
s.lastRefresh = time.Now()
// Configure the local default address space.
local, err := newAddressSpace(localDefaultAddressSpaceId, localScope)
if err != nil {
return err
}
// Fetch configuration.
resp, err := http.Get(masQueryUrl)
if err != nil {
return err
}
defer resp.Body.Close()
// Decode JSON object.
var obj jsonObject
decoder := json.NewDecoder(resp.Body)
err = decoder.Decode(&obj)
if err != nil {
return err
}
// Add the IP addresses to the local address space.
for _, v := range obj.IPs {
address := net.ParseIP(v.IP)
subnet := net.IPNet{
IP: net.ParseIP(v.IP),
Mask: net.IPMask(net.ParseIP(v.Mask)),
}
ap, err := local.newAddressPool(&subnet)
if err != nil && err != errAddressExists {
return err
}
_, err = ap.newAddressRecord(&address)
if err != nil {
return err
}
}
// Set the local address space as active.
s.sink.setAddressSpace(local)
return nil
}

58
ipam/null.go Normal file
Просмотреть файл

@ -0,0 +1,58 @@
// Copyright Microsoft Corp.
// All rights reserved.
package ipam
import (
"net"
)
// Null IPAM configuration source.
type nullSource struct {
name string
sink configSink
initialized bool
}
// Creates the null source.
func newNullSource(sink configSink) (*nullSource, error) {
return &nullSource{
name: "Null",
sink: sink,
}, nil
}
// Starts the null source.
func (s *nullSource) start() error {
return nil
}
// Stops the null source.
func (s *nullSource) stop() {
return
}
// Refreshes configuration.
func (s *nullSource) refresh() error {
// Configure the local default address space.
local, err := newAddressSpace(localDefaultAddressSpaceId, localScope)
if err != nil {
return err
}
subnet := net.IPNet{
IP: net.IPv4(0, 0, 0, 0),
Mask: net.IPv4Mask(0, 0, 0, 0),
}
_, err = local.newAddressPool(&subnet)
if err != nil {
return err
}
// Set the local address space as active.
s.sink.setAddressSpace(local)
return nil
}

Просмотреть файл

@ -11,9 +11,16 @@ import (
"github.com/Azure/Aqua/log"
)
// Plugin capabilities.
const (
requiresMACAddress = false
)
// IpamPlugin object and interface
type ipamPlugin struct {
*common.Plugin
addrSpaces map[string]*addressSpace
source configSource
sync.Mutex
}
@ -22,6 +29,8 @@ type IpamPlugin interface {
Stop()
SetOption(string, string)
setAddressSpace(*addressSpace) error
}
// Creates a new IpamPlugin object.
@ -33,7 +42,8 @@ func NewPlugin(name string, version string) (IpamPlugin, error) {
}
return &ipamPlugin{
Plugin: plugin,
Plugin: plugin,
addrSpaces: make(map[string]*addressSpace),
}, nil
}
@ -54,6 +64,13 @@ func (plugin *ipamPlugin) Start(errChan chan error) error {
listener.AddHandler(requestAddressPath, plugin.requestAddress)
listener.AddHandler(releaseAddressPath, plugin.releaseAddress)
// Start configuration source.
err = plugin.startSource()
if err != nil {
log.Printf("%s: Failed to start: %v", plugin.Name, err)
return err
}
log.Printf("%s: Plugin started.", plugin.Name)
return nil
@ -61,34 +78,66 @@ func (plugin *ipamPlugin) Start(errChan chan error) error {
// Stops the plugin.
func (plugin *ipamPlugin) Stop() {
plugin.stopSource()
plugin.Uninitialize()
log.Printf("%s: Plugin stopped.\n", plugin.Name)
}
// Sets a new address space for the plugin to serve to clients.
func (plugin *ipamPlugin) setAddressSpace(as *addressSpace) error {
plugin.Lock()
as1, ok := plugin.addrSpaces[as.id]
if !ok {
plugin.addrSpaces[as.id] = as
plugin.Unlock()
} else {
plugin.Unlock()
as1.merge(as)
}
return nil
}
// Parses the given pool ID string and returns the address space and pool objects.
func (plugin *ipamPlugin) parsePoolId(poolId string) (*addressSpace, *addressPool, error) {
apId, err := newAddressPoolIdFromString(poolId)
if err != nil {
return nil, nil, err
}
plugin.Lock()
as := plugin.addrSpaces[apId.asId]
plugin.Unlock()
if as == nil {
return nil, nil, errInvalidAddressSpace
}
var ap *addressPool
if apId.subnet != "" {
ap, err = as.getAddressPool(poolId)
if err != nil {
return nil, nil, err
}
}
return as, ap, nil
}
//
// Libnetwork remote IPAM API implementation
// https://github.com/docker/libnetwork/blob/master/docs/ipam.md
//
// Handles GetCapabilities requests.
func (plugin *ipamPlugin) getCapabilities(w http.ResponseWriter, r *http.Request) {
var req getCapabilitiesRequest
log.Request(plugin.Name, &req, nil)
resp := getCapabilitiesResponse{}
err := plugin.Listener.Encode(w, &resp)
log.Response(plugin.Name, &resp, err)
}
func (plugin *ipamPlugin) getDefaultAddressSpaces(w http.ResponseWriter, r *http.Request) {
var req getDefaultAddressSpacesRequest
log.Request(plugin.Name, &req, nil)
resp := getDefaultAddressSpacesResponse{
LocalDefaultAddressSpace: "",
GlobalDefaultAddressSpace: "",
resp := getCapabilitiesResponse{
RequiresMACAddress: requiresMACAddress,
}
err := plugin.Listener.Encode(w, &resp)
@ -96,36 +145,94 @@ func (plugin *ipamPlugin) getDefaultAddressSpaces(w http.ResponseWriter, r *http
log.Response(plugin.Name, &resp, err)
}
// Handles GetDefaultAddressSpaces requests.
func (plugin *ipamPlugin) getDefaultAddressSpaces(w http.ResponseWriter, r *http.Request) {
var req getDefaultAddressSpacesRequest
var resp getDefaultAddressSpacesResponse
log.Request(plugin.Name, &req, nil)
plugin.refreshSource()
plugin.Lock()
local := plugin.addrSpaces[localDefaultAddressSpaceId]
if local != nil {
resp.LocalDefaultAddressSpace = local.id
}
global := plugin.addrSpaces[globalDefaultAddressSpaceId]
if global != nil {
resp.GlobalDefaultAddressSpace = global.id
}
plugin.Unlock()
err := plugin.Listener.Encode(w, &resp)
log.Response(plugin.Name, &resp, err)
}
// Handles RequestPool requests.
func (plugin *ipamPlugin) requestPool(w http.ResponseWriter, r *http.Request) {
var req requestPoolRequest
// Decode request.
err := plugin.Listener.Decode(w, r, &req)
log.Request(plugin.Name, &req, err)
if err != nil {
return
}
plugin.refreshSource()
// Process request.
as, _, err := plugin.parsePoolId(req.AddressSpace)
if err != nil {
plugin.SendErrorResponse(w, err)
return
}
poolId, err := as.requestPool(req.Pool, req.SubPool, req.Options, req.V6)
if err != nil {
plugin.SendErrorResponse(w, err)
return
}
// Encode response.
data := make(map[string]string)
resp := requestPoolResponse{"", "0.0.0.0/8", data}
resp := requestPoolResponse{PoolID: poolId.String(), Pool: poolId.subnet, Data: data}
err = plugin.Listener.Encode(w, &resp)
log.Response(plugin.Name, &resp, err)
}
// Handles ReleasePool requests.
func (plugin *ipamPlugin) releasePool(w http.ResponseWriter, r *http.Request) {
var req releasePoolRequest
// Decode request.
err := plugin.Listener.Decode(w, r, &req)
log.Request(plugin.Name, &req, err)
if err != nil {
return
}
// Process request.
as, _, err := plugin.parsePoolId(req.PoolID)
if err != nil {
plugin.SendErrorResponse(w, err)
return
}
err = as.releasePool(req.PoolID)
if err != nil {
plugin.SendErrorResponse(w, err)
return
}
// Encode response.
resp := releasePoolResponse{}
err = plugin.Listener.Encode(w, &resp)
@ -133,35 +240,66 @@ func (plugin *ipamPlugin) releasePool(w http.ResponseWriter, r *http.Request) {
log.Response(plugin.Name, &resp, err)
}
// Handles RequestAddress requests.
func (plugin *ipamPlugin) requestAddress(w http.ResponseWriter, r *http.Request) {
var req requestAddressRequest
// Decode request.
err := plugin.Listener.Decode(w, r, &req)
log.Request(plugin.Name, &req, err)
if err != nil {
return
}
resp := requestAddressResponse{"", make(map[string]string)}
plugin.refreshSource()
// Process request.
_, ap, err := plugin.parsePoolId(req.PoolID)
if err != nil {
plugin.SendErrorResponse(w, err)
return
}
addr, err := ap.requestAddress(req.Address, req.Options)
if err != nil {
plugin.SendErrorResponse(w, err)
return
}
// Encode response.
data := make(map[string]string)
resp := requestAddressResponse{Address: addr, Data: data}
err = plugin.Listener.Encode(w, &resp)
log.Response(plugin.Name, &resp, err)
}
// Handles ReleaseAddress requests.
func (plugin *ipamPlugin) releaseAddress(w http.ResponseWriter, r *http.Request) {
var req releaseAddressRequest
// Decode request.
err := plugin.Listener.Decode(w, r, &req)
log.Request(plugin.Name, &req, err)
if err != nil {
return
}
// Process request.
_, ap, err := plugin.parsePoolId(req.PoolID)
if err != nil {
plugin.SendErrorResponse(w, err)
return
}
err = ap.releaseAddress(req.Address)
if err != nil {
plugin.SendErrorResponse(w, err)
return
}
// Encode response.
resp := releaseAddressResponse{}
err = plugin.Listener.Encode(w, &resp)

349
ipam/pool.go Normal file
Просмотреть файл

@ -0,0 +1,349 @@
// Copyright Microsoft Corp.
// All rights reserved.
package ipam
import (
"fmt"
"net"
"strings"
"sync"
)
const (
// Default address space IDs.
localDefaultAddressSpaceId = "LocalDefaultAddressSpace"
globalDefaultAddressSpaceId = "GlobalDefaultAddressSpace"
// Address space scopes.
localScope = "local"
globalScope = "global"
)
// Represents the key to an address pool.
type addressPoolId struct {
asId string
subnet string
childSubnet string
}
// Represents a set of non-overlapping address pools.
type addressSpace struct {
id string
scope string
pools map[string]*addressPool
epoch int
sync.Mutex
}
// Represents a subnet and the set of addresses in it.
type addressPool struct {
id *addressPoolId
as *addressSpace
subnet net.IPNet
addresses map[string]*addressRecord
v6 bool
epoch int
ref int
}
// Represents an IP address in a pool.
type addressRecord struct {
addr net.IP
inUse bool
epoch int
}
//
// AddressPoolId
//
// Creates a new address pool ID object.
func newAddressPoolId(asId string, subnet string, childSubnet string) *addressPoolId {
return &addressPoolId{
asId: asId,
subnet: subnet,
childSubnet: childSubnet,
}
}
// Creates a new pool ID from a string representation.
func newAddressPoolIdFromString(s string) (*addressPoolId, error) {
var pid addressPoolId
p := strings.Split(s, "|")
if len(p) > 3 {
return nil, errInvalidPoolId
}
pid.asId = p[0]
if len(p) >= 2 {
pid.subnet = p[1]
}
if len(p) == 3 {
pid.childSubnet = p[2]
}
return &pid, nil
}
// Returns the string representation of a pool ID.
func (pid *addressPoolId) String() string {
s := fmt.Sprintf("%s|%s", pid.asId, pid.subnet)
if pid.childSubnet != "" {
s = fmt.Sprintf("%s|%s", s, pid.childSubnet)
}
return s
}
//
// AddressSpace
//
// Creates a new addressSpace object.
func newAddressSpace(id string, scope string) (*addressSpace, error) {
if scope != localScope && scope != globalScope {
return nil, errInvalidScope
}
return &addressSpace{
id: id,
scope: scope,
pools: make(map[string]*addressPool),
}, nil
}
// Merges a new address space to an existing one.
func (as *addressSpace) merge(newas *addressSpace) {
as.Lock()
defer as.Unlock()
// The new epoch after the merge.
as.epoch++
// Add new pools and addresses.
for pk, pv := range newas.pools {
ap := as.pools[pk]
if ap == nil {
// This is a new address pool.
// Merge it to the existing address space.
as.pools[pk] = pv
delete(newas.pools, pk)
pv.epoch = as.epoch
} else {
// This pool already exists.
// Compare address records one by one.
for ak, av := range pv.addresses {
ar := ap.addresses[ak]
if ar == nil {
// This is a new address record.
// Merge it to the existing address pool.
ap.addresses[ak] = av
delete(ap.addresses, ak)
av.epoch = as.epoch
} else {
// This address record already exists.
ar.epoch = as.epoch
}
}
ap.epoch = as.epoch
}
}
// Cleanup stale pools and addresses from the old epoch.
for pk, pv := range as.pools {
if pv.epoch < as.epoch {
for ak, av := range pv.addresses {
if !av.inUse {
delete(pv.addresses, ak)
}
}
if pv.ref == 0 {
delete(as.pools, pk)
}
}
}
return
}
// Creates a new addressPool object.
func (as *addressSpace) newAddressPool(subnet *net.IPNet) (*addressPool, error) {
id := newAddressPoolId(as.id, subnet.String(), "")
as.Lock()
defer as.Unlock()
pool, ok := as.pools[id.String()]
if ok {
return pool, errAddressPoolExists
}
pool = &addressPool{
id: id,
as: as,
subnet: *subnet,
addresses: make(map[string]*addressRecord),
v6: subnet.IP.To16() != nil,
epoch: as.epoch,
}
as.pools[id.String()] = pool
return pool, nil
}
// Returns the address pool with the given pool ID.
func (as *addressSpace) getAddressPool(poolId string) (*addressPool, error) {
as.Lock()
defer as.Unlock()
ap := as.pools[poolId]
if ap == nil {
return nil, errInvalidPoolId
}
return ap, nil
}
// Requests a new address pool from the address space.
func (as *addressSpace) requestPool(pool string, subPool string, options map[string]string, v6 bool) (*addressPoolId, error) {
var ap *addressPool
as.Lock()
defer as.Unlock()
if pool != "" {
// Return the specific address pool requested.
ap = as.pools[pool]
if ap == nil {
return nil, errAddressPoolNotFound
}
} else {
// Return any available address pool.
for _, ap = range as.pools {
if ap.v6 == v6 {
break
}
}
if ap == nil {
return nil, errNoAvailableAddressPools
}
}
ap.ref++
return ap.id, nil
}
// Releases a previously requested address pool back to its address space.
func (as *addressSpace) releasePool(poolId string) error {
as.Lock()
defer as.Unlock()
ap, ok := as.pools[poolId]
if !ok {
return errAddressPoolNotFound
}
ap.ref--
// Delete address pool if it is no longer available.
if ap.ref == 0 && ap.epoch < as.epoch {
delete(as.pools, poolId)
}
return nil
}
//
// AddressPool
//
// Creates a new addressRecord object.
func (ap *addressPool) newAddressRecord(addr *net.IP) (*addressRecord, error) {
id := addr.String()
if !ap.subnet.Contains(*addr) {
return nil, errInvalidAddress
}
ap.as.Lock()
defer ap.as.Unlock()
ar, ok := ap.addresses[id]
if ok {
return ar, errAddressExists
}
ar = &addressRecord{
addr: *addr,
epoch: ap.epoch,
}
ap.addresses[id] = ar
return ar, nil
}
// Requests a new address from the address pool.
func (ap *addressPool) requestAddress(address string, options map[string]string) (string, error) {
var ar *addressRecord
ap.as.Lock()
defer ap.as.Unlock()
if address != "" {
// Return the specific address requested.
ar = ap.addresses[address]
if ar == nil {
return "", errAddressNotFound
}
if ar.inUse {
return "", errAddressInUse
}
} else {
// Return any available address.
for _, ar = range ap.addresses {
if !ar.inUse {
break
}
}
if ar == nil {
return "", errNoAvailableAddresses
}
}
ar.inUse = true
return ar.addr.String(), nil
}
// Releases a previously requested address back to its address pool.
func (ap *addressPool) releaseAddress(address string) error {
ap.as.Lock()
defer ap.as.Unlock()
ar := ap.addresses[address]
if ar == nil {
return errAddressNotFound
}
if !ar.inUse {
return errAddressNotInUse
}
ar.inUse = false
// Delete address record if it is no longer available.
if ar.epoch < ap.as.epoch {
delete(ap.addresses, address)
}
return nil
}