* Adding listing caching option
This commit is contained in:
Vikas Bhansali 2024-11-12 15:08:14 +05:30 коммит произвёл GitHub
Родитель b26436c81c
Коммит e5f6bd7fc2
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
24 изменённых файлов: 505 добавлений и 94 удалений

Просмотреть файл

@ -1,4 +1,7 @@
## 2.4.0 (Unreleased)
**Features**
- Entry cache to hold directory listing results in cache for a given timeout. This will reduce REST calls going to storage while listing the blobs in parallel.
**Bug Fixes**
- [#1426](https://github.com/Azure/azure-storage-fuse/issues/1426) Read panic in block-cache due to boundary conditions.
- Do not allow mount path and temp-cache path to be same when using block-cache.

Просмотреть файл

@ -99,7 +99,7 @@ Note: Blobfuse2 accepts all CLI parameters that Blobfuse does, but may ignore pa
| --log-level=LOG_WARNING | --log-level=LOG_WARNING | logging.level | |
| --use-attr-cache=true | --use-attr-cache=true | attr_cache | Add attr_cache to the components list |
| --use-adls=false | --use-adls=false | azstorage.type | Specify either 'block' or 'adls' |
| --no-symlinks=false | --no-symlinks=false | attr_cache.no-symlinks | |
| --no-symlinks=false | --no-symlinks=true | attr_cache.no-symlinks | |
| --cache-on-list=true | --cache-on-list=true | attr_cache.no-cache-on-list | This parameter has the opposite boolean semantics |
| --upload-modified-only=true | --upload-modified-only=true | | Always on in blobfuse2 |
| --max-concurrency=12 | --max-concurrency=12 | azstorage.max-concurrency | |

Просмотреть файл

@ -1697,6 +1697,13 @@ stages:
echo "##vso[task.setvariable variable=is_preview]$is_preview"
fi
is_preview="false"
echo "##vso[task.setvariable variable=is_preview]$is_preview"
if [[ $marinerFuse3AmdRpm == *"preview"* ]]; then
is_preview="true"
echo "##vso[task.setvariable variable=is_preview]$is_preview"
fi
while IFS=, read -r distro fuseArchType repoName releaseName; do
# If the package is preview, publish to mariner preview package

Просмотреть файл

@ -38,6 +38,7 @@ import (
_ "github.com/Azure/azure-storage-fuse/v2/component/azstorage"
_ "github.com/Azure/azure-storage-fuse/v2/component/block_cache"
_ "github.com/Azure/azure-storage-fuse/v2/component/custom"
_ "github.com/Azure/azure-storage-fuse/v2/component/entry_cache"
_ "github.com/Azure/azure-storage-fuse/v2/component/file_cache"
_ "github.com/Azure/azure-storage-fuse/v2/component/libfuse"
_ "github.com/Azure/azure-storage-fuse/v2/component/loopback"

Просмотреть файл

@ -90,10 +90,11 @@ type mountOptions struct {
LazyWrite bool `config:"lazy-write"`
// v1 support
Streaming bool `config:"streaming"`
AttrCache bool `config:"use-attr-cache"`
LibfuseOptions []string `config:"libfuse-options"`
BlockCache bool `config:"block-cache"`
Streaming bool `config:"streaming"`
AttrCache bool `config:"use-attr-cache"`
LibfuseOptions []string `config:"libfuse-options"`
BlockCache bool `config:"block-cache"`
EntryCacheTimeout int `config:"list-cache-timeout"`
}
var options mountOptions
@ -313,6 +314,10 @@ var mountCmd = &cobra.Command{
options.Components = pipeline
}
if config.IsSet("entry_cache.timeout-sec") || options.EntryCacheTimeout > 0 {
options.Components = append(options.Components[:1], append([]string{"entry_cache"}, options.Components[1:]...)...)
}
if config.IsSet("libfuse-options") {
for _, v := range options.LibfuseOptions {
parameter := strings.Split(v, "=")

Просмотреть файл

@ -55,7 +55,6 @@ const defaultAttrCacheTimeout uint32 = (120)
type AttrCache struct {
internal.BaseComponent
cacheTimeout uint32
cacheOnList bool
noSymlinks bool
maxFiles int
cacheMap map[string]*attrCacheItem
@ -150,22 +149,18 @@ func (ac *AttrCache) Configure(_ bool) error {
ac.cacheTimeout = defaultAttrCacheTimeout
}
if config.IsSet(compName + ".cache-on-list") {
ac.cacheOnList = conf.CacheOnList
} else {
ac.cacheOnList = !conf.NoCacheOnList
}
if config.IsSet(compName + ".max-files") {
ac.maxFiles = conf.MaxFiles
} else {
ac.maxFiles = defaultMaxFiles
}
ac.noSymlinks = conf.NoSymlinks
if config.IsSet(compName + ".no-symlinks") {
ac.noSymlinks = conf.NoSymlinks
}
log.Crit("AttrCache::Configure : cache-timeout %d, symlink %t, cache-on-list %t, max-files %d",
ac.cacheTimeout, ac.noSymlinks, ac.cacheOnList, ac.maxFiles)
log.Crit("AttrCache::Configure : cache-timeout %d, symlink %t, max-files %d",
ac.cacheTimeout, ac.noSymlinks, ac.maxFiles)
return nil
}
@ -294,7 +289,7 @@ func (ac *AttrCache) StreamDir(options internal.StreamDirOptions) ([]*internal.O
// cacheAttributes : On dir listing cache the attributes for all files
func (ac *AttrCache) cacheAttributes(pathList []*internal.ObjAttr) {
// Check whether or not we are supposed to cache on list
if ac.cacheOnList && len(pathList) > 0 {
if len(pathList) > 0 {
// Putting this inside loop is heavy as for each item we will do a kernel call to get current time
// If there are millions of blobs then cost of this is very high.
currTime := time.Now()
@ -488,14 +483,8 @@ func (ac *AttrCache) GetAttr(options internal.GetAttrOptions) (*internal.ObjAttr
// no entry if path does not exist
return &internal.ObjAttr{}, syscall.ENOENT
} else {
// IsMetadataRetrieved is false in the case of ADLS List since the API does not support metadata.
// Once migration of ADLS list to blob endpoint is done (in future service versions), we can remove this.
// options.RetrieveMetadata is set by CopyFromFile and WriteFile which need metadata to ensure it is preserved.
if value.getAttr().IsMetadataRetrieved() || (ac.noSymlinks && !options.RetrieveMetadata) {
// path exists and we have all the metadata required or we do not care about metadata
log.Debug("AttrCache::GetAttr : %s served from cache", options.Name)
return value.getAttr(), nil
}
log.Debug("AttrCache::GetAttr : %s served from cache", options.Name)
return value.getAttr(), nil
}
}

Просмотреть файл

@ -80,9 +80,6 @@ func newTestAttrCache(next internal.Component, configuration string) *AttrCache
func getPathAttr(path string, size int64, mode os.FileMode, metadata bool) *internal.ObjAttr {
flags := internal.NewFileBitMap()
if metadata {
flags.Set(internal.PropFlagMetadataRetrieved)
}
return &internal.ObjAttr{
Path: path,
Name: filepath.Base(path),
@ -210,8 +207,7 @@ func (suite *attrCacheTestSuite) TestDefault() {
defer suite.cleanupTest()
suite.assert.Equal(suite.attrCache.Name(), "attr_cache")
suite.assert.EqualValues(suite.attrCache.cacheTimeout, 120)
suite.assert.Equal(suite.attrCache.cacheOnList, true)
suite.assert.Equal(suite.attrCache.noSymlinks, false)
// suite.assert.Equal(suite.attrCache.noSymlinks, false)
}
// Tests configuration
@ -223,7 +219,6 @@ func (suite *attrCacheTestSuite) TestConfig() {
suite.assert.Equal(suite.attrCache.Name(), "attr_cache")
suite.assert.EqualValues(suite.attrCache.cacheTimeout, 60)
suite.assert.Equal(suite.attrCache.cacheOnList, false)
suite.assert.Equal(suite.attrCache.noSymlinks, true)
}
@ -246,7 +241,6 @@ func (suite *attrCacheTestSuite) TestConfigZero() {
suite.assert.Equal(suite.attrCache.Name(), "attr_cache")
suite.assert.EqualValues(suite.attrCache.cacheTimeout, 0)
suite.assert.Equal(suite.attrCache.cacheOnList, false)
suite.assert.Equal(suite.attrCache.noSymlinks, true)
}
@ -426,29 +420,6 @@ func (suite *attrCacheTestSuite) TestReadDirExists() {
}
}
func (suite *attrCacheTestSuite) TestReadDirNoCacheOnList() {
defer suite.cleanupTest()
suite.cleanupTest() // clean up the default attr cache generated
cacheOnList := false
config := fmt.Sprintf("attr_cache:\n no-cache-on-list: %t", !cacheOnList)
suite.setupTestHelper(config) // setup a new attr cache with a custom config (clean up will occur after the test as usual)
suite.assert.EqualValues(suite.attrCache.cacheOnList, cacheOnList)
path := "a"
size := int64(1024)
mode := os.FileMode(0)
aAttr := generateNestedPathAttr(path, size, mode)
options := internal.ReadDirOptions{Name: path}
suite.mock.EXPECT().ReadDir(options).Return(aAttr, nil)
suite.assert.Empty(suite.attrCache.cacheMap) // cacheMap should be empty before call
returnedAttr, err := suite.attrCache.ReadDir(options)
suite.assert.Nil(err)
suite.assert.Equal(aAttr, returnedAttr)
suite.assert.Empty(suite.attrCache.cacheMap) // cacheMap should be empty after call
}
func (suite *attrCacheTestSuite) TestReadDirError() {
defer suite.cleanupTest()
var paths = []string{"a", "a/", "ab", "ab/"}
@ -912,7 +883,6 @@ func (suite *attrCacheTestSuite) TestGetAttrExistsWithoutMetadataNoSymlinks() {
// This is a little janky but required since testify suite does not support running setup or clean up for subtests.
suite.cleanupTest()
suite.setupTestHelper(config) // setup a new attr cache with a custom config (clean up will occur after the test as usual)
suite.assert.EqualValues(suite.attrCache.cacheOnList, noSymlinks)
suite.Run(path, func() {
truncatedPath := internal.TruncateDirName(path)
addDirectoryToCache(suite.assert, suite.attrCache, "a", true) // add the paths to the cache with IsMetadataRetrived=true
@ -941,7 +911,7 @@ func (suite *attrCacheTestSuite) TestGetAttrExistsWithoutMetadata() {
options := internal.GetAttrOptions{Name: path}
// attributes should not be accessible so call the mock
suite.mock.EXPECT().GetAttr(options).Return(getPathAttr(path, defaultSize, fs.FileMode(defaultMode), false), nil)
//suite.mock.EXPECT().GetAttr(options).Return(getPathAttr(path, defaultSize, fs.FileMode(defaultMode), false), nil)
_, err := suite.attrCache.GetAttr(options)
suite.assert.Nil(err)

Просмотреть файл

@ -508,7 +508,7 @@ func (az *AzStorage) CreateLink(options internal.CreateLinkOptions) error {
func (az *AzStorage) ReadLink(options internal.ReadLinkOptions) (string, error) {
log.Trace("AzStorage::ReadLink : Read symlink %s", options.Name)
data, err := az.storage.ReadBuffer(options.Name, 0, 0)
data, err := az.storage.ReadBuffer(options.Name, 0, options.Size)
if err != nil {
azStatsCollector.PushEvents(readLink, options.Name, nil)

Просмотреть файл

@ -457,7 +457,6 @@ func (bb *BlockBlob) getAttrUsingRest(name string) (attr *internal.ObjAttr, err
parseMetadata(attr, prop.Metadata)
attr.Flags.Set(internal.PropFlagMetadataRetrieved)
attr.Flags.Set(internal.PropFlagModeDefault)
return attr, nil
@ -602,7 +601,6 @@ func (bb *BlockBlob) List(prefix string, marker *string, count int32) ([]*intern
MD5: blobInfo.Properties.ContentMD5,
}
parseMetadata(attr, blobInfo.Metadata)
attr.Flags.Set(internal.PropFlagMetadataRetrieved)
attr.Flags.Set(internal.PropFlagModeDefault)
}
blobList = append(blobList, attr)
@ -641,7 +639,6 @@ func (bb *BlockBlob) List(prefix string, marker *string, count int32) ([]*intern
attr.Atime = attr.Mtime
attr.Crtime = attr.Mtime
attr.Ctime = attr.Mtime
attr.Flags.Set(internal.PropFlagMetadataRetrieved)
attr.Flags.Set(internal.PropFlagModeDefault)
blobList = append(blobList, attr)
}

Просмотреть файл

@ -644,7 +644,6 @@ func (s *blockBlobTestSuite) TestReadDirNoVirtualDirectory() {
s.assert.EqualValues(name, entries[0].Path)
s.assert.EqualValues(name, entries[0].Name)
s.assert.True(entries[0].IsDir())
s.assert.True(entries[0].IsMetadataRetrieved())
s.assert.True(entries[0].IsModeDefault())
})
}
@ -664,13 +663,11 @@ func (s *blockBlobTestSuite) TestReadDirHierarchy() {
s.assert.EqualValues(base+"/c1", entries[0].Path)
s.assert.EqualValues("c1", entries[0].Name)
s.assert.True(entries[0].IsDir())
s.assert.True(entries[0].IsMetadataRetrieved())
s.assert.True(entries[0].IsModeDefault())
// Check the file
s.assert.EqualValues(base+"/c2", entries[1].Path)
s.assert.EqualValues("c2", entries[1].Name)
s.assert.False(entries[1].IsDir())
s.assert.True(entries[1].IsMetadataRetrieved())
s.assert.True(entries[1].IsModeDefault())
}
@ -693,19 +690,16 @@ func (s *blockBlobTestSuite) TestReadDirRoot() {
s.assert.EqualValues(base, entries[0].Path)
s.assert.EqualValues(base, entries[0].Name)
s.assert.True(entries[0].IsDir())
s.assert.True(entries[0].IsMetadataRetrieved())
s.assert.True(entries[0].IsModeDefault())
// Check the baseb dir
s.assert.EqualValues(base+"b", entries[1].Path)
s.assert.EqualValues(base+"b", entries[1].Name)
s.assert.True(entries[1].IsDir())
s.assert.True(entries[1].IsMetadataRetrieved())
s.assert.True(entries[1].IsModeDefault())
// Check the basec file
s.assert.EqualValues(base+"c", entries[2].Path)
s.assert.EqualValues(base+"c", entries[2].Name)
s.assert.False(entries[2].IsDir())
s.assert.True(entries[2].IsMetadataRetrieved())
s.assert.True(entries[2].IsModeDefault())
})
}
@ -725,7 +719,6 @@ func (s *blockBlobTestSuite) TestReadDirSubDir() {
s.assert.EqualValues(base+"/c1"+"/gc1", entries[0].Path)
s.assert.EqualValues("gc1", entries[0].Name)
s.assert.False(entries[0].IsDir())
s.assert.True(entries[0].IsMetadataRetrieved())
s.assert.True(entries[0].IsModeDefault())
}
@ -745,7 +738,6 @@ func (s *blockBlobTestSuite) TestReadDirSubDirPrefixPath() {
s.assert.EqualValues("c1"+"/gc1", entries[0].Path)
s.assert.EqualValues("gc1", entries[0].Name)
s.assert.False(entries[0].IsDir())
s.assert.True(entries[0].IsMetadataRetrieved())
s.assert.True(entries[0].IsModeDefault())
}

Просмотреть файл

@ -557,15 +557,14 @@ func ParseAndReadDynamicConfig(az *AzStorage, opt AzStorageOptions, reload bool)
az.stConfig.honourACL = false
}
// by default symlink will be disabled
az.stConfig.disableSymlink = true
if config.IsSet("attr_cache.no-symlinks") {
err := config.UnmarshalKey("attr_cache.no-symlinks", &az.stConfig.disableSymlink)
if err != nil {
az.stConfig.disableSymlink = true
log.Err("ParseAndReadDynamicConfig : Failed to unmarshal attr_cache.no-symlinks")
}
} else {
// by default symlink will be disabled
az.stConfig.disableSymlink = true
}
// Auth related reconfig

Просмотреть файл

@ -402,8 +402,6 @@ func (dl *Datalake) GetAttr(name string) (attr *internal.ObjAttr, err error) {
attr.Mode = attr.Mode | os.ModeDir
}
attr.Flags.Set(internal.PropFlagMetadataRetrieved)
if dl.Config.honourACL && dl.Config.authConfig.ObjectID != "" {
acl, err := fileClient.GetAccessControl(context.Background(), nil)
if err != nil {

Просмотреть файл

@ -498,13 +498,11 @@ func (s *datalakeTestSuite) TestReadDirHierarchy() {
s.assert.EqualValues(base+"/c1", entries[0].Path)
s.assert.EqualValues("c1", entries[0].Name)
s.assert.True(entries[0].IsDir())
s.assert.False(entries[0].IsMetadataRetrieved())
s.assert.False(entries[0].IsModeDefault())
// Check the file
s.assert.EqualValues(base+"/c2", entries[1].Path)
s.assert.EqualValues("c2", entries[1].Name)
s.assert.False(entries[1].IsDir())
s.assert.False(entries[1].IsMetadataRetrieved())
s.assert.False(entries[1].IsModeDefault())
}
@ -527,19 +525,16 @@ func (s *datalakeTestSuite) TestReadDirRoot() {
s.assert.EqualValues(base, entries[0].Path)
s.assert.EqualValues(base, entries[0].Name)
s.assert.True(entries[0].IsDir())
s.assert.False(entries[0].IsMetadataRetrieved())
s.assert.False(entries[0].IsModeDefault())
// Check the baseb dir
s.assert.EqualValues(base+"b", entries[1].Path)
s.assert.EqualValues(base+"b", entries[1].Name)
s.assert.True(entries[1].IsDir())
s.assert.False(entries[1].IsMetadataRetrieved())
s.assert.False(entries[1].IsModeDefault())
// Check the basec file
s.assert.EqualValues(base+"c", entries[2].Path)
s.assert.EqualValues(base+"c", entries[2].Name)
s.assert.False(entries[2].IsDir())
s.assert.False(entries[2].IsMetadataRetrieved())
s.assert.False(entries[2].IsModeDefault())
})
}
@ -559,7 +554,6 @@ func (s *datalakeTestSuite) TestReadDirSubDir() {
s.assert.EqualValues(base+"/c1"+"/gc1", entries[0].Path)
s.assert.EqualValues("gc1", entries[0].Name)
s.assert.False(entries[0].IsDir())
s.assert.False(entries[0].IsMetadataRetrieved())
s.assert.False(entries[0].IsModeDefault())
}
@ -579,7 +573,6 @@ func (s *datalakeTestSuite) TestReadDirSubDirPrefixPath() {
s.assert.EqualValues(base+"/c1"+"/gc1", entries[0].Path)
s.assert.EqualValues("gc1", entries[0].Name)
s.assert.False(entries[0].IsDir())
s.assert.False(entries[0].IsMetadataRetrieved())
s.assert.False(entries[0].IsModeDefault())
}

Просмотреть файл

@ -0,0 +1,214 @@
/*
_____ _____ _____ ____ ______ _____ ------
| | | | | | | | | | | | |
| | | | | | | | | | | | |
| --- | | | | |-----| |---- | | |-----| |----- ------
| | | | | | | | | | | | |
| ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____
Licensed under the MIT License <http://opensource.org/licenses/MIT>.
Copyright © 2020-2024 Microsoft Corporation. All rights reserved.
Author : <blobfusedev@microsoft.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
*/
package entry_cache
import (
"container/list"
"context"
"fmt"
"sync"
"github.com/Azure/azure-storage-fuse/v2/common"
"github.com/Azure/azure-storage-fuse/v2/common/config"
"github.com/Azure/azure-storage-fuse/v2/common/log"
"github.com/Azure/azure-storage-fuse/v2/internal"
"github.com/vibhansa-msft/tlru"
)
// Common structure for Component
type EntryCache struct {
internal.BaseComponent
cacheTimeout uint32
pathLocks *common.LockMap
pathLRU *tlru.TLRU
pathMap sync.Map
}
type pathCacheItem struct {
children []*internal.ObjAttr
nextToken string
}
// By default entry cache is valid for 30 seconds
const defaultEntryCacheTimeout uint32 = (30)
// Structure defining your config parameters
type EntryCacheOptions struct {
Timeout uint32 `config:"timeout-sec" yaml:"timeout-sec,omitempty"`
}
const compName = "entry_cache"
// Verification to check satisfaction criteria with Component Interface
var _ internal.Component = &EntryCache{}
func (c *EntryCache) Name() string {
return compName
}
func (c *EntryCache) SetName(name string) {
c.BaseComponent.SetName(name)
}
func (c *EntryCache) SetNextComponent(nc internal.Component) {
c.BaseComponent.SetNextComponent(nc)
}
// Start : Pipeline calls this method to start the component functionality
//
// this shall not block the call otherwise pipeline will not start
func (c *EntryCache) Start(ctx context.Context) error {
log.Trace("EntryCache::Start : Starting component %s", c.Name())
err := c.pathLRU.Start()
if err != nil {
log.Err("EntryCache::Start : fail to start LRU for path caching [%s]", err.Error())
return fmt.Errorf("failed to start LRU for path caching [%s]", err.Error())
}
return nil
}
// Stop : Stop the component functionality and kill all threads started
func (c *EntryCache) Stop() error {
log.Trace("EntryCache::Stop : Stopping component %s", c.Name())
err := c.pathLRU.Stop()
if err != nil {
log.Err("EntryCache::Stop : fail to stop LRU for path caching [%s]", err.Error())
}
return nil
}
// Configure : Pipeline will call this method after constructor so that you can read config and initialize yourself
//
// Return failure if any config is not valid to exit the process
func (c *EntryCache) Configure(_ bool) error {
log.Trace("EntryCache::Configure : %s", c.Name())
var readonly bool
err := config.UnmarshalKey("read-only", &readonly)
if err != nil {
log.Err("EntryCache::Configure : config error [unable to obtain read-only]")
return fmt.Errorf("config error in %s [%s]", c.Name(), err.Error())
}
if !readonly {
log.Err("EntryCache::Configure : EntryCache component should be used only in read-only mode")
return fmt.Errorf("EntryCache component should be used in only in read-only mode")
}
// >> If you do not need any config parameters remove below code and return nil
conf := EntryCacheOptions{}
err = config.UnmarshalKey(c.Name(), &conf)
if err != nil {
log.Err("EntryCache::Configure : config error [invalid config attributes]")
return fmt.Errorf("EntryCache: config error [invalid config attributes]")
}
c.cacheTimeout = defaultEntryCacheTimeout
if config.IsSet(compName + ".timeout-sec") {
c.cacheTimeout = conf.Timeout
}
c.pathLRU, err = tlru.New(1000, c.cacheTimeout, c.pathEvict, 0, nil)
if err != nil {
log.Err("EntryCache::Start : fail to create LRU for path caching [%s]", err.Error())
return fmt.Errorf("config error in %s [%s]", c.Name(), err.Error())
}
c.pathLocks = common.NewLockMap()
return nil
}
// StreamDir : Optionally cache entries of the list
func (c *EntryCache) StreamDir(options internal.StreamDirOptions) ([]*internal.ObjAttr, string, error) {
log.Trace("AttrCache::StreamDir : %s", options.Name)
pathKey := fmt.Sprintf("%s##%s", options.Name, options.Token)
flock := c.pathLocks.Get(pathKey)
flock.Lock()
defer flock.Unlock()
pathEntry, found := c.pathMap.Load(pathKey)
if !found {
log.Debug("EntryCache::StreamDir : Cache not valid, fetch new list for path: %s, token %s", options.Name, options.Token)
pathList, token, err := c.NextComponent().StreamDir(options)
if err == nil && len(pathList) > 0 {
item := pathCacheItem{
children: pathList,
nextToken: token,
}
c.pathMap.Store(pathKey, item)
c.pathLRU.Add(pathKey)
}
return pathList, token, err
} else {
log.Debug("EntryCache::StreamDir : Serving list from cache for path: %s, token %s", options.Name, options.Token)
item := pathEntry.(pathCacheItem)
return item.children, item.nextToken, nil
}
}
// pathEvict : Callback when a node from cache expires
func (c *EntryCache) pathEvict(node *list.Element) {
pathKey := node.Value.(string)
flock := c.pathLocks.Get(pathKey)
flock.Lock()
defer flock.Unlock()
log.Debug("EntryCache::pathEvict : Expiry for path %s", pathKey)
c.pathMap.Delete(pathKey)
}
// ------------------------- Factory -------------------------------------------
// Pipeline will call this method to create your object, initialize your variables here
// << DO NOT DELETE ANY AUTO GENERATED CODE HERE >>
func NewEntryCacheComponent() internal.Component {
comp := &EntryCache{}
comp.SetName(compName)
return comp
}
// On init register this component to pipeline and supply your constructor
func init() {
internal.AddComponent(compName, NewEntryCacheComponent)
entryTimeout := config.AddUint32Flag("list-cache-timeout", defaultEntryCacheTimeout, "list entry timeout")
config.BindPFlag(compName+".timeout-sec", entryTimeout)
}

Просмотреть файл

@ -0,0 +1,219 @@
/*
_____ _____ _____ ____ ______ _____ ------
| | | | | | | | | | | | |
| | | | | | | | | | | | |
| --- | | | | |-----| |---- | | |-----| |----- ------
| | | | | | | | | | | | |
| ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____
Licensed under the MIT License <http://opensource.org/licenses/MIT>.
Copyright © 2020-2024 Microsoft Corporation. All rights reserved.
Author : <blobfusedev@microsoft.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
*/
package entry_cache
import (
"context"
"fmt"
"math/rand"
"os"
"path/filepath"
"strings"
"testing"
"time"
"github.com/Azure/azure-storage-fuse/v2/common"
"github.com/Azure/azure-storage-fuse/v2/common/config"
"github.com/Azure/azure-storage-fuse/v2/common/log"
"github.com/Azure/azure-storage-fuse/v2/component/loopback"
"github.com/Azure/azure-storage-fuse/v2/internal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
var home_dir, _ = os.UserHomeDir()
type entryCacheTestSuite struct {
suite.Suite
assert *assert.Assertions
entryCache *EntryCache
loopback internal.Component
fake_storage_path string
}
func newLoopbackFS() internal.Component {
loopback := loopback.NewLoopbackFSComponent()
loopback.Configure(true)
return loopback
}
func newEntryCache(next internal.Component) *EntryCache {
entryCache := NewEntryCacheComponent()
entryCache.SetNextComponent(next)
err := entryCache.Configure(true)
if err != nil {
panic("Unable to configure entry cache.")
}
return entryCache.(*EntryCache)
}
func randomString(length int) string {
rand.Seed(time.Now().UnixNano())
b := make([]byte, length)
rand.Read(b)
return fmt.Sprintf("%x", b)[:length]
}
func (suite *entryCacheTestSuite) SetupTest() {
err := log.SetDefaultLogger("silent", common.LogConfig{Level: common.ELogLevel.LOG_DEBUG()})
if err != nil {
panic("Unable to set silent logger as default.")
}
rand := randomString(8)
suite.fake_storage_path = filepath.Join(home_dir, "fake_storage"+rand)
defaultConfig := fmt.Sprintf("read-only: true\n\nentry_cache:\n timeout-sec: 7\n\nloopbackfs:\n path: %s", suite.fake_storage_path)
log.Debug(defaultConfig)
// Delete the temp directories created
os.RemoveAll(suite.fake_storage_path)
suite.setupTestHelper(defaultConfig)
}
func (suite *entryCacheTestSuite) setupTestHelper(configuration string) {
suite.assert = assert.New(suite.T())
config.ReadConfigFromReader(strings.NewReader(configuration))
suite.loopback = newLoopbackFS()
suite.entryCache = newEntryCache(suite.loopback)
suite.loopback.Start(context.Background())
err := suite.entryCache.Start(context.Background())
if err != nil {
panic(fmt.Sprintf("Unable to start file cache [%s]", err.Error()))
}
}
func (suite *entryCacheTestSuite) cleanupTest() {
suite.loopback.Stop()
err := suite.entryCache.Stop()
if err != nil {
panic(fmt.Sprintf("Unable to stop file cache [%s]", err.Error()))
}
// Delete the temp directories created
os.RemoveAll(suite.fake_storage_path)
}
func (suite *entryCacheTestSuite) TestEmpty() {
defer suite.cleanupTest()
objs, token, err := suite.entryCache.StreamDir(internal.StreamDirOptions{Name: "", Token: ""})
suite.assert.Nil(err)
suite.assert.NotNil(objs)
suite.assert.Equal(token, "")
_, found := suite.entryCache.pathMap.Load("##")
suite.assert.False(found)
objs, token, err = suite.entryCache.StreamDir(internal.StreamDirOptions{Name: "ABCD", Token: ""})
suite.assert.NotNil(err)
suite.assert.Nil(objs)
suite.assert.Equal(token, "")
}
func (suite *entryCacheTestSuite) TestWithEntry() {
defer suite.cleanupTest()
// Create a file
filePath := filepath.Join(suite.fake_storage_path, "testfile1")
h, err := os.Create(filePath)
suite.assert.Nil(err)
suite.assert.NotNil(h)
h.Close()
objs, token, err := suite.entryCache.StreamDir(internal.StreamDirOptions{Name: "", Token: ""})
suite.assert.Nil(err)
suite.assert.NotNil(objs)
suite.assert.Equal(token, "")
cachedObjs, found := suite.entryCache.pathMap.Load("##")
suite.assert.True(found)
suite.assert.Equal(len(objs), 1)
suite.assert.Equal(objs, cachedObjs.(pathCacheItem).children)
}
func (suite *entryCacheTestSuite) TestCachedEntry() {
defer suite.cleanupTest()
// Create a file
filePath := filepath.Join(suite.fake_storage_path, "testfile1")
h, err := os.Create(filePath)
suite.assert.Nil(err)
suite.assert.NotNil(h)
h.Close()
objs, token, err := suite.entryCache.StreamDir(internal.StreamDirOptions{Name: "", Token: ""})
suite.assert.Nil(err)
suite.assert.NotNil(objs)
suite.assert.Equal(token, "")
cachedObjs, found := suite.entryCache.pathMap.Load("##")
suite.assert.True(found)
suite.assert.Equal(len(objs), 1)
suite.assert.Equal(objs, cachedObjs.(pathCacheItem).children)
filePath = filepath.Join(suite.fake_storage_path, "testfile2")
h, err = os.Create(filePath)
suite.assert.Nil(err)
suite.assert.NotNil(h)
h.Close()
objs, token, err = suite.entryCache.StreamDir(internal.StreamDirOptions{Name: "", Token: ""})
suite.assert.Nil(err)
suite.assert.NotNil(objs)
suite.assert.Equal(token, "")
suite.assert.Equal(len(objs), 1)
time.Sleep(40 * time.Second)
_, found = suite.entryCache.pathMap.Load("##")
suite.assert.False(found)
objs, token, err = suite.entryCache.StreamDir(internal.StreamDirOptions{Name: "", Token: ""})
suite.assert.Nil(err)
suite.assert.NotNil(objs)
suite.assert.Equal(token, "")
suite.assert.Equal(len(objs), 2)
}
// In order for 'go test' to run this suite, we need to create
// a normal test function and pass our suite to suite.Run
func TestEntryCacheTestSuite(t *testing.T) {
suite.Run(t, new(entryCacheTestSuite))
}

Просмотреть файл

@ -962,7 +962,13 @@ func libfuse_readlink(path *C.char, buf *C.char, size C.size_t) C.int {
name = common.NormalizeObjectName(name)
//log.Trace("Libfuse::libfuse_readlink : Received for %s", name)
targetPath, err := fuseFS.NextComponent().ReadLink(internal.ReadLinkOptions{Name: name})
linkSize := int64(0)
attr, err := fuseFS.NextComponent().GetAttr(internal.GetAttrOptions{Name: name})
if err == nil && attr != nil {
linkSize = attr.Size
}
targetPath, err := fuseFS.NextComponent().ReadLink(internal.ReadLinkOptions{Name: name, Size: linkSize})
if err != nil {
log.Err("Libfuse::libfuse2_readlink : error reading link file %s [%s]", name, err.Error())
if os.IsNotExist(err) {

Просмотреть файл

@ -491,6 +491,9 @@ func testReadLink(suite *libfuseTestSuite) {
defer C.free(unsafe.Pointer(path))
options := internal.ReadLinkOptions{Name: name}
suite.mock.EXPECT().ReadLink(options).Return("target", nil)
attr := &internal.ObjAttr{}
getAttrOpt := internal.GetAttrOptions{Name: name}
suite.mock.EXPECT().GetAttr(getAttrOpt).Return(attr, nil)
// https://stackoverflow.com/questions/41953619/how-to-initialise-empty-c-cstring-in-cgo
buf := C.CString("")
@ -506,6 +509,9 @@ func testReadLinkNotExists(suite *libfuseTestSuite) {
defer C.free(unsafe.Pointer(path))
options := internal.ReadLinkOptions{Name: name}
suite.mock.EXPECT().ReadLink(options).Return("", syscall.ENOENT)
attr := &internal.ObjAttr{}
getAttrOpt := internal.GetAttrOptions{Name: name}
suite.mock.EXPECT().GetAttr(getAttrOpt).Return(attr, nil)
buf := C.CString("")
err := libfuse_readlink(path, buf, 7)
@ -520,6 +526,8 @@ func testReadLinkError(suite *libfuseTestSuite) {
defer C.free(unsafe.Pointer(path))
options := internal.ReadLinkOptions{Name: name}
suite.mock.EXPECT().ReadLink(options).Return("", errors.New("failed to read link"))
getAttrOpt := internal.GetAttrOptions{Name: name}
suite.mock.EXPECT().GetAttr(getAttrOpt).Return(nil, nil)
buf := C.CString("")
err := libfuse_readlink(path, buf, 7)

Просмотреть файл

@ -1031,7 +1031,13 @@ func libfuse_readlink(path *C.char, buf *C.char, size C.size_t) C.int {
name = common.NormalizeObjectName(name)
//log.Trace("Libfuse::libfuse_readlink : Received for %s", name)
targetPath, err := fuseFS.NextComponent().ReadLink(internal.ReadLinkOptions{Name: name})
linkSize := int64(0)
attr, err := fuseFS.NextComponent().GetAttr(internal.GetAttrOptions{Name: name})
if err == nil && attr != nil {
linkSize = attr.Size
}
targetPath, err := fuseFS.NextComponent().ReadLink(internal.ReadLinkOptions{Name: name, Size: linkSize})
if err != nil {
log.Err("Libfuse::libfuse_readlink : error reading link file %s [%s]", name, err.Error())
if os.IsNotExist(err) {

Просмотреть файл

@ -469,6 +469,9 @@ func testReadLink(suite *libfuseTestSuite) {
defer C.free(unsafe.Pointer(path))
options := internal.ReadLinkOptions{Name: name}
suite.mock.EXPECT().ReadLink(options).Return("target", nil)
attr := &internal.ObjAttr{}
getAttrOpt := internal.GetAttrOptions{Name: name}
suite.mock.EXPECT().GetAttr(getAttrOpt).Return(attr, nil)
// https://stackoverflow.com/questions/41953619/how-to-initialise-empty-c-cstring-in-cgo
buf := C.CString("")
@ -484,6 +487,9 @@ func testReadLinkNotExists(suite *libfuseTestSuite) {
defer C.free(unsafe.Pointer(path))
options := internal.ReadLinkOptions{Name: name}
suite.mock.EXPECT().ReadLink(options).Return("", syscall.ENOENT)
attr := &internal.ObjAttr{}
getAttrOpt := internal.GetAttrOptions{Name: name}
suite.mock.EXPECT().GetAttr(getAttrOpt).Return(attr, nil)
buf := C.CString("")
err := libfuse_readlink(path, buf, 7)
@ -498,6 +504,8 @@ func testReadLinkError(suite *libfuseTestSuite) {
defer C.free(unsafe.Pointer(path))
options := internal.ReadLinkOptions{Name: name}
suite.mock.EXPECT().ReadLink(options).Return("", errors.New("failed to read link"))
getAttrOpt := internal.GetAttrOptions{Name: name}
suite.mock.EXPECT().GetAttr(getAttrOpt).Return(nil, nil)
buf := C.CString("")
err := libfuse_readlink(path, buf, 7)

Просмотреть файл

@ -148,7 +148,6 @@ func (lfs *LoopbackFS) ReadDir(options internal.ReadDirOptions) ([]*internal.Obj
Mode: info.Mode(),
Mtime: info.ModTime(),
}
attr.Flags.Set(internal.PropFlagMetadataRetrieved)
attr.Flags.Set(internal.PropFlagModeDefault)
if file.IsDir() {
@ -186,7 +185,6 @@ func (lfs *LoopbackFS) StreamDir(options internal.StreamDirOptions) ([]*internal
Mode: info.Mode(),
Mtime: info.ModTime(),
}
attr.Flags.Set(internal.PropFlagMetadataRetrieved)
attr.Flags.Set(internal.PropFlagModeDefault)
if file.IsDir() {
@ -436,7 +434,6 @@ func (lfs *LoopbackFS) GetAttr(options internal.GetAttrOptions) (*internal.ObjAt
Mode: info.Mode(),
Mtime: info.ModTime(),
}
attr.Flags.Set(internal.PropFlagMetadataRetrieved)
attr.Flags.Set(internal.PropFlagModeDefault)
if info.Mode()&os.ModeSymlink != 0 {

Просмотреть файл

@ -64,7 +64,6 @@ const (
PropFlagIsDir
PropFlagEmptyDir
PropFlagSymlink
PropFlagMetadataRetrieved
PropFlagModeDefault // TODO: Does this sound better as ModeDefault or DefaultMode? The getter would be IsModeDefault or IsDefaultMode
)
@ -93,12 +92,6 @@ func (attr *ObjAttr) IsSymlink() bool {
return attr.Flags.IsSet(PropFlagSymlink)
}
// IsMetadataRetrieved : Whether or not metadata has been retrieved for this path.
// Datalake list paths does not support returning x-ms-properties (metadata), so we cannot be sure if the path is a symlink or not.
func (attr *ObjAttr) IsMetadataRetrieved() bool {
return attr.Flags.IsSet(PropFlagMetadataRetrieved)
}
// IsModeDefault : Whether or not to use the default mode.
// This is set in any storage service that does not support chmod/chown.
func (attr *ObjAttr) IsModeDefault() bool {

Просмотреть файл

@ -9,7 +9,7 @@
Licensed under the MIT License <http://opensource.org/licenses/MIT>.
Copyright © 2020-2023 Microsoft Corporation. All rights reserved.
Copyright © 2020-2024 Microsoft Corporation. All rights reserved.
Author : <blobfusedev@microsoft.com>
Permission is hereby granted, free of charge, to any person obtaining a copy

Просмотреть файл

@ -167,6 +167,7 @@ type CreateLinkOptions struct {
type ReadLinkOptions struct {
Name string
Size int64
}
type GetAttrOptions struct {

Просмотреть файл

@ -51,6 +51,7 @@ logging:
# Pipeline configuration. Choose components to be engaged. The order below is the priority order that needs to be followed.
components:
- libfuse
- entry_cache
- block_cache
- file_cache
- attr_cache
@ -67,6 +68,10 @@ libfuse:
extension: <physical path to extension library>
direct-io: true|false <enable to bypass the kernel cache>
# Entry Cache configuration
entry_cache:
timeout-sec: <cache eviction timeout (in sec). Default - 30 sec>
# Block cache related configuration
block_cache:
block-size-mb: <size of each block to be cached in memory (in MB). Default - 16 MB>