Use Lockedfile api to acquire lock (#1070)
* added lockedfileapi support for CNI * fixed interface changes * addressed comments fixed ut * addressed comments * fixed copy to buffer part in writer api * fixed copy to buffer part in writer api * keeping old code not changing it.
This commit is contained in:
Родитель
077e11d192
Коммит
cdab7d0241
|
@ -7,6 +7,7 @@ import (
|
|||
|
||||
"github.com/Azure/azure-container-networking/common"
|
||||
"github.com/Azure/azure-container-networking/log"
|
||||
"github.com/Azure/azure-container-networking/processlock"
|
||||
"github.com/Azure/azure-container-networking/store"
|
||||
"github.com/microsoft/ApplicationInsights-Go/appinsights"
|
||||
)
|
||||
|
@ -101,17 +102,31 @@ func getMetadata(th *telemetryHandle) {
|
|||
th.metadata = metadata
|
||||
th.rwmutex.Unlock()
|
||||
|
||||
lockclient, err := processlock.NewFileLock(metadataFile + store.LockExtension)
|
||||
if err != nil {
|
||||
log.Printf("Error initializing file lock:%v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Save metadata retrieved from wireserver to a file
|
||||
kvs, err := store.NewJsonFileStore(metadataFile)
|
||||
kvs, err := store.NewJsonFileStore(metadataFile, lockclient)
|
||||
if err != nil {
|
||||
debugLog("[AppInsights] Error initializing kvs store: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
kvs.Lock(true)
|
||||
err = common.SaveHostMetadata(th.metadata, metadataFile)
|
||||
kvs.Unlock(true)
|
||||
err = kvs.Lock(store.DefaultLockTimeout)
|
||||
if err != nil {
|
||||
log.Errorf("getMetadata: Not able to acquire lock:%v", err)
|
||||
return
|
||||
}
|
||||
metadataErr := common.SaveHostMetadata(th.metadata, metadataFile)
|
||||
err = kvs.Unlock()
|
||||
if err != nil {
|
||||
log.Errorf("getMetadata: Not able to release lock:%v", err)
|
||||
}
|
||||
|
||||
if metadataErr != nil {
|
||||
debugLog("[AppInsights] saving host metadata failed with :%v", err)
|
||||
}
|
||||
}
|
||||
|
@ -284,6 +299,7 @@ func (th *telemetryHandle) TrackMetric(metric Metric) {
|
|||
aimetric.Properties[versionStr] = th.appVersion
|
||||
aimetric.Properties[resourceGroupStr] = th.metadata.ResourceGroupName
|
||||
aimetric.Properties[vmIDStr] = metadata.VMID
|
||||
aimetric.Properties[osStr] = runtime.GOOS
|
||||
aimetric.Tags.Session().SetId(metadata.VMID)
|
||||
}
|
||||
|
||||
|
|
|
@ -43,20 +43,12 @@ func main() {
|
|||
|
||||
if err := ipamPlugin.Plugin.InitializeKeyValueStore(&config); err != nil {
|
||||
fmt.Printf("Failed to initialize key-value store of ipam plugin, err:%v.\n", err)
|
||||
|
||||
if isSafe, _ := ipamPlugin.Plugin.IsSafeToRemoveLock(ipamPlugin.Plugin.Name); isSafe {
|
||||
log.Printf("[IPAM] Removing lock file as process holding lock exited")
|
||||
if errUninit := ipamPlugin.Plugin.UninitializeKeyValueStore(true); errUninit != nil {
|
||||
log.Errorf("Failed to uninitialize key-value store of network plugin, err:%v.\n", errUninit)
|
||||
}
|
||||
}
|
||||
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if errUninit := ipamPlugin.Plugin.UninitializeKeyValueStore(false); errUninit != nil {
|
||||
fmt.Printf("Failed to uninitialize key-value store of ipam plugin, err:%v.\n", err)
|
||||
if errUninit := ipamPlugin.Plugin.UninitializeKeyValueStore(); errUninit != nil {
|
||||
fmt.Printf("Failed to uninitialize key-value store of ipam plugin, err:%v.\n", errUninit)
|
||||
}
|
||||
|
||||
if recover() != nil {
|
||||
|
|
|
@ -43,20 +43,12 @@ func main() {
|
|||
|
||||
if err := ipamPlugin.Plugin.InitializeKeyValueStore(&config); err != nil {
|
||||
fmt.Printf("Failed to initialize key-value store of ipam plugin, err:%v.\n", err)
|
||||
|
||||
if isSafe, _ := ipamPlugin.Plugin.IsSafeToRemoveLock(ipamPlugin.Plugin.Name); isSafe {
|
||||
log.Printf("[IPAM] Removing lock file as process holding lock exited")
|
||||
if errUninit := ipamPlugin.Plugin.UninitializeKeyValueStore(true); errUninit != nil {
|
||||
log.Errorf("Failed to uninitialize key-value store of network plugin, err:%v.\n", errUninit)
|
||||
}
|
||||
}
|
||||
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if errUninit := ipamPlugin.Plugin.UninitializeKeyValueStore(false); errUninit != nil {
|
||||
fmt.Printf("Failed to uninitialize key-value store of ipam plugin, err:%v.\n", err)
|
||||
if errUninit := ipamPlugin.Plugin.UninitializeKeyValueStore(); errUninit != nil {
|
||||
fmt.Printf("Failed to uninitialize key-value store of ipam plugin, err:%v.\n", errUninit)
|
||||
}
|
||||
|
||||
if recover() != nil {
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-container-networking/aitelemetry"
|
||||
"github.com/Azure/azure-container-networking/cni"
|
||||
"github.com/Azure/azure-container-networking/cni/network"
|
||||
"github.com/Azure/azure-container-networking/common"
|
||||
|
@ -18,8 +19,10 @@ import (
|
|||
acnnetwork "github.com/Azure/azure-container-networking/network"
|
||||
"github.com/Azure/azure-container-networking/nns"
|
||||
"github.com/Azure/azure-container-networking/platform"
|
||||
"github.com/Azure/azure-container-networking/store"
|
||||
"github.com/Azure/azure-container-networking/telemetry"
|
||||
"github.com/containernetworking/cni/pkg/skel"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -139,14 +142,13 @@ func main() {
|
|||
|
||||
var (
|
||||
config common.PluginConfig
|
||||
err error
|
||||
logDirectory string // This sets empty string i.e. current location
|
||||
tb *telemetry.TelemetryBuffer
|
||||
)
|
||||
|
||||
log.SetName(name)
|
||||
log.SetLevel(log.LevelInfo)
|
||||
if err = log.SetTargetLogDirectory(log.TargetLogfile, logDirectory); err != nil {
|
||||
if err := log.SetTargetLogDirectory(log.TargetLogfile, logDirectory); err != nil {
|
||||
fmt.Printf("Failed to setup cni logging: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
@ -187,7 +189,8 @@ func main() {
|
|||
|
||||
cniReport.GetReport(pluginName, version, ipamQueryURL)
|
||||
|
||||
upTime, err := platform.GetLastRebootTime()
|
||||
var upTime time.Time
|
||||
upTime, err = platform.GetLastRebootTime()
|
||||
if err == nil {
|
||||
cniReport.VMUptime = upTime.Format("2006-01-02 15:04:05")
|
||||
}
|
||||
|
@ -195,24 +198,33 @@ func main() {
|
|||
// CNI Acquires lock
|
||||
if err = netPlugin.Plugin.InitializeKeyValueStore(&config); err != nil {
|
||||
log.Errorf("Failed to initialize key-value store of network plugin, err:%v.\n", err)
|
||||
tb := telemetry.NewTelemetryBuffer()
|
||||
if tberr := tb.Connect(); tberr == nil {
|
||||
reportPluginError(reportManager, tb, err)
|
||||
tb.Close()
|
||||
tb = telemetry.NewTelemetryBuffer()
|
||||
if tberr := tb.Connect(); tberr != nil {
|
||||
log.Errorf("Cannot connect to telemetry service:%v", tberr)
|
||||
return
|
||||
}
|
||||
|
||||
if isSafe, _ := netPlugin.Plugin.IsSafeToRemoveLock(name); isSafe {
|
||||
log.Printf("[CNI] Removing lock file as process holding lock exited")
|
||||
if errUninit := netPlugin.Plugin.UninitializeKeyValueStore(true); errUninit != nil {
|
||||
log.Errorf("Failed to uninitialize key-value store of network plugin, err:%v.\n", errUninit)
|
||||
reportPluginError(reportManager, tb, err)
|
||||
|
||||
if errors.Is(err, store.ErrTimeoutLockingStore) {
|
||||
var cniMetric telemetry.AIMetric
|
||||
cniMetric.Metric = aitelemetry.Metric{
|
||||
Name: telemetry.CNILockTimeoutStr,
|
||||
Value: 1.0,
|
||||
CustomDimensions: make(map[string]string),
|
||||
}
|
||||
err = telemetry.SendCNIMetric(&cniMetric, tb)
|
||||
if err != nil {
|
||||
log.Errorf("Couldn't send cnilocktimeout metric: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
tb.Close()
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if errUninit := netPlugin.Plugin.UninitializeKeyValueStore(false); errUninit != nil {
|
||||
if errUninit := netPlugin.Plugin.UninitializeKeyValueStore(); errUninit != nil {
|
||||
log.Errorf("Failed to uninitialize key-value store of network plugin, err:%v.\n", errUninit)
|
||||
}
|
||||
|
||||
|
@ -270,7 +282,7 @@ func main() {
|
|||
netPlugin.Stop()
|
||||
|
||||
// release cni lock
|
||||
if errUninit := netPlugin.Plugin.UninitializeKeyValueStore(false); errUninit != nil {
|
||||
if errUninit := netPlugin.Plugin.UninitializeKeyValueStore(); errUninit != nil {
|
||||
log.Errorf("Failed to uninitialize key-value store of network plugin, err:%v.\n", errUninit)
|
||||
}
|
||||
|
||||
|
|
|
@ -6,15 +6,14 @@ package cni
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"runtime"
|
||||
|
||||
"github.com/Azure/azure-container-networking/common"
|
||||
"github.com/Azure/azure-container-networking/log"
|
||||
"github.com/Azure/azure-container-networking/platform"
|
||||
"github.com/Azure/azure-container-networking/processlock"
|
||||
"github.com/Azure/azure-container-networking/store"
|
||||
|
||||
cniInvoke "github.com/containernetworking/cni/pkg/invoke"
|
||||
cniSkel "github.com/containernetworking/cni/pkg/skel"
|
||||
cniTypes "github.com/containernetworking/cni/pkg/types"
|
||||
|
@ -155,8 +154,13 @@ func (plugin *Plugin) Errorf(format string, args ...interface{}) *cniTypes.Error
|
|||
func (plugin *Plugin) InitializeKeyValueStore(config *common.PluginConfig) error {
|
||||
// Create the key value store.
|
||||
if plugin.Store == nil {
|
||||
var err error
|
||||
plugin.Store, err = store.NewJsonFileStore(platform.CNIRuntimePath + plugin.Name + ".json")
|
||||
lockclient, err := processlock.NewFileLock(platform.CNILockPath + plugin.Name + store.LockExtension)
|
||||
if err != nil {
|
||||
log.Printf("[cni] Error initializing file lock:%v", err)
|
||||
return errors.Wrap(err, "error creating new filelock")
|
||||
}
|
||||
|
||||
plugin.Store, err = store.NewJsonFileStore(platform.CNIRuntimePath+plugin.Name+".json", lockclient)
|
||||
if err != nil {
|
||||
log.Printf("[cni] Failed to create store: %v.", err)
|
||||
return err
|
||||
|
@ -164,7 +168,7 @@ func (plugin *Plugin) InitializeKeyValueStore(config *common.PluginConfig) error
|
|||
}
|
||||
|
||||
// Acquire store lock.
|
||||
if err := plugin.Store.Lock(true); err != nil {
|
||||
if err := plugin.Store.Lock(store.DefaultLockTimeout); err != nil {
|
||||
log.Printf("[cni] Failed to lock store: %v.", err)
|
||||
return err
|
||||
}
|
||||
|
@ -175,9 +179,9 @@ func (plugin *Plugin) InitializeKeyValueStore(config *common.PluginConfig) error
|
|||
}
|
||||
|
||||
// Uninitialize key-value store
|
||||
func (plugin *Plugin) UninitializeKeyValueStore(force bool) error {
|
||||
func (plugin *Plugin) UninitializeKeyValueStore() error {
|
||||
if plugin.Store != nil {
|
||||
err := plugin.Store.Unlock(force)
|
||||
err := plugin.Store.Unlock()
|
||||
if err != nil {
|
||||
log.Printf("[cni] Failed to unlock store: %v.", err)
|
||||
return err
|
||||
|
@ -187,67 +191,3 @@ func (plugin *Plugin) UninitializeKeyValueStore(force bool) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
// check if safe to remove lockfile
|
||||
func (plugin *Plugin) IsSafeToRemoveLock(processName string) (bool, error) {
|
||||
if plugin != nil && plugin.Store != nil {
|
||||
// check if get process command supported
|
||||
if cmdErr := platform.GetProcessSupport(); cmdErr != nil {
|
||||
log.Errorf("Get process cmd not supported. Error %v", cmdErr)
|
||||
return false, cmdErr
|
||||
}
|
||||
|
||||
lockFileName := plugin.Store.GetLockFileName()
|
||||
// Read pid from lockfile
|
||||
lockFilePid, err := plugin.readLockFile(lockFileName)
|
||||
if err != nil {
|
||||
return false, errors.Wrap(err, "IsSafeToRemoveLock lockfile read failed")
|
||||
}
|
||||
|
||||
log.Printf("Read from lockfile:%s", lockFilePid)
|
||||
// Get the process name if running and
|
||||
// check if that matches with our expected process
|
||||
// if it returns non-nil error then process is not running
|
||||
pName, err := platform.GetProcessNameByID(lockFilePid)
|
||||
if err != nil {
|
||||
var content string
|
||||
content, err = plugin.readLockFile(lockFileName)
|
||||
if err != nil {
|
||||
return false, errors.Wrap(err, "IsSafeToRemoveLock lockfile 2nd read failed")
|
||||
}
|
||||
|
||||
// pid in lockfile changed after getprocessnamebyid call. so some other process acquired lockfile in between.
|
||||
// so its not safe to remove lockfile
|
||||
if string(content) != lockFilePid {
|
||||
log.Printf("Lockfile content changed from %s to %s. So not safe to remove lockfile", lockFilePid, content)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
log.Printf("[CNI] Process name is %s", pName)
|
||||
|
||||
if pName != processName {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
log.Errorf("Plugin store is nil")
|
||||
return false, fmt.Errorf("plugin store nil")
|
||||
}
|
||||
|
||||
func (plugin *Plugin) readLockFile(filename string) (string, error) {
|
||||
content, err := ioutil.ReadFile(filename)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to read lockfile :%v", err)
|
||||
return "", fmt.Errorf("readLockFile error:%w", err)
|
||||
}
|
||||
|
||||
if len(content) == 0 {
|
||||
log.Errorf("Num bytes read from lockfile is 0")
|
||||
return "", errEmptyContent
|
||||
}
|
||||
|
||||
return string(content), nil
|
||||
}
|
||||
|
|
|
@ -1,69 +0,0 @@
|
|||
package cni
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/Azure/azure-container-networking/common"
|
||||
"github.com/Azure/azure-container-networking/store"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
// Run tests.
|
||||
exitCode := m.Run()
|
||||
os.Exit(exitCode)
|
||||
}
|
||||
|
||||
func TestPluginSafeToRemoveLock(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
plugin Plugin
|
||||
processName string
|
||||
wantIsSafe bool
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "Safe to remove lock-true. Process name does not match",
|
||||
plugin: Plugin{
|
||||
Plugin: &common.Plugin{
|
||||
Name: "cni",
|
||||
Version: "0.3.0",
|
||||
Store: store.NewMockStore("testfiles/processfound.lock"),
|
||||
},
|
||||
version: "0.3.0",
|
||||
},
|
||||
processName: "azure-vnet",
|
||||
wantIsSafe: true,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "Safe to remove lock-true. Process not running",
|
||||
plugin: Plugin{
|
||||
Plugin: &common.Plugin{
|
||||
Name: "cni",
|
||||
Version: "0.3.0",
|
||||
Store: store.NewMockStore("testfiles/processnotfound.lock"),
|
||||
},
|
||||
version: "0.3.0",
|
||||
},
|
||||
processName: "azure-vnet",
|
||||
wantIsSafe: true,
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
isSafe, err := tt.plugin.IsSafeToRemoveLock(tt.processName)
|
||||
if tt.wantErr {
|
||||
require.Error(t, err)
|
||||
require.Equal(t, tt.wantIsSafe, isSafe)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tt.wantIsSafe, isSafe)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -14,6 +14,7 @@ import (
|
|||
"github.com/Azure/azure-container-networking/common"
|
||||
"github.com/Azure/azure-container-networking/log"
|
||||
"github.com/Azure/azure-container-networking/platform"
|
||||
"github.com/Azure/azure-container-networking/processlock"
|
||||
"github.com/Azure/azure-container-networking/store"
|
||||
)
|
||||
|
||||
|
@ -158,9 +159,15 @@ func main() {
|
|||
return
|
||||
}
|
||||
|
||||
lockclient, err := processlock.NewFileLock(platform.CNILockPath + name + store.LockExtension)
|
||||
if err != nil {
|
||||
log.Printf("Error initializing file lock:%v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Create the key value store.
|
||||
storeFileName := storeFileLocation + name + ".json"
|
||||
config.Store, err = store.NewJsonFileStore(storeFileName)
|
||||
config.Store, err = store.NewJsonFileStore(storeFileName, lockclient)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to create store file: %s, due to error %v\n", storeFileName, err)
|
||||
return
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
"github.com/Azure/azure-container-networking/netlink"
|
||||
"github.com/Azure/azure-container-networking/network"
|
||||
"github.com/Azure/azure-container-networking/platform"
|
||||
"github.com/Azure/azure-container-networking/processlock"
|
||||
"github.com/Azure/azure-container-networking/store"
|
||||
"github.com/Azure/azure-container-networking/telemetry"
|
||||
)
|
||||
|
@ -141,8 +142,15 @@ func main() {
|
|||
tb.ConnectToTelemetryService(telemetryNumRetries, telemetryWaitTimeInMilliseconds)
|
||||
defer tb.Close()
|
||||
|
||||
var lockclient processlock.Interface
|
||||
for {
|
||||
config.Store, err = store.NewJsonFileStore(platform.CNIRuntimePath + pluginName + ".json")
|
||||
lockclient, err = processlock.NewFileLock(platform.CNILockPath + pluginName + store.LockExtension)
|
||||
if err != nil {
|
||||
log.Printf("Error initializing file lock:%v", err)
|
||||
return
|
||||
}
|
||||
|
||||
config.Store, err = store.NewJsonFileStore(platform.CNIRuntimePath+pluginName+".json", lockclient)
|
||||
if err != nil {
|
||||
fmt.Printf("[monitor] Failed to create store: %v\n", err)
|
||||
return
|
||||
|
|
|
@ -17,8 +17,6 @@ import (
|
|||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/Azure/azure-container-networking/store"
|
||||
|
||||
"github.com/Azure/azure-container-networking/cns"
|
||||
"github.com/Azure/azure-container-networking/cns/common"
|
||||
"github.com/Azure/azure-container-networking/cns/fakes"
|
||||
|
@ -26,6 +24,8 @@ import (
|
|||
"github.com/Azure/azure-container-networking/cns/nmagent"
|
||||
"github.com/Azure/azure-container-networking/cns/types"
|
||||
acncommon "github.com/Azure/azure-container-networking/common"
|
||||
"github.com/Azure/azure-container-networking/processlock"
|
||||
"github.com/Azure/azure-container-networking/store"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -911,7 +911,7 @@ func startService() error {
|
|||
// Create the service.
|
||||
config := common.ServiceConfig{}
|
||||
// Create the key value store.
|
||||
if config.Store, err = store.NewJsonFileStore(cnsJsonFileName); err != nil {
|
||||
if config.Store, err = store.NewJsonFileStore(cnsJsonFileName, processlock.NewMockFileLock(false)); err != nil {
|
||||
logger.Errorf("Failed to create store file: %s, due to error %v\n", cnsJsonFileName, err)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@ import (
|
|||
"github.com/Azure/azure-container-networking/cns"
|
||||
cnscli "github.com/Azure/azure-container-networking/cns/cmd/cli"
|
||||
"github.com/Azure/azure-container-networking/cns/cnireconciler"
|
||||
cni "github.com/Azure/azure-container-networking/cns/cnireconciler"
|
||||
"github.com/Azure/azure-container-networking/cns/common"
|
||||
"github.com/Azure/azure-container-networking/cns/configuration"
|
||||
"github.com/Azure/azure-container-networking/cns/hnsclient"
|
||||
|
@ -42,6 +41,7 @@ import (
|
|||
"github.com/Azure/azure-container-networking/crd/nodenetworkconfig/api/v1alpha"
|
||||
"github.com/Azure/azure-container-networking/log"
|
||||
"github.com/Azure/azure-container-networking/platform"
|
||||
"github.com/Azure/azure-container-networking/processlock"
|
||||
localtls "github.com/Azure/azure-container-networking/server/tls"
|
||||
"github.com/Azure/azure-container-networking/store"
|
||||
"github.com/avast/retry-go/v3"
|
||||
|
@ -468,9 +468,15 @@ func main() {
|
|||
return
|
||||
}
|
||||
|
||||
lockclient, err := processlock.NewFileLock(platform.CNILockPath + name + store.LockExtension)
|
||||
if err != nil {
|
||||
log.Printf("Error initializing file lock:%v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Create the key value store.
|
||||
storeFileName := storeFileLocation + name + ".json"
|
||||
config.Store, err = store.NewJsonFileStore(storeFileName)
|
||||
config.Store, err = store.NewJsonFileStore(storeFileName, lockclient)
|
||||
if err != nil {
|
||||
logger.Errorf("Failed to create store file: %s, due to error %v\n", storeFileName, err)
|
||||
return
|
||||
|
@ -538,7 +544,8 @@ func main() {
|
|||
// If so, we should check that the the CNI is new enough to support the state commands,
|
||||
// otherwise we fall back to the existing behavior.
|
||||
if cnsconfig.InitializeFromCNI {
|
||||
isGoodVer, err := cni.IsDumpStateVer()
|
||||
var isGoodVer bool
|
||||
isGoodVer, err = cnireconciler.IsDumpStateVer()
|
||||
if err != nil {
|
||||
logger.Errorf("error checking CNI ver: %v", err)
|
||||
}
|
||||
|
@ -623,8 +630,11 @@ func main() {
|
|||
}(privateEndpoint, infravnet, nodeID)
|
||||
}
|
||||
|
||||
var netPlugin network.NetPlugin
|
||||
var ipamPlugin ipam.IpamPlugin
|
||||
var (
|
||||
netPlugin network.NetPlugin
|
||||
ipamPlugin ipam.IpamPlugin
|
||||
lockclientCnm processlock.Interface
|
||||
)
|
||||
|
||||
if startCNM {
|
||||
var pluginConfig acn.PluginConfig
|
||||
|
@ -647,9 +657,15 @@ func main() {
|
|||
return
|
||||
}
|
||||
|
||||
lockclientCnm, err = processlock.NewFileLock(platform.CNILockPath + pluginName + store.LockExtension)
|
||||
if err != nil {
|
||||
log.Printf("Error initializing file lock:%v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Create the key value store.
|
||||
pluginStoreFile := storeFileLocation + pluginName + ".json"
|
||||
pluginConfig.Store, err = store.NewJsonFileStore(pluginStoreFile)
|
||||
pluginConfig.Store, err = store.NewJsonFileStore(pluginStoreFile, lockclientCnm)
|
||||
if err != nil {
|
||||
logger.Errorf("Failed to create plugin store file %s, due to error : %v\n", pluginStoreFile, err)
|
||||
return
|
||||
|
@ -700,6 +716,14 @@ func main() {
|
|||
logger.Printf("stop ipam plugin")
|
||||
ipamPlugin.Stop()
|
||||
}
|
||||
|
||||
if err = lockclientCnm.Unlock(); err != nil {
|
||||
log.Errorf("lockclient cnm unlock error:%v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err = lockclient.Unlock(); err != nil {
|
||||
log.Errorf("lockclient cns unlock error:%v", err)
|
||||
}
|
||||
|
||||
logger.Printf("CNS exited")
|
||||
|
|
|
@ -0,0 +1,102 @@
|
|||
// Copyright 2018 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package filelock provides a platform-independent API for advisory file
|
||||
// locking. Calls to functions in this package on platforms that do not support
|
||||
// advisory locks will return errors for which IsNotSupported returns true.
|
||||
|
||||
//nolint
|
||||
package filelock
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io/fs"
|
||||
"os"
|
||||
)
|
||||
|
||||
// A File provides the minimal set of methods required to lock an open file.
|
||||
// File implementations must be usable as map keys.
|
||||
// The usual implementation is *os.File.
|
||||
type File interface {
|
||||
// Name returns the name of the file.
|
||||
Name() string
|
||||
|
||||
// Fd returns a valid file descriptor.
|
||||
// (If the File is an *os.File, it must not be closed.)
|
||||
Fd() uintptr
|
||||
|
||||
// Stat returns the FileInfo structure describing file.
|
||||
Stat() (fs.FileInfo, error)
|
||||
}
|
||||
|
||||
// Lock places an advisory write lock on the file, blocking until it can be
|
||||
// locked.
|
||||
//
|
||||
// If Lock returns nil, no other process will be able to place a read or write
|
||||
// lock on the file until this process exits, closes f, or calls Unlock on it.
|
||||
//
|
||||
// If f's descriptor is already read- or write-locked, the behavior of Lock is
|
||||
// unspecified.
|
||||
//
|
||||
// Closing the file may or may not release the lock promptly. Callers should
|
||||
// ensure that Unlock is always called when Lock succeeds.
|
||||
func Lock(f File) error {
|
||||
return lock(f, writeLock)
|
||||
}
|
||||
|
||||
// RLock places an advisory read lock on the file, blocking until it can be locked.
|
||||
//
|
||||
// If RLock returns nil, no other process will be able to place a write lock on
|
||||
// the file until this process exits, closes f, or calls Unlock on it.
|
||||
//
|
||||
// If f is already read- or write-locked, the behavior of RLock is unspecified.
|
||||
//
|
||||
// Closing the file may or may not release the lock promptly. Callers should
|
||||
// ensure that Unlock is always called if RLock succeeds.
|
||||
func RLock(f File) error {
|
||||
return lock(f, readLock)
|
||||
}
|
||||
|
||||
// Unlock removes an advisory lock placed on f by this process.
|
||||
//
|
||||
// The caller must not attempt to unlock a file that is not locked.
|
||||
func Unlock(f File) error {
|
||||
return unlock(f)
|
||||
}
|
||||
|
||||
// String returns the name of the function corresponding to lt
|
||||
// (Lock, RLock, or Unlock).
|
||||
func (lt lockType) String() string {
|
||||
switch lt {
|
||||
case readLock:
|
||||
return "RLock"
|
||||
case writeLock:
|
||||
return "Lock"
|
||||
default:
|
||||
return "Unlock"
|
||||
}
|
||||
}
|
||||
|
||||
// IsNotSupported returns a boolean indicating whether the error is known to
|
||||
// report that a function is not supported (possibly for a specific input).
|
||||
// It is satisfied by ErrNotSupported as well as some syscall errors.
|
||||
func IsNotSupported(err error) bool {
|
||||
return isNotSupported(underlyingError(err))
|
||||
}
|
||||
|
||||
// ErrNotSupported - operation not supported
|
||||
var ErrNotSupported = errors.New("operation not supported")
|
||||
|
||||
// underlyingError returns the underlying error for known os error types.
|
||||
func underlyingError(err error) error {
|
||||
switch err := err.(type) {
|
||||
case *fs.PathError:
|
||||
return err.Err
|
||||
case *os.LinkError:
|
||||
return err.Err
|
||||
case *os.SyscallError:
|
||||
return err.Err
|
||||
}
|
||||
return err
|
||||
}
|
|
@ -0,0 +1,217 @@
|
|||
// Copyright 2018 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
//go:build aix || (solaris && !illumos)
|
||||
// +build aix solaris,!illumos
|
||||
|
||||
// This code implements the filelock API using POSIX 'fcntl' locks, which attach
|
||||
// to an (inode, process) pair rather than a file descriptor. To avoid unlocking
|
||||
// files prematurely when the same file is opened through different descriptors,
|
||||
// we allow only one read-lock at a time.
|
||||
//
|
||||
// Most platforms provide some alternative API, such as an 'flock' system call
|
||||
// or an F_OFD_SETLK command for 'fcntl', that allows for better concurrency and
|
||||
// does not require per-inode bookkeeping in the application.
|
||||
|
||||
package filelock
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"io/fs"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
type lockType int16
|
||||
|
||||
const (
|
||||
readLock lockType = syscall.F_RDLCK
|
||||
writeLock lockType = syscall.F_WRLCK
|
||||
)
|
||||
|
||||
type inode = uint64 // type of syscall.Stat_t.Ino
|
||||
|
||||
type inodeLock struct {
|
||||
owner File
|
||||
queue []<-chan File
|
||||
}
|
||||
|
||||
type token struct{}
|
||||
|
||||
var (
|
||||
mu sync.Mutex
|
||||
inodes = map[File]inode{}
|
||||
locks = map[inode]inodeLock{}
|
||||
)
|
||||
|
||||
func lock(f File, lt lockType) (err error) {
|
||||
// POSIX locks apply per inode and process, and the lock for an inode is
|
||||
// released when *any* descriptor for that inode is closed. So we need to
|
||||
// synchronize access to each inode internally, and must serialize lock and
|
||||
// unlock calls that refer to the same inode through different descriptors.
|
||||
fi, err := f.Stat()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ino := fi.Sys().(*syscall.Stat_t).Ino
|
||||
|
||||
mu.Lock()
|
||||
if i, dup := inodes[f]; dup && i != ino {
|
||||
mu.Unlock()
|
||||
return &fs.PathError{
|
||||
Op: lt.String(),
|
||||
Path: f.Name(),
|
||||
Err: errors.New("inode for file changed since last Lock or RLock"),
|
||||
}
|
||||
}
|
||||
inodes[f] = ino
|
||||
|
||||
var wait chan File
|
||||
l := locks[ino]
|
||||
if l.owner == f {
|
||||
// This file already owns the lock, but the call may change its lock type.
|
||||
} else if l.owner == nil {
|
||||
// No owner: it's ours now.
|
||||
l.owner = f
|
||||
} else {
|
||||
// Already owned: add a channel to wait on.
|
||||
wait = make(chan File)
|
||||
l.queue = append(l.queue, wait)
|
||||
}
|
||||
locks[ino] = l
|
||||
mu.Unlock()
|
||||
|
||||
if wait != nil {
|
||||
wait <- f
|
||||
}
|
||||
|
||||
// Spurious EDEADLK errors arise on platforms that compute deadlock graphs at
|
||||
// the process, rather than thread, level. Consider processes P and Q, with
|
||||
// threads P.1, P.2, and Q.3. The following trace is NOT a deadlock, but will be
|
||||
// reported as a deadlock on systems that consider only process granularity:
|
||||
//
|
||||
// P.1 locks file A.
|
||||
// Q.3 locks file B.
|
||||
// Q.3 blocks on file A.
|
||||
// P.2 blocks on file B. (This is erroneously reported as a deadlock.)
|
||||
// P.1 unlocks file A.
|
||||
// Q.3 unblocks and locks file A.
|
||||
// Q.3 unlocks files A and B.
|
||||
// P.2 unblocks and locks file B.
|
||||
// P.2 unlocks file B.
|
||||
//
|
||||
// These spurious errors were observed in practice on AIX and Solaris in
|
||||
// cmd/go: see https://golang.org/issue/32817.
|
||||
//
|
||||
// We work around this bug by treating EDEADLK as always spurious. If there
|
||||
// really is a lock-ordering bug between the interacting processes, it will
|
||||
// become a livelock instead, but that's not appreciably worse than if we had
|
||||
// a proper flock implementation (which generally does not even attempt to
|
||||
// diagnose deadlocks).
|
||||
//
|
||||
// In the above example, that changes the trace to:
|
||||
//
|
||||
// P.1 locks file A.
|
||||
// Q.3 locks file B.
|
||||
// Q.3 blocks on file A.
|
||||
// P.2 spuriously fails to lock file B and goes to sleep.
|
||||
// P.1 unlocks file A.
|
||||
// Q.3 unblocks and locks file A.
|
||||
// Q.3 unlocks files A and B.
|
||||
// P.2 wakes up and locks file B.
|
||||
// P.2 unlocks file B.
|
||||
//
|
||||
// We know that the retry loop will not introduce a *spurious* livelock
|
||||
// because, according to the POSIX specification, EDEADLK is only to be
|
||||
// returned when “the lock is blocked by a lock from another process”.
|
||||
// If that process is blocked on some lock that we are holding, then the
|
||||
// resulting livelock is due to a real deadlock (and would manifest as such
|
||||
// when using, for example, the flock implementation of this package).
|
||||
// If the other process is *not* blocked on some other lock that we are
|
||||
// holding, then it will eventually release the requested lock.
|
||||
|
||||
nextSleep := 1 * time.Millisecond
|
||||
const maxSleep = 500 * time.Millisecond
|
||||
for {
|
||||
err = setlkw(f.Fd(), lt)
|
||||
if err != syscall.EDEADLK {
|
||||
break
|
||||
}
|
||||
time.Sleep(nextSleep)
|
||||
|
||||
nextSleep += nextSleep
|
||||
if nextSleep > maxSleep {
|
||||
nextSleep = maxSleep
|
||||
}
|
||||
// Apply 10% jitter to avoid synchronizing collisions when we finally unblock.
|
||||
nextSleep += time.Duration((0.1*rand.Float64() - 0.05) * float64(nextSleep))
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
unlock(f)
|
||||
return &fs.PathError{
|
||||
Op: lt.String(),
|
||||
Path: f.Name(),
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func unlock(f File) error {
|
||||
var owner File
|
||||
|
||||
mu.Lock()
|
||||
ino, ok := inodes[f]
|
||||
if ok {
|
||||
owner = locks[ino].owner
|
||||
}
|
||||
mu.Unlock()
|
||||
|
||||
if owner != f {
|
||||
panic("unlock called on a file that is not locked")
|
||||
}
|
||||
|
||||
err := setlkw(f.Fd(), syscall.F_UNLCK)
|
||||
|
||||
mu.Lock()
|
||||
l := locks[ino]
|
||||
if len(l.queue) == 0 {
|
||||
// No waiters: remove the map entry.
|
||||
delete(locks, ino)
|
||||
} else {
|
||||
// The first waiter is sending us their file now.
|
||||
// Receive it and update the queue.
|
||||
l.owner = <-l.queue[0]
|
||||
l.queue = l.queue[1:]
|
||||
locks[ino] = l
|
||||
}
|
||||
delete(inodes, f)
|
||||
mu.Unlock()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// setlkw calls FcntlFlock with F_SETLKW for the entire file indicated by fd.
|
||||
func setlkw(fd uintptr, lt lockType) error {
|
||||
for {
|
||||
err := syscall.FcntlFlock(fd, syscall.F_SETLKW, &syscall.Flock_t{
|
||||
Type: int16(lt),
|
||||
Whence: io.SeekStart,
|
||||
Start: 0,
|
||||
Len: 0, // All bytes.
|
||||
})
|
||||
if err != syscall.EINTR {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func isNotSupported(err error) bool {
|
||||
return err == syscall.ENOSYS || err == syscall.ENOTSUP || err == syscall.EOPNOTSUPP || err == ErrNotSupported
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
// Copyright 2018 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
//go:build !aix && !darwin && !dragonfly && !freebsd && !linux && !netbsd && !openbsd && !plan9 && !solaris && !windows
|
||||
// +build !aix,!darwin,!dragonfly,!freebsd,!linux,!netbsd,!openbsd,!plan9,!solaris,!windows
|
||||
|
||||
package filelock
|
||||
|
||||
import "io/fs"
|
||||
|
||||
type lockType int8
|
||||
|
||||
const (
|
||||
readLock = iota + 1
|
||||
writeLock
|
||||
)
|
||||
|
||||
func lock(f File, lt lockType) error {
|
||||
return &fs.PathError{
|
||||
Op: lt.String(),
|
||||
Path: f.Name(),
|
||||
Err: ErrNotSupported,
|
||||
}
|
||||
}
|
||||
|
||||
func unlock(f File) error {
|
||||
return &fs.PathError{
|
||||
Op: "Unlock",
|
||||
Path: f.Name(),
|
||||
Err: ErrNotSupported,
|
||||
}
|
||||
}
|
||||
|
||||
func isNotSupported(err error) bool {
|
||||
return err == ErrNotSupported
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
// Copyright 2018 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
//go:build plan9
|
||||
// +build plan9
|
||||
|
||||
package filelock
|
||||
|
||||
import "io/fs"
|
||||
|
||||
type lockType int8
|
||||
|
||||
const (
|
||||
readLock = iota + 1
|
||||
writeLock
|
||||
)
|
||||
|
||||
func lock(f File, lt lockType) error {
|
||||
return &fs.PathError{
|
||||
Op: lt.String(),
|
||||
Path: f.Name(),
|
||||
Err: ErrNotSupported,
|
||||
}
|
||||
}
|
||||
|
||||
func unlock(f File) error {
|
||||
return &fs.PathError{
|
||||
Op: "Unlock",
|
||||
Path: f.Name(),
|
||||
Err: ErrNotSupported,
|
||||
}
|
||||
}
|
||||
|
||||
func isNotSupported(err error) bool {
|
||||
return err == ErrNotSupported
|
||||
}
|
|
@ -0,0 +1,212 @@
|
|||
// Copyright 2018 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
//go:build !js && !plan9
|
||||
// +build !js,!plan9
|
||||
|
||||
//nolint
|
||||
|
||||
package filelock_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-container-networking/internal/lockedfile/internal/filelock"
|
||||
)
|
||||
|
||||
func lock(t *testing.T, f *os.File) {
|
||||
t.Helper()
|
||||
err := filelock.Lock(f)
|
||||
t.Logf("Lock(fd %d) = %v", f.Fd(), err)
|
||||
if err != nil {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
func rLock(t *testing.T, f *os.File) {
|
||||
t.Helper()
|
||||
err := filelock.RLock(f)
|
||||
t.Logf("RLock(fd %d) = %v", f.Fd(), err)
|
||||
if err != nil {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
func unlock(t *testing.T, f *os.File) {
|
||||
t.Helper()
|
||||
err := filelock.Unlock(f)
|
||||
t.Logf("Unlock(fd %d) = %v", f.Fd(), err)
|
||||
if err != nil {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
func mustTempFile(t *testing.T) (f *os.File, remove func()) {
|
||||
t.Helper()
|
||||
|
||||
base := filepath.Base(t.Name())
|
||||
f, err := os.CreateTemp("", base)
|
||||
if err != nil {
|
||||
t.Fatalf(`os.CreateTemp("", %q) = %v`, base, err)
|
||||
}
|
||||
t.Logf("fd %d = %s", f.Fd(), f.Name())
|
||||
|
||||
return f, func() {
|
||||
f.Close()
|
||||
os.Remove(f.Name())
|
||||
}
|
||||
}
|
||||
|
||||
func mustOpen(t *testing.T, name string) *os.File {
|
||||
t.Helper()
|
||||
|
||||
f, err := os.OpenFile(name, os.O_RDWR, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("os.Open(%q) = %v", name, err)
|
||||
}
|
||||
|
||||
t.Logf("fd %d = os.Open(%q)", f.Fd(), name)
|
||||
return f
|
||||
}
|
||||
|
||||
const (
|
||||
quiescent = 10 * time.Millisecond
|
||||
probablyStillBlocked = 10 * time.Second
|
||||
)
|
||||
|
||||
func mustBlock(t *testing.T, op string, f *os.File) (wait func(*testing.T)) {
|
||||
t.Helper()
|
||||
|
||||
desc := fmt.Sprintf("%s(fd %d)", op, f.Fd())
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
t.Helper()
|
||||
switch op {
|
||||
case "Lock":
|
||||
lock(t, f)
|
||||
case "RLock":
|
||||
rLock(t, f)
|
||||
default:
|
||||
panic("invalid op: " + op)
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
t.Fatalf("%s unexpectedly did not block", desc)
|
||||
return nil
|
||||
|
||||
case <-time.After(quiescent):
|
||||
t.Logf("%s is blocked (as expected)", desc)
|
||||
return func(t *testing.T) {
|
||||
t.Helper()
|
||||
select {
|
||||
case <-time.After(probablyStillBlocked):
|
||||
t.Fatalf("%s is unexpectedly still blocked", desc)
|
||||
case <-done:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestLockExcludesLock(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
f, remove := mustTempFile(t)
|
||||
defer remove()
|
||||
|
||||
other := mustOpen(t, f.Name())
|
||||
defer other.Close()
|
||||
|
||||
lock(t, f)
|
||||
lockOther := mustBlock(t, "Lock", other)
|
||||
unlock(t, f)
|
||||
lockOther(t)
|
||||
unlock(t, other)
|
||||
}
|
||||
|
||||
func TestLockExcludesRLock(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
f, remove := mustTempFile(t)
|
||||
defer remove()
|
||||
|
||||
other := mustOpen(t, f.Name())
|
||||
defer other.Close()
|
||||
|
||||
lock(t, f)
|
||||
rLockOther := mustBlock(t, "RLock", other)
|
||||
unlock(t, f)
|
||||
rLockOther(t)
|
||||
unlock(t, other)
|
||||
}
|
||||
|
||||
func TestRLockExcludesOnlyLock(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
f, remove := mustTempFile(t)
|
||||
defer remove()
|
||||
rLock(t, f)
|
||||
|
||||
f2 := mustOpen(t, f.Name())
|
||||
defer f2.Close()
|
||||
|
||||
doUnlockTF := false
|
||||
switch runtime.GOOS {
|
||||
case "aix", "solaris":
|
||||
// When using POSIX locks (as on Solaris), we can't safely read-lock the
|
||||
// same inode through two different descriptors at the same time: when the
|
||||
// first descriptor is closed, the second descriptor would still be open but
|
||||
// silently unlocked. So a second RLock must block instead of proceeding.
|
||||
lockF2 := mustBlock(t, "RLock", f2)
|
||||
unlock(t, f)
|
||||
lockF2(t)
|
||||
default:
|
||||
rLock(t, f2)
|
||||
doUnlockTF = true
|
||||
}
|
||||
|
||||
other := mustOpen(t, f.Name())
|
||||
defer other.Close()
|
||||
lockOther := mustBlock(t, "Lock", other)
|
||||
|
||||
unlock(t, f2)
|
||||
if doUnlockTF {
|
||||
unlock(t, f)
|
||||
}
|
||||
lockOther(t)
|
||||
unlock(t, other)
|
||||
}
|
||||
|
||||
func TestLockNotDroppedByExecCommand(t *testing.T) {
|
||||
f, remove := mustTempFile(t)
|
||||
defer remove()
|
||||
|
||||
lock(t, f)
|
||||
|
||||
other := mustOpen(t, f.Name())
|
||||
defer other.Close()
|
||||
|
||||
// Some kinds of file locks are dropped when a duplicated or forked file
|
||||
// descriptor is unlocked. Double-check that the approach used by os/exec does
|
||||
// not accidentally drop locks.
|
||||
//nolint
|
||||
cmd := exec.Command(os.Args[0], "-test.run=^$")
|
||||
if err := cmd.Run(); err != nil {
|
||||
t.Fatalf("exec failed: %v", err)
|
||||
}
|
||||
|
||||
lockOther := mustBlock(t, "Lock", other)
|
||||
unlock(t, f)
|
||||
lockOther(t)
|
||||
unlock(t, other)
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
// Copyright 2018 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
//go:build darwin || dragonfly || freebsd || illumos || linux || netbsd || openbsd
|
||||
// +build darwin dragonfly freebsd illumos linux netbsd openbsd
|
||||
|
||||
package filelock
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
type lockType int16
|
||||
|
||||
const (
|
||||
readLock lockType = syscall.LOCK_SH
|
||||
writeLock lockType = syscall.LOCK_EX
|
||||
)
|
||||
|
||||
func lock(f File, lt lockType) (err error) {
|
||||
for {
|
||||
err = syscall.Flock(int(f.Fd()), int(lt))
|
||||
//nolint // not changing std golib code
|
||||
if err != syscall.EINTR {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return &fs.PathError{
|
||||
Op: lt.String(),
|
||||
Path: f.Name(),
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func unlock(f File) error {
|
||||
return lock(f, syscall.LOCK_UN)
|
||||
}
|
||||
|
||||
func isNotSupported(err error) bool {
|
||||
//nolint // not changing std golib code
|
||||
return err == syscall.ENOSYS || err == syscall.ENOTSUP || err == syscall.EOPNOTSUPP || err == ErrNotSupported
|
||||
}
|
|
@ -0,0 +1,66 @@
|
|||
// Copyright 2018 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
//go:build windows
|
||||
// +build windows
|
||||
|
||||
package filelock
|
||||
|
||||
import (
|
||||
"golang.org/x/sys/windows"
|
||||
"io/fs"
|
||||
)
|
||||
|
||||
type lockType uint32
|
||||
|
||||
const (
|
||||
readLock lockType = 0
|
||||
writeLock lockType = windows.LOCKFILE_EXCLUSIVE_LOCK
|
||||
)
|
||||
|
||||
const (
|
||||
reserved = 0
|
||||
allBytes = ^uint32(0)
|
||||
)
|
||||
|
||||
func lock(f File, lt lockType) error {
|
||||
// Per https://golang.org/issue/19098, “Programs currently expect the Fd
|
||||
// method to return a handle that uses ordinary synchronous I/O.”
|
||||
// However, LockFileEx still requires an OVERLAPPED structure,
|
||||
// which contains the file offset of the beginning of the lock range.
|
||||
// We want to lock the entire file, so we leave the offset as zero.
|
||||
ol := new(windows.Overlapped)
|
||||
|
||||
err := windows.LockFileEx(windows.Handle(f.Fd()), uint32(lt), reserved, allBytes, allBytes, ol)
|
||||
if err != nil {
|
||||
return &fs.PathError{
|
||||
Op: lt.String(),
|
||||
Path: f.Name(),
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func unlock(f File) error {
|
||||
ol := new(windows.Overlapped)
|
||||
err := windows.UnlockFileEx(windows.Handle(f.Fd()), reserved, allBytes, allBytes, ol)
|
||||
if err != nil {
|
||||
return &fs.PathError{
|
||||
Op: "Unlock",
|
||||
Path: f.Name(),
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func isNotSupported(err error) bool {
|
||||
switch err {
|
||||
case windows.ERROR_NOT_SUPPORTED, windows.ERROR_CALL_NOT_IMPLEMENTED, ErrNotSupported:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
|
@ -0,0 +1,188 @@
|
|||
// Copyright 2018 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package lockedfile creates and manipulates files whose contents should only
|
||||
// change atomically.
|
||||
//nolint
|
||||
package lockedfile
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"os"
|
||||
"runtime"
|
||||
)
|
||||
|
||||
// A File is a locked *os.File.
|
||||
//
|
||||
// Closing the file releases the lock.
|
||||
//
|
||||
// If the program exits while a file is locked, the operating system releases
|
||||
// the lock but may not do so promptly: callers must ensure that all locked
|
||||
// files are closed before exiting.
|
||||
type File struct {
|
||||
osFile
|
||||
closed bool
|
||||
}
|
||||
|
||||
// osFile embeds a *os.File while keeping the pointer itself unexported.
|
||||
// (When we close a File, it must be the same file descriptor that we opened!)
|
||||
type osFile struct {
|
||||
*os.File
|
||||
}
|
||||
|
||||
// OpenFile is like os.OpenFile, but returns a locked file.
|
||||
// If flag includes os.O_WRONLY or os.O_RDWR, the file is write-locked;
|
||||
// otherwise, it is read-locked.
|
||||
func OpenFile(name string, flag int, perm fs.FileMode) (*File, error) {
|
||||
var (
|
||||
f = new(File)
|
||||
err error
|
||||
)
|
||||
f.osFile.File, err = openFile(name, flag, perm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Although the operating system will drop locks for open files when the go
|
||||
// command exits, we want to hold locks for as little time as possible, and we
|
||||
// especially don't want to leave a file locked after we're done with it. Our
|
||||
// Close method is what releases the locks, so use a finalizer to report
|
||||
// missing Close calls on a best-effort basis.
|
||||
runtime.SetFinalizer(f, func(f *File) {
|
||||
panic(fmt.Sprintf("lockedfile.File %s became unreachable without a call to Close", f.Name()))
|
||||
})
|
||||
|
||||
return f, nil
|
||||
}
|
||||
|
||||
// Open is like os.Open, but returns a read-locked file.
|
||||
func Open(name string) (*File, error) {
|
||||
return OpenFile(name, os.O_RDONLY, 0)
|
||||
}
|
||||
|
||||
// Create is like os.Create, but returns a write-locked file.
|
||||
func Create(name string) (*File, error) {
|
||||
return OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
|
||||
}
|
||||
|
||||
// Edit creates the named file with mode 0666 (before umask),
|
||||
// but does not truncate existing contents.
|
||||
//
|
||||
// If Edit succeeds, methods on the returned File can be used for I/O.
|
||||
// The associated file descriptor has mode O_RDWR and the file is write-locked.
|
||||
func Edit(name string) (*File, error) {
|
||||
return OpenFile(name, os.O_RDWR|os.O_CREATE, 0666)
|
||||
}
|
||||
|
||||
// Close unlocks and closes the underlying file.
|
||||
//
|
||||
// Close may be called multiple times; all calls after the first will return a
|
||||
// non-nil error.
|
||||
func (f *File) Close() error {
|
||||
if f.closed {
|
||||
return &fs.PathError{
|
||||
Op: "close",
|
||||
Path: f.Name(),
|
||||
Err: fs.ErrClosed,
|
||||
}
|
||||
}
|
||||
f.closed = true
|
||||
|
||||
err := closeFile(f.osFile.File)
|
||||
runtime.SetFinalizer(f, nil)
|
||||
return err
|
||||
}
|
||||
|
||||
// Read opens the named file with a read-lock and returns its contents.
|
||||
func Read(name string) ([]byte, error) {
|
||||
f, err := Open(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
return io.ReadAll(f)
|
||||
}
|
||||
|
||||
// Write opens the named file (creating it with the given permissions if needed),
|
||||
// then write-locks it and overwrites it with the given content.
|
||||
func Write(name string, content io.Reader, perm fs.FileMode) (err error) {
|
||||
f, err := OpenFile(name, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = io.Copy(f, content)
|
||||
if closeErr := f.Close(); err == nil {
|
||||
err = closeErr
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Transform invokes t with the result of reading the named file, with its lock
|
||||
// still held.
|
||||
//
|
||||
// If t returns a nil error, Transform then writes the returned contents back to
|
||||
// the file, making a best effort to preserve existing contents on error.
|
||||
//
|
||||
// t must not modify the slice passed to it.
|
||||
func Transform(name string, t func([]byte) ([]byte, error)) (err error) {
|
||||
f, err := Edit(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
old, err := io.ReadAll(f)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
latest, err := t(old)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(latest) > len(old) {
|
||||
// The overall file size is increasing, so write the tail first: if we're
|
||||
// about to run out of space on the disk, we would rather detect that
|
||||
// failure before we have overwritten the original contents.
|
||||
if _, err = f.WriteAt(latest[len(old):], int64(len(old))); err != nil {
|
||||
// Make a best effort to remove the incomplete tail.
|
||||
f.Truncate(int64(len(old)))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// We're about to overwrite the old contents. In case of failure, make a best
|
||||
// effort to roll back before we close the file.
|
||||
defer func() {
|
||||
if err != nil {
|
||||
if _, err := f.WriteAt(old, 0); err == nil {
|
||||
f.Truncate(int64(len(old)))
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
if len(latest) >= len(old) {
|
||||
if _, err := f.WriteAt(latest[:len(old)], 0); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if _, err := f.WriteAt(latest, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
// The overall file size is decreasing, so shrink the file to its final size
|
||||
// after writing. We do this after writing (instead of before) so that if
|
||||
// the write fails, enough filesystem space will likely still be reserved
|
||||
// to contain the previous contents.
|
||||
if err := f.Truncate(int64(len(latest))); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
// Copyright 2018 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
//go:build !plan9
|
||||
// +build !plan9
|
||||
|
||||
//nolint
|
||||
package lockedfile
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
"os"
|
||||
|
||||
"github.com/Azure/azure-container-networking/internal/lockedfile/internal/filelock"
|
||||
)
|
||||
|
||||
func openFile(name string, flag int, perm fs.FileMode) (*os.File, error) {
|
||||
// On BSD systems, we could add the O_SHLOCK or O_EXLOCK flag to the OpenFile
|
||||
// call instead of locking separately, but we have to support separate locking
|
||||
// calls for Linux and Windows anyway, so it's simpler to use that approach
|
||||
// consistently.
|
||||
|
||||
f, err := os.OpenFile(name, flag&^os.O_TRUNC, perm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch flag & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR) {
|
||||
case os.O_WRONLY, os.O_RDWR:
|
||||
err = filelock.Lock(f)
|
||||
default:
|
||||
err = filelock.RLock(f)
|
||||
}
|
||||
if err != nil {
|
||||
f.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if flag&os.O_TRUNC == os.O_TRUNC {
|
||||
if err := f.Truncate(0); err != nil {
|
||||
// The documentation for os.O_TRUNC says “if possible, truncate file when
|
||||
// opened”, but doesn't define “possible” (golang.org/issue/28699).
|
||||
// We'll treat regular files (and symlinks to regular files) as “possible”
|
||||
// and ignore errors for the rest.
|
||||
if fi, statErr := f.Stat(); statErr != nil || fi.Mode().IsRegular() {
|
||||
filelock.Unlock(f)
|
||||
f.Close()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return f, nil
|
||||
}
|
||||
|
||||
func closeFile(f *os.File) error {
|
||||
// Since locking syscalls operate on file descriptors, we must unlock the file
|
||||
// while the descriptor is still valid — that is, before the file is closed —
|
||||
// and avoid unlocking files that are already closed.
|
||||
err := filelock.Unlock(f)
|
||||
|
||||
if closeErr := f.Close(); err == nil {
|
||||
err = closeErr
|
||||
}
|
||||
return err
|
||||
}
|
|
@ -0,0 +1,95 @@
|
|||
// Copyright 2018 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
//go:build plan9
|
||||
// +build plan9
|
||||
|
||||
package lockedfile
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
"math/rand"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Opening an exclusive-use file returns an error.
|
||||
// The expected error strings are:
|
||||
//
|
||||
// - "open/create -- file is locked" (cwfs, kfs)
|
||||
// - "exclusive lock" (fossil)
|
||||
// - "exclusive use file already open" (ramfs)
|
||||
var lockedErrStrings = [...]string{
|
||||
"file is locked",
|
||||
"exclusive lock",
|
||||
"exclusive use file already open",
|
||||
}
|
||||
|
||||
// Even though plan9 doesn't support the Lock/RLock/Unlock functions to
|
||||
// manipulate already-open files, IsLocked is still meaningful: os.OpenFile
|
||||
// itself may return errors that indicate that a file with the ModeExclusive bit
|
||||
// set is already open.
|
||||
func isLocked(err error) bool {
|
||||
s := err.Error()
|
||||
|
||||
for _, frag := range lockedErrStrings {
|
||||
if strings.Contains(s, frag) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func openFile(name string, flag int, perm fs.FileMode) (*os.File, error) {
|
||||
// Plan 9 uses a mode bit instead of explicit lock/unlock syscalls.
|
||||
//
|
||||
// Per http://man.cat-v.org/plan_9/5/stat: “Exclusive use files may be open
|
||||
// for I/O by only one fid at a time across all clients of the server. If a
|
||||
// second open is attempted, it draws an error.”
|
||||
//
|
||||
// So we can try to open a locked file, but if it fails we're on our own to
|
||||
// figure out when it becomes available. We'll use exponential backoff with
|
||||
// some jitter and an arbitrary limit of 500ms.
|
||||
|
||||
// If the file was unpacked or created by some other program, it might not
|
||||
// have the ModeExclusive bit set. Set it before we call OpenFile, so that we
|
||||
// can be confident that a successful OpenFile implies exclusive use.
|
||||
if fi, err := os.Stat(name); err == nil {
|
||||
if fi.Mode()&fs.ModeExclusive == 0 {
|
||||
if err := os.Chmod(name, fi.Mode()|fs.ModeExclusive); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
} else if !os.IsNotExist(err) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nextSleep := 1 * time.Millisecond
|
||||
const maxSleep = 500 * time.Millisecond
|
||||
for {
|
||||
f, err := os.OpenFile(name, flag, perm|fs.ModeExclusive)
|
||||
if err == nil {
|
||||
return f, nil
|
||||
}
|
||||
|
||||
if !isLocked(err) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
time.Sleep(nextSleep)
|
||||
|
||||
nextSleep += nextSleep
|
||||
if nextSleep > maxSleep {
|
||||
nextSleep = maxSleep
|
||||
}
|
||||
// Apply 10% jitter to avoid synchronizing collisions.
|
||||
nextSleep += time.Duration((0.1*rand.Float64() - 0.05) * float64(nextSleep))
|
||||
}
|
||||
}
|
||||
|
||||
func closeFile(f *os.File) error {
|
||||
return f.Close()
|
||||
}
|
|
@ -0,0 +1,271 @@
|
|||
// Copyright 2018 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
//go:build !js
|
||||
// +build !js
|
||||
|
||||
// js does not support inter-process file locking.
|
||||
//nolint
|
||||
package lockedfile_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-container-networking/internal/lockedfile"
|
||||
)
|
||||
|
||||
func mustTempDir(t *testing.T) (dir string, remove func()) {
|
||||
t.Helper()
|
||||
|
||||
dir, err := os.MkdirTemp("", filepath.Base(t.Name()))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return dir, func() { os.RemoveAll(dir) }
|
||||
}
|
||||
|
||||
const (
|
||||
quiescent = 10 * time.Millisecond
|
||||
probablyStillBlocked = 10 * time.Second
|
||||
)
|
||||
|
||||
func mustBlock(t *testing.T, desc string, f func()) (wait func(*testing.T)) {
|
||||
t.Helper()
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
f()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
t.Fatalf("%s unexpectedly did not block", desc)
|
||||
return nil
|
||||
|
||||
case <-time.After(quiescent):
|
||||
return func(t *testing.T) {
|
||||
t.Helper()
|
||||
select {
|
||||
case <-time.After(probablyStillBlocked):
|
||||
t.Fatalf("%s is unexpectedly still blocked after %v", desc, probablyStillBlocked)
|
||||
case <-done:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMutexExcludes(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dir, remove := mustTempDir(t)
|
||||
defer remove()
|
||||
|
||||
path := filepath.Join(dir, "lock")
|
||||
|
||||
mu := lockedfile.MutexAt(path)
|
||||
t.Logf("mu := MutexAt(_)")
|
||||
|
||||
unlock, err := mu.Lock()
|
||||
if err != nil {
|
||||
t.Fatalf("mu.Lock: %v", err)
|
||||
}
|
||||
t.Logf("unlock, _ := mu.Lock()")
|
||||
|
||||
mu2 := lockedfile.MutexAt(mu.Path)
|
||||
t.Logf("mu2 := MutexAt(mu.Path)")
|
||||
|
||||
wait := mustBlock(t, "mu2.Lock()", func() {
|
||||
unlock2, err := mu2.Lock()
|
||||
if err != nil {
|
||||
t.Errorf("mu2.Lock: %v", err)
|
||||
return
|
||||
}
|
||||
t.Logf("unlock2, _ := mu2.Lock()")
|
||||
t.Logf("unlock2()")
|
||||
unlock2()
|
||||
})
|
||||
|
||||
t.Logf("unlock()")
|
||||
unlock()
|
||||
wait(t)
|
||||
}
|
||||
|
||||
func TestReadWaitsForLock(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dir, remove := mustTempDir(t)
|
||||
defer remove()
|
||||
|
||||
path := filepath.Join(dir, "timestamp.txt")
|
||||
|
||||
f, err := lockedfile.Create(path)
|
||||
if err != nil {
|
||||
t.Fatalf("Create: %v", err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
const (
|
||||
part1 = "part 1\n"
|
||||
part2 = "part 2\n"
|
||||
)
|
||||
_, err = f.WriteString(part1)
|
||||
if err != nil {
|
||||
t.Fatalf("WriteString: %v", err)
|
||||
}
|
||||
t.Logf("WriteString(%q) = <nil>", part1)
|
||||
|
||||
wait := mustBlock(t, "Read", func() {
|
||||
b, err := lockedfile.Read(path)
|
||||
if err != nil {
|
||||
t.Errorf("Read: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
const want = part1 + part2
|
||||
got := string(b)
|
||||
if got == want {
|
||||
t.Logf("Read(_) = %q", got)
|
||||
} else {
|
||||
t.Errorf("Read(_) = %q, _; want %q", got, want)
|
||||
}
|
||||
})
|
||||
|
||||
_, err = f.WriteString(part2)
|
||||
if err != nil {
|
||||
t.Errorf("WriteString: %v", err)
|
||||
} else {
|
||||
t.Logf("WriteString(%q) = <nil>", part2)
|
||||
}
|
||||
f.Close()
|
||||
|
||||
wait(t)
|
||||
}
|
||||
|
||||
func TestCanLockExistingFile(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dir, remove := mustTempDir(t)
|
||||
defer remove()
|
||||
path := filepath.Join(dir, "existing.txt")
|
||||
|
||||
if err := os.WriteFile(path, []byte("ok"), 0777); err != nil {
|
||||
t.Fatalf("os.WriteFile: %v", err)
|
||||
}
|
||||
|
||||
f, err := lockedfile.Edit(path)
|
||||
if err != nil {
|
||||
t.Fatalf("first Edit: %v", err)
|
||||
}
|
||||
|
||||
wait := mustBlock(t, "Edit", func() {
|
||||
other, err := lockedfile.Edit(path)
|
||||
if err != nil {
|
||||
t.Errorf("second Edit: %v", err)
|
||||
}
|
||||
other.Close()
|
||||
})
|
||||
|
||||
f.Close()
|
||||
wait(t)
|
||||
}
|
||||
|
||||
// TestSpuriousEDEADLK verifies that the spurious EDEADLK reported in
|
||||
// https://golang.org/issue/32817 no longer occurs.
|
||||
func TestSpuriousEDEADLK(t *testing.T) {
|
||||
// P.1 locks file A.
|
||||
// Q.3 locks file B.
|
||||
// Q.3 blocks on file A.
|
||||
// P.2 blocks on file B. (Spurious EDEADLK occurs here.)
|
||||
// P.1 unlocks file A.
|
||||
// Q.3 unblocks and locks file A.
|
||||
// Q.3 unlocks files A and B.
|
||||
// P.2 unblocks and locks file B.
|
||||
// P.2 unlocks file B.
|
||||
|
||||
dirVar := t.Name() + "DIR"
|
||||
|
||||
if dir := os.Getenv(dirVar); dir != "" {
|
||||
// Q.3 locks file B.
|
||||
b, err := lockedfile.Edit(filepath.Join(dir, "B"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer b.Close()
|
||||
|
||||
if err := os.WriteFile(filepath.Join(dir, "locked"), []byte("ok"), 0666); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Q.3 blocks on file A.
|
||||
a, err := lockedfile.Edit(filepath.Join(dir, "A"))
|
||||
// Q.3 unblocks and locks file A.
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
//nolint
|
||||
defer a.Close()
|
||||
|
||||
// Q.3 unlocks files A and B.
|
||||
return
|
||||
}
|
||||
|
||||
dir, remove := mustTempDir(t)
|
||||
defer remove()
|
||||
|
||||
// P.1 locks file A.
|
||||
a, err := lockedfile.Edit(filepath.Join(dir, "A"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cmd := exec.Command(os.Args[0], "-test.run="+t.Name())
|
||||
cmd.Env = append(os.Environ(), fmt.Sprintf("%s=%s", dirVar, dir))
|
||||
|
||||
qDone := make(chan struct{})
|
||||
waitQ := mustBlock(t, "Edit A and B in subprocess", func() {
|
||||
out, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
t.Errorf("%v:\n%s", err, out)
|
||||
}
|
||||
close(qDone)
|
||||
})
|
||||
|
||||
// Wait until process Q has either failed or locked file B.
|
||||
// Otherwise, P.2 might not block on file B as intended.
|
||||
locked:
|
||||
for {
|
||||
if _, err := os.Stat(filepath.Join(dir, "locked")); !os.IsNotExist(err) {
|
||||
break locked
|
||||
}
|
||||
select {
|
||||
case <-qDone:
|
||||
break locked
|
||||
case <-time.After(1 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
|
||||
waitP2 := mustBlock(t, "Edit B", func() {
|
||||
// P.2 blocks on file B. (Spurious EDEADLK occurs here.)
|
||||
b, err := lockedfile.Edit(filepath.Join(dir, "B"))
|
||||
// P.2 unblocks and locks file B.
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
// P.2 unlocks file B.
|
||||
b.Close()
|
||||
})
|
||||
|
||||
// P.1 unlocks file A.
|
||||
a.Close()
|
||||
|
||||
waitQ(t)
|
||||
waitP2(t)
|
||||
}
|
|
@ -0,0 +1,68 @@
|
|||
// Copyright 2018 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package lockedfile
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// A Mutex provides mutual exclusion within and across processes by locking a
|
||||
// well-known file. Such a file generally guards some other part of the
|
||||
// filesystem: for example, a Mutex file in a directory might guard access to
|
||||
// the entire tree rooted in that directory.
|
||||
//
|
||||
// Mutex does not implement sync.Locker: unlike a sync.Mutex, a lockedfile.Mutex
|
||||
// can fail to lock (e.g. if there is a permission error in the filesystem).
|
||||
//
|
||||
// Like a sync.Mutex, a Mutex may be included as a field of a larger struct but
|
||||
// must not be copied after first use. The Path field must be set before first
|
||||
// use and must not be change thereafter.
|
||||
type Mutex struct {
|
||||
Path string // The path to the well-known lock file. Must be non-empty.
|
||||
mu sync.Mutex // A redundant mutex. The race detector doesn't know about file locking, so in tests we may need to lock something that it understands.
|
||||
}
|
||||
|
||||
// MutexAt returns a new Mutex with Path set to the given non-empty path.
|
||||
func MutexAt(path string) *Mutex {
|
||||
if path == "" {
|
||||
panic("lockedfile.MutexAt: path must be non-empty")
|
||||
}
|
||||
return &Mutex{Path: path}
|
||||
}
|
||||
|
||||
func (mu *Mutex) String() string {
|
||||
return fmt.Sprintf("lockedfile.Mutex(%s)", mu.Path)
|
||||
}
|
||||
|
||||
// Lock attempts to lock the Mutex.
|
||||
//
|
||||
// If successful, Lock returns a non-nil unlock function: it is provided as a
|
||||
// return-value instead of a separate method to remind the caller to check the
|
||||
// accompanying error. (See https://golang.org/issue/20803.)
|
||||
func (mu *Mutex) Lock() (unlock func(), err error) {
|
||||
if mu.Path == "" {
|
||||
panic("lockedfile.Mutex: missing Path during Lock")
|
||||
}
|
||||
|
||||
// We could use either O_RDWR or O_WRONLY here. If we choose O_RDWR and the
|
||||
// file at mu.Path is write-only, the call to OpenFile will fail with a
|
||||
// permission error. That's actually what we want: if we add an RLock method
|
||||
// in the future, it should call OpenFile with O_RDONLY and will require the
|
||||
// files must be readable, so we should not let the caller make any
|
||||
// assumptions about Mutex working with write-only files.
|
||||
//nolint
|
||||
f, err := OpenFile(mu.Path, os.O_RDWR|os.O_CREATE, 0666)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mu.mu.Lock()
|
||||
|
||||
return func() {
|
||||
mu.mu.Unlock()
|
||||
f.Close()
|
||||
}, nil
|
||||
}
|
|
@ -0,0 +1,107 @@
|
|||
// Copyright 2019 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
//go:build !js
|
||||
// +build !js
|
||||
|
||||
// js does not support inter-process file locking.
|
||||
//nolint
|
||||
package lockedfile_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"math/rand"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-container-networking/internal/lockedfile"
|
||||
)
|
||||
|
||||
func isPowerOf2(x int) bool {
|
||||
return x > 0 && x&(x-1) == 0
|
||||
}
|
||||
|
||||
func roundDownToPowerOf2(x int) int {
|
||||
if x <= 0 {
|
||||
panic("nonpositive x")
|
||||
}
|
||||
bit := 1
|
||||
for x != bit {
|
||||
x = x &^ bit
|
||||
bit <<= 1
|
||||
}
|
||||
return x
|
||||
}
|
||||
|
||||
func TestTransform(t *testing.T) {
|
||||
dir, remove := mustTempDir(t)
|
||||
defer remove()
|
||||
path := filepath.Join(dir, "blob.bin")
|
||||
|
||||
const maxChunkWords = 8 << 10
|
||||
buf := make([]byte, 2*maxChunkWords*8)
|
||||
for i := uint64(0); i < 2*maxChunkWords; i++ {
|
||||
binary.LittleEndian.PutUint64(buf[i*8:], i)
|
||||
}
|
||||
|
||||
if err := lockedfile.Write(path, bytes.NewReader(buf[:8]), 0666); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var attempts int64 = 128
|
||||
if !testing.Short() {
|
||||
attempts *= 16
|
||||
}
|
||||
const parallel = 32
|
||||
|
||||
var sem = make(chan bool, parallel)
|
||||
|
||||
for n := attempts; n > 0; n-- {
|
||||
sem <- true
|
||||
go func() {
|
||||
defer func() { <-sem }()
|
||||
|
||||
time.Sleep(time.Duration(rand.Intn(100)) * time.Microsecond)
|
||||
chunkWords := roundDownToPowerOf2(rand.Intn(maxChunkWords) + 1)
|
||||
offset := rand.Intn(chunkWords)
|
||||
|
||||
err := lockedfile.Transform(path, func(data []byte) (chunk []byte, err error) {
|
||||
chunk = buf[offset*8 : (offset+chunkWords)*8]
|
||||
|
||||
if len(data)&^7 != len(data) {
|
||||
t.Errorf("read %d bytes, but each write is an integer multiple of 8 bytes", len(data))
|
||||
return chunk, nil
|
||||
}
|
||||
|
||||
words := len(data) / 8
|
||||
if !isPowerOf2(words) {
|
||||
t.Errorf("read %d 8-byte words, but each write is a power-of-2 number of words", words)
|
||||
return chunk, nil
|
||||
}
|
||||
|
||||
u := binary.LittleEndian.Uint64(data)
|
||||
for i := 1; i < words; i++ {
|
||||
next := binary.LittleEndian.Uint64(data[i*8:])
|
||||
if next != u+1 {
|
||||
t.Errorf("wrote sequential integers, but read integer out of sequence at offset %d", i)
|
||||
return chunk, nil
|
||||
}
|
||||
u = next
|
||||
}
|
||||
|
||||
return chunk, nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error from Transform: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
for n := parallel; n > 0; n-- {
|
||||
sem <- true
|
||||
}
|
||||
}
|
|
@ -0,0 +1,113 @@
|
|||
package processlock
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var (
|
||||
lockDir, _ = os.Getwd()
|
||||
existingLockFile string
|
||||
newLockFile string
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
existingLockFile = filepath.Join(lockDir, "existing.lock")
|
||||
newLockFile = filepath.Join(lockDir, "new.lock")
|
||||
os.Remove(existingLockFile)
|
||||
os.Remove(newLockFile)
|
||||
f, _ := os.Create(existingLockFile)
|
||||
exitCode := m.Run()
|
||||
f.Close()
|
||||
os.Remove(existingLockFile)
|
||||
os.Remove(newLockFile)
|
||||
os.Exit(exitCode)
|
||||
}
|
||||
|
||||
func TestFileLock(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
flock Interface
|
||||
wantErr bool
|
||||
deleteLockfile bool
|
||||
wantErrMsg string
|
||||
lockfileName string
|
||||
}{
|
||||
{
|
||||
name: "Create new file and acquire Lock",
|
||||
flock: &fileLock{filePath: newLockFile},
|
||||
wantErr: false,
|
||||
deleteLockfile: true,
|
||||
lockfileName: newLockFile,
|
||||
},
|
||||
{
|
||||
name: "acquire Lock on existing file",
|
||||
flock: &fileLock{filePath: existingLockFile},
|
||||
lockfileName: existingLockFile,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "acquire Lock on existing file after releasing",
|
||||
flock: &fileLock{filePath: existingLockFile},
|
||||
lockfileName: existingLockFile,
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
err := tt.flock.Lock()
|
||||
if tt.wantErr {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
err = tt.flock.Unlock()
|
||||
require.NoError(t, err)
|
||||
err = tt.flock.Unlock()
|
||||
require.NoError(t, err, "Calling Release lock again should not throw error for already released lock:%v", err)
|
||||
|
||||
// read lockfile contents to check if contents match with pid of current process
|
||||
b, errRead := ioutil.ReadFile(tt.lockfileName)
|
||||
require.NoError(t, errRead, "Got error reading lockfile:%v", errRead)
|
||||
pidStr := string(b)
|
||||
pid, _ := strconv.Atoi(pidStr)
|
||||
require.Equal(t, os.Getpid(), pid, "Expected pid %d but got %d", os.Getpid(), pid)
|
||||
}
|
||||
if tt.deleteLockfile {
|
||||
os.Remove(tt.lockfileName)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestReleaseFileLockError(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
flock Interface
|
||||
wantErr bool
|
||||
wantErrMsg string
|
||||
}{
|
||||
{
|
||||
name: "Release file lock without acquring it",
|
||||
flock: &fileLock{filePath: newLockFile},
|
||||
wantErr: true,
|
||||
wantErrMsg: ErrInvalidFile.Error(),
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
err := tt.flock.Unlock()
|
||||
if tt.wantErr {
|
||||
require.Error(t, err)
|
||||
require.Equal(t, tt.wantErrMsg, err.Error(), "Expected:%s but got:%s", tt.wantErrMsg, err.Error())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
package processlock
|
||||
|
||||
import (
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// ErrMockFileLock - mock filelock error
|
||||
var ErrMockFileLock = errors.New("mock filelock error")
|
||||
|
||||
type mockFileLock struct {
|
||||
fail bool
|
||||
}
|
||||
|
||||
func NewMockFileLock(fail bool) Interface {
|
||||
return &mockFileLock{
|
||||
fail: fail,
|
||||
}
|
||||
}
|
||||
|
||||
func (l *mockFileLock) Lock() error {
|
||||
if l.fail {
|
||||
return ErrMockFileLock
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *mockFileLock) Unlock() error {
|
||||
if l.fail {
|
||||
return ErrMockFileLock
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
package processlock
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
|
||||
"github.com/Azure/azure-container-networking/internal/lockedfile"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// ErrInvalidFile represents invalid file pointer
|
||||
var (
|
||||
ErrEmptyFilePath = errors.New("empty file path")
|
||||
ErrInvalidFile = errors.New("invalid File pointer")
|
||||
)
|
||||
|
||||
//nolint:revive // this naming makes sense
|
||||
type Interface interface {
|
||||
Lock() error
|
||||
Unlock() error
|
||||
}
|
||||
|
||||
type fileLock struct {
|
||||
filePath string
|
||||
file *lockedfile.File
|
||||
}
|
||||
|
||||
func NewFileLock(fileAbsPath string) (Interface, error) {
|
||||
if fileAbsPath == "" {
|
||||
return nil, ErrEmptyFilePath
|
||||
}
|
||||
|
||||
//nolint:gomnd //0o664 - permission to create directory in octal
|
||||
err := os.MkdirAll(filepath.Dir(fileAbsPath), os.FileMode(0o664))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "mkdir lock dir returned error")
|
||||
}
|
||||
|
||||
return &fileLock{
|
||||
filePath: fileAbsPath,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (l *fileLock) Lock() error {
|
||||
var err error
|
||||
|
||||
l.file, err = lockedfile.Create(l.filePath)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "lockedfile create error in lock")
|
||||
}
|
||||
|
||||
_, err = l.file.WriteString(strconv.Itoa(os.Getpid()))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "write to lockfile failed")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *fileLock) Unlock() error {
|
||||
if l.file == nil {
|
||||
return ErrInvalidFile
|
||||
}
|
||||
|
||||
err := l.file.Close()
|
||||
if err != nil && !errors.Is(err, fs.ErrClosed) {
|
||||
return errors.Wrap(err, "file close error in unlock")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
135
store/json.go
135
store/json.go
|
@ -9,55 +9,46 @@ import (
|
|||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-container-networking/log"
|
||||
"github.com/Azure/azure-container-networking/platform"
|
||||
"github.com/Azure/azure-container-networking/processlock"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const (
|
||||
// Default file name for backing persistent store.
|
||||
defaultFileName = "azure-container-networking.json"
|
||||
|
||||
// Extension added to the file name for lock.
|
||||
lockExtension = ".lock"
|
||||
// LockExtension - Extension added to the file name for lock.
|
||||
LockExtension = ".lock"
|
||||
|
||||
// Maximum number of retries before failing a lock call.
|
||||
lockMaxRetries = 100
|
||||
|
||||
// Delay between lock retries.
|
||||
lockRetryDelay = 100 * time.Millisecond
|
||||
// DefaultLockTimeout - lock timeout in milliseconds
|
||||
DefaultLockTimeout = 10000 * time.Millisecond
|
||||
)
|
||||
|
||||
// jsonFileStore is an implementation of KeyValueStore using a local JSON file.
|
||||
type jsonFileStore struct {
|
||||
fileName string
|
||||
lockFileName string
|
||||
data map[string]*json.RawMessage
|
||||
inSync bool
|
||||
locked bool
|
||||
fileName string
|
||||
data map[string]*json.RawMessage
|
||||
inSync bool
|
||||
processLock processlock.Interface
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
//nolint:revive // ignoring name change
|
||||
// NewJsonFileStore creates a new jsonFileStore object, accessed as a KeyValueStore.
|
||||
func NewJsonFileStore(fileName string) (KeyValueStore, error) {
|
||||
func NewJsonFileStore(fileName string, lockclient processlock.Interface) (KeyValueStore, error) {
|
||||
if fileName == "" {
|
||||
fileName = defaultFileName
|
||||
}
|
||||
|
||||
if platform.CNILockPath != "" {
|
||||
err := os.MkdirAll(platform.CNILockPath, os.FileMode(0o664))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
kvs := &jsonFileStore{
|
||||
fileName: fileName,
|
||||
lockFileName: platform.CNILockPath + filepath.Base(fileName) + lockExtension,
|
||||
data: make(map[string]*json.RawMessage),
|
||||
fileName: fileName,
|
||||
processLock: lockclient,
|
||||
data: make(map[string]*json.RawMessage),
|
||||
}
|
||||
|
||||
return kvs, nil
|
||||
|
@ -174,84 +165,48 @@ func (kvs *jsonFileStore) flush() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Lock locks the store for exclusive access.
|
||||
func (kvs *jsonFileStore) Lock(block bool) error {
|
||||
var (
|
||||
lockFile *os.File
|
||||
err error
|
||||
)
|
||||
func (kvs *jsonFileStore) lockUtil(status chan error) {
|
||||
err := kvs.processLock.Lock()
|
||||
status <- err
|
||||
}
|
||||
|
||||
// Lock locks the store for exclusive access.
|
||||
func (kvs *jsonFileStore) Lock(timeout time.Duration) error {
|
||||
kvs.Mutex.Lock()
|
||||
defer kvs.Mutex.Unlock()
|
||||
|
||||
if kvs.locked {
|
||||
return ErrStoreLocked
|
||||
}
|
||||
afterTime := time.After(timeout)
|
||||
status := make(chan error)
|
||||
|
||||
//nolint:gomnd // 0o664 - read write mode constant
|
||||
lockPerm := os.FileMode(0o644) + os.FileMode(os.ModeExclusive)
|
||||
log.Printf("Acquiring process lock")
|
||||
go kvs.lockUtil(status)
|
||||
|
||||
// Try to acquire the lock file.
|
||||
var lockRetryCount uint
|
||||
var modTimeCur time.Time
|
||||
var modTimePrev time.Time
|
||||
for lockRetryCount < lockMaxRetries {
|
||||
lockFile, err = os.OpenFile(kvs.lockFileName, os.O_CREATE|os.O_EXCL|os.O_RDWR, lockPerm)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
|
||||
if !block {
|
||||
return ErrNonBlockingLockIsAlreadyLocked
|
||||
}
|
||||
|
||||
// Reset the lock retry count if the timestamp for the lock file changes.
|
||||
if fileInfo, err := os.Stat(kvs.lockFileName); err == nil {
|
||||
modTimeCur = fileInfo.ModTime()
|
||||
if !modTimeCur.Equal(modTimePrev) {
|
||||
lockRetryCount = 0
|
||||
}
|
||||
modTimePrev = modTimeCur
|
||||
}
|
||||
|
||||
time.Sleep(lockRetryDelay)
|
||||
|
||||
lockRetryCount++
|
||||
}
|
||||
|
||||
if lockRetryCount == lockMaxRetries {
|
||||
var err error
|
||||
select {
|
||||
case <-afterTime:
|
||||
return ErrTimeoutLockingStore
|
||||
case err = <-status:
|
||||
}
|
||||
|
||||
defer lockFile.Close()
|
||||
|
||||
// Write the process ID for easy identification.
|
||||
if _, err = lockFile.WriteString(strconv.Itoa(os.Getpid())); err != nil {
|
||||
return err
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "processLock acquire error")
|
||||
}
|
||||
|
||||
kvs.locked = true
|
||||
|
||||
log.Printf("Acquired process lock")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Unlock unlocks the store.
|
||||
func (kvs *jsonFileStore) Unlock(forceUnlock bool) error {
|
||||
func (kvs *jsonFileStore) Unlock() error {
|
||||
kvs.Mutex.Lock()
|
||||
defer kvs.Mutex.Unlock()
|
||||
|
||||
if !forceUnlock && !kvs.locked {
|
||||
return ErrStoreNotLocked
|
||||
}
|
||||
|
||||
err := os.Remove(kvs.lockFileName)
|
||||
err := kvs.processLock.Unlock()
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.Wrap(err, "unlock error")
|
||||
}
|
||||
|
||||
kvs.inSync = false
|
||||
kvs.locked = false
|
||||
|
||||
log.Printf("Released process lock")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -269,24 +224,6 @@ func (kvs *jsonFileStore) GetModificationTime() (time.Time, error) {
|
|||
return info.ModTime().UTC(), nil
|
||||
}
|
||||
|
||||
// GetLockFileModificationTime returns the modification time of the lock file of the persistent store.
|
||||
func (kvs *jsonFileStore) GetLockFileModificationTime() (time.Time, error) {
|
||||
kvs.Mutex.Lock()
|
||||
defer kvs.Mutex.Unlock()
|
||||
|
||||
info, err := os.Stat(kvs.lockFileName)
|
||||
if err != nil {
|
||||
log.Printf("os.stat() for file %v failed: %v", kvs.lockFileName, err)
|
||||
return time.Time{}.UTC(), err
|
||||
}
|
||||
|
||||
return info.ModTime().UTC(), nil
|
||||
}
|
||||
|
||||
func (kvs *jsonFileStore) GetLockFileName() string {
|
||||
return kvs.lockFileName
|
||||
}
|
||||
|
||||
func (kvs *jsonFileStore) Remove() {
|
||||
kvs.Mutex.Lock()
|
||||
if err := os.Remove(kvs.fileName); err != nil {
|
||||
|
|
|
@ -5,9 +5,12 @@ package store
|
|||
|
||||
import (
|
||||
"os"
|
||||
"runtime"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-container-networking/processlock"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -46,7 +49,7 @@ func TestKeyValuePairsAreReinstantiatedFromJSONFile(t *testing.T) {
|
|||
defer os.Remove(testFileName)
|
||||
|
||||
// Create the store, initialized using the JSON file.
|
||||
kvs, err := NewJsonFileStore(testFileName)
|
||||
kvs, err := NewJsonFileStore(testFileName, processlock.NewMockFileLock(false))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create KeyValueStore %v\n", err)
|
||||
}
|
||||
|
@ -71,7 +74,7 @@ func TestKeyValuePairsArePersistedToJSONFile(t *testing.T) {
|
|||
var actualPair string
|
||||
|
||||
// Create the store.
|
||||
kvs, err := NewJsonFileStore(testFileName)
|
||||
kvs, err := NewJsonFileStore(testFileName, processlock.NewMockFileLock(false))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create KeyValueStore %v\n", err)
|
||||
}
|
||||
|
@ -117,7 +120,7 @@ func TestKeyValuePairsAreWrittenAndReadCorrectly(t *testing.T) {
|
|||
var readValue testType1
|
||||
|
||||
// Create the store.
|
||||
kvs, err := NewJsonFileStore(testFileName)
|
||||
kvs, err := NewJsonFileStore(testFileName, processlock.NewMockFileLock(false))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create KeyValueStore %v\n", err)
|
||||
}
|
||||
|
@ -150,93 +153,70 @@ func TestKeyValuePairsAreWrittenAndReadCorrectly(t *testing.T) {
|
|||
os.Remove(testFileName)
|
||||
}
|
||||
|
||||
// Tests that locking a store gives the caller exclusive access.
|
||||
func TestLockingStoreGivesExclusiveAccess(t *testing.T) {
|
||||
anyValue := testType1{"test", 42}
|
||||
|
||||
// Create the store.
|
||||
kvs, err := NewJsonFileStore(testFileName)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create first store: %v", err)
|
||||
}
|
||||
|
||||
// Lock for exclusive access.
|
||||
err = kvs.Lock(false)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to lock store: %v", err)
|
||||
}
|
||||
|
||||
// Write a key value pair.
|
||||
err = kvs.Write(testKey1, &anyValue)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to write to store: %v", err)
|
||||
}
|
||||
|
||||
// Create a second store pointing to the same file.
|
||||
kvs2, err := NewJsonFileStore(testFileName)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create second store: %v", err)
|
||||
}
|
||||
|
||||
// Try locking the second store.
|
||||
// This should fail because the first store has exclusive access.
|
||||
err = kvs2.Lock(false)
|
||||
if err == nil {
|
||||
t.Errorf("Locking an already-locked store succeeded: %v", err)
|
||||
}
|
||||
|
||||
// Unlock the first store.
|
||||
err = kvs.Unlock(false)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to unlock first store: %v", err)
|
||||
}
|
||||
|
||||
// Try locking the second store again.
|
||||
// This should succeed because the first store revoked exclusive access.
|
||||
err = kvs2.Lock(false)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to re-lock an unlocked store: %v", err)
|
||||
}
|
||||
|
||||
// Unlock the second store.
|
||||
err = kvs2.Unlock(false)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to unlock second store: %v", err)
|
||||
}
|
||||
|
||||
// Cleanup.
|
||||
os.Remove(testFileName)
|
||||
}
|
||||
|
||||
// test case for testing newjsonfilestore idempotent
|
||||
func TestNewJsonFileStoreIdempotent(t *testing.T) {
|
||||
_, err := NewJsonFileStore(testLockFileName)
|
||||
_, err := NewJsonFileStore(testLockFileName, processlock.NewMockFileLock(false))
|
||||
if err != nil {
|
||||
t.Errorf("Failed to initialize store: %v", err)
|
||||
}
|
||||
|
||||
_, err = NewJsonFileStore(testLockFileName)
|
||||
_, err = NewJsonFileStore(testLockFileName, processlock.NewMockFileLock(false))
|
||||
if err != nil {
|
||||
t.Errorf("Failed to initialize same store second time: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// test case for checking if lockfilepath is expected
|
||||
func TestLockFilePath(t *testing.T) {
|
||||
store, err := NewJsonFileStore(testLockFileName)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to initialize store: %v", err)
|
||||
func TestLock(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
store KeyValueStore
|
||||
timeoutms int
|
||||
wantErr bool
|
||||
wantErrMsg string
|
||||
}{
|
||||
{
|
||||
name: "Acquire Lock happy path",
|
||||
store: func() KeyValueStore {
|
||||
st, _ := NewJsonFileStore(testFileName, processlock.NewMockFileLock(false))
|
||||
return st
|
||||
}(),
|
||||
timeoutms: 10000,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "Acquire Lock Fail",
|
||||
store: func() KeyValueStore {
|
||||
st, _ := NewJsonFileStore(testFileName, processlock.NewMockFileLock(true))
|
||||
return st
|
||||
}(),
|
||||
timeoutms: 10000,
|
||||
wantErr: true,
|
||||
wantErrMsg: processlock.ErrMockFileLock.Error(),
|
||||
},
|
||||
{
|
||||
name: "Acquire Lock timeout error",
|
||||
store: func() KeyValueStore {
|
||||
st, _ := NewJsonFileStore(testFileName, processlock.NewMockFileLock(false))
|
||||
return st
|
||||
}(),
|
||||
timeoutms: 0,
|
||||
wantErr: true,
|
||||
wantErrMsg: ErrTimeoutLockingStore.Error(),
|
||||
},
|
||||
}
|
||||
|
||||
lockFileName := store.GetLockFileName()
|
||||
|
||||
if runtime.GOOS == "linux" {
|
||||
if lockFileName != "/var/run/azure-vnet/"+testLockFileName+".lock" {
|
||||
t.Errorf("Not expected file lock name: %v", lockFileName)
|
||||
}
|
||||
} else {
|
||||
if lockFileName != testLockFileName+".lock" {
|
||||
t.Errorf("Not expected lockfilename: %v", lockFileName)
|
||||
}
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
err := tt.store.Lock(time.Duration(tt.timeoutms) * time.Millisecond)
|
||||
if tt.wantErr {
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), tt.wantErrMsg, "Expected:%v but got:%v", tt.wantErrMsg, err.Error())
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
err = tt.store.Unlock()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,11 +28,11 @@ func (ms *mockStore) Flush() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (ms *mockStore) Lock(block bool) error {
|
||||
func (ms *mockStore) Lock(duration time.Duration) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ms *mockStore) Unlock(forceUnlock bool) error {
|
||||
func (ms *mockStore) Unlock() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -13,11 +13,9 @@ type KeyValueStore interface {
|
|||
Read(key string, value interface{}) error
|
||||
Write(key string, value interface{}) error
|
||||
Flush() error
|
||||
Lock(block bool) error
|
||||
Unlock(forceUnlock bool) error
|
||||
Lock(timeout time.Duration) error
|
||||
Unlock() error
|
||||
GetModificationTime() (time.Time, error)
|
||||
GetLockFileModificationTime() (time.Time, error)
|
||||
GetLockFileName() string
|
||||
Remove()
|
||||
}
|
||||
|
||||
|
|
|
@ -3,8 +3,6 @@ package telemetry
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"runtime"
|
||||
|
||||
"github.com/Azure/azure-container-networking/aitelemetry"
|
||||
"github.com/Azure/azure-container-networking/log"
|
||||
)
|
||||
|
@ -73,7 +71,6 @@ func SendAIMetric(aiMetric AIMetric) {
|
|||
return
|
||||
}
|
||||
|
||||
aiMetric.Metric.CustomDimensions[OSTypeStr] = runtime.GOOS
|
||||
th.TrackMetric(aiMetric.Metric)
|
||||
}
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@ const (
|
|||
CNIAddTimeMetricStr = "CNIAddTimeMs"
|
||||
CNIDelTimeMetricStr = "CNIDelTimeMs"
|
||||
CNIUpdateTimeMetricStr = "CNIUpdateTimeMs"
|
||||
CNILockTimeoutStr = "CNILockTimeoutError"
|
||||
|
||||
// Dimension Names
|
||||
ContextStr = "Context"
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
|
||||
"github.com/Azure/azure-container-networking/aitelemetry"
|
||||
"github.com/Azure/azure-container-networking/common"
|
||||
"github.com/Azure/azure-container-networking/log"
|
||||
"github.com/Azure/azure-container-networking/platform"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
@ -112,6 +113,7 @@ func (reportMgr *ReportManager) SendReport(tb *TelemetryBuffer) error {
|
|||
if err == nil {
|
||||
// If write fails, try to re-establish connections as server/client
|
||||
if _, err = tb.Write(report); err != nil {
|
||||
log.Printf("telemetry write failed:%v", err)
|
||||
tb.Cancel()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"github.com/Azure/azure-container-networking/common"
|
||||
"github.com/Azure/azure-container-networking/log"
|
||||
"github.com/Azure/azure-container-networking/platform"
|
||||
"github.com/Azure/azure-container-networking/processlock"
|
||||
"github.com/Azure/azure-container-networking/store"
|
||||
)
|
||||
|
||||
|
@ -109,7 +110,11 @@ func (tb *TelemetryBuffer) StartServer() error {
|
|||
reportStr, err := read(conn)
|
||||
if err == nil {
|
||||
var tmp map[string]interface{}
|
||||
json.Unmarshal(reportStr, &tmp)
|
||||
err = json.Unmarshal(reportStr, &tmp)
|
||||
if err != nil {
|
||||
log.Logf("StartServer: unmarshal error:%v", err)
|
||||
return
|
||||
}
|
||||
if _, ok := tmp["CniSucceeded"]; ok {
|
||||
var cniReport CNIReport
|
||||
json.Unmarshal([]byte(reportStr), &cniReport)
|
||||
|
@ -118,6 +123,8 @@ func (tb *TelemetryBuffer) StartServer() error {
|
|||
var aiMetric AIMetric
|
||||
json.Unmarshal([]byte(reportStr), &aiMetric)
|
||||
tb.data <- aiMetric
|
||||
} else {
|
||||
log.Logf("StartServer: default case:%+v...", tmp)
|
||||
}
|
||||
} else {
|
||||
var index int
|
||||
|
@ -192,13 +199,14 @@ func read(conn net.Conn) (b []byte, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
// Write - write to the file descriptor
|
||||
// Write - write to the file descriptor.
|
||||
func (tb *TelemetryBuffer) Write(b []byte) (c int, err error) {
|
||||
buf := make([]byte, len(b))
|
||||
copy(b, buf)
|
||||
b = append(buf, Delimiter)
|
||||
copy(buf, b)
|
||||
//nolint:makezero //keeping old code
|
||||
buf = append(buf, Delimiter)
|
||||
w := bufio.NewWriter(tb.client)
|
||||
c, err = w.Write(b)
|
||||
c, err = w.Write(buf)
|
||||
if err == nil {
|
||||
err = w.Flush()
|
||||
}
|
||||
|
@ -241,18 +249,35 @@ func push(x interface{}) {
|
|||
metadata, err := common.GetHostMetadata(metadataFile)
|
||||
if err != nil {
|
||||
log.Logf("Error getting metadata %v", err)
|
||||
} else {
|
||||
kvs, err := store.NewJsonFileStore(metadataFile)
|
||||
|
||||
var lockclient processlock.Interface
|
||||
lockclient, err = processlock.NewFileLock(metadataFile + store.LockExtension)
|
||||
if err != nil {
|
||||
log.Printf("Error initializing file lock:%v", err)
|
||||
return
|
||||
}
|
||||
|
||||
var kvs store.KeyValueStore
|
||||
kvs, err = store.NewJsonFileStore(metadataFile, lockclient)
|
||||
if err != nil {
|
||||
log.Printf("Error acuiring lock for writing metadata file: %v", err)
|
||||
}
|
||||
|
||||
kvs.Lock(true)
|
||||
err = kvs.Lock(store.DefaultLockTimeout)
|
||||
if err != nil {
|
||||
log.Errorf("push: Not able to acquire lock:%v", err)
|
||||
return
|
||||
}
|
||||
|
||||
err = common.SaveHostMetadata(metadata, metadataFile)
|
||||
if err != nil {
|
||||
log.Logf("saving host metadata failed with :%v", err)
|
||||
}
|
||||
kvs.Unlock(true)
|
||||
|
||||
err = kvs.Unlock()
|
||||
if err != nil {
|
||||
log.Errorf("push: Not able to release lock:%v", err)
|
||||
}
|
||||
}
|
||||
|
||||
switch y := x.(type) {
|
||||
|
@ -262,6 +287,8 @@ func push(x interface{}) {
|
|||
|
||||
case AIMetric:
|
||||
SendAIMetric(y)
|
||||
default:
|
||||
log.Printf("Push fn: Default case:%+v", y)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -105,7 +105,7 @@ func TestWrite(t *testing.T) {
|
|||
return
|
||||
}
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tt.want, got)
|
||||
require.Equal(t, tt.want, got, "Expected:%d but got:%d", tt.want, got)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,39 +21,31 @@ type KeyValueStoreMock struct {
|
|||
GetModificationTimeError error
|
||||
}
|
||||
|
||||
func (store *KeyValueStoreMock) Read(key string, value interface{}) error {
|
||||
return store.ReadError
|
||||
func (mockst *KeyValueStoreMock) Read(key string, value interface{}) error {
|
||||
return mockst.ReadError
|
||||
}
|
||||
|
||||
func (store *KeyValueStoreMock) Write(key string, value interface{}) error {
|
||||
return store.WriteError
|
||||
func (mockst *KeyValueStoreMock) Write(key string, value interface{}) error {
|
||||
return mockst.WriteError
|
||||
}
|
||||
|
||||
func (store *KeyValueStoreMock) Flush() error {
|
||||
return store.FlushError
|
||||
func (mockst *KeyValueStoreMock) Flush() error {
|
||||
return mockst.FlushError
|
||||
}
|
||||
|
||||
func (store *KeyValueStoreMock) Lock(block bool) error {
|
||||
return store.LockError
|
||||
func (mockst *KeyValueStoreMock) Lock(time.Duration) error {
|
||||
return mockst.LockError
|
||||
}
|
||||
|
||||
func (store *KeyValueStoreMock) Unlock(forceUnlock bool) error {
|
||||
return store.UnlockError
|
||||
func (mockst *KeyValueStoreMock) Unlock() error {
|
||||
return mockst.UnlockError
|
||||
}
|
||||
|
||||
func (store *KeyValueStoreMock) GetModificationTime() (time.Time, error) {
|
||||
if store.GetModificationTimeError != nil {
|
||||
return time.Time{}, store.GetModificationTimeError
|
||||
func (mockst *KeyValueStoreMock) GetModificationTime() (time.Time, error) {
|
||||
if mockst.GetModificationTimeError != nil {
|
||||
return time.Time{}, mockst.GetModificationTimeError
|
||||
}
|
||||
return store.ModificationTime, nil
|
||||
return mockst.ModificationTime, nil
|
||||
}
|
||||
|
||||
func (store *KeyValueStoreMock) GetLockFileModificationTime() (time.Time, error) {
|
||||
return time.Now(), nil
|
||||
}
|
||||
|
||||
func (store *KeyValueStoreMock) GetLockFileName() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (store *KeyValueStoreMock) Remove() {}
|
||||
func (mockst *KeyValueStoreMock) Remove() {}
|
||||
|
|
Загрузка…
Ссылка в новой задаче