Родитель
5728405152
Коммит
ba739b6206
|
@ -1,5 +1,6 @@
|
|||
## 2.3.3 (Unreleased)
|
||||
## 2.4.0 (Unreleased)
|
||||
**Bug Fixes**
|
||||
- [#1426](https://github.com/Azure/azure-storage-fuse/issues/1426) Read panic in block-cache due to boundary conditions.
|
||||
|
||||
## 2.3.2 (2024-09-03)
|
||||
**Bug Fixes**
|
||||
|
|
|
@ -149,6 +149,10 @@ steps:
|
|||
VERBOSE_LOG: ${{ parameters.verbose_log }}
|
||||
continueOnError: false
|
||||
|
||||
- script: |
|
||||
if [ -d "block_cache" ]; then ls -l block_cache; rm -rf block_cache; mkdir block_cache ; fi
|
||||
displayName: 'Clear Temp Cache for Block Cache before mounting'
|
||||
|
||||
- template: 'mount.yml'
|
||||
parameters:
|
||||
working_dir: $(WORK_DIR)
|
||||
|
|
|
@ -61,13 +61,12 @@ const (
|
|||
|
||||
// Block is a memory mapped buffer with its state to hold data
|
||||
type Block struct {
|
||||
offset uint64 // Start offset of the data this block holds
|
||||
id int64 // Id of the block i.e. (offset / block size)
|
||||
endIndex uint64 // Length of the data this block holds
|
||||
state chan int // Channel depicting data has been read for this block or not
|
||||
flags common.BitMap16 // Various states of the block
|
||||
data []byte // Data read from blob
|
||||
node *list.Element // node representation of this block in the list inside handle
|
||||
offset uint64 // Start offset of the data this block holds
|
||||
id int64 // Id of the block i.e. (offset / block size)
|
||||
state chan int // Channel depicting data has been read for this block or not
|
||||
flags common.BitMap16 // Various states of the block
|
||||
data []byte // Data read from blob
|
||||
node *list.Element // node representation of this block in the list inside handle
|
||||
}
|
||||
|
||||
type blockInfo struct {
|
||||
|
@ -123,7 +122,6 @@ func (b *Block) Delete() error {
|
|||
func (b *Block) ReUse() {
|
||||
b.id = -1
|
||||
b.offset = 0
|
||||
b.endIndex = 0
|
||||
b.flags.Reset()
|
||||
b.flags.Set(BlockFlagFresh)
|
||||
b.state = make(chan int, 1)
|
||||
|
|
|
@ -333,6 +333,7 @@ func (bc *BlockCache) OpenFile(options internal.OpenFileOptions) (*handlemap.Han
|
|||
handle.Mtime = attr.Mtime
|
||||
handle.Size = attr.Size
|
||||
|
||||
log.Debug("BlockCache::OpenFile : Size of file handle.Size %v", handle.Size)
|
||||
bc.prepareHandleForBlockCache(handle)
|
||||
|
||||
if options.Flags&os.O_TRUNC != 0 || (options.Flags&os.O_WRONLY != 0 && options.Flags&os.O_APPEND == 0) {
|
||||
|
@ -496,6 +497,10 @@ func (bc *BlockCache) closeFileInternal(options internal.CloseFileOptions) error
|
|||
return nil
|
||||
}
|
||||
|
||||
func (bc *BlockCache) getBlockSize(fileSize uint64, block *Block) uint64 {
|
||||
return min(bc.blockSize, fileSize-block.offset)
|
||||
}
|
||||
|
||||
// ReadInBuffer: Read the file into a buffer
|
||||
func (bc *BlockCache) ReadInBuffer(options internal.ReadInBufferOptions) (int, error) {
|
||||
if options.Offset >= options.Handle.Size {
|
||||
|
@ -522,7 +527,9 @@ func (bc *BlockCache) ReadInBuffer(options internal.ReadInBufferOptions) (int, e
|
|||
|
||||
// Copy data from this block to user buffer
|
||||
readOffset := uint64(options.Offset) - block.offset
|
||||
bytesRead := copy(options.Data[dataRead:], block.data[readOffset:(block.endIndex-block.offset)])
|
||||
blockSize := bc.getBlockSize(uint64(options.Handle.Size), block)
|
||||
|
||||
bytesRead := copy(options.Data[dataRead:], block.data[readOffset:blockSize])
|
||||
|
||||
// Move offset forward in case we need to copy more data
|
||||
options.Offset += int64(bytesRead)
|
||||
|
@ -913,25 +920,31 @@ func (bc *BlockCache) download(item *workItem) {
|
|||
log.Err("BlockCache::download : Failed to open file %s [%s]", fileName, err.Error())
|
||||
_ = os.Remove(localPath)
|
||||
} else {
|
||||
var successfulRead bool = true
|
||||
n, err := f.Read(item.block.data)
|
||||
if err != nil {
|
||||
log.Err("BlockCache::download : Failed to read data from disk cache %s [%s]", fileName, err.Error())
|
||||
f.Close()
|
||||
successfulRead = false
|
||||
_ = os.Remove(localPath)
|
||||
}
|
||||
|
||||
if n != int(bc.blockSize) && item.block.offset+uint64(n) != uint64(item.handle.Size) {
|
||||
log.Err("BlockCache::download : Local data retrieved from disk size mismatch, Expected %v, OnDisk %v, fileSize %v", bc.getBlockSize(uint64(item.handle.Size), item.block), n, item.handle.Size)
|
||||
successfulRead = false
|
||||
_ = os.Remove(localPath)
|
||||
}
|
||||
|
||||
f.Close()
|
||||
// We have read the data from disk so there is no need to go over network
|
||||
// Just mark the block that download is complete
|
||||
|
||||
item.block.endIndex = item.block.offset + uint64(n)
|
||||
item.block.Ready(BlockStatusDownloaded)
|
||||
return
|
||||
if successfulRead {
|
||||
item.block.Ready(BlockStatusDownloaded)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
item.block.endIndex = item.block.offset
|
||||
// If file does not exists then download the block from the container
|
||||
n, err := bc.NextComponent().ReadInBuffer(internal.ReadInBufferOptions{
|
||||
Handle: item.handle,
|
||||
|
@ -961,8 +974,6 @@ func (bc *BlockCache) download(item *workItem) {
|
|||
return
|
||||
}
|
||||
|
||||
item.block.endIndex = item.block.offset + uint64(n)
|
||||
|
||||
if bc.tmpPath != "" {
|
||||
err := os.MkdirAll(filepath.Dir(localPath), 0777)
|
||||
if err != nil {
|
||||
|
@ -1021,10 +1032,6 @@ func (bc *BlockCache) WriteFile(options internal.WriteFileOptions) (int, error)
|
|||
options.Offset += int64(bytesWritten)
|
||||
dataWritten += bytesWritten
|
||||
|
||||
if block.endIndex < uint64(options.Offset) {
|
||||
block.endIndex = uint64(options.Offset)
|
||||
}
|
||||
|
||||
if options.Handle.Size < options.Offset {
|
||||
options.Handle.Size = options.Offset
|
||||
}
|
||||
|
@ -1247,28 +1254,15 @@ func shouldCommitAndDownload(blockID int64, handle *handlemap.Handle) (bool, boo
|
|||
|
||||
// lineupUpload : Create a work item and schedule the upload
|
||||
func (bc *BlockCache) lineupUpload(handle *handlemap.Handle, block *Block, listMap map[int64]*blockInfo) {
|
||||
// if a block has data less than block size and is not the last block,
|
||||
// add null at the end and upload the full block
|
||||
// bc.printCooking(handle)
|
||||
if block.endIndex < uint64(handle.Size) {
|
||||
log.Debug("BlockCache::lineupUpload : Appending null for block %v, size %v for %v=>%s", block.id, (block.endIndex - block.offset), handle.ID, handle.Path)
|
||||
block.endIndex = block.offset + bc.blockSize
|
||||
} else if block.endIndex == uint64(handle.Size) {
|
||||
// TODO: random write scenario where this block is not the last block
|
||||
log.Debug("BlockCache::lineupUpload : Last block %v, size %v for %v=>%s", block.id, (block.endIndex - block.offset), handle.ID, handle.Path)
|
||||
}
|
||||
|
||||
// id := listMap[block.id]
|
||||
// if id == "" {
|
||||
id := base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(16))
|
||||
listMap[block.id] = &blockInfo{
|
||||
id: id,
|
||||
committed: false,
|
||||
size: block.endIndex - block.offset,
|
||||
size: bc.getBlockSize(uint64(handle.Size), block),
|
||||
}
|
||||
//}
|
||||
|
||||
log.Debug("BlockCache::lineupUpload : block %v, size %v for %v=>%s, blockId %v", block.id, (block.endIndex - block.offset), handle.ID, handle.Path, id)
|
||||
log.Debug("BlockCache::lineupUpload : block %v, size %v for %v=>%s, blockId %v", block.id, bc.getBlockSize(uint64(handle.Size), block), handle.ID, handle.Path, id)
|
||||
item := &workItem{
|
||||
handle: handle,
|
||||
block: block,
|
||||
|
@ -1278,8 +1272,6 @@ func (bc *BlockCache) lineupUpload(handle *handlemap.Handle, block *Block, listM
|
|||
blockId: id,
|
||||
}
|
||||
|
||||
// log.Debug("BlockCache::lineupUpload : Upload block %v=>%s (index %v, offset %v, data %v)", handle.ID, handle.Path, block.id, block.offset, (block.endIndex - block.offset))
|
||||
|
||||
block.Uploading()
|
||||
block.flags.Clear(BlockFlagFailed)
|
||||
block.flags.Set(BlockFlagUploading)
|
||||
|
@ -1346,12 +1338,11 @@ func (bc *BlockCache) upload(item *workItem) {
|
|||
flock := bc.fileLocks.Get(fileName)
|
||||
flock.Lock()
|
||||
defer flock.Unlock()
|
||||
// log.Debug("BlockCache::Upload : block %v, size %v for %v=>%s, blockId %v", item.block.id, (item.block.endIndex - item.block.offset), item.handle.ID, item.handle.Path, item.blockId)
|
||||
|
||||
blockSize := bc.getBlockSize(uint64(item.handle.Size), item.block)
|
||||
// This block is updated so we need to stage it now
|
||||
err := bc.NextComponent().StageData(internal.StageDataOptions{
|
||||
Name: item.handle.Path,
|
||||
Data: item.block.data[0 : item.block.endIndex-item.block.offset],
|
||||
Data: item.block.data[0:blockSize],
|
||||
Offset: uint64(item.block.offset),
|
||||
Id: item.blockId})
|
||||
if err != nil {
|
||||
|
@ -1383,7 +1374,7 @@ func (bc *BlockCache) upload(item *workItem) {
|
|||
// Dump this block to local disk cache
|
||||
f, err := os.Create(localPath)
|
||||
if err == nil {
|
||||
_, err := f.Write(item.block.data[0 : item.block.endIndex-item.block.offset])
|
||||
_, err := f.Write(item.block.data[0:blockSize])
|
||||
if err != nil {
|
||||
log.Err("BlockCache::upload : Failed to write %s to disk [%v]", localPath, err.Error())
|
||||
_ = os.Remove(localPath)
|
||||
|
|
|
@ -197,7 +197,6 @@ func (suite *blockTestSuite) TestWriter() {
|
|||
suite.assert.NotNil(b.state)
|
||||
suite.assert.Nil(b.node)
|
||||
suite.assert.Zero(b.offset)
|
||||
suite.assert.Zero(b.endIndex)
|
||||
suite.assert.Equal(b.id, int64(-1))
|
||||
suite.assert.False(b.IsDirty())
|
||||
|
||||
|
|
|
@ -169,6 +169,17 @@ func createFileHandleInLocalAndRemote(suite *dataValidationTestSuite, localFileP
|
|||
return lfh, rfh
|
||||
}
|
||||
|
||||
// Open File in Local and Mounted Directories and returns there file handles the associated fd has O_RDONLY Mode
|
||||
func openFileHandleInLocalAndRemote(suite *dataValidationTestSuite, flags int, localFilePath, remoteFilePath string) (lfh *os.File, rfh *os.File) {
|
||||
lfh, err := os.OpenFile(localFilePath, flags, 0666)
|
||||
suite.Nil(err)
|
||||
|
||||
rfh, err = os.OpenFile(remoteFilePath, flags, 0666)
|
||||
suite.Nil(err)
|
||||
|
||||
return lfh, rfh
|
||||
}
|
||||
|
||||
// closes the file handles, This ensures that data is flushed to disk/Azure Storage from the cache
|
||||
func closeFileHandles(suite *dataValidationTestSuite, handles ...*os.File) {
|
||||
for _, h := range handles {
|
||||
|
@ -215,6 +226,28 @@ func generateFileWithRandomData(suite *dataValidationTestSuite, filePath string,
|
|||
closeFileHandles(suite, fh)
|
||||
}
|
||||
|
||||
func compareReadOperInLocalAndRemote(suite *dataValidationTestSuite, lfh, rfh *os.File, offset int64) {
|
||||
buffer1 := make([]byte, 4*int(_1MB))
|
||||
buffer2 := make([]byte, 4*int(_1MB))
|
||||
|
||||
bytes_read_local, err1 := lfh.ReadAt(buffer1, offset)
|
||||
bytes_read_remote, err2 := rfh.ReadAt(buffer2, offset)
|
||||
suite.Equal(err1, err2)
|
||||
suite.Equal(bytes_read_local, bytes_read_remote)
|
||||
suite.Equal(buffer1[:bytes_read_local], buffer2[:bytes_read_remote])
|
||||
}
|
||||
|
||||
func compareWriteOperInLocalAndRemote(suite *dataValidationTestSuite, lfh, rfh *os.File, offset int64) {
|
||||
sizeofbuffer := (rand.Int() % 4) + 1
|
||||
buffer := make([]byte, sizeofbuffer*int(_1MB))
|
||||
rand.Read(buffer)
|
||||
|
||||
bytes_written_local, err1 := lfh.WriteAt(buffer, offset)
|
||||
bytes_written_remote, err2 := rfh.WriteAt(buffer, offset)
|
||||
suite.Equal(err1, err2)
|
||||
suite.Equal(bytes_written_local, bytes_written_remote)
|
||||
}
|
||||
|
||||
// -------------- Data Validation Tests -------------------
|
||||
|
||||
// Test correct overwrite of file using echo command
|
||||
|
@ -719,6 +752,52 @@ func (suite *dataValidationTestSuite) TestPanicOnReadingFileInRandReadMode() {
|
|||
closeFileHandles(suite, rfh)
|
||||
}
|
||||
|
||||
func (suite *dataValidationTestSuite) TestReadDataAtBlockBoundaries() {
|
||||
fileName := "testReadDataAtBlockBoundaries"
|
||||
localFilePath, remoteFilePath := convertFileNameToFilePath(fileName)
|
||||
fileSize := 35 * int(_1MB)
|
||||
generateFileWithRandomData(suite, localFilePath, fileSize)
|
||||
suite.copyToMountDir(localFilePath, remoteFilePath)
|
||||
suite.validateData(localFilePath, remoteFilePath)
|
||||
|
||||
lfh, rfh := openFileHandleInLocalAndRemote(suite, os.O_RDWR, localFilePath, remoteFilePath)
|
||||
var offset int64 = 0
|
||||
//tests run in 16MB block size config.
|
||||
//Data in File 35MB(3blocks)
|
||||
//block1->16MB, block2->16MB, block3->3MB
|
||||
|
||||
//getting 4MB data from 1st block
|
||||
compareReadOperInLocalAndRemote(suite, lfh, rfh, offset)
|
||||
//getting 4MB data from overlapping blocks
|
||||
offset = int64(15 * int(_1MB))
|
||||
compareReadOperInLocalAndRemote(suite, lfh, rfh, offset)
|
||||
//getting 4MB data from last block
|
||||
offset = int64(32 * int(_1MB))
|
||||
compareReadOperInLocalAndRemote(suite, lfh, rfh, offset)
|
||||
//getting 4MB data from overlapping block with last block
|
||||
offset = int64(30 * int(_1MB))
|
||||
compareReadOperInLocalAndRemote(suite, lfh, rfh, offset)
|
||||
//Read at some random offset
|
||||
for i := 0; i < 10; i++ {
|
||||
offset = rand.Int63() % int64(fileSize)
|
||||
compareReadOperInLocalAndRemote(suite, lfh, rfh, offset)
|
||||
}
|
||||
|
||||
//write at end of file
|
||||
offset = int64(fileSize)
|
||||
compareWriteOperInLocalAndRemote(suite, lfh, rfh, offset)
|
||||
//Check the previous write with read
|
||||
compareReadOperInLocalAndRemote(suite, lfh, rfh, offset)
|
||||
|
||||
//Write at Random offset in the file
|
||||
offset = rand.Int63() % int64(fileSize)
|
||||
compareWriteOperInLocalAndRemote(suite, lfh, rfh, offset)
|
||||
//Check the previous write with read
|
||||
compareReadOperInLocalAndRemote(suite, lfh, rfh, offset)
|
||||
|
||||
closeFileHandles(suite, lfh, rfh)
|
||||
}
|
||||
|
||||
// -------------- Main Method -------------------
|
||||
func TestDataValidationTestSuite(t *testing.T) {
|
||||
initDataValidationFlags()
|
||||
|
|
Загрузка…
Ссылка в новой задаче