From ab33569b24ab310bc58ec5c2c87c18509a500d2a Mon Sep 17 00:00:00 2001 From: Onur Filiz Date: Mon, 9 May 2016 04:31:08 -0700 Subject: [PATCH 1/3] Implemented IPAM plugin and address pools --- ipam/api.go | 21 +++ ipam/config.go | 61 +++++++++ ipam/plugin.go | 188 ++++++++++++++++++++++---- ipam/pool.go | 349 +++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 594 insertions(+), 25 deletions(-) create mode 100644 ipam/config.go create mode 100644 ipam/pool.go diff --git a/ipam/api.go b/ipam/api.go index 7ef187c59..43086f50f 100644 --- a/ipam/api.go +++ b/ipam/api.go @@ -3,6 +3,10 @@ package ipam +import ( + "fmt" +) + const ( // Libnetwork IPAM plugin endpoint type endpointType = "IpamDriver" @@ -16,6 +20,23 @@ const ( releaseAddressPath = "/IpamDriver.ReleaseAddress" ) +var ( + // Error response messages returned by plugin. + errInvalidAddressSpace = fmt.Errorf("Invalid address space") + errInvalidPoolId = fmt.Errorf("Invalid address pool") + errInvalidAddress = fmt.Errorf("Invalid address") + errInvalidScope = fmt.Errorf("Invalid scope") + errInvalidConfiguration = fmt.Errorf("Invalid configuration") + errAddressPoolExists = fmt.Errorf("Address pool already exists") + errAddressPoolNotFound = fmt.Errorf("Address pool not found") + errNoAvailableAddressPools = fmt.Errorf("No available address pools") + errAddressExists = fmt.Errorf("Address already exists") + errAddressNotFound = fmt.Errorf("Address not found") + errAddressInUse = fmt.Errorf("Address already in use") + errAddressNotInUse = fmt.Errorf("Address not in use") + errNoAvailableAddresses = fmt.Errorf("No available addresses") +) + // Request sent by libnetwork when querying plugin capabilities. type getCapabilitiesRequest struct { } diff --git a/ipam/config.go b/ipam/config.go new file mode 100644 index 000000000..338567e16 --- /dev/null +++ b/ipam/config.go @@ -0,0 +1,61 @@ +// Copyright Microsoft Corp. +// All rights reserved. + +package ipam + +import ( + "github.com/Azure/Aqua/log" +) + +// IPAM configuration source. +type configSource interface { + start() error + stop() + refresh() error +} + +type configSink interface { + setAddressSpace(*addressSpace) error +} + +// Starts configuration source. +func (plugin *ipamPlugin) startSource() error { + var err error + + switch plugin.GetOption("source") { + + case "azure", "": + + case "mas": + plugin.source, err = newMasSource(configSink(plugin)) + + case "null": + plugin.source = nil + + default: + return errInvalidConfiguration + } + + if plugin.source != nil { + err = plugin.source.start() + } + + return err +} + +// Stops configuration source. +func (plugin *ipamPlugin) stopSource() { + if plugin.source != nil { + plugin.source.stop() + } +} + +// Signals configuration source to refresh. +func (plugin *ipamPlugin) refreshSource() { + if plugin.source != nil { + err := plugin.source.refresh() + if err != nil { + log.Printf("%s: Source refresh returned err=%v.\n", plugin.Name, err) + } + } +} diff --git a/ipam/plugin.go b/ipam/plugin.go index 03a930d80..1cceb92eb 100644 --- a/ipam/plugin.go +++ b/ipam/plugin.go @@ -11,9 +11,16 @@ import ( "github.com/Azure/Aqua/log" ) +// Plugin capabilities. +const ( + requiresMACAddress = false +) + // IpamPlugin object and interface type ipamPlugin struct { *common.Plugin + addrSpaces map[string]*addressSpace + source configSource sync.Mutex } @@ -22,6 +29,8 @@ type IpamPlugin interface { Stop() SetOption(string, string) + + setAddressSpace(*addressSpace) error } // Creates a new IpamPlugin object. @@ -33,7 +42,8 @@ func NewPlugin(name string, version string) (IpamPlugin, error) { } return &ipamPlugin{ - Plugin: plugin, + Plugin: plugin, + addrSpaces: make(map[string]*addressSpace), }, nil } @@ -54,6 +64,13 @@ func (plugin *ipamPlugin) Start(errChan chan error) error { listener.AddHandler(requestAddressPath, plugin.requestAddress) listener.AddHandler(releaseAddressPath, plugin.releaseAddress) + // Start configuration source. + err = plugin.startSource() + if err != nil { + log.Printf("%s: Failed to start: %v", plugin.Name, err) + return err + } + log.Printf("%s: Plugin started.", plugin.Name) return nil @@ -61,34 +78,66 @@ func (plugin *ipamPlugin) Start(errChan chan error) error { // Stops the plugin. func (plugin *ipamPlugin) Stop() { + plugin.stopSource() plugin.Uninitialize() log.Printf("%s: Plugin stopped.\n", plugin.Name) } +// Sets a new address space for the plugin to serve to clients. +func (plugin *ipamPlugin) setAddressSpace(as *addressSpace) error { + plugin.Lock() + + as1, ok := plugin.addrSpaces[as.id] + if !ok { + plugin.addrSpaces[as.id] = as + plugin.Unlock() + } else { + plugin.Unlock() + as1.merge(as) + } + + return nil +} + +// Parses the given pool ID string and returns the address space and pool objects. +func (plugin *ipamPlugin) parsePoolId(poolId string) (*addressSpace, *addressPool, error) { + apId, err := newAddressPoolIdFromString(poolId) + if err != nil { + return nil, nil, err + } + + plugin.Lock() + as := plugin.addrSpaces[apId.asId] + plugin.Unlock() + + if as == nil { + return nil, nil, errInvalidAddressSpace + } + + var ap *addressPool + if apId.subnet != "" { + ap, err = as.getAddressPool(poolId) + if err != nil { + return nil, nil, err + } + } + + return as, ap, nil +} + // // Libnetwork remote IPAM API implementation // https://github.com/docker/libnetwork/blob/master/docs/ipam.md // +// Handles GetCapabilities requests. func (plugin *ipamPlugin) getCapabilities(w http.ResponseWriter, r *http.Request) { var req getCapabilitiesRequest log.Request(plugin.Name, &req, nil) - resp := getCapabilitiesResponse{} - err := plugin.Listener.Encode(w, &resp) - - log.Response(plugin.Name, &resp, err) -} - -func (plugin *ipamPlugin) getDefaultAddressSpaces(w http.ResponseWriter, r *http.Request) { - var req getDefaultAddressSpacesRequest - - log.Request(plugin.Name, &req, nil) - - resp := getDefaultAddressSpacesResponse{ - LocalDefaultAddressSpace: "", - GlobalDefaultAddressSpace: "", + resp := getCapabilitiesResponse{ + RequiresMACAddress: requiresMACAddress, } err := plugin.Listener.Encode(w, &resp) @@ -96,36 +145,94 @@ func (plugin *ipamPlugin) getDefaultAddressSpaces(w http.ResponseWriter, r *http log.Response(plugin.Name, &resp, err) } +// Handles GetDefaultAddressSpaces requests. +func (plugin *ipamPlugin) getDefaultAddressSpaces(w http.ResponseWriter, r *http.Request) { + var req getDefaultAddressSpacesRequest + var resp getDefaultAddressSpacesResponse + + log.Request(plugin.Name, &req, nil) + + plugin.refreshSource() + + plugin.Lock() + + local := plugin.addrSpaces[localDefaultAddressSpaceId] + if local != nil { + resp.LocalDefaultAddressSpace = local.id + } + + global := plugin.addrSpaces[globalDefaultAddressSpaceId] + if global != nil { + resp.GlobalDefaultAddressSpace = global.id + } + + plugin.Unlock() + + err := plugin.Listener.Encode(w, &resp) + + log.Response(plugin.Name, &resp, err) +} + +// Handles RequestPool requests. func (plugin *ipamPlugin) requestPool(w http.ResponseWriter, r *http.Request) { var req requestPoolRequest + // Decode request. err := plugin.Listener.Decode(w, r, &req) - log.Request(plugin.Name, &req, err) - if err != nil { return } + plugin.refreshSource() + + // Process request. + as, _, err := plugin.parsePoolId(req.AddressSpace) + if err != nil { + plugin.SendErrorResponse(w, err) + return + } + + poolId, err := as.requestPool(req.Pool, req.SubPool, req.Options, req.V6) + if err != nil { + plugin.SendErrorResponse(w, err) + return + } + + // Encode response. data := make(map[string]string) - resp := requestPoolResponse{"", "0.0.0.0/8", data} + resp := requestPoolResponse{PoolID: poolId.String(), Pool: poolId.subnet, Data: data} err = plugin.Listener.Encode(w, &resp) log.Response(plugin.Name, &resp, err) } +// Handles ReleasePool requests. func (plugin *ipamPlugin) releasePool(w http.ResponseWriter, r *http.Request) { var req releasePoolRequest + // Decode request. err := plugin.Listener.Decode(w, r, &req) - log.Request(plugin.Name, &req, err) - if err != nil { return } + // Process request. + as, _, err := plugin.parsePoolId(req.PoolID) + if err != nil { + plugin.SendErrorResponse(w, err) + return + } + + err = as.releasePool(req.PoolID) + if err != nil { + plugin.SendErrorResponse(w, err) + return + } + + // Encode response. resp := releasePoolResponse{} err = plugin.Listener.Encode(w, &resp) @@ -133,35 +240,66 @@ func (plugin *ipamPlugin) releasePool(w http.ResponseWriter, r *http.Request) { log.Response(plugin.Name, &resp, err) } +// Handles RequestAddress requests. func (plugin *ipamPlugin) requestAddress(w http.ResponseWriter, r *http.Request) { var req requestAddressRequest + // Decode request. err := plugin.Listener.Decode(w, r, &req) - log.Request(plugin.Name, &req, err) - if err != nil { return } - resp := requestAddressResponse{"", make(map[string]string)} + plugin.refreshSource() + + // Process request. + _, ap, err := plugin.parsePoolId(req.PoolID) + if err != nil { + plugin.SendErrorResponse(w, err) + return + } + + addr, err := ap.requestAddress(req.Address, req.Options) + if err != nil { + plugin.SendErrorResponse(w, err) + return + } + + // Encode response. + data := make(map[string]string) + resp := requestAddressResponse{Address: addr, Data: data} err = plugin.Listener.Encode(w, &resp) log.Response(plugin.Name, &resp, err) } +// Handles ReleaseAddress requests. func (plugin *ipamPlugin) releaseAddress(w http.ResponseWriter, r *http.Request) { var req releaseAddressRequest + // Decode request. err := plugin.Listener.Decode(w, r, &req) - log.Request(plugin.Name, &req, err) - if err != nil { return } + // Process request. + _, ap, err := plugin.parsePoolId(req.PoolID) + if err != nil { + plugin.SendErrorResponse(w, err) + return + } + + err = ap.releaseAddress(req.Address) + if err != nil { + plugin.SendErrorResponse(w, err) + return + } + + // Encode response. resp := releaseAddressResponse{} err = plugin.Listener.Encode(w, &resp) diff --git a/ipam/pool.go b/ipam/pool.go new file mode 100644 index 000000000..fcb74976a --- /dev/null +++ b/ipam/pool.go @@ -0,0 +1,349 @@ +// Copyright Microsoft Corp. +// All rights reserved. + +package ipam + +import ( + "fmt" + "net" + "strings" + "sync" +) + +const ( + // Default address space IDs. + localDefaultAddressSpaceId = "LocalDefaultAddressSpace" + globalDefaultAddressSpaceId = "GlobalDefaultAddressSpace" + + // Address space scopes. + localScope = "local" + globalScope = "global" +) + +// Represents the key to an address pool. +type addressPoolId struct { + asId string + subnet string + childSubnet string +} + +// Represents a set of non-overlapping address pools. +type addressSpace struct { + id string + scope string + pools map[string]*addressPool + epoch int + sync.Mutex +} + +// Represents a subnet and the set of addresses in it. +type addressPool struct { + id *addressPoolId + as *addressSpace + subnet net.IPNet + addresses map[string]*addressRecord + v6 bool + epoch int + ref int +} + +// Represents an IP address in a pool. +type addressRecord struct { + addr net.IP + inUse bool + epoch int +} + +// +// AddressPoolId +// + +// Creates a new address pool ID object. +func newAddressPoolId(asId string, subnet string, childSubnet string) *addressPoolId { + return &addressPoolId{ + asId: asId, + subnet: subnet, + childSubnet: childSubnet, + } +} + +// Creates a new pool ID from a string representation. +func newAddressPoolIdFromString(s string) (*addressPoolId, error) { + var pid addressPoolId + + p := strings.Split(s, "|") + if len(p) > 3 { + return nil, errInvalidPoolId + } + + pid.asId = p[0] + if len(p) >= 2 { + pid.subnet = p[1] + } + if len(p) == 3 { + pid.childSubnet = p[2] + } + + return &pid, nil +} + +// Returns the string representation of a pool ID. +func (pid *addressPoolId) String() string { + s := fmt.Sprintf("%s|%s", pid.asId, pid.subnet) + if pid.childSubnet != "" { + s = fmt.Sprintf("%s|%s", s, pid.childSubnet) + } + return s +} + +// +// AddressSpace +// + +// Creates a new addressSpace object. +func newAddressSpace(id string, scope string) (*addressSpace, error) { + if scope != localScope && scope != globalScope { + return nil, errInvalidScope + } + + return &addressSpace{ + id: id, + scope: scope, + pools: make(map[string]*addressPool), + }, nil +} + +// Merges a new address space to an existing one. +func (as *addressSpace) merge(newas *addressSpace) { + as.Lock() + defer as.Unlock() + + // The new epoch after the merge. + as.epoch++ + + // Add new pools and addresses. + for pk, pv := range newas.pools { + ap := as.pools[pk] + + if ap == nil { + // This is a new address pool. + // Merge it to the existing address space. + as.pools[pk] = pv + delete(newas.pools, pk) + pv.epoch = as.epoch + } else { + // This pool already exists. + // Compare address records one by one. + for ak, av := range pv.addresses { + ar := ap.addresses[ak] + + if ar == nil { + // This is a new address record. + // Merge it to the existing address pool. + ap.addresses[ak] = av + delete(ap.addresses, ak) + av.epoch = as.epoch + } else { + // This address record already exists. + ar.epoch = as.epoch + } + } + + ap.epoch = as.epoch + } + } + + // Cleanup stale pools and addresses from the old epoch. + for pk, pv := range as.pools { + if pv.epoch < as.epoch { + for ak, av := range pv.addresses { + if !av.inUse { + delete(pv.addresses, ak) + } + } + + if pv.ref == 0 { + delete(as.pools, pk) + } + } + } + + return +} + +// Creates a new addressPool object. +func (as *addressSpace) newAddressPool(subnet *net.IPNet) (*addressPool, error) { + id := newAddressPoolId(as.id, subnet.String(), "") + + as.Lock() + defer as.Unlock() + + pool, ok := as.pools[id.String()] + if ok { + return pool, errAddressPoolExists + } + + pool = &addressPool{ + id: id, + as: as, + subnet: *subnet, + addresses: make(map[string]*addressRecord), + v6: subnet.IP.To16() != nil, + epoch: as.epoch, + } + + as.pools[id.String()] = pool + + return pool, nil +} + +// Returns the address pool with the given pool ID. +func (as *addressSpace) getAddressPool(poolId string) (*addressPool, error) { + as.Lock() + defer as.Unlock() + + ap := as.pools[poolId] + if ap == nil { + return nil, errInvalidPoolId + } + + return ap, nil +} + +// Requests a new address pool from the address space. +func (as *addressSpace) requestPool(pool string, subPool string, options map[string]string, v6 bool) (*addressPoolId, error) { + var ap *addressPool + + as.Lock() + defer as.Unlock() + + if pool != "" { + // Return the specific address pool requested. + ap = as.pools[pool] + if ap == nil { + return nil, errAddressPoolNotFound + } + } else { + // Return any available address pool. + for _, ap = range as.pools { + if ap.v6 == v6 { + break + } + } + + if ap == nil { + return nil, errNoAvailableAddressPools + } + } + + ap.ref++ + + return ap.id, nil +} + +// Releases a previously requested address pool back to its address space. +func (as *addressSpace) releasePool(poolId string) error { + as.Lock() + defer as.Unlock() + + ap, ok := as.pools[poolId] + if !ok { + return errAddressPoolNotFound + } + + ap.ref-- + + // Delete address pool if it is no longer available. + if ap.ref == 0 && ap.epoch < as.epoch { + delete(as.pools, poolId) + } + + return nil +} + +// +// AddressPool +// + +// Creates a new addressRecord object. +func (ap *addressPool) newAddressRecord(addr *net.IP) (*addressRecord, error) { + id := addr.String() + + if !ap.subnet.Contains(*addr) { + return nil, errInvalidAddress + } + + ap.as.Lock() + defer ap.as.Unlock() + + ar, ok := ap.addresses[id] + if ok { + return ar, errAddressExists + } + + ar = &addressRecord{ + addr: *addr, + epoch: ap.epoch, + } + + ap.addresses[id] = ar + + return ar, nil +} + +// Requests a new address from the address pool. +func (ap *addressPool) requestAddress(address string, options map[string]string) (string, error) { + var ar *addressRecord + + ap.as.Lock() + defer ap.as.Unlock() + + if address != "" { + // Return the specific address requested. + ar = ap.addresses[address] + if ar == nil { + return "", errAddressNotFound + } + if ar.inUse { + return "", errAddressInUse + } + } else { + // Return any available address. + for _, ar = range ap.addresses { + if !ar.inUse { + break + } + } + + if ar == nil { + return "", errNoAvailableAddresses + } + } + + ar.inUse = true + + return ar.addr.String(), nil +} + +// Releases a previously requested address back to its address pool. +func (ap *addressPool) releaseAddress(address string) error { + ap.as.Lock() + defer ap.as.Unlock() + + ar := ap.addresses[address] + if ar == nil { + return errAddressNotFound + } + if !ar.inUse { + return errAddressNotInUse + } + + ar.inUse = false + + // Delete address record if it is no longer available. + if ar.epoch < ap.as.epoch { + delete(ap.addresses, address) + } + + return nil +} From 200527c2346cb2352cdb1bd4b18db8c1e8c2ed9e Mon Sep 17 00:00:00 2001 From: Onur Filiz Date: Mon, 9 May 2016 04:36:59 -0700 Subject: [PATCH 2/3] Implemented Microsoft Azure Stack IPAM config --- ipam/mas.go | 114 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 ipam/mas.go diff --git a/ipam/mas.go b/ipam/mas.go new file mode 100644 index 000000000..c00ec8222 --- /dev/null +++ b/ipam/mas.go @@ -0,0 +1,114 @@ +// Copyright Microsoft Corp. +// All rights reserved. + +package ipam + +import ( + "encoding/json" + "net" + "net/http" + "time" +) + +const ( + // Host URL to query. + masQueryUrl = "http://169.254.169.254:6642/ListNetwork" + + // Minimum delay between consecutive polls. + masDefaultMinPollPeriod = 30 * time.Second +) + +// Microsoft Azure Stack IPAM configuration source. +type masSource struct { + name string + sink configSink + lastRefresh time.Time + minPollPeriod time.Duration +} + +// MAS host agent JSON object format. +type jsonObject struct { + Isolation string + IPs []struct { + IP string + IsolationId string + Mask string + DefaultGateways []string + DnsServers []string + } +} + +// Creates the MAS source. +func newMasSource(sink configSink) (*masSource, error) { + return &masSource{ + name: "MAS", + sink: sink, + minPollPeriod: masDefaultMinPollPeriod, + }, nil +} + +// Starts the MAS source. +func (s *masSource) start() error { + return nil +} + +// Stops the MAS source. +func (s *masSource) stop() { + return +} + +// Refreshes configuration. +func (s *masSource) refresh() error { + + // Refresh only if enough time has passed since the last poll. + if time.Since(s.lastRefresh) < s.minPollPeriod { + return nil + } + s.lastRefresh = time.Now() + + // Configure the local default address space. + local, err := newAddressSpace(localDefaultAddressSpaceId, localScope) + if err != nil { + return err + } + + // Fetch configuration. + resp, err := http.Get(masQueryUrl) + if err != nil { + return err + } + + defer resp.Body.Close() + + // Decode JSON object. + var obj jsonObject + decoder := json.NewDecoder(resp.Body) + err = decoder.Decode(&obj) + if err != nil { + return err + } + + // Add the IP addresses to the local address space. + for _, v := range obj.IPs { + address := net.ParseIP(v.IP) + subnet := net.IPNet{ + IP: net.ParseIP(v.IP), + Mask: net.IPMask(net.ParseIP(v.Mask)), + } + + ap, err := local.newAddressPool(&subnet) + if err != nil && err != errAddressExists { + return err + } + + _, err = ap.newAddressRecord(&address) + if err != nil { + return err + } + } + + // Set the local address space as active. + s.sink.setAddressSpace(local) + + return nil +} From 950366417e1fa070019da6815f3d0bc7e2aa4c45 Mon Sep 17 00:00:00 2001 From: Onur Filiz Date: Tue, 10 May 2016 17:47:51 -0700 Subject: [PATCH 3/3] Added null IPAM support --- ipam/config.go | 2 +- ipam/null.go | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 1 deletion(-) create mode 100644 ipam/null.go diff --git a/ipam/config.go b/ipam/config.go index 338567e16..c9e302fc7 100644 --- a/ipam/config.go +++ b/ipam/config.go @@ -30,7 +30,7 @@ func (plugin *ipamPlugin) startSource() error { plugin.source, err = newMasSource(configSink(plugin)) case "null": - plugin.source = nil + plugin.source, err = newNullSource(configSink(plugin)) default: return errInvalidConfiguration diff --git a/ipam/null.go b/ipam/null.go new file mode 100644 index 000000000..59ee2815c --- /dev/null +++ b/ipam/null.go @@ -0,0 +1,58 @@ +// Copyright Microsoft Corp. +// All rights reserved. + +package ipam + +import ( + "net" +) + +// Null IPAM configuration source. +type nullSource struct { + name string + sink configSink + initialized bool +} + +// Creates the null source. +func newNullSource(sink configSink) (*nullSource, error) { + return &nullSource{ + name: "Null", + sink: sink, + }, nil +} + +// Starts the null source. +func (s *nullSource) start() error { + return nil +} + +// Stops the null source. +func (s *nullSource) stop() { + return +} + +// Refreshes configuration. +func (s *nullSource) refresh() error { + + // Configure the local default address space. + local, err := newAddressSpace(localDefaultAddressSpaceId, localScope) + if err != nil { + return err + } + + subnet := net.IPNet{ + IP: net.IPv4(0, 0, 0, 0), + Mask: net.IPv4Mask(0, 0, 0, 0), + } + + _, err = local.newAddressPool(&subnet) + if err != nil { + return err + } + + // Set the local address space as active. + s.sink.setAddressSpace(local) + + return nil +}