Preserve metadata when reuploading files (#739)
* Added metadata to handle to attempt to preserve metadata. WIP * Added metadata preservation feature to attribute cache * Removed test from file test * Fixed unit tests * Added code to ensure metadata is retreived * metadata preservation Co-authored-by: Gauri Prasad <gapra@microsoft.com>
This commit is contained in:
Родитель
15b68d598a
Коммит
ece81e7c8d
|
@ -40,6 +40,7 @@ import (
|
||||||
"blobfuse2/internal/handlemap"
|
"blobfuse2/internal/handlemap"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
@ -349,6 +350,19 @@ func (ac *AttrCache) RenameFile(options internal.RenameFileOptions) error {
|
||||||
|
|
||||||
// WriteFile : Mark the file invalid
|
// WriteFile : Mark the file invalid
|
||||||
func (ac *AttrCache) WriteFile(options internal.WriteFileOptions) (int, error) {
|
func (ac *AttrCache) WriteFile(options internal.WriteFileOptions) (int, error) {
|
||||||
|
|
||||||
|
// GetAttr on cache hit will serve from cache, on cache miss will serve from next component.
|
||||||
|
attr, err := ac.GetAttr(internal.GetAttrOptions{Name: options.Handle.Path, RetrieveMetadata: true})
|
||||||
|
if err != nil {
|
||||||
|
// Ignore not exists errors - this can happen if createEmptyFile is set to false
|
||||||
|
if !(os.IsNotExist(err) || err == syscall.ENOENT) {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if attr != nil {
|
||||||
|
options.Metadata = attr.Metadata
|
||||||
|
}
|
||||||
|
|
||||||
size, err := ac.NextComponent().WriteFile(options)
|
size, err := ac.NextComponent().WriteFile(options)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
ac.cacheLock.RLock()
|
ac.cacheLock.RLock()
|
||||||
|
@ -381,7 +395,19 @@ func (ac *AttrCache) TruncateFile(options internal.TruncateFileOptions) error {
|
||||||
func (ac *AttrCache) CopyFromFile(options internal.CopyFromFileOptions) error {
|
func (ac *AttrCache) CopyFromFile(options internal.CopyFromFileOptions) error {
|
||||||
log.Trace("AttrCache::CopyFromFile : %s", options.Name)
|
log.Trace("AttrCache::CopyFromFile : %s", options.Name)
|
||||||
|
|
||||||
err := ac.NextComponent().CopyFromFile(options)
|
// GetAttr on cache hit will serve from cache, on cache miss will serve from next component.
|
||||||
|
attr, err := ac.GetAttr(internal.GetAttrOptions{Name: options.Name, RetrieveMetadata: true})
|
||||||
|
if err != nil {
|
||||||
|
// Ignore not exists errors - this can happen if createEmptyFile is set to false
|
||||||
|
if !(os.IsNotExist(err) || err == syscall.ENOENT) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if attr != nil {
|
||||||
|
options.Metadata = attr.Metadata
|
||||||
|
}
|
||||||
|
|
||||||
|
err = ac.NextComponent().CopyFromFile(options)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
ac.cacheLock.RLock()
|
ac.cacheLock.RLock()
|
||||||
defer ac.cacheLock.RUnlock()
|
defer ac.cacheLock.RUnlock()
|
||||||
|
@ -434,7 +460,8 @@ func (ac *AttrCache) GetAttr(options internal.GetAttrOptions) (*internal.ObjAttr
|
||||||
} else {
|
} else {
|
||||||
// IsMetadataRetrieved is false in the case of ADLS List since the API does not support metadata.
|
// IsMetadataRetrieved is false in the case of ADLS List since the API does not support metadata.
|
||||||
// Once migration of ADLS list to blob endpoint is done (in future service versions), we can remove this.
|
// Once migration of ADLS list to blob endpoint is done (in future service versions), we can remove this.
|
||||||
if value.getAttr().IsMetadataRetrieved() || ac.noSymlinks {
|
// options.RetrieveMetadata is set by CopyFromFile and WriteFile which need metadata to ensure it is preserved.
|
||||||
|
if value.getAttr().IsMetadataRetrieved() || (ac.noSymlinks && !options.RetrieveMetadata) {
|
||||||
// path exists and we have all the metadata required or we do not care about metadata
|
// path exists and we have all the metadata required or we do not care about metadata
|
||||||
log.Debug("AttrCache::GetAttr : %s served from cache", options.Name)
|
log.Debug("AttrCache::GetAttr : %s served from cache", options.Name)
|
||||||
return value.getAttr(), nil
|
return value.getAttr(), nil
|
||||||
|
|
|
@ -92,7 +92,7 @@ func getPathAttr(path string, size int64, mode os.FileMode, metadata bool) *inte
|
||||||
Ctime: time.Now(),
|
Ctime: time.Now(),
|
||||||
Crtime: time.Now(),
|
Crtime: time.Now(),
|
||||||
Flags: flags,
|
Flags: flags,
|
||||||
Metadata: make(map[string]string),
|
Metadata: nil,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -701,35 +701,55 @@ func (suite *attrCacheTestSuite) TestRenameFile() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tests Write File
|
// Tests Write File
|
||||||
func (suite *attrCacheTestSuite) TestWriteFile() {
|
func (suite *attrCacheTestSuite) TestWriteFileError() {
|
||||||
defer suite.cleanupTest()
|
defer suite.cleanupTest()
|
||||||
path := "a"
|
path := "a"
|
||||||
handle := handlemap.Handle{
|
handle := handlemap.Handle{
|
||||||
Path: path,
|
Path: path,
|
||||||
}
|
}
|
||||||
|
|
||||||
options := internal.WriteFileOptions{Handle: &handle}
|
options := internal.WriteFileOptions{Handle: &handle, Metadata: nil}
|
||||||
|
|
||||||
// Error
|
// Error
|
||||||
|
suite.mock.EXPECT().GetAttr(internal.GetAttrOptions{Name: path, RetrieveMetadata: true}).Return(nil, nil)
|
||||||
suite.mock.EXPECT().WriteFile(options).Return(0, errors.New("Failed to write a file"))
|
suite.mock.EXPECT().WriteFile(options).Return(0, errors.New("Failed to write a file"))
|
||||||
|
|
||||||
_, err := suite.attrCache.WriteFile(options)
|
_, err := suite.attrCache.WriteFile(options)
|
||||||
suite.assert.NotNil(err)
|
suite.assert.NotNil(err)
|
||||||
suite.assert.NotContains(suite.attrCache.cacheMap, path)
|
suite.assert.Contains(suite.attrCache.cacheMap, path) // GetAttr call will add this to the cache
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *attrCacheTestSuite) TestWriteFileDoesNotExist() {
|
||||||
|
defer suite.cleanupTest()
|
||||||
|
path := "a"
|
||||||
|
handle := handlemap.Handle{
|
||||||
|
Path: path,
|
||||||
|
}
|
||||||
|
|
||||||
|
options := internal.WriteFileOptions{Handle: &handle, Metadata: nil}
|
||||||
// Success
|
// Success
|
||||||
// Entry Does Not Already Exist
|
// Entry Does Not Already Exist
|
||||||
|
suite.mock.EXPECT().GetAttr(internal.GetAttrOptions{Name: path, RetrieveMetadata: true}).Return(nil, nil)
|
||||||
suite.mock.EXPECT().WriteFile(options).Return(0, nil)
|
suite.mock.EXPECT().WriteFile(options).Return(0, nil)
|
||||||
|
|
||||||
_, err = suite.attrCache.WriteFile(options)
|
_, err := suite.attrCache.WriteFile(options)
|
||||||
suite.assert.Nil(err)
|
suite.assert.Nil(err)
|
||||||
suite.assert.NotContains(suite.attrCache.cacheMap, path)
|
suite.assert.Contains(suite.attrCache.cacheMap, path) // GetAttr call will add this to the cache
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *attrCacheTestSuite) TestWriteFileExists() {
|
||||||
|
defer suite.cleanupTest()
|
||||||
|
path := "a"
|
||||||
|
handle := handlemap.Handle{
|
||||||
|
Path: path,
|
||||||
|
}
|
||||||
|
|
||||||
|
options := internal.WriteFileOptions{Handle: &handle, Metadata: nil}
|
||||||
// Entry Already Exists
|
// Entry Already Exists
|
||||||
addPathToCache(suite.assert, suite.attrCache, path, false)
|
addPathToCache(suite.assert, suite.attrCache, path, true)
|
||||||
suite.mock.EXPECT().WriteFile(options).Return(0, nil)
|
suite.mock.EXPECT().WriteFile(options).Return(0, nil)
|
||||||
|
|
||||||
_, err = suite.attrCache.WriteFile(options)
|
_, err := suite.attrCache.WriteFile(options)
|
||||||
suite.assert.Nil(err)
|
suite.assert.Nil(err)
|
||||||
assertInvalid(suite, path)
|
assertInvalid(suite, path)
|
||||||
}
|
}
|
||||||
|
@ -772,32 +792,45 @@ func (suite *attrCacheTestSuite) TestTruncateFile() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tests CopyFromFile
|
// Tests CopyFromFile
|
||||||
func (suite *attrCacheTestSuite) TestCopyFromFile() {
|
func (suite *attrCacheTestSuite) TestCopyFromFileError() {
|
||||||
defer suite.cleanupTest()
|
defer suite.cleanupTest()
|
||||||
path := "a"
|
path := "a"
|
||||||
|
|
||||||
options := internal.CopyFromFileOptions{Name: path, File: nil}
|
options := internal.CopyFromFileOptions{Name: path, File: nil, Metadata: nil}
|
||||||
|
suite.mock.EXPECT().GetAttr(internal.GetAttrOptions{Name: path, RetrieveMetadata: true}).Return(nil, nil)
|
||||||
// Error
|
// Error
|
||||||
suite.mock.EXPECT().CopyFromFile(options).Return(errors.New("Failed to copy from file"))
|
suite.mock.EXPECT().CopyFromFile(options).Return(errors.New("Failed to copy from file"))
|
||||||
|
|
||||||
err := suite.attrCache.CopyFromFile(options)
|
err := suite.attrCache.CopyFromFile(options)
|
||||||
suite.assert.NotNil(err)
|
suite.assert.NotNil(err)
|
||||||
suite.assert.NotContains(suite.attrCache.cacheMap, path)
|
suite.assert.Contains(suite.attrCache.cacheMap, path) // GetAttr call will add this to the cache
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *attrCacheTestSuite) TestCopyFromFileDoesNotExist() {
|
||||||
|
defer suite.cleanupTest()
|
||||||
|
path := "a"
|
||||||
|
|
||||||
|
options := internal.CopyFromFileOptions{Name: path, File: nil, Metadata: nil}
|
||||||
// Success
|
// Success
|
||||||
// Entry Does Not Already Exist
|
// Entry Does Not Already Exist
|
||||||
|
suite.mock.EXPECT().GetAttr(internal.GetAttrOptions{Name: path, RetrieveMetadata: true}).Return(nil, nil)
|
||||||
suite.mock.EXPECT().CopyFromFile(options).Return(nil)
|
suite.mock.EXPECT().CopyFromFile(options).Return(nil)
|
||||||
|
|
||||||
err = suite.attrCache.CopyFromFile(options)
|
err := suite.attrCache.CopyFromFile(options)
|
||||||
suite.assert.Nil(err)
|
suite.assert.Nil(err)
|
||||||
suite.assert.NotContains(suite.attrCache.cacheMap, path)
|
suite.assert.Contains(suite.attrCache.cacheMap, path) // GetAttr call will add this to the cache
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *attrCacheTestSuite) TestCopyFromFileExists() {
|
||||||
|
defer suite.cleanupTest()
|
||||||
|
path := "a"
|
||||||
|
|
||||||
|
options := internal.CopyFromFileOptions{Name: path, File: nil, Metadata: nil}
|
||||||
// Entry Already Exists
|
// Entry Already Exists
|
||||||
addPathToCache(suite.assert, suite.attrCache, path, false)
|
addPathToCache(suite.assert, suite.attrCache, path, true)
|
||||||
suite.mock.EXPECT().CopyFromFile(options).Return(nil)
|
suite.mock.EXPECT().CopyFromFile(options).Return(nil)
|
||||||
|
|
||||||
err = suite.attrCache.CopyFromFile(options)
|
err := suite.attrCache.CopyFromFile(options)
|
||||||
suite.assert.Nil(err)
|
suite.assert.Nil(err)
|
||||||
assertInvalid(suite, path)
|
assertInvalid(suite, path)
|
||||||
}
|
}
|
||||||
|
|
|
@ -352,7 +352,7 @@ func (az *AzStorage) ReadInBuffer(options internal.ReadInBufferOptions) (length
|
||||||
}
|
}
|
||||||
|
|
||||||
func (az *AzStorage) WriteFile(options internal.WriteFileOptions) (int, error) {
|
func (az *AzStorage) WriteFile(options internal.WriteFileOptions) (int, error) {
|
||||||
err := az.storage.Write(options.Handle.Path, options.Offset, int64(len(options.Data)), options.Data, options.FileOffsets, options.ModBlockList)
|
err := az.storage.Write(options)
|
||||||
return len(options.Data), err
|
return len(options.Data), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -387,7 +387,7 @@ func (az *AzStorage) TruncateFile(options internal.TruncateFileOptions) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = az.storage.WriteFromBuffer(options.Name, nil, data)
|
err = az.storage.WriteFromBuffer(options.Name, attr.Metadata, data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Err("AzStorage::TruncateFile : Failed to update file")
|
log.Err("AzStorage::TruncateFile : Failed to update file")
|
||||||
return err
|
return err
|
||||||
|
@ -403,7 +403,7 @@ func (az *AzStorage) CopyToFile(options internal.CopyToFileOptions) error {
|
||||||
|
|
||||||
func (az *AzStorage) CopyFromFile(options internal.CopyFromFileOptions) error {
|
func (az *AzStorage) CopyFromFile(options internal.CopyFromFileOptions) error {
|
||||||
log.Trace("AzStorage::CopyFromFile : Upload file %s", options.Name)
|
log.Trace("AzStorage::CopyFromFile : Upload file %s", options.Name)
|
||||||
return az.storage.WriteFromFile(options.Name, nil, options.File)
|
return az.storage.WriteFromFile(options.Name, options.Metadata, options.File)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Symlink operations
|
// Symlink operations
|
||||||
|
|
|
@ -667,7 +667,7 @@ func (bb *BlockBlob) GetFileBlockOffsets(name string) (*common.BlockOffsetList,
|
||||||
log.Err("BlockBlob::GetFileBlockOffsets : Failed to get block list %s ", name, err.Error())
|
log.Err("BlockBlob::GetFileBlockOffsets : Failed to get block list %s ", name, err.Error())
|
||||||
return &common.BlockOffsetList{}, err
|
return &common.BlockOffsetList{}, err
|
||||||
}
|
}
|
||||||
for _, block := range *&storageBlockList.CommittedBlocks {
|
for _, block := range storageBlockList.CommittedBlocks {
|
||||||
blk := &common.Block{
|
blk := &common.Block{
|
||||||
Id: block.Name,
|
Id: block.Name,
|
||||||
StartIndex: int64(blockOffset),
|
StartIndex: int64(blockOffset),
|
||||||
|
@ -709,12 +709,15 @@ func (bb *BlockBlob) createNewBlocks(blockList *common.BlockOffsetList, offset,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write : write data at given offset to a blob
|
// Write : write data at given offset to a blob
|
||||||
func (bb *BlockBlob) Write(name string, offset, length int64, data []byte, fileOffsets, modFileOffsets *common.BlockOffsetList) error {
|
func (bb *BlockBlob) Write(options internal.WriteFileOptions) error {
|
||||||
defer log.TimeTrack(time.Now(), "BlockBlob::Write", name)
|
name := options.Handle.Path
|
||||||
|
offset := options.Offset
|
||||||
|
defer log.TimeTrack(time.Now(), "BlockBlob::Write", options.Handle.Path)
|
||||||
log.Trace("BlockBlob::Write : name %s offset %v", name, offset)
|
log.Trace("BlockBlob::Write : name %s offset %v", name, offset)
|
||||||
// tracks the case where our offset is great than our current file size (appending only - not modifying pre-existing data)
|
// tracks the case where our offset is great than our current file size (appending only - not modifying pre-existing data)
|
||||||
var dataBuffer *[]byte
|
var dataBuffer *[]byte
|
||||||
|
|
||||||
|
fileOffsets := options.FileOffsets
|
||||||
// when the file offset mapping is cached we don't need to make a get block list call
|
// when the file offset mapping is cached we don't need to make a get block list call
|
||||||
if fileOffsets != nil && !fileOffsets.Cached {
|
if fileOffsets != nil && !fileOffsets.Cached {
|
||||||
var err error
|
var err error
|
||||||
|
@ -724,6 +727,8 @@ func (bb *BlockBlob) Write(name string, offset, length int64, data []byte, fileO
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
length := int64(len(options.Data))
|
||||||
|
data := options.Data
|
||||||
// case 1: file consists of no blocks (small file)
|
// case 1: file consists of no blocks (small file)
|
||||||
if fileOffsets != nil && len(fileOffsets.BlockList) == 0 {
|
if fileOffsets != nil && len(fileOffsets.BlockList) == 0 {
|
||||||
// get all the data
|
// get all the data
|
||||||
|
@ -753,7 +758,7 @@ func (bb *BlockBlob) Write(name string, offset, length int64, data []byte, fileO
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// WriteFromBuffer should be able to handle the case where now the block is too big and gets split into multiple blocks
|
// WriteFromBuffer should be able to handle the case where now the block is too big and gets split into multiple blocks
|
||||||
err := bb.WriteFromBuffer(name, nil, *dataBuffer)
|
err := bb.WriteFromBuffer(name, options.Metadata, *dataBuffer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Err("BlockBlob::Write : Failed to upload to blob %s ", name, err.Error())
|
log.Err("BlockBlob::Write : Failed to upload to blob %s ", name, err.Error())
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -112,7 +112,7 @@ type AzConnection interface {
|
||||||
|
|
||||||
WriteFromFile(name string, metadata map[string]string, fi *os.File) error
|
WriteFromFile(name string, metadata map[string]string, fi *os.File) error
|
||||||
WriteFromBuffer(name string, metadata map[string]string, data []byte) error
|
WriteFromBuffer(name string, metadata map[string]string, data []byte) error
|
||||||
Write(name string, offset int64, len int64, data []byte, fileOffsets, modBlockList *common.BlockOffsetList) error
|
Write(options internal.WriteFileOptions) error
|
||||||
GetFileBlockOffsets(name string) (*common.BlockOffsetList, error)
|
GetFileBlockOffsets(name string) (*common.BlockOffsetList, error)
|
||||||
|
|
||||||
ChangeMod(string, os.FileMode) error
|
ChangeMod(string, os.FileMode) error
|
||||||
|
|
|
@ -501,8 +501,8 @@ func (dl *Datalake) WriteFromBuffer(name string, metadata map[string]string, dat
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write : Write to a file at given offset
|
// Write : Write to a file at given offset
|
||||||
func (dl *Datalake) Write(name string, offset int64, len int64, data []byte, fileOffsets, modBlockList *common.BlockOffsetList) error {
|
func (dl *Datalake) Write(options internal.WriteFileOptions) error {
|
||||||
return dl.BlockBlob.Write(name, offset, len, data, fileOffsets, modBlockList)
|
return dl.BlockBlob.Write(options)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dl *Datalake) GetFileBlockOffsets(name string) (*common.BlockOffsetList, error) {
|
func (dl *Datalake) GetFileBlockOffsets(name string) (*common.BlockOffsetList, error) {
|
||||||
|
|
|
@ -606,6 +606,7 @@ func libfuse_write(path *C.char, buf *C.char, size C.size_t, off C.off_t, fi *C.
|
||||||
Data: data[:size],
|
Data: data[:size],
|
||||||
FileOffsets: &common.BlockOffsetList{},
|
FileOffsets: &common.BlockOffsetList{},
|
||||||
ModBlockList: &common.BlockOffsetList{},
|
ModBlockList: &common.BlockOffsetList{},
|
||||||
|
Metadata: nil,
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -630,6 +630,7 @@ func libfuse_write(path *C.char, buf *C.char, size C.size_t, off C.off_t, fi *C.
|
||||||
Data: data[:size],
|
Data: data[:size],
|
||||||
FileOffsets: &common.BlockOffsetList{},
|
FileOffsets: &common.BlockOffsetList{},
|
||||||
ModBlockList: &common.BlockOffsetList{},
|
ModBlockList: &common.BlockOffsetList{},
|
||||||
|
Metadata: nil,
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -317,9 +317,6 @@ func (st *Stream) ReadInBuffer(options internal.ReadInBufferOptions) (int, error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (st *Stream) WriteFile(options internal.WriteFileOptions) (int, error) {
|
func (st *Stream) WriteFile(options internal.WriteFileOptions) (int, error) {
|
||||||
// if len(options.FileOffsets) == 0 {
|
|
||||||
|
|
||||||
// }
|
|
||||||
return st.NextComponent().WriteFile(options)
|
return st.NextComponent().WriteFile(options)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -116,6 +116,7 @@ type WriteFileOptions struct {
|
||||||
Data []byte
|
Data []byte
|
||||||
FileOffsets *common.BlockOffsetList
|
FileOffsets *common.BlockOffsetList
|
||||||
ModBlockList *common.BlockOffsetList
|
ModBlockList *common.BlockOffsetList
|
||||||
|
Metadata map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
type GetFileBlockOffsetsOptions struct {
|
type GetFileBlockOffsetsOptions struct {
|
||||||
|
@ -137,6 +138,7 @@ type CopyToFileOptions struct {
|
||||||
type CopyFromFileOptions struct {
|
type CopyFromFileOptions struct {
|
||||||
Name string
|
Name string
|
||||||
File *os.File
|
File *os.File
|
||||||
|
Metadata map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
type FlushFileOptions struct {
|
type FlushFileOptions struct {
|
||||||
|
@ -170,6 +172,7 @@ type ReadLinkOptions struct {
|
||||||
|
|
||||||
type GetAttrOptions struct {
|
type GetAttrOptions struct {
|
||||||
Name string
|
Name string
|
||||||
|
RetrieveMetadata bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type SetAttrOptions struct {
|
type SetAttrOptions struct {
|
||||||
|
|
Загрузка…
Ссылка в новой задаче