Merge branch 'main' into support-wi-token
This commit is contained in:
Коммит
3f5cff871e
10
CHANGELOG.md
10
CHANGELOG.md
|
@ -1,18 +1,20 @@
|
|||
## 2.4.0 (Unreleased)
|
||||
**Features**
|
||||
- Entry cache to hold directory listing results in cache for a given timeout. This will reduce REST calls going to storage while listing the blobs in parallel.
|
||||
|
||||
**Bug Fixes**
|
||||
- [#1426](https://github.com/Azure/azure-storage-fuse/issues/1426) Read panic in block-cache due to boundary conditions.
|
||||
- Do not allow mount path and temp-cache path to be same when using block-cache.
|
||||
- Do not allow to mount with non-empty directory provided for disk persistence in block-cache.
|
||||
- Rename file was calling an additional getProperties call.
|
||||
- Delete empty directories from local cache on rmdir operation.
|
||||
|
||||
**Features**
|
||||
- Added support for custom component via go plugin.
|
||||
- Added 'gen-config' command to auto generate blobfuse2 config file.
|
||||
|
||||
**Other Changes**
|
||||
- Stream config will be converted to block-cache config implicitly and 'stream' component is no longer used from this release onwards.
|
||||
|
||||
**Other Changes**
|
||||
- MSI login with object-id will not rely on azcli anymore.
|
||||
- MSI login with object-id will not rely on azcli anymore, rather it will be supported by 'azidentity' SDK.
|
||||
|
||||
## 2.3.2 (2024-09-03)
|
||||
**Bug Fixes**
|
||||
|
|
|
@ -99,7 +99,7 @@ Note: Blobfuse2 accepts all CLI parameters that Blobfuse does, but may ignore pa
|
|||
| --log-level=LOG_WARNING | --log-level=LOG_WARNING | logging.level | |
|
||||
| --use-attr-cache=true | --use-attr-cache=true | attr_cache | Add attr_cache to the components list |
|
||||
| --use-adls=false | --use-adls=false | azstorage.type | Specify either 'block' or 'adls' |
|
||||
| --no-symlinks=false | --no-symlinks=false | attr_cache.no-symlinks | |
|
||||
| --no-symlinks=false | --no-symlinks=true | attr_cache.no-symlinks | |
|
||||
| --cache-on-list=true | --cache-on-list=true | attr_cache.no-cache-on-list | This parameter has the opposite boolean semantics |
|
||||
| --upload-modified-only=true | --upload-modified-only=true | | Always on in blobfuse2 |
|
||||
| --max-concurrency=12 | --max-concurrency=12 | azstorage.max-concurrency | |
|
||||
|
|
|
@ -592,7 +592,7 @@ stages:
|
|||
- script: |
|
||||
echo 'mode: count' > ./blobfuse2_coverage_raw.rpt
|
||||
tail -q -n +2 ./*.cov >> ./blobfuse2_coverage_raw.rpt
|
||||
cat ./blobfuse2_coverage_raw.rpt | grep -v mock_component | grep -v base_component | grep -v loopback | grep -v tools | grep -v "common/log" | grep -v "common/exectime" | grep -v "common/types.go" | grep -v "internal/stats_manager" | grep -v "main.go" | grep -v "component/azstorage/azauthmsi.go" | grep -v "component/azstorage/azauthspn.go" | grep -v "component/stream" | grep -v "component/custom" | grep -v "component/azstorage/azauthcli.go" > ./blobfuse2_coverage.rpt
|
||||
cat ./blobfuse2_coverage_raw.rpt | grep -v mock_component | grep -v base_component | grep -v loopback | grep -v tools | grep -v "common/log" | grep -v "common/exectime" | grep -v "common/types.go" | grep -v "internal/stats_manager" | grep -v "main.go" | grep -v "component/azstorage/azauthmsi.go" | grep -v "component/azstorage/azauthspn.go" | grep -v "component/stream" | grep -v "component/custom" | grep -v "component/azstorage/azauthcli.go" | grep -v "exported/exported.go" | grep -v "component/block_cache/stream.go" > ./blobfuse2_coverage.rpt
|
||||
go tool cover -func blobfuse2_coverage.rpt > ./blobfuse2_func_cover.rpt
|
||||
go tool cover -html=./blobfuse2_coverage.rpt -o ./blobfuse2_coverage.html
|
||||
go tool cover -html=./blobfuse2_ut.cov -o ./blobfuse2_ut.html
|
||||
|
|
|
@ -1697,6 +1697,13 @@ stages:
|
|||
echo "##vso[task.setvariable variable=is_preview]$is_preview"
|
||||
fi
|
||||
|
||||
is_preview="false"
|
||||
echo "##vso[task.setvariable variable=is_preview]$is_preview"
|
||||
if [[ $marinerFuse3AmdRpm == *"preview"* ]]; then
|
||||
is_preview="true"
|
||||
echo "##vso[task.setvariable variable=is_preview]$is_preview"
|
||||
fi
|
||||
|
||||
while IFS=, read -r distro fuseArchType repoName releaseName; do
|
||||
|
||||
# If the package is preview, publish to mariner preview package
|
||||
|
|
|
@ -0,0 +1,152 @@
|
|||
/*
|
||||
_____ _____ _____ ____ ______ _____ ------
|
||||
| | | | | | | | | | | | |
|
||||
| | | | | | | | | | | | |
|
||||
| --- | | | | |-----| |---- | | |-----| |----- ------
|
||||
| | | | | | | | | | | | |
|
||||
| ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____
|
||||
|
||||
|
||||
Licensed under the MIT License <http://opensource.org/licenses/MIT>.
|
||||
|
||||
Copyright © 2020-2024 Microsoft Corporation. All rights reserved.
|
||||
Author : <blobfusedev@microsoft.com>
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE
|
||||
*/
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/Azure/azure-storage-fuse/v2/common"
|
||||
"github.com/Azure/azure-storage-fuse/v2/common/config"
|
||||
"github.com/Azure/azure-storage-fuse/v2/internal"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
type genConfigParams struct {
|
||||
blockCache bool `config:"block-cache" yaml:"block-cache,omitempty"`
|
||||
directIO bool `config:"direct-io" yaml:"direct-io,omitempty"`
|
||||
readOnly bool `config:"ro" yaml:"ro,omitempty"`
|
||||
tmpPath string `config:"tmp-path" yaml:"tmp-path,omitempty"`
|
||||
outputFile string `config:"o" yaml:"o,omitempty"`
|
||||
}
|
||||
|
||||
var optsGenCfg genConfigParams
|
||||
|
||||
var generatedConfig = &cobra.Command{
|
||||
Use: "gen-config",
|
||||
Short: "Generate default config file.",
|
||||
Long: "Generate default config file with the values pre-caculated by blobfuse2.",
|
||||
SuggestFor: []string{"generate default config", "generate config"},
|
||||
Hidden: true,
|
||||
Args: cobra.ExactArgs(0),
|
||||
FlagErrorHandling: cobra.ExitOnError,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
|
||||
// Check if configTmp is not provided when component is fc
|
||||
if (!optsGenCfg.blockCache) && optsGenCfg.tmpPath == "" {
|
||||
return fmt.Errorf("temp path is required for file cache mode. Use flag --tmp-path to provide the path")
|
||||
}
|
||||
|
||||
// Set the configs
|
||||
if optsGenCfg.readOnly {
|
||||
config.Set("read-only", "true")
|
||||
}
|
||||
|
||||
if optsGenCfg.directIO {
|
||||
config.Set("direct-io", "true")
|
||||
}
|
||||
|
||||
config.Set("tmp-path", optsGenCfg.tmpPath)
|
||||
|
||||
// Create the pipeline
|
||||
pipeline := []string{"libfuse"}
|
||||
if optsGenCfg.blockCache {
|
||||
pipeline = append(pipeline, "block_cache")
|
||||
} else {
|
||||
pipeline = append(pipeline, "file_cache")
|
||||
}
|
||||
|
||||
if !optsGenCfg.directIO {
|
||||
pipeline = append(pipeline, "attr_cache")
|
||||
}
|
||||
pipeline = append(pipeline, "azstorage")
|
||||
|
||||
var sb strings.Builder
|
||||
|
||||
if optsGenCfg.directIO {
|
||||
sb.WriteString("direct-io: true\n")
|
||||
}
|
||||
|
||||
if optsGenCfg.readOnly {
|
||||
sb.WriteString("read-only: true\n\n")
|
||||
}
|
||||
|
||||
sb.WriteString("# Logger configuration\n#logging:\n # type: syslog|silent|base\n # level: log_off|log_crit|log_err|log_warning|log_info|log_trace|log_debug\n")
|
||||
sb.WriteString(" # file-path: <path where log files shall be stored. Default - '$HOME/.blobfuse2/blobfuse2.log'>\n")
|
||||
|
||||
sb.WriteString("\ncomponents:\n")
|
||||
for _, component := range pipeline {
|
||||
sb.WriteString(fmt.Sprintf(" - %s\n", component))
|
||||
}
|
||||
|
||||
for _, component := range pipeline {
|
||||
c := internal.GetComponent(component)
|
||||
if c == nil {
|
||||
return fmt.Errorf("generatedConfig:: error getting component [%s]", component)
|
||||
}
|
||||
sb.WriteString("\n")
|
||||
sb.WriteString(c.GenConfig())
|
||||
}
|
||||
|
||||
sb.WriteString("\n#Required\n#azstorage:\n # type: block|adls \n # account-name: <name of the storage account>\n # container: <name of the storage container to be mounted>\n # endpoint: <example - https://account-name.blob.core.windows.net>\n ")
|
||||
sb.WriteString("# mode: key|sas|spn|msi|azcli \n # account-key: <storage account key>\n # OR\n # sas: <storage account sas>\n # OR\n # appid: <storage account app id / client id for MSI>\n # OR\n # tenantid: <storage account tenant id for SPN")
|
||||
|
||||
filePath := ""
|
||||
if optsGenCfg.outputFile == "" {
|
||||
filePath = "./blobfuse2.yaml"
|
||||
} else {
|
||||
filePath = optsGenCfg.outputFile
|
||||
}
|
||||
|
||||
var err error = nil
|
||||
if optsGenCfg.outputFile == "console" {
|
||||
fmt.Println(sb.String())
|
||||
} else {
|
||||
err = common.WriteToFile(filePath, sb.String(), common.WriteToFileOptions{Flags: os.O_TRUNC, Permission: 0644})
|
||||
}
|
||||
|
||||
return err
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
rootCmd.AddCommand(generatedConfig)
|
||||
|
||||
generatedConfig.Flags().BoolVar(&optsGenCfg.blockCache, "block-cache", false, "Block-Cache shall be used as caching strategy")
|
||||
generatedConfig.Flags().BoolVar(&optsGenCfg.directIO, "direct-io", false, "Direct-io mode shall be used")
|
||||
generatedConfig.Flags().BoolVar(&optsGenCfg.readOnly, "ro", false, "Mount in read-only mode")
|
||||
generatedConfig.Flags().StringVar(&optsGenCfg.tmpPath, "tmp-path", "", "Temp cache path to be used")
|
||||
generatedConfig.Flags().StringVar(&optsGenCfg.outputFile, "o", "", "Output file location")
|
||||
}
|
|
@ -0,0 +1,202 @@
|
|||
/*
|
||||
_____ _____ _____ ____ ______ _____ ------
|
||||
| | | | | | | | | | | | |
|
||||
| | | | | | | | | | | | |
|
||||
| --- | | | | |-----| |---- | | |-----| |----- ------
|
||||
| | | | | | | | | | | | |
|
||||
| ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____
|
||||
|
||||
|
||||
Licensed under the MIT License <http://opensource.org/licenses/MIT>.
|
||||
|
||||
Copyright © 2020-2024 Microsoft Corporation. All rights reserved.
|
||||
Author : <blobfusedev@microsoft.com>
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE
|
||||
*/
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
||||
type genConfig struct {
|
||||
suite.Suite
|
||||
assert *assert.Assertions
|
||||
}
|
||||
|
||||
func (suite *genConfig) SetupTest() {
|
||||
suite.assert = assert.New(suite.T())
|
||||
}
|
||||
|
||||
func (suite *genConfig) cleanupTest() {
|
||||
os.Remove(suite.getDefaultLogLocation())
|
||||
optsGenCfg = genConfigParams{}
|
||||
}
|
||||
|
||||
func (suite *genConfig) getDefaultLogLocation() string {
|
||||
return "./blobfuse2.yaml"
|
||||
}
|
||||
|
||||
func (suite *genConfig) TestNoTempPath() {
|
||||
defer suite.cleanupTest()
|
||||
|
||||
_, err := executeCommandC(rootCmd, "gen-config")
|
||||
suite.assert.NotNil(err)
|
||||
}
|
||||
|
||||
func (suite *genConfig) TestFileCacheConfigGen() {
|
||||
defer suite.cleanupTest()
|
||||
|
||||
tempDir, _ := os.MkdirTemp("", "TestTempDir")
|
||||
os.MkdirAll(tempDir, 0777)
|
||||
defer os.RemoveAll(tempDir)
|
||||
|
||||
_, err := executeCommandC(rootCmd, "gen-config", fmt.Sprintf("--tmp-path=%s", tempDir))
|
||||
suite.assert.Nil(err)
|
||||
|
||||
logFilePath := suite.getDefaultLogLocation()
|
||||
|
||||
//Check if a file is generated named generatedConfig.yaml
|
||||
suite.assert.FileExists(logFilePath)
|
||||
|
||||
//check if the generated file is not empty
|
||||
file, err := os.ReadFile(logFilePath)
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.NotEmpty(file)
|
||||
|
||||
//check if the generated file has the correct component
|
||||
suite.assert.Contains(string(file), "file_cache")
|
||||
|
||||
//check if the generated file has the correct temp path
|
||||
suite.assert.Contains(string(file), tempDir)
|
||||
}
|
||||
|
||||
func (suite *genConfig) TestBlockCacheConfigGen() {
|
||||
defer suite.cleanupTest()
|
||||
|
||||
tempDir, _ := os.MkdirTemp("", "TestTempDir")
|
||||
os.MkdirAll(tempDir, 0777)
|
||||
defer os.RemoveAll(tempDir)
|
||||
|
||||
_, err := executeCommandC(rootCmd, "gen-config", "--block-cache", fmt.Sprintf("--tmp-path=%s", tempDir))
|
||||
suite.assert.Nil(err)
|
||||
|
||||
logFilePath := suite.getDefaultLogLocation()
|
||||
|
||||
//Check if a file is generated named generatedConfig.yaml
|
||||
suite.assert.FileExists(logFilePath)
|
||||
|
||||
//check if the generated file is not empty
|
||||
file, err := os.ReadFile(logFilePath)
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.NotEmpty(file)
|
||||
|
||||
//check if the generated file has the correct component
|
||||
suite.assert.Contains(string(file), "block_cache")
|
||||
suite.assert.NotContains(string(file), "file_cache")
|
||||
|
||||
//check if the generated file has the correct temp path
|
||||
suite.assert.Contains(string(file), tempDir)
|
||||
}
|
||||
|
||||
func (suite *genConfig) TestBlockCacheConfigGen1() {
|
||||
defer suite.cleanupTest()
|
||||
|
||||
tempDir, _ := os.MkdirTemp("", "TestTempDir")
|
||||
os.MkdirAll(tempDir, 0777)
|
||||
defer os.RemoveAll(tempDir)
|
||||
|
||||
_, err := executeCommandC(rootCmd, "gen-config", "--block-cache")
|
||||
suite.assert.Nil(err)
|
||||
|
||||
logFilePath := suite.getDefaultLogLocation()
|
||||
|
||||
//Check if a file is generated named generatedConfig.yaml
|
||||
suite.assert.FileExists(logFilePath)
|
||||
|
||||
//check if the generated file is not empty
|
||||
file, err := os.ReadFile(logFilePath)
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.NotEmpty(file)
|
||||
|
||||
//check if the generated file has the correct component
|
||||
suite.assert.Contains(string(file), "block_cache")
|
||||
suite.assert.NotContains(string(file), "file_cache")
|
||||
|
||||
//check if the generated file has the correct temp path
|
||||
suite.assert.NotContains(string(file), tempDir)
|
||||
}
|
||||
|
||||
// test direct io flag
|
||||
func (suite *genConfig) TestDirectIOConfigGen() {
|
||||
defer suite.cleanupTest()
|
||||
|
||||
_, err := executeCommandC(rootCmd, "gen-config", "--block-cache", "--direct-io")
|
||||
suite.assert.Nil(err)
|
||||
|
||||
logFilePath := suite.getDefaultLogLocation()
|
||||
suite.assert.FileExists(logFilePath)
|
||||
|
||||
//check if the generated file is not empty
|
||||
file, err := os.ReadFile(logFilePath)
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.NotEmpty(file)
|
||||
|
||||
//check if the generated file has the correct direct io flag
|
||||
suite.assert.Contains(string(file), "direct-io: true")
|
||||
suite.assert.NotContains(string(file), " path: ")
|
||||
}
|
||||
|
||||
func (suite *genConfig) TestOutputFile() {
|
||||
defer suite.cleanupTest()
|
||||
|
||||
_, err := executeCommandC(rootCmd, "gen-config", "--block-cache", "--direct-io", "--o", "1.yaml")
|
||||
suite.assert.Nil(err)
|
||||
|
||||
//check if the generated file is not empty
|
||||
file, err := os.ReadFile("1.yaml")
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.NotEmpty(file)
|
||||
|
||||
//check if the generated file has the correct direct io flag
|
||||
suite.assert.Contains(string(file), "direct-io: true")
|
||||
suite.assert.NotContains(string(file), " path: ")
|
||||
_ = os.Remove("1.yaml")
|
||||
}
|
||||
|
||||
func (suite *genConfig) TestConsoleOutput() {
|
||||
defer suite.cleanupTest()
|
||||
|
||||
op, err := executeCommandC(rootCmd, "gen-config", "--block-cache", "--direct-io", "--o", "console")
|
||||
suite.assert.Nil(err)
|
||||
|
||||
//check if the generated file has the correct direct io flag
|
||||
suite.assert.Empty(op)
|
||||
}
|
||||
|
||||
func TestGenConfig(t *testing.T) {
|
||||
suite.Run(t, new(genConfig))
|
||||
}
|
|
@ -72,10 +72,10 @@ type hmonTestSuite struct {
|
|||
}
|
||||
|
||||
func generateRandomPID() string {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
var randpid int
|
||||
for i := 0; i <= 5; i++ {
|
||||
randpid = rand.Intn(90000) + 10000
|
||||
randpid = r.Intn(90000) + 10000
|
||||
_, err := os.FindProcess(randpid)
|
||||
if err != nil {
|
||||
break
|
||||
|
|
|
@ -38,6 +38,7 @@ import (
|
|||
_ "github.com/Azure/azure-storage-fuse/v2/component/azstorage"
|
||||
_ "github.com/Azure/azure-storage-fuse/v2/component/block_cache"
|
||||
_ "github.com/Azure/azure-storage-fuse/v2/component/custom"
|
||||
_ "github.com/Azure/azure-storage-fuse/v2/component/entry_cache"
|
||||
_ "github.com/Azure/azure-storage-fuse/v2/component/file_cache"
|
||||
_ "github.com/Azure/azure-storage-fuse/v2/component/libfuse"
|
||||
_ "github.com/Azure/azure-storage-fuse/v2/component/loopback"
|
||||
|
|
28
cmd/mount.go
28
cmd/mount.go
|
@ -90,10 +90,11 @@ type mountOptions struct {
|
|||
LazyWrite bool `config:"lazy-write"`
|
||||
|
||||
// v1 support
|
||||
Streaming bool `config:"streaming"`
|
||||
AttrCache bool `config:"use-attr-cache"`
|
||||
LibfuseOptions []string `config:"libfuse-options"`
|
||||
BlockCache bool `config:"block-cache"`
|
||||
Streaming bool `config:"streaming"`
|
||||
AttrCache bool `config:"use-attr-cache"`
|
||||
LibfuseOptions []string `config:"libfuse-options"`
|
||||
BlockCache bool `config:"block-cache"`
|
||||
EntryCacheTimeout int `config:"list-cache-timeout"`
|
||||
}
|
||||
|
||||
var options mountOptions
|
||||
|
@ -313,6 +314,10 @@ var mountCmd = &cobra.Command{
|
|||
options.Components = pipeline
|
||||
}
|
||||
|
||||
if config.IsSet("entry_cache.timeout-sec") || options.EntryCacheTimeout > 0 {
|
||||
options.Components = append(options.Components[:1], append([]string{"entry_cache"}, options.Components[1:]...)...)
|
||||
}
|
||||
|
||||
if config.IsSet("libfuse-options") {
|
||||
for _, v := range options.LibfuseOptions {
|
||||
parameter := strings.Split(v, "=")
|
||||
|
@ -361,6 +366,7 @@ var mountCmd = &cobra.Command{
|
|||
config.Set("lfuse.gid", fmt.Sprint(val))
|
||||
} else if v == "direct_io" || v == "direct_io=true" {
|
||||
config.Set("lfuse.direct-io", "true")
|
||||
config.Set("direct-io", "true")
|
||||
} else {
|
||||
return errors.New(common.FuseAllowedFlags)
|
||||
}
|
||||
|
@ -433,6 +439,20 @@ var mountCmd = &cobra.Command{
|
|||
log.Info("Mount Command: %s", os.Args)
|
||||
log.Crit("Logging level set to : %s", logLevel.String())
|
||||
log.Debug("Mount allowed on nonempty path : %v", options.NonEmpty)
|
||||
|
||||
directIO := false
|
||||
_ = config.UnmarshalKey("direct-io", &directIO)
|
||||
if directIO {
|
||||
// Directio is enabled, so remove the attr-cache from the pipeline
|
||||
for i, name := range options.Components {
|
||||
if name == "attr_cache" {
|
||||
options.Components = append(options.Components[:i], options.Components[i+1:]...)
|
||||
log.Crit("Mount::runPipeline : Direct IO enabled, removing attr_cache from pipeline")
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pipeline, err = internal.NewPipeline(options.Components, !daemon.WasReborn())
|
||||
if err != nil {
|
||||
if err.Error() == "Azure CLI not found on path" {
|
||||
|
|
|
@ -101,9 +101,9 @@ func TestGenerateConfig(t *testing.T) {
|
|||
}
|
||||
|
||||
func randomString(length int) string {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
b := make([]byte, length)
|
||||
rand.Read(b)
|
||||
r.Read(b)
|
||||
return fmt.Sprintf("%x", b)[:length]
|
||||
}
|
||||
|
||||
|
|
|
@ -47,7 +47,7 @@ import (
|
|||
|
||||
// Standard config default values
|
||||
const (
|
||||
blobfuse2Version_ = "2.3.3"
|
||||
blobfuse2Version_ = "2.4.0"
|
||||
|
||||
DefaultMaxLogFileSize = 512
|
||||
DefaultLogFileCount = 10
|
||||
|
|
|
@ -91,7 +91,6 @@ func IsMountActive(path string) (bool, error) {
|
|||
var out bytes.Buffer
|
||||
cmd := exec.Command("pidof", "blobfuse2")
|
||||
cmd.Stdout = &out
|
||||
|
||||
err := cmd.Run()
|
||||
if err != nil {
|
||||
if err.Error() == "exit status 1" {
|
||||
|
@ -476,3 +475,28 @@ func GetFuseMinorVersion() int {
|
|||
|
||||
return val
|
||||
}
|
||||
|
||||
type WriteToFileOptions struct {
|
||||
Flags int
|
||||
Permission os.FileMode
|
||||
}
|
||||
|
||||
func WriteToFile(filename string, data string, options WriteToFileOptions) error {
|
||||
// Open the file with the provided flags, create it if it doesn't exist
|
||||
//check if options.Permission is 0 if so then assign 0777
|
||||
if options.Permission == 0 {
|
||||
options.Permission = 0777
|
||||
}
|
||||
file, err := os.OpenFile(filename, options.Flags|os.O_CREATE|os.O_WRONLY, options.Permission)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error opening file: [%s]", err.Error())
|
||||
}
|
||||
defer file.Close() // Ensure the file is closed when we're done
|
||||
|
||||
// Write the data content to the file
|
||||
if _, err := file.WriteString(data); err != nil {
|
||||
return fmt.Errorf("error writing to file [%s]", err.Error())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -35,13 +35,12 @@ package common
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
@ -50,7 +49,6 @@ import (
|
|||
var home_dir, _ = os.UserHomeDir()
|
||||
|
||||
func randomString(length int) string {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
b := make([]byte, length)
|
||||
rand.Read(b)
|
||||
return fmt.Sprintf("%x", b)[:length]
|
||||
|
@ -69,18 +67,22 @@ func TestUtil(t *testing.T) {
|
|||
suite.Run(t, new(utilTestSuite))
|
||||
}
|
||||
|
||||
func (suite *typesTestSuite) TestIsMountActiveNoMount() {
|
||||
func (suite *utilTestSuite) TestIsMountActiveNoMount() {
|
||||
var out bytes.Buffer
|
||||
cmd := exec.Command("pidof", "blobfuse2")
|
||||
cmd := exec.Command("../blobfuse2", "unmount", "all")
|
||||
cmd.Stdout = &out
|
||||
err := cmd.Run()
|
||||
suite.assert.Nil(err)
|
||||
cmd = exec.Command("pidof", "blobfuse2")
|
||||
cmd.Stdout = &out
|
||||
err = cmd.Run()
|
||||
suite.assert.Equal("exit status 1", err.Error())
|
||||
res, err := IsMountActive("/mnt/blobfuse")
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.False(res)
|
||||
}
|
||||
|
||||
func (suite *typesTestSuite) TestIsMountActiveTwoMounts() {
|
||||
func (suite *utilTestSuite) TestIsMountActiveTwoMounts() {
|
||||
var out bytes.Buffer
|
||||
|
||||
// Define the file name and the content you want to write
|
||||
|
@ -336,6 +338,31 @@ func (suite *utilTestSuite) TestDirectoryCleanup() {
|
|||
|
||||
}
|
||||
|
||||
func (suite *utilTestSuite) TestWriteToFile() {
|
||||
homeDir, err := os.UserHomeDir()
|
||||
if err != nil {
|
||||
fmt.Println("Error getting home directory:", err)
|
||||
return
|
||||
}
|
||||
filePath := fmt.Sprintf(".blobfuse2/test_%s.txt", randomString(8))
|
||||
content := "Hello World"
|
||||
filePath = homeDir + "/" + filePath
|
||||
|
||||
defer os.Remove(filePath)
|
||||
|
||||
err = WriteToFile(filePath, content, WriteToFileOptions{})
|
||||
suite.assert.Nil(err)
|
||||
|
||||
// Check if file exists
|
||||
suite.assert.FileExists(filePath)
|
||||
|
||||
// Check the content of the file
|
||||
data, err := os.ReadFile(filePath)
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.Equal(content, string(data))
|
||||
|
||||
}
|
||||
|
||||
func (suite *utilTestSuite) TestGetFuseMinorVersion() {
|
||||
i := GetFuseMinorVersion()
|
||||
suite.assert.GreaterOrEqual(i, 0)
|
||||
|
|
|
@ -55,7 +55,6 @@ const defaultAttrCacheTimeout uint32 = (120)
|
|||
type AttrCache struct {
|
||||
internal.BaseComponent
|
||||
cacheTimeout uint32
|
||||
cacheOnList bool
|
||||
noSymlinks bool
|
||||
maxFiles int
|
||||
cacheMap map[string]*attrCacheItem
|
||||
|
@ -119,6 +118,17 @@ func (ac *AttrCache) Stop() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// GenConfig : Generate the default config for the component
|
||||
func (ac *AttrCache) GenConfig() string {
|
||||
log.Info("AttrCache::Configure : config generation started")
|
||||
|
||||
var sb strings.Builder
|
||||
sb.WriteString(fmt.Sprintf("\n%s:", ac.Name()))
|
||||
sb.WriteString(fmt.Sprintf("\n timeout-sec: %v", defaultAttrCacheTimeout))
|
||||
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
// Configure : Pipeline will call this method after constructor so that you can read config and initialize yourself
|
||||
//
|
||||
// Return failure if any config is not valid to exit the process
|
||||
|
@ -139,22 +149,18 @@ func (ac *AttrCache) Configure(_ bool) error {
|
|||
ac.cacheTimeout = defaultAttrCacheTimeout
|
||||
}
|
||||
|
||||
if config.IsSet(compName + ".cache-on-list") {
|
||||
ac.cacheOnList = conf.CacheOnList
|
||||
} else {
|
||||
ac.cacheOnList = !conf.NoCacheOnList
|
||||
}
|
||||
|
||||
if config.IsSet(compName + ".max-files") {
|
||||
ac.maxFiles = conf.MaxFiles
|
||||
} else {
|
||||
ac.maxFiles = defaultMaxFiles
|
||||
}
|
||||
|
||||
ac.noSymlinks = conf.NoSymlinks
|
||||
if config.IsSet(compName + ".no-symlinks") {
|
||||
ac.noSymlinks = conf.NoSymlinks
|
||||
}
|
||||
|
||||
log.Info("AttrCache::Configure : cache-timeout %d, symlink %t, cache-on-list %t, max-files %d",
|
||||
ac.cacheTimeout, ac.noSymlinks, ac.cacheOnList, ac.maxFiles)
|
||||
log.Crit("AttrCache::Configure : cache-timeout %d, symlink %t, max-files %d",
|
||||
ac.cacheTimeout, ac.noSymlinks, ac.maxFiles)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -283,7 +289,7 @@ func (ac *AttrCache) StreamDir(options internal.StreamDirOptions) ([]*internal.O
|
|||
// cacheAttributes : On dir listing cache the attributes for all files
|
||||
func (ac *AttrCache) cacheAttributes(pathList []*internal.ObjAttr) {
|
||||
// Check whether or not we are supposed to cache on list
|
||||
if ac.cacheOnList && len(pathList) > 0 {
|
||||
if len(pathList) > 0 {
|
||||
// Putting this inside loop is heavy as for each item we will do a kernel call to get current time
|
||||
// If there are millions of blobs then cost of this is very high.
|
||||
currTime := time.Now()
|
||||
|
@ -477,14 +483,8 @@ func (ac *AttrCache) GetAttr(options internal.GetAttrOptions) (*internal.ObjAttr
|
|||
// no entry if path does not exist
|
||||
return &internal.ObjAttr{}, syscall.ENOENT
|
||||
} else {
|
||||
// IsMetadataRetrieved is false in the case of ADLS List since the API does not support metadata.
|
||||
// Once migration of ADLS list to blob endpoint is done (in future service versions), we can remove this.
|
||||
// options.RetrieveMetadata is set by CopyFromFile and WriteFile which need metadata to ensure it is preserved.
|
||||
if value.getAttr().IsMetadataRetrieved() || (ac.noSymlinks && !options.RetrieveMetadata) {
|
||||
// path exists and we have all the metadata required or we do not care about metadata
|
||||
log.Debug("AttrCache::GetAttr : %s served from cache", options.Name)
|
||||
return value.getAttr(), nil
|
||||
}
|
||||
log.Debug("AttrCache::GetAttr : %s served from cache", options.Name)
|
||||
return value.getAttr(), nil
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -80,9 +80,6 @@ func newTestAttrCache(next internal.Component, configuration string) *AttrCache
|
|||
|
||||
func getPathAttr(path string, size int64, mode os.FileMode, metadata bool) *internal.ObjAttr {
|
||||
flags := internal.NewFileBitMap()
|
||||
if metadata {
|
||||
flags.Set(internal.PropFlagMetadataRetrieved)
|
||||
}
|
||||
return &internal.ObjAttr{
|
||||
Path: path,
|
||||
Name: filepath.Base(path),
|
||||
|
@ -210,8 +207,7 @@ func (suite *attrCacheTestSuite) TestDefault() {
|
|||
defer suite.cleanupTest()
|
||||
suite.assert.Equal(suite.attrCache.Name(), "attr_cache")
|
||||
suite.assert.EqualValues(suite.attrCache.cacheTimeout, 120)
|
||||
suite.assert.Equal(suite.attrCache.cacheOnList, true)
|
||||
suite.assert.Equal(suite.attrCache.noSymlinks, false)
|
||||
// suite.assert.Equal(suite.attrCache.noSymlinks, false)
|
||||
}
|
||||
|
||||
// Tests configuration
|
||||
|
@ -223,7 +219,6 @@ func (suite *attrCacheTestSuite) TestConfig() {
|
|||
|
||||
suite.assert.Equal(suite.attrCache.Name(), "attr_cache")
|
||||
suite.assert.EqualValues(suite.attrCache.cacheTimeout, 60)
|
||||
suite.assert.Equal(suite.attrCache.cacheOnList, false)
|
||||
suite.assert.Equal(suite.attrCache.noSymlinks, true)
|
||||
}
|
||||
|
||||
|
@ -246,7 +241,6 @@ func (suite *attrCacheTestSuite) TestConfigZero() {
|
|||
|
||||
suite.assert.Equal(suite.attrCache.Name(), "attr_cache")
|
||||
suite.assert.EqualValues(suite.attrCache.cacheTimeout, 0)
|
||||
suite.assert.Equal(suite.attrCache.cacheOnList, false)
|
||||
suite.assert.Equal(suite.attrCache.noSymlinks, true)
|
||||
}
|
||||
|
||||
|
@ -426,29 +420,6 @@ func (suite *attrCacheTestSuite) TestReadDirExists() {
|
|||
}
|
||||
}
|
||||
|
||||
func (suite *attrCacheTestSuite) TestReadDirNoCacheOnList() {
|
||||
defer suite.cleanupTest()
|
||||
suite.cleanupTest() // clean up the default attr cache generated
|
||||
cacheOnList := false
|
||||
config := fmt.Sprintf("attr_cache:\n no-cache-on-list: %t", !cacheOnList)
|
||||
suite.setupTestHelper(config) // setup a new attr cache with a custom config (clean up will occur after the test as usual)
|
||||
suite.assert.EqualValues(suite.attrCache.cacheOnList, cacheOnList)
|
||||
path := "a"
|
||||
size := int64(1024)
|
||||
mode := os.FileMode(0)
|
||||
aAttr := generateNestedPathAttr(path, size, mode)
|
||||
|
||||
options := internal.ReadDirOptions{Name: path}
|
||||
suite.mock.EXPECT().ReadDir(options).Return(aAttr, nil)
|
||||
|
||||
suite.assert.Empty(suite.attrCache.cacheMap) // cacheMap should be empty before call
|
||||
returnedAttr, err := suite.attrCache.ReadDir(options)
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.Equal(aAttr, returnedAttr)
|
||||
|
||||
suite.assert.Empty(suite.attrCache.cacheMap) // cacheMap should be empty after call
|
||||
}
|
||||
|
||||
func (suite *attrCacheTestSuite) TestReadDirError() {
|
||||
defer suite.cleanupTest()
|
||||
var paths = []string{"a", "a/", "ab", "ab/"}
|
||||
|
@ -912,7 +883,6 @@ func (suite *attrCacheTestSuite) TestGetAttrExistsWithoutMetadataNoSymlinks() {
|
|||
// This is a little janky but required since testify suite does not support running setup or clean up for subtests.
|
||||
suite.cleanupTest()
|
||||
suite.setupTestHelper(config) // setup a new attr cache with a custom config (clean up will occur after the test as usual)
|
||||
suite.assert.EqualValues(suite.attrCache.cacheOnList, noSymlinks)
|
||||
suite.Run(path, func() {
|
||||
truncatedPath := internal.TruncateDirName(path)
|
||||
addDirectoryToCache(suite.assert, suite.attrCache, "a", true) // add the paths to the cache with IsMetadataRetrived=true
|
||||
|
@ -941,7 +911,7 @@ func (suite *attrCacheTestSuite) TestGetAttrExistsWithoutMetadata() {
|
|||
|
||||
options := internal.GetAttrOptions{Name: path}
|
||||
// attributes should not be accessible so call the mock
|
||||
suite.mock.EXPECT().GetAttr(options).Return(getPathAttr(path, defaultSize, fs.FileMode(defaultMode), false), nil)
|
||||
//suite.mock.EXPECT().GetAttr(options).Return(getPathAttr(path, defaultSize, fs.FileMode(defaultMode), false), nil)
|
||||
|
||||
_, err := suite.attrCache.GetAttr(options)
|
||||
suite.assert.Nil(err)
|
||||
|
|
|
@ -508,7 +508,7 @@ func (az *AzStorage) CreateLink(options internal.CreateLinkOptions) error {
|
|||
|
||||
func (az *AzStorage) ReadLink(options internal.ReadLinkOptions) (string, error) {
|
||||
log.Trace("AzStorage::ReadLink : Read symlink %s", options.Name)
|
||||
data, err := az.storage.ReadBuffer(options.Name, 0, 0)
|
||||
data, err := az.storage.ReadBuffer(options.Name, 0, options.Size)
|
||||
|
||||
if err != nil {
|
||||
azStatsCollector.PushEvents(readLink, options.Name, nil)
|
||||
|
|
|
@ -457,7 +457,6 @@ func (bb *BlockBlob) getAttrUsingRest(name string) (attr *internal.ObjAttr, err
|
|||
|
||||
parseMetadata(attr, prop.Metadata)
|
||||
|
||||
attr.Flags.Set(internal.PropFlagMetadataRetrieved)
|
||||
attr.Flags.Set(internal.PropFlagModeDefault)
|
||||
|
||||
return attr, nil
|
||||
|
@ -602,7 +601,6 @@ func (bb *BlockBlob) List(prefix string, marker *string, count int32) ([]*intern
|
|||
MD5: blobInfo.Properties.ContentMD5,
|
||||
}
|
||||
parseMetadata(attr, blobInfo.Metadata)
|
||||
attr.Flags.Set(internal.PropFlagMetadataRetrieved)
|
||||
attr.Flags.Set(internal.PropFlagModeDefault)
|
||||
}
|
||||
blobList = append(blobList, attr)
|
||||
|
@ -641,7 +639,6 @@ func (bb *BlockBlob) List(prefix string, marker *string, count int32) ([]*intern
|
|||
attr.Atime = attr.Mtime
|
||||
attr.Crtime = attr.Mtime
|
||||
attr.Ctime = attr.Mtime
|
||||
attr.Flags.Set(internal.PropFlagMetadataRetrieved)
|
||||
attr.Flags.Set(internal.PropFlagModeDefault)
|
||||
blobList = append(blobList, attr)
|
||||
}
|
||||
|
|
|
@ -40,6 +40,7 @@ import (
|
|||
"bytes"
|
||||
"container/list"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
|
@ -47,7 +48,6 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
@ -285,9 +285,8 @@ func (s *blockBlobTestSuite) TestDefault() {
|
|||
}
|
||||
|
||||
func randomString(length int) string {
|
||||
random := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
b := make([]byte, length)
|
||||
random.Read(b)
|
||||
rand.Read(b)
|
||||
return fmt.Sprintf("%x", b)[:length]
|
||||
}
|
||||
|
||||
|
@ -644,7 +643,6 @@ func (s *blockBlobTestSuite) TestReadDirNoVirtualDirectory() {
|
|||
s.assert.EqualValues(name, entries[0].Path)
|
||||
s.assert.EqualValues(name, entries[0].Name)
|
||||
s.assert.True(entries[0].IsDir())
|
||||
s.assert.True(entries[0].IsMetadataRetrieved())
|
||||
s.assert.True(entries[0].IsModeDefault())
|
||||
})
|
||||
}
|
||||
|
@ -664,13 +662,11 @@ func (s *blockBlobTestSuite) TestReadDirHierarchy() {
|
|||
s.assert.EqualValues(base+"/c1", entries[0].Path)
|
||||
s.assert.EqualValues("c1", entries[0].Name)
|
||||
s.assert.True(entries[0].IsDir())
|
||||
s.assert.True(entries[0].IsMetadataRetrieved())
|
||||
s.assert.True(entries[0].IsModeDefault())
|
||||
// Check the file
|
||||
s.assert.EqualValues(base+"/c2", entries[1].Path)
|
||||
s.assert.EqualValues("c2", entries[1].Name)
|
||||
s.assert.False(entries[1].IsDir())
|
||||
s.assert.True(entries[1].IsMetadataRetrieved())
|
||||
s.assert.True(entries[1].IsModeDefault())
|
||||
}
|
||||
|
||||
|
@ -693,19 +689,16 @@ func (s *blockBlobTestSuite) TestReadDirRoot() {
|
|||
s.assert.EqualValues(base, entries[0].Path)
|
||||
s.assert.EqualValues(base, entries[0].Name)
|
||||
s.assert.True(entries[0].IsDir())
|
||||
s.assert.True(entries[0].IsMetadataRetrieved())
|
||||
s.assert.True(entries[0].IsModeDefault())
|
||||
// Check the baseb dir
|
||||
s.assert.EqualValues(base+"b", entries[1].Path)
|
||||
s.assert.EqualValues(base+"b", entries[1].Name)
|
||||
s.assert.True(entries[1].IsDir())
|
||||
s.assert.True(entries[1].IsMetadataRetrieved())
|
||||
s.assert.True(entries[1].IsModeDefault())
|
||||
// Check the basec file
|
||||
s.assert.EqualValues(base+"c", entries[2].Path)
|
||||
s.assert.EqualValues(base+"c", entries[2].Name)
|
||||
s.assert.False(entries[2].IsDir())
|
||||
s.assert.True(entries[2].IsMetadataRetrieved())
|
||||
s.assert.True(entries[2].IsModeDefault())
|
||||
})
|
||||
}
|
||||
|
@ -725,7 +718,6 @@ func (s *blockBlobTestSuite) TestReadDirSubDir() {
|
|||
s.assert.EqualValues(base+"/c1"+"/gc1", entries[0].Path)
|
||||
s.assert.EqualValues("gc1", entries[0].Name)
|
||||
s.assert.False(entries[0].IsDir())
|
||||
s.assert.True(entries[0].IsMetadataRetrieved())
|
||||
s.assert.True(entries[0].IsModeDefault())
|
||||
}
|
||||
|
||||
|
@ -745,7 +737,6 @@ func (s *blockBlobTestSuite) TestReadDirSubDirPrefixPath() {
|
|||
s.assert.EqualValues("c1"+"/gc1", entries[0].Path)
|
||||
s.assert.EqualValues("gc1", entries[0].Name)
|
||||
s.assert.False(entries[0].IsDir())
|
||||
s.assert.True(entries[0].IsMetadataRetrieved())
|
||||
s.assert.True(entries[0].IsModeDefault())
|
||||
}
|
||||
|
||||
|
|
|
@ -502,14 +502,14 @@ func ParseAndValidateConfig(az *AzStorage, opt AzStorageOptions) error {
|
|||
log.Warn("unsupported v1 CLI parameter: debug-libcurl is not applicable in blobfuse2.")
|
||||
}
|
||||
|
||||
log.Info("ParseAndValidateConfig : account %s, container %s, account-type %s, auth %s, prefix %s, endpoint %s, MD5 %v %v, virtual-directory %v, disable-compression %v, CPK %v",
|
||||
log.Crit("ParseAndValidateConfig : account %s, container %s, account-type %s, auth %s, prefix %s, endpoint %s, MD5 %v %v, virtual-directory %v, disable-compression %v, CPK %v",
|
||||
az.stConfig.authConfig.AccountName, az.stConfig.container, az.stConfig.authConfig.AccountType, az.stConfig.authConfig.AuthMode,
|
||||
az.stConfig.prefixPath, az.stConfig.authConfig.Endpoint, az.stConfig.validateMD5, az.stConfig.updateMD5, az.stConfig.virtualDirectory, az.stConfig.disableCompression, az.stConfig.cpkEnabled)
|
||||
log.Info("ParseAndValidateConfig : use-HTTP %t, block-size %d, max-concurrency %d, default-tier %s, fail-unsupported-op %t, mount-all-containers %t", az.stConfig.authConfig.UseHTTP, az.stConfig.blockSize, az.stConfig.maxConcurrency, az.stConfig.defaultTier, az.stConfig.ignoreAccessModifiers, az.stConfig.mountAllContainers)
|
||||
log.Info("ParseAndValidateConfig : Retry Config: retry-count %d, max-timeout %d, backoff-time %d, max-delay %d",
|
||||
log.Crit("ParseAndValidateConfig : use-HTTP %t, block-size %d, max-concurrency %d, default-tier %s, fail-unsupported-op %t, mount-all-containers %t", az.stConfig.authConfig.UseHTTP, az.stConfig.blockSize, az.stConfig.maxConcurrency, az.stConfig.defaultTier, az.stConfig.ignoreAccessModifiers, az.stConfig.mountAllContainers)
|
||||
log.Crit("ParseAndValidateConfig : Retry Config: retry-count %d, max-timeout %d, backoff-time %d, max-delay %d",
|
||||
az.stConfig.maxRetries, az.stConfig.maxTimeout, az.stConfig.backoffTime, az.stConfig.maxRetryDelay)
|
||||
|
||||
log.Info("ParseAndValidateConfig : Telemetry : %s, honour-ACL %v, disable-symlink %v", az.stConfig.telemetry, az.stConfig.honourACL, az.stConfig.disableSymlink)
|
||||
log.Crit("ParseAndValidateConfig : Telemetry : %s, honour-ACL %v, disable-symlink %v", az.stConfig.telemetry, az.stConfig.honourACL, az.stConfig.disableSymlink)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -561,15 +561,14 @@ func ParseAndReadDynamicConfig(az *AzStorage, opt AzStorageOptions, reload bool)
|
|||
az.stConfig.honourACL = false
|
||||
}
|
||||
|
||||
// by default symlink will be disabled
|
||||
az.stConfig.disableSymlink = true
|
||||
|
||||
if config.IsSet("attr_cache.no-symlinks") {
|
||||
err := config.UnmarshalKey("attr_cache.no-symlinks", &az.stConfig.disableSymlink)
|
||||
if err != nil {
|
||||
az.stConfig.disableSymlink = true
|
||||
log.Err("ParseAndReadDynamicConfig : Failed to unmarshal attr_cache.no-symlinks")
|
||||
}
|
||||
} else {
|
||||
// by default symlink will be disabled
|
||||
az.stConfig.disableSymlink = true
|
||||
}
|
||||
|
||||
// Auth related reconfig
|
||||
|
|
|
@ -402,8 +402,6 @@ func (dl *Datalake) GetAttr(name string) (attr *internal.ObjAttr, err error) {
|
|||
attr.Mode = attr.Mode | os.ModeDir
|
||||
}
|
||||
|
||||
attr.Flags.Set(internal.PropFlagMetadataRetrieved)
|
||||
|
||||
if dl.Config.honourACL && dl.Config.authConfig.ObjectID != "" {
|
||||
acl, err := fileClient.GetAccessControl(context.Background(), nil)
|
||||
if err != nil {
|
||||
|
|
|
@ -39,11 +39,11 @@ package azstorage
|
|||
import (
|
||||
"bytes"
|
||||
"container/list"
|
||||
"crypto/rand"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"os"
|
||||
"strings"
|
||||
"syscall"
|
||||
|
@ -498,13 +498,11 @@ func (s *datalakeTestSuite) TestReadDirHierarchy() {
|
|||
s.assert.EqualValues(base+"/c1", entries[0].Path)
|
||||
s.assert.EqualValues("c1", entries[0].Name)
|
||||
s.assert.True(entries[0].IsDir())
|
||||
s.assert.False(entries[0].IsMetadataRetrieved())
|
||||
s.assert.False(entries[0].IsModeDefault())
|
||||
// Check the file
|
||||
s.assert.EqualValues(base+"/c2", entries[1].Path)
|
||||
s.assert.EqualValues("c2", entries[1].Name)
|
||||
s.assert.False(entries[1].IsDir())
|
||||
s.assert.False(entries[1].IsMetadataRetrieved())
|
||||
s.assert.False(entries[1].IsModeDefault())
|
||||
}
|
||||
|
||||
|
@ -527,19 +525,16 @@ func (s *datalakeTestSuite) TestReadDirRoot() {
|
|||
s.assert.EqualValues(base, entries[0].Path)
|
||||
s.assert.EqualValues(base, entries[0].Name)
|
||||
s.assert.True(entries[0].IsDir())
|
||||
s.assert.False(entries[0].IsMetadataRetrieved())
|
||||
s.assert.False(entries[0].IsModeDefault())
|
||||
// Check the baseb dir
|
||||
s.assert.EqualValues(base+"b", entries[1].Path)
|
||||
s.assert.EqualValues(base+"b", entries[1].Name)
|
||||
s.assert.True(entries[1].IsDir())
|
||||
s.assert.False(entries[1].IsMetadataRetrieved())
|
||||
s.assert.False(entries[1].IsModeDefault())
|
||||
// Check the basec file
|
||||
s.assert.EqualValues(base+"c", entries[2].Path)
|
||||
s.assert.EqualValues(base+"c", entries[2].Name)
|
||||
s.assert.False(entries[2].IsDir())
|
||||
s.assert.False(entries[2].IsMetadataRetrieved())
|
||||
s.assert.False(entries[2].IsModeDefault())
|
||||
})
|
||||
}
|
||||
|
@ -559,7 +554,6 @@ func (s *datalakeTestSuite) TestReadDirSubDir() {
|
|||
s.assert.EqualValues(base+"/c1"+"/gc1", entries[0].Path)
|
||||
s.assert.EqualValues("gc1", entries[0].Name)
|
||||
s.assert.False(entries[0].IsDir())
|
||||
s.assert.False(entries[0].IsMetadataRetrieved())
|
||||
s.assert.False(entries[0].IsModeDefault())
|
||||
}
|
||||
|
||||
|
@ -579,7 +573,6 @@ func (s *datalakeTestSuite) TestReadDirSubDirPrefixPath() {
|
|||
s.assert.EqualValues(base+"/c1"+"/gc1", entries[0].Path)
|
||||
s.assert.EqualValues("gc1", entries[0].Name)
|
||||
s.assert.False(entries[0].IsDir())
|
||||
s.assert.False(entries[0].IsMetadataRetrieved())
|
||||
s.assert.False(entries[0].IsModeDefault())
|
||||
}
|
||||
|
||||
|
|
|
@ -102,15 +102,16 @@ type BlockCacheOptions struct {
|
|||
}
|
||||
|
||||
const (
|
||||
compName = "block_cache"
|
||||
defaultTimeout = 120
|
||||
MAX_POOL_USAGE uint32 = 80
|
||||
MIN_POOL_USAGE uint32 = 50
|
||||
MIN_PREFETCH = 5
|
||||
MIN_WRITE_BLOCK = 3
|
||||
MIN_RANDREAD = 10
|
||||
MAX_FAIL_CNT = 3
|
||||
MAX_BLOCKS = 50000
|
||||
compName = "block_cache"
|
||||
defaultTimeout = 120
|
||||
defaultBlockSize = 16
|
||||
MAX_POOL_USAGE uint32 = 80
|
||||
MIN_POOL_USAGE uint32 = 50
|
||||
MIN_PREFETCH = 5
|
||||
MIN_WRITE_BLOCK = 3
|
||||
MIN_RANDREAD = 10
|
||||
MAX_FAIL_CNT = 3
|
||||
MAX_BLOCKS = 50000
|
||||
)
|
||||
|
||||
// Verification to check satisfaction criteria with Component Interface
|
||||
|
@ -171,6 +172,35 @@ func (bc *BlockCache) Stop() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// GenConfig : Generate the default config for the component
|
||||
func (bc *BlockCache) GenConfig() string {
|
||||
log.Info("BlockCache::Configure : config generation started")
|
||||
|
||||
prefetch := uint32(math.Max((MIN_PREFETCH*2)+1, (float64)(2*runtime.NumCPU())))
|
||||
memSize := uint32(bc.getDefaultMemSize() / _1MB)
|
||||
if (prefetch * defaultBlockSize) > memSize {
|
||||
prefetch = (MIN_PREFETCH * 2) + 1
|
||||
}
|
||||
|
||||
var sb strings.Builder
|
||||
sb.WriteString(fmt.Sprintf("\n%s:", bc.Name()))
|
||||
|
||||
sb.WriteString(fmt.Sprintf("\n block-size-mb: %v", defaultBlockSize))
|
||||
sb.WriteString(fmt.Sprintf("\n mem-size-mb: %v", memSize))
|
||||
sb.WriteString(fmt.Sprintf("\n prefetch: %v", prefetch))
|
||||
sb.WriteString(fmt.Sprintf("\n parallelism: %v", uint32(3*runtime.NumCPU())))
|
||||
|
||||
var tmpPath string = ""
|
||||
_ = config.UnmarshalKey("tmp-path", &tmpPath)
|
||||
if tmpPath != "" {
|
||||
sb.WriteString(fmt.Sprintf("\n path: %v", tmpPath))
|
||||
sb.WriteString(fmt.Sprintf("\n disk-size-mb: %v", bc.getDefaultDiskSize(tmpPath)))
|
||||
sb.WriteString(fmt.Sprintf("\n disk-timeout-sec: %v", defaultTimeout))
|
||||
}
|
||||
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
// Configure : Pipeline will call this method after constructor so that you can read config and initialize yourself
|
||||
//
|
||||
// Return failure if any config is not valid to exit the process
|
||||
|
@ -183,7 +213,7 @@ func (bc *BlockCache) Configure(_ bool) error {
|
|||
return fmt.Errorf("config error in %s [%s]", bc.Name(), err.Error())
|
||||
}
|
||||
}
|
||||
defaultMemSize := false
|
||||
|
||||
conf := BlockCacheOptions{}
|
||||
err := config.UnmarshalKey(bc.Name(), &conf)
|
||||
if err != nil {
|
||||
|
@ -191,7 +221,7 @@ func (bc *BlockCache) Configure(_ bool) error {
|
|||
return fmt.Errorf("config error in %s [%s]", bc.Name(), err.Error())
|
||||
}
|
||||
|
||||
bc.blockSize = uint64(16) * _1MB
|
||||
bc.blockSize = uint64(defaultBlockSize) * _1MB
|
||||
if config.IsSet(compName + ".block-size-mb") {
|
||||
bc.blockSize = uint64(conf.BlockSize * float64(_1MB))
|
||||
}
|
||||
|
@ -199,15 +229,7 @@ func (bc *BlockCache) Configure(_ bool) error {
|
|||
if config.IsSet(compName + ".mem-size-mb") {
|
||||
bc.memSize = conf.MemSize * _1MB
|
||||
} else {
|
||||
var sysinfo syscall.Sysinfo_t
|
||||
err = syscall.Sysinfo(&sysinfo)
|
||||
if err != nil {
|
||||
log.Err("BlockCache::Configure : config error %s [%s]. Assigning a pre-defined value of 4GB.", bc.Name(), err.Error())
|
||||
bc.memSize = uint64(4192) * _1MB
|
||||
} else {
|
||||
bc.memSize = uint64(0.8 * (float64)(sysinfo.Freeram) * float64(sysinfo.Unit))
|
||||
defaultMemSize = true
|
||||
}
|
||||
bc.memSize = bc.getDefaultMemSize()
|
||||
}
|
||||
|
||||
bc.diskTimeout = defaultTimeout
|
||||
|
@ -219,7 +241,7 @@ func (bc *BlockCache) Configure(_ bool) error {
|
|||
bc.prefetch = uint32(math.Max((MIN_PREFETCH*2)+1, (float64)(2*runtime.NumCPU())))
|
||||
bc.noPrefetch = false
|
||||
|
||||
if defaultMemSize && (uint64(bc.prefetch)*uint64(bc.blockSize)) > bc.memSize {
|
||||
if (!config.IsSet(compName + ".mem-size-mb")) && (uint64(bc.prefetch)*uint64(bc.blockSize)) > bc.memSize {
|
||||
bc.prefetch = (MIN_PREFETCH * 2) + 1
|
||||
}
|
||||
|
||||
|
@ -246,16 +268,16 @@ func (bc *BlockCache) Configure(_ bool) error {
|
|||
bc.workers = conf.Workers
|
||||
}
|
||||
|
||||
bc.tmpPath = ""
|
||||
if conf.TmpPath != "" {
|
||||
bc.tmpPath = common.ExpandPath(conf.TmpPath)
|
||||
//check mnt path is not same as temp path
|
||||
bc.tmpPath = common.ExpandPath(conf.TmpPath)
|
||||
|
||||
if bc.tmpPath != "" {
|
||||
//check mnt path is not same as temp path
|
||||
err = config.UnmarshalKey("mount-path", &bc.mntPath)
|
||||
if err != nil {
|
||||
log.Err("BlockCache: config error [unable to obtain Mount Path]")
|
||||
return fmt.Errorf("config error in %s [%s]", bc.Name(), err.Error())
|
||||
}
|
||||
|
||||
if bc.mntPath == bc.tmpPath {
|
||||
log.Err("BlockCache: config error [tmp-path is same as mount path]")
|
||||
return fmt.Errorf("config error in %s error [tmp-path is same as mount path]", bc.Name())
|
||||
|
@ -277,28 +299,17 @@ func (bc *BlockCache) Configure(_ bool) error {
|
|||
return fmt.Errorf("config error in %s [%s]", bc.Name(), "temp directory not empty")
|
||||
}
|
||||
|
||||
var stat syscall.Statfs_t
|
||||
err = syscall.Statfs(bc.tmpPath, &stat)
|
||||
if err != nil {
|
||||
log.Err("BlockCache::Configure : config error %s [%s]. Assigning a default value of 4GB or if any value is assigned to .disk-size-mb in config.", bc.Name(), err.Error())
|
||||
bc.diskSize = uint64(4192) * _1MB
|
||||
} else {
|
||||
bc.diskSize = uint64(0.8 * float64(stat.Bavail) * float64(stat.Bsize))
|
||||
bc.diskSize = bc.getDefaultDiskSize(bc.tmpPath)
|
||||
if config.IsSet(compName + ".disk-size-mb") {
|
||||
bc.diskSize = conf.DiskSize * _1MB
|
||||
}
|
||||
}
|
||||
|
||||
if config.IsSet(compName + ".disk-size-mb") {
|
||||
bc.diskSize = conf.DiskSize * _1MB
|
||||
}
|
||||
|
||||
if (uint64(bc.prefetch) * uint64(bc.blockSize)) > bc.memSize {
|
||||
log.Err("BlockCache::Configure : config error [memory limit too low for configured prefetch]")
|
||||
return fmt.Errorf("config error in %s [memory limit too low for configured prefetch]", bc.Name())
|
||||
}
|
||||
|
||||
log.Info("BlockCache::Configure : block size %v, mem size %v, worker %v, prefetch %v, disk path %v, max size %v, disk timeout %v, prefetch-on-open %t, maxDiskUsageHit %v, noPrefetch %v",
|
||||
bc.blockSize, bc.memSize, bc.workers, bc.prefetch, bc.tmpPath, bc.diskSize, bc.diskTimeout, bc.prefetchOnOpen, bc.maxDiskUsageHit, bc.noPrefetch)
|
||||
|
||||
bc.blockPool = NewBlockPool(bc.blockSize, bc.memSize)
|
||||
if bc.blockPool == nil {
|
||||
log.Err("BlockCache::Configure : fail to init Block pool")
|
||||
|
@ -319,9 +330,35 @@ func (bc *BlockCache) Configure(_ bool) error {
|
|||
}
|
||||
}
|
||||
|
||||
log.Crit("BlockCache::Configure : block size %v, mem size %v, worker %v, prefetch %v, disk path %v, max size %v, disk timeout %v, prefetch-on-open %t, maxDiskUsageHit %v, noPrefetch %v",
|
||||
bc.blockSize, bc.memSize, bc.workers, bc.prefetch, bc.tmpPath, bc.diskSize, bc.diskTimeout, bc.prefetchOnOpen, bc.maxDiskUsageHit, bc.noPrefetch)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (bc *BlockCache) getDefaultDiskSize(path string) uint64 {
|
||||
var stat syscall.Statfs_t
|
||||
err := syscall.Statfs(path, &stat)
|
||||
if err != nil {
|
||||
log.Info("BlockCache::getDefaultDiskSize : config error %s [%s]. Assigning a default value of 4GB or if any value is assigned to .disk-size-mb in config.", bc.Name(), err.Error())
|
||||
return uint64(4192) * _1MB
|
||||
}
|
||||
|
||||
return uint64(0.8 * float64(stat.Bavail) * float64(stat.Bsize))
|
||||
}
|
||||
|
||||
func (bc *BlockCache) getDefaultMemSize() uint64 {
|
||||
var sysinfo syscall.Sysinfo_t
|
||||
err := syscall.Sysinfo(&sysinfo)
|
||||
|
||||
if err != nil {
|
||||
log.Info("BlockCache::getDefaultMemSize : config error %s [%s]. Assigning a pre-defined value of 4GB.", bc.Name(), err.Error())
|
||||
return uint64(4192) * _1MB
|
||||
}
|
||||
|
||||
return uint64(0.8 * (float64)(sysinfo.Freeram) * float64(sysinfo.Unit))
|
||||
}
|
||||
|
||||
// CreateFile: Create a new file
|
||||
func (bc *BlockCache) CreateFile(options internal.CreateFileOptions) (*handlemap.Handle, error) {
|
||||
log.Trace("BlockCache::CreateFile : name=%s, mode=%d", options.Name, options.Mode)
|
||||
|
|
|
@ -40,7 +40,6 @@ import (
|
|||
"encoding/base64"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"math/rand"
|
||||
"os"
|
||||
|
@ -64,6 +63,7 @@ import (
|
|||
var home_dir, _ = os.UserHomeDir()
|
||||
var mountpoint = home_dir + "mountpoint"
|
||||
var dataBuff []byte
|
||||
var r *rand.Rand = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
type blockCacheTestSuite struct {
|
||||
suite.Suite
|
||||
|
@ -85,9 +85,8 @@ type testObj struct {
|
|||
}
|
||||
|
||||
func randomString(length int) string {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
b := make([]byte, length)
|
||||
rand.Read(b)
|
||||
r.Read(b)
|
||||
return fmt.Sprintf("%x", b)[:length]
|
||||
}
|
||||
|
||||
|
@ -344,8 +343,8 @@ func (suite *blockCacheTestSuite) TestFileOpenClose() {
|
|||
fileName := getTestFileName(suite.T().Name())
|
||||
storagePath := filepath.Join(tobj.fake_storage_path, fileName)
|
||||
data := make([]byte, 5*_1MB)
|
||||
_, _ = rand.Read(data)
|
||||
ioutil.WriteFile(storagePath, data, 0777)
|
||||
_, _ = r.Read(data)
|
||||
os.WriteFile(storagePath, data, 0777)
|
||||
|
||||
options := internal.OpenFileOptions{Name: fileName}
|
||||
h, err := tobj.blockCache.OpenFile(options)
|
||||
|
@ -570,22 +569,25 @@ func (suite *blockCacheTestSuite) TestFileReadBlockCacheTmpPath() {
|
|||
|
||||
tmpPath := tobj.blockCache.tmpPath
|
||||
|
||||
files, err := ioutil.ReadDir(tmpPath)
|
||||
entries, err := os.ReadDir(tmpPath)
|
||||
suite.assert.Nil(err)
|
||||
|
||||
var size1048576, size7 bool
|
||||
for _, file := range files {
|
||||
if file.Size() == 1048576 {
|
||||
for _, entry := range entries {
|
||||
f, err := entry.Info()
|
||||
suite.assert.Nil(err)
|
||||
|
||||
if f.Size() == 1048576 {
|
||||
size1048576 = true
|
||||
}
|
||||
if file.Size() == 7 {
|
||||
if f.Size() == 7 {
|
||||
size7 = true
|
||||
}
|
||||
}
|
||||
|
||||
suite.assert.True(size1048576)
|
||||
suite.assert.True(size7)
|
||||
suite.assert.Equal(len(files), 2)
|
||||
suite.assert.Equal(len(entries), 2)
|
||||
|
||||
err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h})
|
||||
suite.assert.Nil(err)
|
||||
|
@ -601,8 +603,8 @@ func (suite *blockCacheTestSuite) TestFileReadSerial() {
|
|||
fileName := getTestFileName(suite.T().Name())
|
||||
storagePath := filepath.Join(tobj.fake_storage_path, fileName)
|
||||
data := make([]byte, 50*_1MB)
|
||||
_, _ = rand.Read(data)
|
||||
ioutil.WriteFile(storagePath, data, 0777)
|
||||
_, _ = r.Read(data)
|
||||
os.WriteFile(storagePath, data, 0777)
|
||||
|
||||
options := internal.OpenFileOptions{Name: fileName}
|
||||
h, err := tobj.blockCache.OpenFile(options)
|
||||
|
@ -643,8 +645,8 @@ func (suite *blockCacheTestSuite) TestFileReadRandom() {
|
|||
fileName := getTestFileName(suite.T().Name())
|
||||
storagePath := filepath.Join(tobj.fake_storage_path, fileName)
|
||||
data := make([]byte, 100*_1MB)
|
||||
_, _ = rand.Read(data)
|
||||
ioutil.WriteFile(storagePath, data, 0777)
|
||||
_, _ = r.Read(data)
|
||||
os.WriteFile(storagePath, data, 0777)
|
||||
|
||||
options := internal.OpenFileOptions{Name: fileName}
|
||||
h, err := tobj.blockCache.OpenFile(options)
|
||||
|
@ -684,8 +686,8 @@ func (suite *blockCacheTestSuite) TestFileReadRandomNoPrefetch() {
|
|||
fileName := getTestFileName(suite.T().Name())
|
||||
storagePath := filepath.Join(tobj.fake_storage_path, fileName)
|
||||
data := make([]byte, 100*_1MB)
|
||||
_, _ = rand.Read(data)
|
||||
ioutil.WriteFile(storagePath, data, 0777)
|
||||
_, _ = r.Read(data)
|
||||
os.WriteFile(storagePath, data, 0777)
|
||||
|
||||
options := internal.OpenFileOptions{Name: fileName}
|
||||
h, err := tobj.blockCache.OpenFile(options)
|
||||
|
@ -727,7 +729,7 @@ func (suite *blockCacheTestSuite) TestDiskUsageCheck() {
|
|||
|
||||
// Default disk size is 50MB
|
||||
data := make([]byte, 5*_1MB)
|
||||
_, _ = rand.Read(data)
|
||||
_, _ = r.Read(data)
|
||||
|
||||
type diskusagedata struct {
|
||||
name string
|
||||
|
@ -742,7 +744,7 @@ func (suite *blockCacheTestSuite) TestDiskUsageCheck() {
|
|||
}
|
||||
|
||||
for i := 0; i < 13; i++ {
|
||||
ioutil.WriteFile(localfiles[i].name, data, 0777)
|
||||
os.WriteFile(localfiles[i].name, data, 0777)
|
||||
usage, err := common.GetUsage(tobj.disk_cache_path)
|
||||
suite.assert.Nil(err)
|
||||
fmt.Printf("%d : %v (%v : %v) Usage %v\n", i, localfiles[i].name, localfiles[i].diskflag, tobj.blockCache.checkDiskUsage(), usage)
|
||||
|
@ -801,8 +803,8 @@ func (suite *blockCacheTestSuite) TestOpenWithTruncate() {
|
|||
fileName := getTestFileName(suite.T().Name())
|
||||
storagePath := filepath.Join(tobj.fake_storage_path, fileName)
|
||||
data := make([]byte, 5*_1MB)
|
||||
_, _ = rand.Read(data)
|
||||
ioutil.WriteFile(storagePath, data, 0777)
|
||||
_, _ = r.Read(data)
|
||||
os.WriteFile(storagePath, data, 0777)
|
||||
|
||||
options := internal.OpenFileOptions{Name: fileName}
|
||||
h, err := tobj.blockCache.OpenFile(options)
|
||||
|
@ -898,7 +900,7 @@ func (suite *blockCacheTestSuite) TestWriteFileMultiBlock() {
|
|||
storagePath := filepath.Join(tobj.fake_storage_path, path)
|
||||
|
||||
data := make([]byte, 5*_1MB)
|
||||
_, _ = rand.Read(data)
|
||||
_, _ = r.Read(data)
|
||||
|
||||
options := internal.CreateFileOptions{Name: path, Mode: 0777}
|
||||
h, err := tobj.blockCache.CreateFile(options)
|
||||
|
@ -937,7 +939,7 @@ func (suite *blockCacheTestSuite) TestWriteFileMultiBlockWithOverwrite() {
|
|||
storagePath := filepath.Join(tobj.fake_storage_path, path)
|
||||
|
||||
data := make([]byte, 5*_1MB)
|
||||
_, _ = rand.Read(data)
|
||||
_, _ = r.Read(data)
|
||||
|
||||
options := internal.CreateFileOptions{Name: path, Mode: 0777}
|
||||
h, err := tobj.blockCache.CreateFile(options)
|
||||
|
@ -988,7 +990,7 @@ func (suite *blockCacheTestSuite) TestWritefileWithAppend() {
|
|||
|
||||
path := getTestFileName(suite.T().Name())
|
||||
data := make([]byte, 13*_1MB)
|
||||
_, _ = rand.Read(data)
|
||||
_, _ = r.Read(data)
|
||||
|
||||
options := internal.CreateFileOptions{Name: path, Mode: 0777}
|
||||
h, err := tobj.blockCache.CreateFile(options)
|
||||
|
@ -1022,7 +1024,7 @@ func (suite *blockCacheTestSuite) TestWritefileWithAppend() {
|
|||
h, err = tobj.blockCache.OpenFile(internal.OpenFileOptions{Name: path, Flags: os.O_RDWR, Mode: 0777})
|
||||
suite.assert.Nil(err)
|
||||
dataNew := make([]byte, 10*_1MB)
|
||||
_, _ = rand.Read(data)
|
||||
_, _ = r.Read(data)
|
||||
|
||||
n, err = tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: h.Size, Data: dataNew}) // 5 bytes
|
||||
suite.assert.Nil(err)
|
||||
|
@ -1054,14 +1056,14 @@ func (suite *blockCacheTestSuite) TestWriteBlockOutOfRange() {
|
|||
|
||||
path := getTestFileName(suite.T().Name())
|
||||
data := make([]byte, 20*_1MB)
|
||||
_, _ = rand.Read(data)
|
||||
_, _ = r.Read(data)
|
||||
|
||||
options := internal.CreateFileOptions{Name: path, Mode: 0777}
|
||||
h, err := tobj.blockCache.CreateFile(options)
|
||||
suite.assert.Nil(err)
|
||||
|
||||
dataNew := make([]byte, 1*_1MB)
|
||||
_, _ = rand.Read(data)
|
||||
_, _ = r.Read(data)
|
||||
|
||||
n, err := tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 10 * 50001, Data: dataNew}) // 5 bytes
|
||||
suite.assert.NotNil(err)
|
||||
|
@ -1790,7 +1792,7 @@ func (suite *blockCacheTestSuite) TestPreventRaceCondition() {
|
|||
storagePath := filepath.Join(tobj.fake_storage_path, path)
|
||||
|
||||
data := make([]byte, _1MB)
|
||||
_, _ = rand.Read(data)
|
||||
_, _ = r.Read(data)
|
||||
|
||||
// write using block cache
|
||||
options := internal.CreateFileOptions{Name: path, Mode: 0777}
|
||||
|
@ -1862,7 +1864,7 @@ func (suite *blockCacheTestSuite) TestBlockParallelUploadAndWrite() {
|
|||
storagePath := filepath.Join(tobj.fake_storage_path, path)
|
||||
|
||||
data := make([]byte, _1MB)
|
||||
_, _ = rand.Read(data)
|
||||
_, _ = r.Read(data)
|
||||
|
||||
options := internal.CreateFileOptions{Name: path, Mode: 0777}
|
||||
h, err := tobj.blockCache.CreateFile(options)
|
||||
|
@ -1924,7 +1926,7 @@ func (suite *blockCacheTestSuite) TestBlockParallelUploadAndWriteValidation() {
|
|||
localPath := filepath.Join(tobj.disk_cache_path, path)
|
||||
|
||||
data := make([]byte, _1MB)
|
||||
_, _ = rand.Read(data)
|
||||
_, _ = r.Read(data)
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// write to local file
|
||||
|
@ -2625,7 +2627,7 @@ func (suite *blockCacheTestSuite) TestZZZZZStreamToBlockCacheConfig() {
|
|||
// a normal test function and pass our suite to suite.Run
|
||||
func TestBlockCacheTestSuite(t *testing.T) {
|
||||
dataBuff = make([]byte, 5*_1MB)
|
||||
_, _ = rand.Read(dataBuff)
|
||||
_, _ = r.Read(dataBuff)
|
||||
|
||||
suite.Run(t, new(blockCacheTestSuite))
|
||||
}
|
||||
|
|
|
@ -51,7 +51,7 @@ func initializePlugins() error {
|
|||
// Example BLOBFUSE_PLUGIN_PATH="/path/to/plugin1.so:/path/to/plugin2.so"
|
||||
pluginFilesPath := os.Getenv("BLOBFUSE_PLUGIN_PATH")
|
||||
if pluginFilesPath == "" {
|
||||
log.Info("No plugins to load, BLOBFUSE_PLUGIN_PATH is empty")
|
||||
log.Debug("initializePlugins: No plugins to load.")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -59,32 +59,32 @@ func initializePlugins() error {
|
|||
|
||||
for _, file := range pluginFiles {
|
||||
if !strings.HasSuffix(file, ".so") {
|
||||
log.Err("Invalid plugin file extension: %s", file)
|
||||
log.Err("initializePlugins: Invalid plugin file extension: %s", file)
|
||||
continue
|
||||
}
|
||||
log.Info("loading plugin %s", file)
|
||||
log.Info("initializePlugins: loading plugin %s", file)
|
||||
startTime := time.Now()
|
||||
p, err := plugin.Open(file)
|
||||
if err != nil {
|
||||
log.Err("Error opening plugin %s: %s", file, err.Error())
|
||||
log.Err("initializePlugins: Error opening plugin %s: %s", file, err.Error())
|
||||
return fmt.Errorf("error opening plugin %s: %s", file, err.Error())
|
||||
}
|
||||
|
||||
getExternalComponentFunc, err := p.Lookup("GetExternalComponent")
|
||||
if err != nil {
|
||||
log.Err("GetExternalComponent function lookup error in plugin %s: %s", file, err.Error())
|
||||
log.Err("initializePlugins: GetExternalComponent function lookup error in plugin %s: %s", file, err.Error())
|
||||
return fmt.Errorf("GetExternalComponent function lookup error in plugin %s: %s", file, err.Error())
|
||||
}
|
||||
|
||||
getExternalComponent, ok := getExternalComponentFunc.(func() (string, func() exported.Component))
|
||||
if !ok {
|
||||
log.Err("GetExternalComponent function in %s has some incorrect definition", file)
|
||||
log.Err("initializePlugins: GetExternalComponent function in %s has some incorrect definition", file)
|
||||
return fmt.Errorf("GetExternalComponent function in %s has some incorrect definition", file)
|
||||
}
|
||||
|
||||
compName, initExternalComponent := getExternalComponent()
|
||||
internal.AddComponent(compName, initExternalComponent)
|
||||
log.Info("Plugin %s loaded in %s", file, time.Since(startTime))
|
||||
log.Info("initializePlugins: Plugin %s loaded in %s", file, time.Since(startTime))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -92,7 +92,7 @@ func initializePlugins() error {
|
|||
func init() {
|
||||
err := initializePlugins()
|
||||
if err != nil {
|
||||
log.Err("custom::Error initializing plugins: %s", err.Error())
|
||||
log.Err("custom::init : Error initializing plugins: %s", err.Error())
|
||||
fmt.Printf("failed to initialize plugin: %s\n", err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,214 @@
|
|||
/*
|
||||
_____ _____ _____ ____ ______ _____ ------
|
||||
| | | | | | | | | | | | |
|
||||
| | | | | | | | | | | | |
|
||||
| --- | | | | |-----| |---- | | |-----| |----- ------
|
||||
| | | | | | | | | | | | |
|
||||
| ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____
|
||||
|
||||
|
||||
Licensed under the MIT License <http://opensource.org/licenses/MIT>.
|
||||
|
||||
Copyright © 2020-2024 Microsoft Corporation. All rights reserved.
|
||||
Author : <blobfusedev@microsoft.com>
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE
|
||||
*/
|
||||
|
||||
package entry_cache
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/Azure/azure-storage-fuse/v2/common"
|
||||
"github.com/Azure/azure-storage-fuse/v2/common/config"
|
||||
"github.com/Azure/azure-storage-fuse/v2/common/log"
|
||||
"github.com/Azure/azure-storage-fuse/v2/internal"
|
||||
"github.com/vibhansa-msft/tlru"
|
||||
)
|
||||
|
||||
// Common structure for Component
|
||||
type EntryCache struct {
|
||||
internal.BaseComponent
|
||||
cacheTimeout uint32
|
||||
pathLocks *common.LockMap
|
||||
pathLRU *tlru.TLRU
|
||||
pathMap sync.Map
|
||||
}
|
||||
|
||||
type pathCacheItem struct {
|
||||
children []*internal.ObjAttr
|
||||
nextToken string
|
||||
}
|
||||
|
||||
// By default entry cache is valid for 30 seconds
|
||||
const defaultEntryCacheTimeout uint32 = (30)
|
||||
|
||||
// Structure defining your config parameters
|
||||
type EntryCacheOptions struct {
|
||||
Timeout uint32 `config:"timeout-sec" yaml:"timeout-sec,omitempty"`
|
||||
}
|
||||
|
||||
const compName = "entry_cache"
|
||||
|
||||
// Verification to check satisfaction criteria with Component Interface
|
||||
var _ internal.Component = &EntryCache{}
|
||||
|
||||
func (c *EntryCache) Name() string {
|
||||
return compName
|
||||
}
|
||||
|
||||
func (c *EntryCache) SetName(name string) {
|
||||
c.BaseComponent.SetName(name)
|
||||
}
|
||||
|
||||
func (c *EntryCache) SetNextComponent(nc internal.Component) {
|
||||
c.BaseComponent.SetNextComponent(nc)
|
||||
}
|
||||
|
||||
// Start : Pipeline calls this method to start the component functionality
|
||||
//
|
||||
// this shall not block the call otherwise pipeline will not start
|
||||
func (c *EntryCache) Start(ctx context.Context) error {
|
||||
log.Trace("EntryCache::Start : Starting component %s", c.Name())
|
||||
|
||||
err := c.pathLRU.Start()
|
||||
if err != nil {
|
||||
log.Err("EntryCache::Start : fail to start LRU for path caching [%s]", err.Error())
|
||||
return fmt.Errorf("failed to start LRU for path caching [%s]", err.Error())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop : Stop the component functionality and kill all threads started
|
||||
func (c *EntryCache) Stop() error {
|
||||
log.Trace("EntryCache::Stop : Stopping component %s", c.Name())
|
||||
|
||||
err := c.pathLRU.Stop()
|
||||
if err != nil {
|
||||
log.Err("EntryCache::Stop : fail to stop LRU for path caching [%s]", err.Error())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Configure : Pipeline will call this method after constructor so that you can read config and initialize yourself
|
||||
//
|
||||
// Return failure if any config is not valid to exit the process
|
||||
func (c *EntryCache) Configure(_ bool) error {
|
||||
log.Trace("EntryCache::Configure : %s", c.Name())
|
||||
|
||||
var readonly bool
|
||||
err := config.UnmarshalKey("read-only", &readonly)
|
||||
if err != nil {
|
||||
log.Err("EntryCache::Configure : config error [unable to obtain read-only]")
|
||||
return fmt.Errorf("config error in %s [%s]", c.Name(), err.Error())
|
||||
}
|
||||
|
||||
if !readonly {
|
||||
log.Err("EntryCache::Configure : EntryCache component should be used only in read-only mode")
|
||||
return fmt.Errorf("EntryCache component should be used in only in read-only mode")
|
||||
}
|
||||
|
||||
// >> If you do not need any config parameters remove below code and return nil
|
||||
conf := EntryCacheOptions{}
|
||||
err = config.UnmarshalKey(c.Name(), &conf)
|
||||
if err != nil {
|
||||
log.Err("EntryCache::Configure : config error [invalid config attributes]")
|
||||
return fmt.Errorf("EntryCache: config error [invalid config attributes]")
|
||||
}
|
||||
|
||||
c.cacheTimeout = defaultEntryCacheTimeout
|
||||
if config.IsSet(compName + ".timeout-sec") {
|
||||
c.cacheTimeout = conf.Timeout
|
||||
}
|
||||
|
||||
c.pathLRU, err = tlru.New(1000, c.cacheTimeout, c.pathEvict, 0, nil)
|
||||
if err != nil {
|
||||
log.Err("EntryCache::Start : fail to create LRU for path caching [%s]", err.Error())
|
||||
return fmt.Errorf("config error in %s [%s]", c.Name(), err.Error())
|
||||
}
|
||||
|
||||
c.pathLocks = common.NewLockMap()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// StreamDir : Optionally cache entries of the list
|
||||
func (c *EntryCache) StreamDir(options internal.StreamDirOptions) ([]*internal.ObjAttr, string, error) {
|
||||
log.Trace("AttrCache::StreamDir : %s", options.Name)
|
||||
|
||||
pathKey := fmt.Sprintf("%s##%s", options.Name, options.Token)
|
||||
flock := c.pathLocks.Get(pathKey)
|
||||
flock.Lock()
|
||||
defer flock.Unlock()
|
||||
|
||||
pathEntry, found := c.pathMap.Load(pathKey)
|
||||
if !found {
|
||||
log.Debug("EntryCache::StreamDir : Cache not valid, fetch new list for path: %s, token %s", options.Name, options.Token)
|
||||
pathList, token, err := c.NextComponent().StreamDir(options)
|
||||
if err == nil && len(pathList) > 0 {
|
||||
item := pathCacheItem{
|
||||
children: pathList,
|
||||
nextToken: token,
|
||||
}
|
||||
c.pathMap.Store(pathKey, item)
|
||||
c.pathLRU.Add(pathKey)
|
||||
}
|
||||
return pathList, token, err
|
||||
} else {
|
||||
log.Debug("EntryCache::StreamDir : Serving list from cache for path: %s, token %s", options.Name, options.Token)
|
||||
item := pathEntry.(pathCacheItem)
|
||||
return item.children, item.nextToken, nil
|
||||
}
|
||||
}
|
||||
|
||||
// pathEvict : Callback when a node from cache expires
|
||||
func (c *EntryCache) pathEvict(node *list.Element) {
|
||||
pathKey := node.Value.(string)
|
||||
|
||||
flock := c.pathLocks.Get(pathKey)
|
||||
flock.Lock()
|
||||
defer flock.Unlock()
|
||||
|
||||
log.Debug("EntryCache::pathEvict : Expiry for path %s", pathKey)
|
||||
c.pathMap.Delete(pathKey)
|
||||
}
|
||||
|
||||
// ------------------------- Factory -------------------------------------------
|
||||
|
||||
// Pipeline will call this method to create your object, initialize your variables here
|
||||
// << DO NOT DELETE ANY AUTO GENERATED CODE HERE >>
|
||||
func NewEntryCacheComponent() internal.Component {
|
||||
comp := &EntryCache{}
|
||||
comp.SetName(compName)
|
||||
return comp
|
||||
}
|
||||
|
||||
// On init register this component to pipeline and supply your constructor
|
||||
func init() {
|
||||
internal.AddComponent(compName, NewEntryCacheComponent)
|
||||
|
||||
entryTimeout := config.AddUint32Flag("list-cache-timeout", defaultEntryCacheTimeout, "list entry timeout")
|
||||
config.BindPFlag(compName+".timeout-sec", entryTimeout)
|
||||
}
|
|
@ -0,0 +1,219 @@
|
|||
/*
|
||||
_____ _____ _____ ____ ______ _____ ------
|
||||
| | | | | | | | | | | | |
|
||||
| | | | | | | | | | | | |
|
||||
| --- | | | | |-----| |---- | | |-----| |----- ------
|
||||
| | | | | | | | | | | | |
|
||||
| ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____
|
||||
|
||||
|
||||
Licensed under the MIT License <http://opensource.org/licenses/MIT>.
|
||||
|
||||
Copyright © 2020-2024 Microsoft Corporation. All rights reserved.
|
||||
Author : <blobfusedev@microsoft.com>
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE
|
||||
*/
|
||||
|
||||
package entry_cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-storage-fuse/v2/common"
|
||||
"github.com/Azure/azure-storage-fuse/v2/common/config"
|
||||
"github.com/Azure/azure-storage-fuse/v2/common/log"
|
||||
"github.com/Azure/azure-storage-fuse/v2/component/loopback"
|
||||
"github.com/Azure/azure-storage-fuse/v2/internal"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
||||
var home_dir, _ = os.UserHomeDir()
|
||||
|
||||
type entryCacheTestSuite struct {
|
||||
suite.Suite
|
||||
assert *assert.Assertions
|
||||
entryCache *EntryCache
|
||||
loopback internal.Component
|
||||
fake_storage_path string
|
||||
}
|
||||
|
||||
func newLoopbackFS() internal.Component {
|
||||
loopback := loopback.NewLoopbackFSComponent()
|
||||
loopback.Configure(true)
|
||||
|
||||
return loopback
|
||||
}
|
||||
|
||||
func newEntryCache(next internal.Component) *EntryCache {
|
||||
entryCache := NewEntryCacheComponent()
|
||||
entryCache.SetNextComponent(next)
|
||||
err := entryCache.Configure(true)
|
||||
if err != nil {
|
||||
panic("Unable to configure entry cache.")
|
||||
}
|
||||
|
||||
return entryCache.(*EntryCache)
|
||||
}
|
||||
|
||||
func randomString(length int) string {
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
b := make([]byte, length)
|
||||
r.Read(b)
|
||||
return fmt.Sprintf("%x", b)[:length]
|
||||
}
|
||||
|
||||
func (suite *entryCacheTestSuite) SetupTest() {
|
||||
err := log.SetDefaultLogger("silent", common.LogConfig{Level: common.ELogLevel.LOG_DEBUG()})
|
||||
if err != nil {
|
||||
panic("Unable to set silent logger as default.")
|
||||
}
|
||||
rand := randomString(8)
|
||||
suite.fake_storage_path = filepath.Join(home_dir, "fake_storage"+rand)
|
||||
defaultConfig := fmt.Sprintf("read-only: true\n\nentry_cache:\n timeout-sec: 7\n\nloopbackfs:\n path: %s", suite.fake_storage_path)
|
||||
log.Debug(defaultConfig)
|
||||
|
||||
// Delete the temp directories created
|
||||
os.RemoveAll(suite.fake_storage_path)
|
||||
suite.setupTestHelper(defaultConfig)
|
||||
}
|
||||
|
||||
func (suite *entryCacheTestSuite) setupTestHelper(configuration string) {
|
||||
suite.assert = assert.New(suite.T())
|
||||
|
||||
config.ReadConfigFromReader(strings.NewReader(configuration))
|
||||
suite.loopback = newLoopbackFS()
|
||||
suite.entryCache = newEntryCache(suite.loopback)
|
||||
suite.loopback.Start(context.Background())
|
||||
err := suite.entryCache.Start(context.Background())
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Unable to start file cache [%s]", err.Error()))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (suite *entryCacheTestSuite) cleanupTest() {
|
||||
suite.loopback.Stop()
|
||||
err := suite.entryCache.Stop()
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Unable to stop file cache [%s]", err.Error()))
|
||||
}
|
||||
|
||||
// Delete the temp directories created
|
||||
os.RemoveAll(suite.fake_storage_path)
|
||||
}
|
||||
|
||||
func (suite *entryCacheTestSuite) TestEmpty() {
|
||||
defer suite.cleanupTest()
|
||||
|
||||
objs, token, err := suite.entryCache.StreamDir(internal.StreamDirOptions{Name: "", Token: ""})
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.NotNil(objs)
|
||||
suite.assert.Equal(token, "")
|
||||
|
||||
_, found := suite.entryCache.pathMap.Load("##")
|
||||
suite.assert.False(found)
|
||||
|
||||
objs, token, err = suite.entryCache.StreamDir(internal.StreamDirOptions{Name: "ABCD", Token: ""})
|
||||
suite.assert.NotNil(err)
|
||||
suite.assert.Nil(objs)
|
||||
suite.assert.Equal(token, "")
|
||||
}
|
||||
|
||||
func (suite *entryCacheTestSuite) TestWithEntry() {
|
||||
defer suite.cleanupTest()
|
||||
|
||||
// Create a file
|
||||
filePath := filepath.Join(suite.fake_storage_path, "testfile1")
|
||||
h, err := os.Create(filePath)
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.NotNil(h)
|
||||
h.Close()
|
||||
|
||||
objs, token, err := suite.entryCache.StreamDir(internal.StreamDirOptions{Name: "", Token: ""})
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.NotNil(objs)
|
||||
suite.assert.Equal(token, "")
|
||||
|
||||
cachedObjs, found := suite.entryCache.pathMap.Load("##")
|
||||
suite.assert.True(found)
|
||||
suite.assert.Equal(len(objs), 1)
|
||||
|
||||
suite.assert.Equal(objs, cachedObjs.(pathCacheItem).children)
|
||||
}
|
||||
|
||||
func (suite *entryCacheTestSuite) TestCachedEntry() {
|
||||
defer suite.cleanupTest()
|
||||
|
||||
// Create a file
|
||||
filePath := filepath.Join(suite.fake_storage_path, "testfile1")
|
||||
h, err := os.Create(filePath)
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.NotNil(h)
|
||||
h.Close()
|
||||
|
||||
objs, token, err := suite.entryCache.StreamDir(internal.StreamDirOptions{Name: "", Token: ""})
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.NotNil(objs)
|
||||
suite.assert.Equal(token, "")
|
||||
|
||||
cachedObjs, found := suite.entryCache.pathMap.Load("##")
|
||||
suite.assert.True(found)
|
||||
suite.assert.Equal(len(objs), 1)
|
||||
|
||||
suite.assert.Equal(objs, cachedObjs.(pathCacheItem).children)
|
||||
|
||||
filePath = filepath.Join(suite.fake_storage_path, "testfile2")
|
||||
h, err = os.Create(filePath)
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.NotNil(h)
|
||||
h.Close()
|
||||
|
||||
objs, token, err = suite.entryCache.StreamDir(internal.StreamDirOptions{Name: "", Token: ""})
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.NotNil(objs)
|
||||
suite.assert.Equal(token, "")
|
||||
suite.assert.Equal(len(objs), 1)
|
||||
|
||||
time.Sleep(40 * time.Second)
|
||||
_, found = suite.entryCache.pathMap.Load("##")
|
||||
suite.assert.False(found)
|
||||
|
||||
objs, token, err = suite.entryCache.StreamDir(internal.StreamDirOptions{Name: "", Token: ""})
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.NotNil(objs)
|
||||
suite.assert.Equal(token, "")
|
||||
suite.assert.Equal(len(objs), 2)
|
||||
|
||||
}
|
||||
|
||||
// In order for 'go test' to run this suite, we need to create
|
||||
// a normal test function and pass our suite to suite.Run
|
||||
func TestEntryCacheTestSuite(t *testing.T) {
|
||||
suite.Run(t, new(entryCacheTestSuite))
|
||||
}
|
|
@ -124,7 +124,7 @@ func deleteFile(name string) error {
|
|||
}
|
||||
|
||||
if err != nil {
|
||||
log.Err("lruPolicy::DeleteItem : Failed to delete local file %s", name)
|
||||
log.Err("cachePolicy::DeleteItem : Failed to delete local file %s [%v]", name, err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -194,6 +194,30 @@ func (c *FileCache) Stop() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// GenConfig : Generate default config for the component
|
||||
func (c *FileCache) GenConfig() string {
|
||||
log.Info("FileCache::Configure : config generation started")
|
||||
|
||||
var sb strings.Builder
|
||||
sb.WriteString(fmt.Sprintf("\n%s:", c.Name()))
|
||||
|
||||
tmpPath := ""
|
||||
_ = config.UnmarshalKey("tmp-path", &tmpPath)
|
||||
|
||||
directIO := false
|
||||
_ = config.UnmarshalKey("direct-io", &directIO)
|
||||
|
||||
timeout := defaultFileCacheTimeout
|
||||
if directIO {
|
||||
timeout = 0
|
||||
}
|
||||
|
||||
sb.WriteString(fmt.Sprintf("\n path: %v", common.ExpandPath(tmpPath)))
|
||||
sb.WriteString(fmt.Sprintf("\n timeout-sec: %v", timeout))
|
||||
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
// Configure : Pipeline will call this method after constructor so that you can read config and initialize yourself
|
||||
//
|
||||
// Return failure if any config is not valid to exit the process
|
||||
|
@ -215,6 +239,15 @@ func (c *FileCache) Configure(_ bool) error {
|
|||
} else {
|
||||
c.cacheTimeout = float64(defaultFileCacheTimeout)
|
||||
}
|
||||
|
||||
directIO := false
|
||||
_ = config.UnmarshalKey("direct-io", &directIO)
|
||||
|
||||
if directIO {
|
||||
c.cacheTimeout = 0
|
||||
log.Crit("FileCache::Configure : Direct IO mode enabled, cache timeout is set to 0")
|
||||
}
|
||||
|
||||
if config.IsSet(compName + ".empty-dir-check") {
|
||||
c.allowNonEmpty = !conf.EmptyDirCheck
|
||||
} else {
|
||||
|
@ -317,7 +350,7 @@ func (c *FileCache) Configure(_ bool) error {
|
|||
c.diskHighWaterMark = (((conf.MaxSizeMB * MB) * float64(cacheConfig.highThreshold)) / 100)
|
||||
}
|
||||
|
||||
log.Info("FileCache::Configure : create-empty %t, cache-timeout %d, tmp-path %s, max-size-mb %d, high-mark %d, low-mark %d, refresh-sec %v, max-eviction %v, hard-limit %v, policy %s, allow-non-empty-temp %t, cleanup-on-start %t, policy-trace %t, offload-io %t, sync-to-flush %t, ignore-sync %t, defaultPermission %v, diskHighWaterMark %v, maxCacheSize %v, mountPath %v",
|
||||
log.Crit("FileCache::Configure : create-empty %t, cache-timeout %d, tmp-path %s, max-size-mb %d, high-mark %d, low-mark %d, refresh-sec %v, max-eviction %v, hard-limit %v, policy %s, allow-non-empty-temp %t, cleanup-on-start %t, policy-trace %t, offload-io %t, sync-to-flush %t, ignore-sync %t, defaultPermission %v, diskHighWaterMark %v, maxCacheSize %v, mountPath %v",
|
||||
c.createEmptyFile, int(c.cacheTimeout), c.tmpPath, int(cacheConfig.maxSizeMB), int(cacheConfig.highThreshold), int(cacheConfig.lowThreshold), c.refreshSec, cacheConfig.maxEviction, c.hardLimit, conf.Policy, c.allowNonEmpty, c.cleanupOnStart, c.policyTrace, c.offloadIO, c.syncToFlush, c.syncToDelete, c.defaultPermission, c.diskHighWaterMark, c.maxCacheSize, c.mountPath)
|
||||
|
||||
return nil
|
||||
|
@ -618,6 +651,46 @@ func (fc *FileCache) IsDirEmpty(options internal.IsDirEmptyOptions) bool {
|
|||
return fc.NextComponent().IsDirEmpty(options)
|
||||
}
|
||||
|
||||
// DeleteEmptyDirs: delete empty directories in local cache, return error if directory is not empty
|
||||
func (fc *FileCache) DeleteEmptyDirs(options internal.DeleteDirOptions) (bool, error) {
|
||||
localPath := options.Name
|
||||
if !strings.Contains(options.Name, fc.tmpPath) {
|
||||
localPath = filepath.Join(fc.tmpPath, options.Name)
|
||||
}
|
||||
|
||||
log.Trace("FileCache::DeleteEmptyDirs : %s", localPath)
|
||||
|
||||
entries, err := os.ReadDir(localPath)
|
||||
if err != nil {
|
||||
log.Debug("FileCache::DeleteEmptyDirs : Unable to read directory %s [%s]", localPath, err.Error())
|
||||
return false, err
|
||||
}
|
||||
|
||||
for _, entry := range entries {
|
||||
if entry.IsDir() {
|
||||
val, err := fc.DeleteEmptyDirs(internal.DeleteDirOptions{
|
||||
Name: filepath.Join(localPath, entry.Name()),
|
||||
})
|
||||
if err != nil {
|
||||
log.Err("FileCache::DeleteEmptyDirs : Unable to delete directory %s [%s]", localPath, err.Error())
|
||||
return val, err
|
||||
}
|
||||
} else {
|
||||
log.Err("FileCache::DeleteEmptyDirs : Directory %s is not empty, contains file %s", localPath, entry.Name())
|
||||
return false, fmt.Errorf("unable to delete directory %s, contains file %s", localPath, entry.Name())
|
||||
}
|
||||
}
|
||||
|
||||
if !strings.EqualFold(fc.tmpPath, localPath) {
|
||||
err = os.Remove(localPath)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// RenameDir: Recursively invalidate the source directory and its children
|
||||
func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error {
|
||||
log.Trace("FileCache::RenameDir : src=%s, dst=%s", options.Src, options.Dst)
|
||||
|
|
|
@ -91,9 +91,9 @@ func newTestFileCache(next internal.Component) *FileCache {
|
|||
}
|
||||
|
||||
func randomString(length int) string {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
b := make([]byte, length)
|
||||
rand.Read(b)
|
||||
r.Read(b)
|
||||
return fmt.Sprintf("%x", b)[:length]
|
||||
}
|
||||
|
||||
|
@ -1911,6 +1911,57 @@ func (suite *fileCacheTestSuite) TestHardLimitOnSize() {
|
|||
suite.assert.NotNil(err)
|
||||
}
|
||||
|
||||
func (suite *fileCacheTestSuite) createDirectoryStructure() {
|
||||
err := os.MkdirAll(filepath.Join(suite.cache_path, "a", "b", "c", "d"), 0777)
|
||||
suite.assert.NoError(err)
|
||||
|
||||
err = os.MkdirAll(filepath.Join(suite.cache_path, "a", "b", "e", "f"), 0777)
|
||||
suite.assert.NoError(err)
|
||||
|
||||
err = os.MkdirAll(filepath.Join(suite.cache_path, "a", "b", "e", "g"), 0777)
|
||||
suite.assert.NoError(err)
|
||||
|
||||
err = os.MkdirAll(filepath.Join(suite.cache_path, "h", "i", "j", "k"), 0777)
|
||||
suite.assert.NoError(err)
|
||||
|
||||
err = os.MkdirAll(filepath.Join(suite.cache_path, "h", "l", "m", "n"), 0777)
|
||||
suite.assert.NoError(err)
|
||||
}
|
||||
|
||||
func (suite *fileCacheTestSuite) TestDeleteEmptyDirsRoot() {
|
||||
defer suite.cleanupTest()
|
||||
|
||||
suite.createDirectoryStructure()
|
||||
val, err := suite.fileCache.DeleteEmptyDirs(internal.DeleteDirOptions{Name: suite.cache_path})
|
||||
suite.assert.NoError(err)
|
||||
suite.assert.True(val)
|
||||
}
|
||||
|
||||
func (suite *fileCacheTestSuite) TestDeleteEmptyDirsNonRoot() {
|
||||
defer suite.cleanupTest()
|
||||
|
||||
suite.createDirectoryStructure()
|
||||
val, err := suite.fileCache.DeleteEmptyDirs(internal.DeleteDirOptions{Name: "a"})
|
||||
suite.assert.NoError(err)
|
||||
suite.assert.True(val)
|
||||
|
||||
val, err = suite.fileCache.DeleteEmptyDirs(internal.DeleteDirOptions{Name: filepath.Join(suite.cache_path, "h")})
|
||||
suite.assert.NoError(err)
|
||||
suite.assert.True(val)
|
||||
}
|
||||
|
||||
func (suite *fileCacheTestSuite) TestDeleteEmptyDirsNegative() {
|
||||
defer suite.cleanupTest()
|
||||
|
||||
suite.createDirectoryStructure()
|
||||
_, err := os.Create(filepath.Join(suite.cache_path, "h", "l", "m", "n", "file.txt"))
|
||||
suite.assert.NoError(err)
|
||||
|
||||
val, err := suite.fileCache.DeleteEmptyDirs(internal.DeleteDirOptions{Name: suite.cache_path})
|
||||
suite.assert.Error(err)
|
||||
suite.assert.False(val)
|
||||
}
|
||||
|
||||
// In order for 'go test' to run this suite, we need to create
|
||||
// a normal test function and pass our suite to suite.Run
|
||||
func TestFileCacheTestSuite(t *testing.T) {
|
||||
|
|
|
@ -36,6 +36,7 @@ package libfuse
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/Azure/azure-storage-fuse/v2/common"
|
||||
"github.com/Azure/azure-storage-fuse/v2/common/config"
|
||||
|
@ -225,6 +226,13 @@ func (lf *Libfuse) Validate(opt *LibfuseOptions) error {
|
|||
lf.negativeTimeout = defaultNegativeEntryExpiration
|
||||
}
|
||||
|
||||
if lf.directIO {
|
||||
lf.negativeTimeout = 0
|
||||
lf.attributeExpiration = 0
|
||||
lf.entryExpiration = 0
|
||||
log.Crit("Libfuse::Validate : DirectIO enabled, setting fuse timeouts to 0")
|
||||
}
|
||||
|
||||
if !(config.IsSet(compName+".uid") || config.IsSet(compName+".gid") ||
|
||||
config.IsSet("lfuse.uid") || config.IsSet("lfuse.gid")) {
|
||||
var err error
|
||||
|
@ -246,6 +254,29 @@ func (lf *Libfuse) Validate(opt *LibfuseOptions) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (lf *Libfuse) GenConfig() string {
|
||||
log.Info("Libfuse::Configure : config generation started")
|
||||
|
||||
// If DirectIO is enabled, override expiration values
|
||||
directIO := false
|
||||
_ = config.UnmarshalKey("direct-io", &directIO)
|
||||
|
||||
var sb strings.Builder
|
||||
sb.WriteString(fmt.Sprintf("\n%s:", lf.Name()))
|
||||
|
||||
timeout := defaultEntryExpiration
|
||||
if directIO {
|
||||
timeout = 0
|
||||
sb.WriteString("\n direct-io: true")
|
||||
}
|
||||
|
||||
sb.WriteString(fmt.Sprintf("\n attribute-expiration-sec: %v", timeout))
|
||||
sb.WriteString(fmt.Sprintf("\n entry-expiration-sec: %v", timeout))
|
||||
sb.WriteString(fmt.Sprintf("\n negative-entry-expiration-sec: %v", timeout))
|
||||
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
// Configure : Pipeline will call this method after constructor so that you can read config and initialize yourself
|
||||
//
|
||||
// Return failure if any config is not valid to exit the process
|
||||
|
@ -301,7 +332,7 @@ func (lf *Libfuse) Configure(_ bool) error {
|
|||
return fmt.Errorf("%s config error %s", lf.Name(), err.Error())
|
||||
}
|
||||
|
||||
log.Info("Libfuse::Configure : read-only %t, allow-other %t, allow-root %t, default-perm %d, entry-timeout %d, attr-time %d, negative-timeout %d, ignore-open-flags %t, nonempty %t, direct_io %t, max-fuse-threads %d, fuse-trace %t, extension %s, disable-writeback-cache %t, dirPermission %v, mountPath %v, umask %v",
|
||||
log.Crit("Libfuse::Configure : read-only %t, allow-other %t, allow-root %t, default-perm %d, entry-timeout %d, attr-time %d, negative-timeout %d, ignore-open-flags %t, nonempty %t, direct_io %t, max-fuse-threads %d, fuse-trace %t, extension %s, disable-writeback-cache %t, dirPermission %v, mountPath %v, umask %v",
|
||||
lf.readOnly, lf.allowOther, lf.allowRoot, lf.filePermission, lf.entryExpiration, lf.attributeExpiration, lf.negativeTimeout, lf.ignoreOpenFlags, lf.nonEmptyMount, lf.directIO, lf.maxFuseThreads, lf.traceEnable, lf.extensionPath, lf.disableWritebackCache, lf.dirPermission, lf.mountPath, lf.umask)
|
||||
|
||||
return nil
|
||||
|
|
|
@ -567,7 +567,16 @@ func libfuse_rmdir(path *C.char) C.int {
|
|||
|
||||
empty := fuseFS.NextComponent().IsDirEmpty(internal.IsDirEmptyOptions{Name: name})
|
||||
if !empty {
|
||||
return -C.ENOTEMPTY
|
||||
// delete empty directories from local cache directory
|
||||
val, err := fuseFS.NextComponent().DeleteEmptyDirs(internal.DeleteDirOptions{Name: name})
|
||||
if !val {
|
||||
// either file cache has failed or not present in the pipeline
|
||||
if err != nil {
|
||||
// if error is not nil, file cache has failed
|
||||
log.Err("Libfuse::libfuse_rmdir : Failed to delete %s [%s]", name, err.Error())
|
||||
}
|
||||
return -C.ENOTEMPTY
|
||||
}
|
||||
}
|
||||
|
||||
err := fuseFS.NextComponent().DeleteDir(internal.DeleteDirOptions{Name: name})
|
||||
|
@ -957,7 +966,13 @@ func libfuse_readlink(path *C.char, buf *C.char, size C.size_t) C.int {
|
|||
name = common.NormalizeObjectName(name)
|
||||
//log.Trace("Libfuse::libfuse_readlink : Received for %s", name)
|
||||
|
||||
targetPath, err := fuseFS.NextComponent().ReadLink(internal.ReadLinkOptions{Name: name})
|
||||
linkSize := int64(0)
|
||||
attr, err := fuseFS.NextComponent().GetAttr(internal.GetAttrOptions{Name: name})
|
||||
if err == nil && attr != nil {
|
||||
linkSize = attr.Size
|
||||
}
|
||||
|
||||
targetPath, err := fuseFS.NextComponent().ReadLink(internal.ReadLinkOptions{Name: name, Size: linkSize})
|
||||
if err != nil {
|
||||
log.Err("Libfuse::libfuse2_readlink : error reading link file %s [%s]", name, err.Error())
|
||||
if os.IsNotExist(err) {
|
||||
|
|
|
@ -172,6 +172,7 @@ func testRmDirNotEmpty(suite *libfuseTestSuite) {
|
|||
defer C.free(unsafe.Pointer(path))
|
||||
isDirEmptyOptions := internal.IsDirEmptyOptions{Name: name}
|
||||
suite.mock.EXPECT().IsDirEmpty(isDirEmptyOptions).Return(false)
|
||||
suite.mock.EXPECT().DeleteEmptyDirs(internal.DeleteDirOptions{Name: name}).Return(false, errors.New("unable to delete directory"))
|
||||
|
||||
err := libfuse_rmdir(path)
|
||||
suite.assert.Equal(C.int(-C.ENOTEMPTY), err)
|
||||
|
@ -490,6 +491,9 @@ func testReadLink(suite *libfuseTestSuite) {
|
|||
defer C.free(unsafe.Pointer(path))
|
||||
options := internal.ReadLinkOptions{Name: name}
|
||||
suite.mock.EXPECT().ReadLink(options).Return("target", nil)
|
||||
attr := &internal.ObjAttr{}
|
||||
getAttrOpt := internal.GetAttrOptions{Name: name}
|
||||
suite.mock.EXPECT().GetAttr(getAttrOpt).Return(attr, nil)
|
||||
|
||||
// https://stackoverflow.com/questions/41953619/how-to-initialise-empty-c-cstring-in-cgo
|
||||
buf := C.CString("")
|
||||
|
@ -505,6 +509,9 @@ func testReadLinkNotExists(suite *libfuseTestSuite) {
|
|||
defer C.free(unsafe.Pointer(path))
|
||||
options := internal.ReadLinkOptions{Name: name}
|
||||
suite.mock.EXPECT().ReadLink(options).Return("", syscall.ENOENT)
|
||||
attr := &internal.ObjAttr{}
|
||||
getAttrOpt := internal.GetAttrOptions{Name: name}
|
||||
suite.mock.EXPECT().GetAttr(getAttrOpt).Return(attr, nil)
|
||||
|
||||
buf := C.CString("")
|
||||
err := libfuse_readlink(path, buf, 7)
|
||||
|
@ -519,6 +526,8 @@ func testReadLinkError(suite *libfuseTestSuite) {
|
|||
defer C.free(unsafe.Pointer(path))
|
||||
options := internal.ReadLinkOptions{Name: name}
|
||||
suite.mock.EXPECT().ReadLink(options).Return("", errors.New("failed to read link"))
|
||||
getAttrOpt := internal.GetAttrOptions{Name: name}
|
||||
suite.mock.EXPECT().GetAttr(getAttrOpt).Return(nil, nil)
|
||||
|
||||
buf := C.CString("")
|
||||
err := libfuse_readlink(path, buf, 7)
|
||||
|
|
|
@ -581,7 +581,16 @@ func libfuse_rmdir(path *C.char) C.int {
|
|||
|
||||
empty := fuseFS.NextComponent().IsDirEmpty(internal.IsDirEmptyOptions{Name: name})
|
||||
if !empty {
|
||||
return -C.ENOTEMPTY
|
||||
// delete empty directories from local cache directory
|
||||
val, err := fuseFS.NextComponent().DeleteEmptyDirs(internal.DeleteDirOptions{Name: name})
|
||||
if !val {
|
||||
// either file cache has failed or not present in the pipeline
|
||||
if err != nil {
|
||||
// if error is not nil, file cache has failed
|
||||
log.Err("Libfuse::libfuse_rmdir : Failed to delete %s [%s]", name, err.Error())
|
||||
}
|
||||
return -C.ENOTEMPTY
|
||||
}
|
||||
}
|
||||
|
||||
err := fuseFS.NextComponent().DeleteDir(internal.DeleteDirOptions{Name: name})
|
||||
|
@ -1026,7 +1035,13 @@ func libfuse_readlink(path *C.char, buf *C.char, size C.size_t) C.int {
|
|||
name = common.NormalizeObjectName(name)
|
||||
//log.Trace("Libfuse::libfuse_readlink : Received for %s", name)
|
||||
|
||||
targetPath, err := fuseFS.NextComponent().ReadLink(internal.ReadLinkOptions{Name: name})
|
||||
linkSize := int64(0)
|
||||
attr, err := fuseFS.NextComponent().GetAttr(internal.GetAttrOptions{Name: name})
|
||||
if err == nil && attr != nil {
|
||||
linkSize = attr.Size
|
||||
}
|
||||
|
||||
targetPath, err := fuseFS.NextComponent().ReadLink(internal.ReadLinkOptions{Name: name, Size: linkSize})
|
||||
if err != nil {
|
||||
log.Err("Libfuse::libfuse_readlink : error reading link file %s [%s]", name, err.Error())
|
||||
if os.IsNotExist(err) {
|
||||
|
|
|
@ -77,9 +77,9 @@ func (suite *libfuseTestSuite) TestConfig() {
|
|||
suite.assert.False(suite.libfuse.allowRoot)
|
||||
suite.assert.Equal(suite.libfuse.dirPermission, uint(fs.FileMode(0777)))
|
||||
suite.assert.Equal(suite.libfuse.filePermission, uint(fs.FileMode(0777)))
|
||||
suite.assert.Equal(suite.libfuse.entryExpiration, uint32(60))
|
||||
suite.assert.Equal(suite.libfuse.attributeExpiration, uint32(60))
|
||||
suite.assert.Equal(suite.libfuse.negativeTimeout, uint32(60))
|
||||
suite.assert.Equal(suite.libfuse.entryExpiration, uint32(0))
|
||||
suite.assert.Equal(suite.libfuse.attributeExpiration, uint32(0))
|
||||
suite.assert.Equal(suite.libfuse.negativeTimeout, uint32(0))
|
||||
suite.assert.True(suite.libfuse.directIO)
|
||||
}
|
||||
|
||||
|
|
|
@ -156,6 +156,7 @@ func testRmDirNotEmpty(suite *libfuseTestSuite) {
|
|||
defer C.free(unsafe.Pointer(path))
|
||||
isDirEmptyOptions := internal.IsDirEmptyOptions{Name: name}
|
||||
suite.mock.EXPECT().IsDirEmpty(isDirEmptyOptions).Return(false)
|
||||
suite.mock.EXPECT().DeleteEmptyDirs(internal.DeleteDirOptions{Name: name}).Return(false, errors.New("unable to delete directory"))
|
||||
|
||||
err := libfuse_rmdir(path)
|
||||
suite.assert.Equal(C.int(-C.ENOTEMPTY), err)
|
||||
|
@ -468,6 +469,9 @@ func testReadLink(suite *libfuseTestSuite) {
|
|||
defer C.free(unsafe.Pointer(path))
|
||||
options := internal.ReadLinkOptions{Name: name}
|
||||
suite.mock.EXPECT().ReadLink(options).Return("target", nil)
|
||||
attr := &internal.ObjAttr{}
|
||||
getAttrOpt := internal.GetAttrOptions{Name: name}
|
||||
suite.mock.EXPECT().GetAttr(getAttrOpt).Return(attr, nil)
|
||||
|
||||
// https://stackoverflow.com/questions/41953619/how-to-initialise-empty-c-cstring-in-cgo
|
||||
buf := C.CString("")
|
||||
|
@ -483,6 +487,9 @@ func testReadLinkNotExists(suite *libfuseTestSuite) {
|
|||
defer C.free(unsafe.Pointer(path))
|
||||
options := internal.ReadLinkOptions{Name: name}
|
||||
suite.mock.EXPECT().ReadLink(options).Return("", syscall.ENOENT)
|
||||
attr := &internal.ObjAttr{}
|
||||
getAttrOpt := internal.GetAttrOptions{Name: name}
|
||||
suite.mock.EXPECT().GetAttr(getAttrOpt).Return(attr, nil)
|
||||
|
||||
buf := C.CString("")
|
||||
err := libfuse_readlink(path, buf, 7)
|
||||
|
@ -497,6 +504,8 @@ func testReadLinkError(suite *libfuseTestSuite) {
|
|||
defer C.free(unsafe.Pointer(path))
|
||||
options := internal.ReadLinkOptions{Name: name}
|
||||
suite.mock.EXPECT().ReadLink(options).Return("", errors.New("failed to read link"))
|
||||
getAttrOpt := internal.GetAttrOptions{Name: name}
|
||||
suite.mock.EXPECT().GetAttr(getAttrOpt).Return(nil, nil)
|
||||
|
||||
buf := C.CString("")
|
||||
err := libfuse_readlink(path, buf, 7)
|
||||
|
|
|
@ -148,7 +148,6 @@ func (lfs *LoopbackFS) ReadDir(options internal.ReadDirOptions) ([]*internal.Obj
|
|||
Mode: info.Mode(),
|
||||
Mtime: info.ModTime(),
|
||||
}
|
||||
attr.Flags.Set(internal.PropFlagMetadataRetrieved)
|
||||
attr.Flags.Set(internal.PropFlagModeDefault)
|
||||
|
||||
if file.IsDir() {
|
||||
|
@ -186,7 +185,6 @@ func (lfs *LoopbackFS) StreamDir(options internal.StreamDirOptions) ([]*internal
|
|||
Mode: info.Mode(),
|
||||
Mtime: info.ModTime(),
|
||||
}
|
||||
attr.Flags.Set(internal.PropFlagMetadataRetrieved)
|
||||
attr.Flags.Set(internal.PropFlagModeDefault)
|
||||
|
||||
if file.IsDir() {
|
||||
|
@ -436,7 +434,6 @@ func (lfs *LoopbackFS) GetAttr(options internal.GetAttrOptions) (*internal.ObjAt
|
|||
Mode: info.Mode(),
|
||||
Mtime: info.ModTime(),
|
||||
}
|
||||
attr.Flags.Set(internal.PropFlagMetadataRetrieved)
|
||||
attr.Flags.Set(internal.PropFlagModeDefault)
|
||||
|
||||
if info.Mode()&os.ModeSymlink != 0 {
|
||||
|
|
|
@ -64,7 +64,6 @@ const (
|
|||
PropFlagIsDir
|
||||
PropFlagEmptyDir
|
||||
PropFlagSymlink
|
||||
PropFlagMetadataRetrieved
|
||||
PropFlagModeDefault // TODO: Does this sound better as ModeDefault or DefaultMode? The getter would be IsModeDefault or IsDefaultMode
|
||||
)
|
||||
|
||||
|
@ -93,12 +92,6 @@ func (attr *ObjAttr) IsSymlink() bool {
|
|||
return attr.Flags.IsSet(PropFlagSymlink)
|
||||
}
|
||||
|
||||
// IsMetadataRetrieved : Whether or not metadata has been retrieved for this path.
|
||||
// Datalake list paths does not support returning x-ms-properties (metadata), so we cannot be sure if the path is a symlink or not.
|
||||
func (attr *ObjAttr) IsMetadataRetrieved() bool {
|
||||
return attr.Flags.IsSet(PropFlagMetadataRetrieved)
|
||||
}
|
||||
|
||||
// IsModeDefault : Whether or not to use the default mode.
|
||||
// This is set in any storage service that does not support chmod/chown.
|
||||
func (attr *ObjAttr) IsModeDefault() bool {
|
||||
|
|
|
@ -65,6 +65,10 @@ func (base *BaseComponent) Configure(isParent bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (base *BaseComponent) GenConfig() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (base *BaseComponent) Priority() ComponentPriority {
|
||||
return EComponentPriority.LevelMid()
|
||||
}
|
||||
|
@ -111,6 +115,13 @@ func (base *BaseComponent) IsDirEmpty(options IsDirEmptyOptions) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (base *BaseComponent) DeleteEmptyDirs(options DeleteDirOptions) (bool, error) {
|
||||
if base.next != nil {
|
||||
return base.next.DeleteEmptyDirs(options)
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (base *BaseComponent) OpenDir(options OpenDirOptions) error {
|
||||
if base.next != nil {
|
||||
return base.next.OpenDir(options)
|
||||
|
|
|
@ -71,6 +71,7 @@ type Component interface {
|
|||
Name() string
|
||||
SetName(string)
|
||||
Configure(bool) error
|
||||
GenConfig() string
|
||||
Priority() ComponentPriority
|
||||
|
||||
SetNextComponent(c Component)
|
||||
|
@ -83,6 +84,7 @@ type Component interface {
|
|||
CreateDir(CreateDirOptions) error
|
||||
DeleteDir(DeleteDirOptions) error
|
||||
IsDirEmpty(IsDirEmptyOptions) bool
|
||||
DeleteEmptyDirs(DeleteDirOptions) (bool, error)
|
||||
|
||||
OpenDir(OpenDirOptions) error
|
||||
//ReadDir: implementation expectations
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
|
||||
Licensed under the MIT License <http://opensource.org/licenses/MIT>.
|
||||
|
||||
Copyright © 2020-2023 Microsoft Corporation. All rights reserved.
|
||||
Copyright © 2020-2024 Microsoft Corporation. All rights reserved.
|
||||
Author : <blobfusedev@microsoft.com>
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
|
|
|
@ -167,6 +167,7 @@ type CreateLinkOptions struct {
|
|||
|
||||
type ReadLinkOptions struct {
|
||||
Name string
|
||||
Size int64
|
||||
}
|
||||
|
||||
type GetAttrOptions struct {
|
||||
|
|
|
@ -136,6 +136,10 @@ func (m *MockComponent) Configure(arg0 bool) error {
|
|||
return ret0
|
||||
}
|
||||
|
||||
func (m *MockComponent) GenConfig() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
// Configure indicates an expected call of Configure.
|
||||
func (mr *MockComponentMockRecorder) Configure(arg0 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
|
@ -327,6 +331,21 @@ func (mr *MockComponentMockRecorder) IsDirEmpty(arg0 interface{}) *gomock.Call {
|
|||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsDirEmpty", reflect.TypeOf((*MockComponent)(nil).IsDirEmpty), arg0)
|
||||
}
|
||||
|
||||
// DeleteEmptyDirs mocks base method.
|
||||
func (m *MockComponent) DeleteEmptyDirs(arg0 DeleteDirOptions) (bool, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "DeleteEmptyDirs", arg0)
|
||||
ret0, _ := ret[0].(bool)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// DeleteDir indicates an expected call of DeleteEmptyDirs.
|
||||
func (mr *MockComponentMockRecorder) DeleteEmptyDirs(arg0 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteEmptyDirs", reflect.TypeOf((*MockComponent)(nil).DeleteEmptyDirs), arg0)
|
||||
}
|
||||
|
||||
// Name mocks base method.
|
||||
func (m *MockComponent) Name() string {
|
||||
m.ctrl.T.Helper()
|
||||
|
|
|
@ -53,6 +53,14 @@ type NewComponent func() Component
|
|||
// Map holding all possible components along with their respective constructors
|
||||
var registeredComponents map[string]NewComponent
|
||||
|
||||
func GetComponent(name string) Component {
|
||||
compInit, ok := registeredComponents[name]
|
||||
if ok {
|
||||
return compInit()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewPipeline : Using a list of strings holding name of components, create and configure the component objects
|
||||
func NewPipeline(components []string, isParent bool) (*Pipeline, error) {
|
||||
comps := make([]Component, 0)
|
||||
|
|
|
@ -51,6 +51,7 @@ logging:
|
|||
# Pipeline configuration. Choose components to be engaged. The order below is the priority order that needs to be followed.
|
||||
components:
|
||||
- libfuse
|
||||
- entry_cache
|
||||
- block_cache
|
||||
- file_cache
|
||||
- attr_cache
|
||||
|
@ -67,6 +68,10 @@ libfuse:
|
|||
extension: <physical path to extension library>
|
||||
direct-io: true|false <enable to bypass the kernel cache>
|
||||
|
||||
# Entry Cache configuration
|
||||
entry_cache:
|
||||
timeout-sec: <cache eviction timeout (in sec). Default - 30 sec>
|
||||
|
||||
# Block cache related configuration
|
||||
block_cache:
|
||||
block-size-mb: <size of each block to be cached in memory (in MB). Default - 16 MB>
|
||||
|
|
|
@ -38,10 +38,11 @@ package e2e_tests
|
|||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"crypto/rand"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
mrand "math/rand"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
|
@ -49,7 +50,6 @@ import (
|
|||
"strings"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
@ -100,7 +100,6 @@ func initDataValidationFlags() {
|
|||
}
|
||||
|
||||
func getDataValidationTestDirName(n int) string {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
b := make([]byte, n)
|
||||
rand.Read(b)
|
||||
return fmt.Sprintf("%x", b)[:n]
|
||||
|
@ -238,7 +237,7 @@ func compareReadOperInLocalAndRemote(suite *dataValidationTestSuite, lfh, rfh *o
|
|||
}
|
||||
|
||||
func compareWriteOperInLocalAndRemote(suite *dataValidationTestSuite, lfh, rfh *os.File, offset int64) {
|
||||
sizeofbuffer := (rand.Int() % 4) + 1
|
||||
sizeofbuffer := (mrand.Int() % 4) + 1
|
||||
buffer := make([]byte, sizeofbuffer*int(_1MB))
|
||||
rand.Read(buffer)
|
||||
|
||||
|
@ -752,51 +751,51 @@ func (suite *dataValidationTestSuite) TestPanicOnReadingFileInRandReadMode() {
|
|||
closeFileHandles(suite, rfh)
|
||||
}
|
||||
|
||||
func (suite *dataValidationTestSuite) TestReadDataAtBlockBoundaries() {
|
||||
fileName := "testReadDataAtBlockBoundaries"
|
||||
localFilePath, remoteFilePath := convertFileNameToFilePath(fileName)
|
||||
fileSize := 35 * int(_1MB)
|
||||
generateFileWithRandomData(suite, localFilePath, fileSize)
|
||||
suite.copyToMountDir(localFilePath, remoteFilePath)
|
||||
suite.validateData(localFilePath, remoteFilePath)
|
||||
// func (suite *dataValidationTestSuite) TestReadDataAtBlockBoundaries() {
|
||||
// fileName := "testReadDataAtBlockBoundaries"
|
||||
// localFilePath, remoteFilePath := convertFileNameToFilePath(fileName)
|
||||
// fileSize := 35 * int(_1MB)
|
||||
// generateFileWithRandomData(suite, localFilePath, fileSize)
|
||||
// suite.copyToMountDir(localFilePath, remoteFilePath)
|
||||
// suite.validateData(localFilePath, remoteFilePath)
|
||||
|
||||
lfh, rfh := openFileHandleInLocalAndRemote(suite, os.O_RDWR, localFilePath, remoteFilePath)
|
||||
var offset int64 = 0
|
||||
//tests run in 16MB block size config.
|
||||
//Data in File 35MB(3blocks)
|
||||
//block1->16MB, block2->16MB, block3->3MB
|
||||
// lfh, rfh := openFileHandleInLocalAndRemote(suite, os.O_RDWR, localFilePath, remoteFilePath)
|
||||
// var offset int64 = 0
|
||||
// //tests run in 16MB block size config.
|
||||
// //Data in File 35MB(3blocks)
|
||||
// //block1->16MB, block2->16MB, block3->3MB
|
||||
|
||||
//getting 4MB data from 1st block
|
||||
compareReadOperInLocalAndRemote(suite, lfh, rfh, offset)
|
||||
//getting 4MB data from overlapping blocks
|
||||
offset = int64(15 * int(_1MB))
|
||||
compareReadOperInLocalAndRemote(suite, lfh, rfh, offset)
|
||||
//getting 4MB data from last block
|
||||
offset = int64(32 * int(_1MB))
|
||||
compareReadOperInLocalAndRemote(suite, lfh, rfh, offset)
|
||||
//getting 4MB data from overlapping block with last block
|
||||
offset = int64(30 * int(_1MB))
|
||||
compareReadOperInLocalAndRemote(suite, lfh, rfh, offset)
|
||||
//Read at some random offset
|
||||
for i := 0; i < 10; i++ {
|
||||
offset = rand.Int63() % int64(fileSize)
|
||||
compareReadOperInLocalAndRemote(suite, lfh, rfh, offset)
|
||||
}
|
||||
// //getting 4MB data from 1st block
|
||||
// compareReadOperInLocalAndRemote(suite, lfh, rfh, offset)
|
||||
// //getting 4MB data from overlapping blocks
|
||||
// offset = int64(15 * int(_1MB))
|
||||
// compareReadOperInLocalAndRemote(suite, lfh, rfh, offset)
|
||||
// //getting 4MB data from last block
|
||||
// offset = int64(32 * int(_1MB))
|
||||
// compareReadOperInLocalAndRemote(suite, lfh, rfh, offset)
|
||||
// //getting 4MB data from overlapping block with last block
|
||||
// offset = int64(30 * int(_1MB))
|
||||
// compareReadOperInLocalAndRemote(suite, lfh, rfh, offset)
|
||||
// //Read at some random offset
|
||||
// for i := 0; i < 10; i++ {
|
||||
// offset = mrand.Int63() % int64(fileSize)
|
||||
// compareReadOperInLocalAndRemote(suite, lfh, rfh, offset)
|
||||
// }
|
||||
|
||||
//write at end of file
|
||||
offset = int64(fileSize)
|
||||
compareWriteOperInLocalAndRemote(suite, lfh, rfh, offset)
|
||||
//Check the previous write with read
|
||||
compareReadOperInLocalAndRemote(suite, lfh, rfh, offset)
|
||||
// //write at end of file
|
||||
// offset = int64(fileSize)
|
||||
// compareWriteOperInLocalAndRemote(suite, lfh, rfh, offset)
|
||||
// //Check the previous write with read
|
||||
// compareReadOperInLocalAndRemote(suite, lfh, rfh, offset)
|
||||
|
||||
//Write at Random offset in the file
|
||||
offset = rand.Int63() % int64(fileSize)
|
||||
compareWriteOperInLocalAndRemote(suite, lfh, rfh, offset)
|
||||
//Check the previous write with read
|
||||
compareReadOperInLocalAndRemote(suite, lfh, rfh, offset)
|
||||
// //Write at Random offset in the file
|
||||
// offset = mrand.Int63() % int64(fileSize)
|
||||
// compareWriteOperInLocalAndRemote(suite, lfh, rfh, offset)
|
||||
// //Check the previous write with read
|
||||
// compareReadOperInLocalAndRemote(suite, lfh, rfh, offset)
|
||||
|
||||
closeFileHandles(suite, lfh, rfh)
|
||||
}
|
||||
// closeFileHandles(suite, lfh, rfh)
|
||||
// }
|
||||
|
||||
// -------------- Main Method -------------------
|
||||
func TestDataValidationTestSuite(t *testing.T) {
|
||||
|
@ -845,16 +844,16 @@ func TestDataValidationTestSuite(t *testing.T) {
|
|||
// Sanity check in the off chance the same random name was generated twice and was still around somehow
|
||||
err := os.RemoveAll(tObj.testMntPath)
|
||||
if err != nil {
|
||||
fmt.Println("Could not cleanup feature dir before testing")
|
||||
fmt.Printf("Could not cleanup feature dir before testing [%s]\n", err.Error())
|
||||
}
|
||||
err = os.RemoveAll(tObj.testCachePath)
|
||||
if err != nil {
|
||||
fmt.Println("Could not cleanup cache dir before testing")
|
||||
fmt.Printf("Could not cleanup cache dir before testing [%s]\n", err.Error())
|
||||
}
|
||||
|
||||
err = os.Mkdir(tObj.testMntPath, 0777)
|
||||
if err != nil {
|
||||
t.Error("Failed to create test directory")
|
||||
t.Errorf("Failed to create test directory [%s]\n", err.Error())
|
||||
}
|
||||
rand.Read(minBuff)
|
||||
rand.Read(medBuff)
|
||||
|
|
|
@ -37,9 +37,9 @@
|
|||
package e2e_tests
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"flag"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
|
@ -88,7 +88,6 @@ func initDirFlags() {
|
|||
}
|
||||
|
||||
func getTestDirName(n int) string {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
b := make([]byte, n)
|
||||
rand.Read(b)
|
||||
return fmt.Sprintf("%x", b)[:n]
|
||||
|
@ -585,12 +584,12 @@ func TestDirTestSuite(t *testing.T) {
|
|||
// Sanity check in the off chance the same random name was generated twice and was still around somehow
|
||||
err := os.RemoveAll(dirTest.testPath)
|
||||
if err != nil {
|
||||
fmt.Println("Could not cleanup feature dir before testing")
|
||||
fmt.Printf("Could not cleanup feature dir before testing [%s]\n", err.Error())
|
||||
}
|
||||
|
||||
err = os.Mkdir(dirTest.testPath, 0777)
|
||||
if err != nil {
|
||||
t.Error("Failed to create test directory")
|
||||
t.Errorf("Failed to create test directory [%s]\n", err.Error())
|
||||
}
|
||||
rand.Read(dirTest.minBuff)
|
||||
rand.Read(dirTest.medBuff)
|
||||
|
|
|
@ -37,10 +37,10 @@
|
|||
package e2e_tests
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
@ -89,7 +89,6 @@ func initFileFlags() {
|
|||
}
|
||||
|
||||
func getFileTestDirName(n int) string {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
b := make([]byte, n)
|
||||
rand.Read(b)
|
||||
return fmt.Sprintf("%x", b)[:n]
|
||||
|
@ -651,12 +650,12 @@ func TestFileTestSuite(t *testing.T) {
|
|||
// Sanity check in the off chance the same random name was generated twice and was still around somehow
|
||||
err := os.RemoveAll(fileTest.testPath)
|
||||
if err != nil {
|
||||
fmt.Println("Could not cleanup feature dir before testing")
|
||||
fmt.Printf("Could not cleanup feature dir before testing [%s]\n", err.Error())
|
||||
}
|
||||
|
||||
err = os.Mkdir(fileTest.testPath, 0777)
|
||||
if err != nil {
|
||||
t.Error("Failed to create test directory")
|
||||
t.Errorf("Failed to create test directory [%s]\n", err.Error())
|
||||
}
|
||||
rand.Read(fileTest.minBuff)
|
||||
rand.Read(fileTest.medBuff)
|
||||
|
|
|
@ -37,9 +37,9 @@
|
|||
package stress_test
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"flag"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
|
|
Загрузка…
Ссылка в новой задаче