Merge pull request #29 from Azure/development

Code quality and testing improvements, bug fixes
This commit is contained in:
Onur Filiz 2016-09-27 15:32:59 -07:00 коммит произвёл GitHub
Родитель 765357e712 0cde819327
Коммит f815c8001c
12 изменённых файлов: 171 добавлений и 72 удалений

Просмотреть файл

@ -10,7 +10,7 @@ import (
"github.com/Azure/Aqua/store"
)
// Plugin object and interface
// Plugin base object.
type Plugin struct {
Name string
Version string
@ -20,8 +20,17 @@ type Plugin struct {
Listener *Listener
}
// Plugin base interface.
type PluginApi interface {
Start(*PluginConfig) error
Stop()
SetOption(string, string)
}
// Plugin common configuration.
type PluginConfig struct {
Name string
Version string
NetApi interface{}
ErrChan chan error
Store store.KeyValueStore
@ -41,7 +50,7 @@ func NewPlugin(name, version, endpointType string) (*Plugin, error) {
func (plugin *Plugin) Initialize(config *PluginConfig) error {
var socketName string
if plugin.Name != "test" {
socketName = plugin.Name
socketName = config.Name + plugin.Name
}
// Create the listener.

Просмотреть файл

@ -29,6 +29,8 @@ var (
errInvalidConfiguration = fmt.Errorf("Invalid configuration")
errAddressPoolExists = fmt.Errorf("Address pool already exists")
errAddressPoolNotFound = fmt.Errorf("Address pool not found")
errAddressPoolInUse = fmt.Errorf("Address pool already in use")
errAddressPoolNotInUse = fmt.Errorf("Address pool not in use")
errNoAvailableAddressPools = fmt.Errorf("No available address pools")
errAddressExists = fmt.Errorf("Address already exists")
errAddressNotFound = fmt.Errorf("Address not found")
@ -43,7 +45,8 @@ type getCapabilitiesRequest struct {
// Response sent by plugin when registering its capabilities with libnetwork.
type getCapabilitiesResponse struct {
RequiresMACAddress bool
RequiresMACAddress bool
RequiresRequestReplay bool
}
// Request sent by libnetwork when querying the default address space names.

Просмотреть файл

@ -13,7 +13,7 @@ import (
const (
// Host URL to query.
azureQueryUrl = "http://168.63.129.16/machine/plugins?comp=nmagent&type=getinterfaceinfov1"
azureQueryUrl = "http://169.254.169.254/machine/plugins?comp=nmagent&type=getinterfaceinfov1"
// Minimum delay between consecutive polls.
azureDefaultMinPollPeriod = 30 * time.Second

Просмотреть файл

@ -51,7 +51,7 @@ func newAddressManager() (*addressManager, error) {
// Initialize configures address manager.
func (am *addressManager) Initialize(config *common.PluginConfig, sourceType string) error {
am.store = config.Store
am.netApi = config.NetApi.(network.NetApi)
am.netApi, _ = config.NetApi.(network.NetApi)
// Restore persisted state.
err := am.restore()
@ -72,6 +72,11 @@ func (am *addressManager) Uninitialize() {
// Restore reads address manager state from persistent store.
func (am *addressManager) restore() error {
// Skip if a store is not provided.
if am.store == nil {
return nil
}
// Read any persisted state.
err := am.store.Read(storeKey, am)
if err != nil {
@ -97,6 +102,11 @@ func (am *addressManager) restore() error {
// Save writes address manager state to persistent store.
func (am *addressManager) save() error {
// Skip if a store is not provided.
if am.store == nil {
return nil
}
err := am.store.Write(storeKey, am)
if err == nil {
log.Printf("[ipam] Save succeeded.\n")

Просмотреть файл

@ -10,9 +10,13 @@ import (
"github.com/Azure/Aqua/log"
)
// Plugin capabilities.
const (
requiresMACAddress = false
// Plugin name.
name = "ipam"
// Libnetwork IPAM plugin capabilities.
requiresMACAddress = false
requiresRequestReplay = false
)
// IpamPlugin object and interface
@ -22,16 +26,13 @@ type ipamPlugin struct {
}
type IpamPlugin interface {
Start(*common.PluginConfig) error
Stop()
SetOption(string, string)
common.PluginApi
}
// Creates a new IpamPlugin object.
func NewPlugin(name string, version string) (IpamPlugin, error) {
func NewPlugin(config *common.PluginConfig) (IpamPlugin, error) {
// Setup base plugin.
plugin, err := common.NewPlugin(name, version, endpointType)
plugin, err := common.NewPlugin(name, config.Version, endpointType)
if err != nil {
return nil, err
}
@ -53,14 +54,14 @@ func (plugin *ipamPlugin) Start(config *common.PluginConfig) error {
// Initialize base plugin.
err := plugin.Initialize(config)
if err != nil {
log.Printf("%s: Failed to initialize base plugin: %v", plugin.Name, err)
log.Printf("[ipam] Failed to initialize base plugin, err:%v.", err)
return err
}
// Initialize address manager.
err = plugin.am.Initialize(config, plugin.GetOption("source"))
if err != nil {
log.Printf("%s: Failed to initialize address manager: %v", plugin.Name, err)
log.Printf("[ipam] Failed to initialize address manager, err:%v.", err)
return err
}
@ -73,7 +74,7 @@ func (plugin *ipamPlugin) Start(config *common.PluginConfig) error {
listener.AddHandler(requestAddressPath, plugin.requestAddress)
listener.AddHandler(releaseAddressPath, plugin.releaseAddress)
log.Printf("%s: Plugin started.", plugin.Name)
log.Printf("[ipam] Plugin started.")
return nil
}
@ -82,7 +83,7 @@ func (plugin *ipamPlugin) Start(config *common.PluginConfig) error {
func (plugin *ipamPlugin) Stop() {
plugin.am.Uninitialize()
plugin.Uninitialize()
log.Printf("%s: Plugin stopped.\n", plugin.Name)
log.Printf("[ipam] Plugin stopped.")
}
//
@ -97,7 +98,8 @@ func (plugin *ipamPlugin) getCapabilities(w http.ResponseWriter, r *http.Request
log.Request(plugin.Name, &req, nil)
resp := getCapabilitiesResponse{
RequiresMACAddress: requiresMACAddress,
RequiresMACAddress: requiresMACAddress,
RequiresRequestReplay: requiresRequestReplay,
}
err := plugin.Listener.Encode(w, &resp)

Просмотреть файл

@ -12,10 +12,13 @@ import (
"net/http/httptest"
"os"
"testing"
"github.com/Azure/Aqua/common"
)
var plugin IpamPlugin
var mux *http.ServeMux
var sink addressConfigSink
var local *addressSpace
var global *addressSpace
@ -25,18 +28,22 @@ var address1 string
// Wraps the test run with plugin setup and teardown.
func TestMain(m *testing.M) {
var config common.PluginConfig
var err error
// Create the plugin.
plugin, err = NewPlugin("test", "")
plugin, err = NewPlugin(&config)
if err != nil {
fmt.Printf("Failed to create IPAM plugin %v\n", err)
return
}
plugin.SetOption("source", "")
// Configure test mode and null config source.
plugin.(*ipamPlugin).Name = "test"
plugin.SetOption("source", "null")
err = plugin.Start(nil)
// Start the plugin.
err = plugin.Start(&config)
if err != nil {
fmt.Printf("Failed to start IPAM plugin %v\n", err)
return
@ -45,6 +52,10 @@ func TestMain(m *testing.M) {
// Get the internal http mux as test hook.
mux = plugin.(*ipamPlugin).Listener.GetMux()
// Get the internal config sink interface.
sink = plugin.(*ipamPlugin).am
plugin.(*ipamPlugin).am.source.refresh()
// Run tests.
exitCode := m.Run()
@ -122,10 +133,12 @@ func TestGetCapabilities(t *testing.T) {
func TestAddAddressSpace(t *testing.T) {
fmt.Println("Test: AddAddressSpace")
var anyInterface = "any"
var anyPriority = 42
var err error
// Configure the local default address space.
local, err = newAddressSpace(localDefaultAddressSpaceId, localScope)
local, err = sink.newAddressSpace(localDefaultAddressSpaceId, localScope)
if err != nil {
t.Errorf("newAddressSpace failed %+v", err)
return
@ -137,7 +150,7 @@ func TestAddAddressSpace(t *testing.T) {
IP: net.IPv4(192, 168, 1, 0),
Mask: net.IPv4Mask(255, 255, 255, 0),
}
ap, err := local.newAddressPool(&subnet)
ap, err := local.newAddressPool(anyInterface, anyPriority, &subnet)
ap.newAddressRecord(&addr1)
ap.newAddressRecord(&addr2)
@ -146,19 +159,19 @@ func TestAddAddressSpace(t *testing.T) {
IP: net.IPv4(192, 168, 2, 0),
Mask: net.IPv4Mask(255, 255, 255, 0),
}
ap, err = local.newAddressPool(&subnet)
ap, err = local.newAddressPool(anyInterface, anyPriority, &subnet)
ap.newAddressRecord(&addr1)
plugin.setAddressSpace(local)
sink.setAddressSpace(local)
// Configure the global default address space.
global, err = newAddressSpace(globalDefaultAddressSpaceId, globalScope)
global, err = sink.newAddressSpace(globalDefaultAddressSpaceId, globalScope)
if err != nil {
t.Errorf("newAddressSpace failed %+v", err)
return
}
plugin.setAddressSpace(global)
sink.setAddressSpace(global)
}
// Tests IpamDriver.GetDefaultAddressSpaces functionality.
@ -243,7 +256,8 @@ func TestRequestAddress(t *testing.T) {
t.Errorf("RequestAddress response is invalid %+v", resp)
}
address1 = resp.Address
address, _, _ := net.ParseCIDR(resp.Address)
address1 = address.String()
}
// Tests IpamDriver.ReleaseAddress functionality.

Просмотреть файл

@ -43,8 +43,8 @@ type addressPool struct {
Addresses map[string]*addressRecord
IsIPv6 bool
Priority int
InUse bool
epoch int
ref int
}
// Represents an IP address in a pool.
@ -133,8 +133,10 @@ func (am *addressManager) setAddressSpace(as *addressSpace) error {
}
// Notify NetPlugin of external interfaces.
for _, ap := range as.Pools {
am.netApi.AddExternalInterface(ap.IfName, ap.Subnet.String())
if am.netApi != nil {
for _, ap := range as.Pools {
am.netApi.AddExternalInterface(ap.IfName, ap.Subnet.String())
}
}
am.save()
@ -180,6 +182,7 @@ func (as *addressSpace) merge(newas *addressSpace) {
}
// Cleanup stale pools and addresses from the old epoch.
// Those currently in use will be deleted after they are released.
for pk, pv := range as.Pools {
if pv.epoch < as.epoch {
for ak, av := range pv.Addresses {
@ -188,7 +191,7 @@ func (as *addressSpace) merge(newas *addressSpace) {
}
}
if pv.ref == 0 {
if !pv.InUse {
delete(as.Pools, pk)
}
}
@ -206,7 +209,7 @@ func (as *addressSpace) newAddressPool(ifName string, priority int, subnet *net.
return pool, errAddressPoolExists
}
v6 := (len(subnet.IP) > net.IPv4len)
v6 := (subnet.IP.To4() == nil)
pool = &addressPool{
as: as,
@ -244,11 +247,21 @@ func (as *addressSpace) requestPool(poolId string, subPoolId string, options map
if ap == nil {
return nil, errAddressPoolNotFound
}
// Fail if requested pool is already in use.
if ap.InUse {
return nil, errAddressPoolInUse
}
} else {
// Return any available address pool.
highestPriority := -1
for _, pool := range as.Pools {
// Skip if pool is already in use.
if pool.InUse {
continue
}
// Pick a pool from the same address family.
if pool.IsIPv6 != v6 {
continue
@ -266,7 +279,7 @@ func (as *addressSpace) requestPool(poolId string, subPoolId string, options map
}
}
ap.ref++
ap.InUse = true
return ap, nil
}
@ -278,10 +291,14 @@ func (as *addressSpace) releasePool(poolId string) error {
return errAddressPoolNotFound
}
ap.ref--
if !ap.InUse {
return errAddressPoolNotInUse
}
ap.InUse = false
// Delete address pool if it is no longer available.
if ap.ref == 0 && ap.epoch < as.epoch {
if ap.epoch < as.epoch {
delete(as.Pools, poolId)
}

Просмотреть файл

@ -76,18 +76,18 @@ func (logger *Logger) SetLevel(level int) {
// Logs a structured request.
func (logger *Logger) Request(tag string, request interface{}, err error) {
if err == nil {
logger.Printf("%s: Received %T %+v.", tag, request, request)
logger.Printf("[%s] Received %T %+v.", tag, request, request)
} else {
logger.Printf("%s: Failed to decode %T %+v %s.", tag, request, request, err.Error())
logger.Printf("[%s] Failed to decode %T %+v %s.", tag, request, request, err.Error())
}
}
// Logs a structured response.
func (logger *Logger) Response(tag string, response interface{}, err error) {
if err == nil {
logger.Printf("%s: Sent %T %+v.", tag, response, response)
logger.Printf("[%s] Sent %T %+v.", tag, response, response)
} else {
logger.Printf("%s: Failed to encode %T %+v %s.", tag, response, response, err.Error())
logger.Printf("[%s] Failed to encode %T %+v %s.", tag, response, response, err.Error())
}
}

26
main.go
Просмотреть файл

@ -17,12 +17,13 @@ import (
"github.com/Azure/Aqua/store"
)
// Binary version
const version = "v0.1"
const (
// Plugin name.
name = "azure"
// Libnetwork plugin names
const netPluginName = "aquanet"
const ipamPluginName = "aquaipam"
// Plugin version.
version = "v0.4"
)
// Prints description and usage information.
func printHelp() {
@ -38,20 +39,19 @@ func main() {
// Set defaults.
logTarget := log.TargetStderr
// Log platform information.
common.LogPlatformInfo()
common.LogNetworkInterfaces()
// Initialize plugin common configuration.
config.Name = name
config.Version = version
// Create network plugin.
netPlugin, err = network.NewPlugin(netPluginName, version)
netPlugin, err = network.NewPlugin(&config)
if err != nil {
fmt.Printf("Failed to create network plugin %v\n", err)
return
}
config.NetApi = netPlugin
// Create IPAM plugin.
ipamPlugin, err = ipam.NewPlugin(ipamPluginName, version)
ipamPlugin, err = ipam.NewPlugin(&config)
if err != nil {
fmt.Printf("Failed to create IPAM plugin %v\n", err)
return
@ -109,6 +109,10 @@ func main() {
return
}
// Log platform information.
common.LogPlatformInfo()
common.LogNetworkInterfaces()
// Start plugins.
if netPlugin != nil {
err = netPlugin.Start(&config)

Просмотреть файл

@ -23,6 +23,11 @@ type networkManager struct {
sync.Mutex
}
// NetworkManager API.
type NetApi interface {
AddExternalInterface(ifName string, subnet string) error
}
// Creates a new network manager.
func newNetworkManager() (*networkManager, error) {
nm := &networkManager{
@ -47,6 +52,11 @@ func (nm *networkManager) Uninitialize() {
// Restore reads network manager state from persistent store.
func (nm *networkManager) restore() error {
// Skip if a store is not provided.
if nm.store == nil {
return nil
}
// Read any persisted state.
err := nm.store.Read(storeKey, nm)
if err != nil {
@ -73,6 +83,11 @@ func (nm *networkManager) restore() error {
// Save writes network manager state to persistent store.
func (nm *networkManager) save() error {
// Skip if a store is not provided.
if nm.store == nil {
return nil
}
err := nm.store.Write(storeKey, nm)
if err == nil {
log.Printf("[net] Save succeeded.\n")

Просмотреть файл

@ -10,8 +10,11 @@ import (
"github.com/Azure/Aqua/log"
)
// Plugin capabilities.
const (
// Plugin name.
name = "net"
// Plugin capabilities.
scope = "local"
)
@ -23,18 +26,13 @@ type netPlugin struct {
}
type NetPlugin interface {
Start(*common.PluginConfig) error
Stop()
}
type NetApi interface {
AddExternalInterface(ifName string, subnet string) error
common.PluginApi
}
// Creates a new NetPlugin object.
func NewPlugin(name string, version string) (NetPlugin, error) {
func NewPlugin(config *common.PluginConfig) (NetPlugin, error) {
// Setup base plugin.
plugin, err := common.NewPlugin(name, version, endpointType)
plugin, err := common.NewPlugin(name, config.Version, endpointType)
if err != nil {
return nil, err
}
@ -45,6 +43,8 @@ func NewPlugin(name string, version string) (NetPlugin, error) {
return nil, err
}
config.NetApi = nm
return &netPlugin{
Plugin: plugin,
scope: scope,
@ -91,14 +91,6 @@ func (plugin *netPlugin) Stop() {
log.Printf("[net] Plugin stopped.")
}
//
// NetPlugin internal API
//
func (plugin *netPlugin) AddExternalInterface(ifName string, subnet string) error {
return plugin.nm.AddExternalInterface(ifName, subnet)
}
//
// Libnetwork remote network API implementation
// https://github.com/docker/libnetwork/blob/master/docs/remote.md

Просмотреть файл

@ -7,32 +7,57 @@ import (
"bytes"
"encoding/json"
"fmt"
"net"
"net/http"
"net/http/httptest"
"os"
"testing"
"github.com/Azure/Aqua/common"
"github.com/Azure/Aqua/netlink"
driverApi "github.com/docker/libnetwork/driverapi"
remoteApi "github.com/docker/libnetwork/drivers/remote/api"
)
var plugin NetPlugin
var mux *http.ServeMux
var anyInterface = "test0"
var anySubnet = "192.168.1.0/24"
// Wraps the test run with plugin setup and teardown.
func TestMain(m *testing.M) {
var config common.PluginConfig
var err error
// Create the plugin.
plugin, err = NewPlugin("test", "")
plugin, err = NewPlugin(&config)
if err != nil {
fmt.Printf("Failed to create network plugin %v\n", err)
return
os.Exit(1)
}
err = plugin.Start(nil)
// Configure test mode.
plugin.(*netPlugin).Name = "test"
// Start the plugin.
err = plugin.Start(&config)
if err != nil {
fmt.Printf("Failed to start network plugin %v\n", err)
return
os.Exit(2)
}
// Create a dummy test network interface.
err = netlink.AddLink(anyInterface, "dummy")
if err != nil {
fmt.Printf("Failed to create test network interface, err:%v.\n", err)
os.Exit(3)
}
err = plugin.(*netPlugin).nm.AddExternalInterface(anyInterface, anySubnet)
if err != nil {
fmt.Printf("Failed to add test network interface, err:%v.\n", err)
os.Exit(4)
}
// Get the internal http mux as test hook.
@ -42,6 +67,7 @@ func TestMain(m *testing.M) {
exitCode := m.Run()
// Cleanup.
netlink.DeleteLink(anyInterface)
plugin.Stop()
os.Exit(exitCode)
@ -116,8 +142,15 @@ func TestCreateNetwork(t *testing.T) {
var body bytes.Buffer
var resp remoteApi.CreateNetworkResponse
_, pool, _ := net.ParseCIDR(anySubnet)
info := &remoteApi.CreateNetworkRequest{
NetworkID: "N1",
IPv4Data: []driverApi.IPAMData{
{
Pool: pool,
},
},
}
json.NewEncoder(&body).Encode(info)