Block cache random write in sparse files (#1475)

This commit is contained in:
Sourav Gupta 2024-07-30 16:27:20 +05:30 коммит произвёл GitHub
Родитель c0afb6025c
Коммит ee19eff072
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
10 изменённых файлов: 805 добавлений и 75 удалений

Просмотреть файл

@ -3,7 +3,9 @@
- Fixed the case where file creation using SAS on HNS accounts was returning back wrong error code.
- [#1402](https://github.com/Azure/azure-storage-fuse/issues/1402) Fixed proxy URL parsing.
- If earlier instance of Blobfuse2 crashed and mount is unstable then next mount to same path will automatically cleanup the system.
- Reset block data to null before reuse to avoid corruption
- In flush operation, the blocks will be committed only if the handle is dirty.
- Reset block data to null before reuse.
- Sparse file data integrity issues fixed.
**Features**

Просмотреть файл

@ -158,7 +158,9 @@ steps:
displayName: 'E2E Test: Mount with Key Credential Configuration'
timeoutInMinutes: 3
continueOnError: false
#--------------------------------------- Tests: End to end tests with Block Cache configurations ------------------------------------------
- template: e2e-tests-spcl.yml
parameters:
conf_template: azure_block_perf.yaml
@ -173,7 +175,7 @@ steps:
account_endpoint: ${{ parameters.account_endpoint }}
idstring: "${{ parameters.service }} with Block-cache"
distro_name: ${{ parameters.distro_name }}
quick_test: ${{ parameters.quick_test }}
quick_test: false
verbose_log: ${{ parameters.verbose_log }}
clone: false

Просмотреть файл

@ -62,6 +62,12 @@ type Block struct {
node *list.Element // node representation of this block in the list inside handle
}
type blockInfo struct {
id string // blockID of the block
committed bool // flag to determine if the block has been committed or not
size uint64 // length of data in block
}
// AllocateBlock creates a new memory mapped buffer for the given size
func AllocateBlock(size uint64) (*Block, error) {
if size == 0 {

Просмотреть файл

@ -376,7 +376,7 @@ func (bc *BlockCache) OpenFile(options internal.OpenFileOptions) (*handlemap.Han
// returns true, if blockList is valid
func (bc *BlockCache) validateBlockList(handle *handlemap.Handle, options internal.OpenFileOptions, blockList *internal.CommittedBlockList) bool {
lst, _ := handle.GetValue("blockList")
listMap := lst.(map[int64]string)
listMap := lst.(map[int64]*blockInfo)
listLen := len(*blockList)
for idx, block := range *blockList {
@ -384,7 +384,11 @@ func (bc *BlockCache) validateBlockList(handle *handlemap.Handle, options intern
log.Err("BlockCache::validateBlockList : Block size mismatch for %s [block: %v, size: %v]", options.Name, block.Id, block.Size)
return false
}
listMap[int64(idx)] = block.Id
listMap[int64(idx)] = &blockInfo{
id: block.Id,
committed: true,
size: block.Size,
}
}
return true
}
@ -398,7 +402,7 @@ func (bc *BlockCache) prepareHandleForBlockCache(handle *handlemap.Handle) {
}
// Create map to hold the block-ids for this file
listMap := make(map[int64]string, 0)
listMap := make(map[int64]*blockInfo, 0)
handle.SetValue("blockList", listMap)
// Set next offset to download as 0
@ -770,14 +774,14 @@ func (bc *BlockCache) refreshBlock(handle *handlemap.Handle, index uint64, prefe
handle.SetValue(fmt.Sprintf("%v", index), block)
handle.SetValue("#", (index + 1))
bc.lineupDownload(handle, block, prefetch)
bc.lineupDownload(handle, block, prefetch, true)
}
return nil
}
// lineupDownload : Create a work item and schedule the download
func (bc *BlockCache) lineupDownload(handle *handlemap.Handle, block *Block, prefetch bool) {
func (bc *BlockCache) lineupDownload(handle *handlemap.Handle, block *Block, prefetch bool, pushFront bool) {
item := &workItem{
handle: handle,
block: block,
@ -791,7 +795,12 @@ func (bc *BlockCache) lineupDownload(handle *handlemap.Handle, block *Block, pre
_ = handle.Buffers.Cooked.Remove(block.node)
}
block.node = handle.Buffers.Cooking.PushFront(block)
if pushFront {
block.node = handle.Buffers.Cooking.PushFront(block)
} else {
// push back to cooking list in case of write scenario where a block is downloaded before it is updated
block.node = handle.Buffers.Cooking.PushBack(block)
}
block.flags.Set(BlockFlagDownloading)
// Send the work item to worker pool to schedule download
@ -916,6 +925,8 @@ func (bc *BlockCache) WriteFile(options internal.WriteFileOptions) (int, error)
options.Handle.Lock()
defer options.Handle.Unlock()
// log.Debug("BlockCache::WriteFile : Writing handle %v=>%v: offset %v, %v bytes", options.Handle.ID, options.Handle.Path, options.Offset, len(options.Data))
// Keep getting next blocks until you read the request amount of data
dataWritten := int(0)
for dataWritten < len(options.Data) {
@ -926,6 +937,8 @@ func (bc *BlockCache) WriteFile(options internal.WriteFileOptions) (int, error)
return dataWritten, err
}
// log.Debug("BlockCache::WriteFile : Writing to block %v, offset %v for handle %v=>%v", block.id, options.Offset, options.Handle.ID, options.Handle.Path)
// Copy the incoming data to block
writeOffset := uint64(options.Offset) - block.offset
bytesWritten := copy(block.data[writeOffset:], options.Data[dataWritten:])
@ -940,7 +953,10 @@ func (bc *BlockCache) WriteFile(options internal.WriteFileOptions) (int, error)
if block.endIndex < uint64(options.Offset) {
block.endIndex = uint64(options.Offset)
options.Handle.Size = int64(block.endIndex)
}
if options.Handle.Size < options.Offset {
options.Handle.Size = options.Offset
}
}
@ -955,7 +971,7 @@ func (bc *BlockCache) getOrCreateBlock(handle *handlemap.Handle, offset uint64)
return nil, fmt.Errorf("block index out of range. Increase your block size")
}
//log.Debug("FilBlockCacheCache::getOrCreateBlock : Get block for %s, index %v", handle.Path, index)
// log.Debug("FilBlockCacheCache::getOrCreateBlock : Get block for %s, index %v", handle.Path, index)
var block *Block
var err error
@ -979,11 +995,22 @@ func (bc *BlockCache) getOrCreateBlock(handle *handlemap.Handle, offset uint64)
block.offset = index * bc.blockSize
if block.offset < uint64(handle.Size) {
// We are writing somewhere in between so just fetch this block
bc.lineupDownload(handle, block, false)
// TODO: add case for committing the dirty blocks and download the given block
_, shouldDownload := shouldCommitAndDownload(block.id, handle)
// Now wait for download to complete
<-block.state
// download the block if,
// - it was already committed
if shouldDownload {
// We are writing somewhere in between so just fetch this block
log.Debug("BlockCache::getOrCreateBlock : Downloading block %v for %v=>%v", block.id, handle.ID, handle.Path)
bc.lineupDownload(handle, block, false, false)
// Now wait for download to complete
<-block.state
} else {
log.Debug("BlockCache::getOrCreateBlock : push block %v to the cooking list for %v=>%v", block.id, handle.ID, handle.Path)
block.node = handle.Buffers.Cooking.PushBack(block)
}
} else {
block.node = handle.Buffers.Cooking.PushBack(block)
}
@ -1030,7 +1057,7 @@ func (bc *BlockCache) stageBlocks(handle *handlemap.Handle, cnt int) error {
node := nodeList.Front()
lst, _ := handle.GetValue("blockList")
listMap := lst.(map[int64]string)
listMap := lst.(map[int64]*blockInfo)
for node != nil && cnt > 0 {
nextNode := node.Next()
@ -1047,12 +1074,50 @@ func (bc *BlockCache) stageBlocks(handle *handlemap.Handle, cnt int) error {
return nil
}
// shouldCommit is used to check if we should commit the existing blocks and download the given block.
// There can be a case where a block has been partially written, staged and cleared from the buffer list.
// If write call comes for that block, we cannot get the previous staged data
// since the block is not yet committed. So, we have to commit it.
// If the block is staged and cleared from the buffer list, return true for commit and false for downloading.
// if the block is already committed, return false for commit and true for downloading.
func shouldCommitAndDownload(blockID int64, handle *handlemap.Handle) (bool, bool) {
lst, ok := handle.GetValue("blockList")
if !ok {
return false, false
}
listMap := lst.(map[int64]*blockInfo)
val, ok := listMap[blockID]
if ok {
// block id exists
// If block is staged, return true for commit and false for downloading
// If block is committed, return false for commit and true for downloading
return !val.committed, val.committed
} else {
return false, false
}
}
// lineupUpload : Create a work item and schedule the upload
func (bc *BlockCache) lineupUpload(handle *handlemap.Handle, block *Block, listMap map[int64]string) {
func (bc *BlockCache) lineupUpload(handle *handlemap.Handle, block *Block, listMap map[int64]*blockInfo) {
// if a block has data less than block size and is not the last block,
// add null at the end and upload the full block
if block.endIndex < uint64(handle.Size) {
block.endIndex = block.offset + bc.blockSize
log.Debug("BlockCache::lineupUpload : Appending null for block %v", block.id)
} else if block.endIndex == uint64(handle.Size) {
// TODO: random write scenario where this block is not the last block
log.Debug("BlockCache::lineupUpload : Last block %v", block.id)
}
// id := listMap[block.id]
// if id == "" {
id := base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(16))
listMap[block.id] = id
listMap[block.id] = &blockInfo{
id: id,
committed: false,
size: block.endIndex - block.offset,
}
//}
item := &workItem{
@ -1066,10 +1131,6 @@ func (bc *BlockCache) lineupUpload(handle *handlemap.Handle, block *Block, listM
log.Debug("BlockCache::lineupUpload : Upload block %v=>%s (index %v, offset %v, data %v)", handle.ID, handle.Path, block.id, block.offset, (block.endIndex - block.offset))
if (block.endIndex - block.offset) == 0 {
log.Err("BlockCache::lineupUpload : Upload block %v=>%s (index %v, offset %v, data %v) 0 byte block formed", handle.ID, handle.Path, block.id, block.offset, (block.endIndex - block.offset))
}
// Remove this block from free block list and add to in-process list
if block.node != nil {
_ = handle.Buffers.Cooking.Remove(block.node)
@ -1131,10 +1192,9 @@ func (bc *BlockCache) upload(item *workItem) {
// This block is updated so we need to stage it now
err := bc.NextComponent().StageData(internal.StageDataOptions{
Name: item.handle.Path,
Data: item.block.data[0 : item.block.endIndex-item.block.offset],
Offset: uint64(item.block.id),
Id: item.blockId})
Name: item.handle.Path,
Data: item.block.data[0 : item.block.endIndex-item.block.offset],
Id: item.blockId})
if err != nil {
// Fail to write the data so just reschedule this request
log.Err("BlockCache::upload : Failed to write %v=>%s from offset %v [%s]", item.handle.ID, item.handle.Path, item.block.id, err.Error())
@ -1222,34 +1282,107 @@ func (bc *BlockCache) commitBlocks(handle *handlemap.Handle) error {
}
}
// Generate the block id list order now
blockIDList, err := bc.getBlockIDList(handle)
if err != nil {
log.Err("BlockCache::commitBlocks : Failed to get block id list for %v [%v]", handle.Path, err.Error())
return err
}
log.Debug("BlockCache::commitBlocks : Committing blocks for %s", handle.Path)
// Commit the block list now
err = bc.NextComponent().CommitData(internal.CommitDataOptions{Name: handle.Path, List: blockIDList, BlockSize: bc.blockSize})
if err != nil {
log.Err("BlockCache::commitBlocks : Failed to commit blocks for %s [%s]", handle.Path, err.Error())
return err
}
// set all the blocks as committed
list, _ := handle.GetValue("blockList")
listMap := list.(map[int64]string)
listMap := list.(map[int64]*blockInfo)
for k := range listMap {
listMap[k].committed = true
}
handle.Flags.Clear(handlemap.HandleFlagDirty)
return nil
}
func (bc *BlockCache) getBlockIDList(handle *handlemap.Handle) ([]string, error) {
// generate the block id list order
list, _ := handle.GetValue("blockList")
listMap := list.(map[int64]*blockInfo)
offsets := make([]int64, 0)
blockIdList := make([]string, 0)
blockIDList := make([]string, 0)
for k := range listMap {
offsets = append(offsets, k)
}
sort.Slice(offsets, func(i, j int) bool { return offsets[i] < offsets[j] })
for i := 0; i < len(offsets); i++ {
blockIdList = append(blockIdList, listMap[offsets[i]])
log.Debug("BlockCache::commitBlocks : Preparing blocklist for %v=>%s (%v : %v)", handle.ID, handle.Path, i, listMap[offsets[i]])
zeroBlockStaged := false
zeroBlockID := ""
index := int64(0)
i := 0
for i < len(offsets) {
if index == offsets[i] {
blockIDList = append(blockIDList, listMap[offsets[i]].id)
log.Debug("BlockCache::getBlockIDList : Preparing blocklist for %v=>%s (%v : %v, size %v)", handle.ID, handle.Path, offsets[i], listMap[offsets[i]].id, listMap[offsets[i]].size)
index++
i++
} else {
for index < offsets[i] {
if !zeroBlockStaged {
id, err := bc.stageZeroBlock(handle, 1)
if err != nil {
return nil, err
}
zeroBlockStaged = true
zeroBlockID = id
}
blockIDList = append(blockIDList, zeroBlockID)
listMap[index] = &blockInfo{
id: zeroBlockID,
committed: false,
size: bc.blockPool.blockSize,
}
log.Debug("BlockCache::getBlockIDList : Adding zero block for %v=>%s, index %v", handle.ID, handle.Path, index)
log.Debug("BlockCache::getBlockIDList : Preparing blocklist for %v=>%s (%v : %v, zero block size %v)", handle.ID, handle.Path, index, zeroBlockID, bc.blockPool.blockSize)
index++
}
}
}
log.Debug("BlockCache::commitBlocks : Committing blocks for %s", handle.Path)
return blockIDList, nil
}
func (bc *BlockCache) stageZeroBlock(handle *handlemap.Handle, tryCnt int) (string, error) {
if tryCnt > MAX_FAIL_CNT {
// If we failed to write the data 3 times then just give up
log.Err("BlockCache::stageZeroBlock : 3 attempts to upload zero block have failed %v=>%v", handle.ID, handle.Path)
return "", fmt.Errorf("3 attempts to upload zero block have failed for %v=>%v", handle.ID, handle.Path)
}
id := base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(16))
log.Debug("BlockCache::stageZeroBlock : Staging zero block for %v=>%v, try = %v", handle.ID, handle.Path, tryCnt)
err := bc.NextComponent().StageData(internal.StageDataOptions{
Name: handle.Path,
Data: bc.blockPool.zeroBlock.data[:],
Id: id,
})
// Commit the block list now
err := bc.NextComponent().CommitData(internal.CommitDataOptions{Name: handle.Path, List: blockIdList, BlockSize: bc.blockSize})
if err != nil {
log.Err("BlockCache::commitBlocks : Failed to commit blocks for %s [%s]", handle.Path, err.Error())
return err
log.Err("BlockCache::stageZeroBlock : Failed to write zero block for %v=>%v, try %v [%v]", handle.ID, handle.Path, tryCnt, err.Error())
return bc.stageZeroBlock(handle, tryCnt+1)
}
handle.Flags.Clear(handlemap.HandleFlagDirty)
return nil
log.Debug("BlockCache::stageZeroBlock : Zero block id for %v=>%v = %v", handle.ID, handle.Path, id)
return id, nil
}
// diskEvict : Callback when a node from disk expires

Просмотреть файл

@ -36,8 +36,10 @@ package block_cache
import (
"bytes"
"context"
"crypto/md5"
"encoding/base64"
"fmt"
"io"
"io/ioutil"
"math"
"math/rand"
@ -68,6 +70,9 @@ type blockCacheTestSuite struct {
func (suite *blockCacheTestSuite) SetupTest() {
suite.assert = assert.New(suite.T())
err := log.SetDefaultLogger("silent", common.LogConfig{Level: common.ELogLevel.LOG_DEBUG()})
suite.assert.Nil(err)
}
type testObj struct {
@ -1065,14 +1070,474 @@ func (suite *blockCacheTestSuite) TestZZZZLazyWrite() {
suite.assert.False(handle.Dirty())
}
func computeMD5(fh *os.File) ([]byte, error) {
hash := md5.New()
if _, err := io.Copy(hash, fh); err != nil {
return nil, err
}
return hash.Sum(nil), nil
}
func (suite *blockCacheTestSuite) TestRandomWriteSparseFile() {
cfg := "block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: 12\n parallelism: 10"
tobj, err := setupPipeline(cfg)
defer tobj.cleanupPipeline()
suite.assert.Nil(err)
suite.assert.NotNil(tobj.blockCache)
path := "testSparseWrite"
storagePath := filepath.Join(tobj.fake_storage_path, path)
localPath := filepath.Join(tobj.disk_cache_path, path)
data := make([]byte, 5*_1MB)
_, _ = rand.Read(data)
// ------------------------------------------------------------------
// write to local file
fh, err := os.Create(localPath)
suite.assert.Nil(err)
defer func(fh *os.File) {
err := fh.Close()
suite.assert.Nil(err)
}(fh)
// write 1MB data at offset 0
n, err := fh.WriteAt(data[0:_1MB], 0)
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
// write 1MB data at offset 9*_1MB
n, err = fh.WriteAt(data[4*_1MB:], int64(9*_1MB))
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
// write 1MB data at offset 5*_1MB
n, err = fh.WriteAt(data[2*_1MB:3*_1MB], int64(5*_1MB))
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
l, err := computeMD5(fh)
suite.assert.Nil(err)
// ------------------------------------------------------------------
// write using block cache
options := internal.CreateFileOptions{Name: path, Mode: 0777}
h, err := tobj.blockCache.CreateFile(options)
suite.assert.Nil(err)
suite.assert.NotNil(h)
suite.assert.Equal(h.Size, int64(0))
suite.assert.False(h.Dirty())
// write 1MB data at offset 0
n, err = tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data[0:_1MB]})
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
suite.assert.True(h.Dirty())
// write 1MB data at offset 9*_1MB
n, err = tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: int64(9 * _1MB), Data: data[4*_1MB:]})
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
// write 1MB data at offset 5*_1MB
n, err = tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: int64(5 * _1MB), Data: data[2*_1MB : 3*_1MB]})
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h})
suite.assert.Nil(err)
fs, err := os.Stat(storagePath)
suite.assert.Nil(err)
suite.assert.Equal(fs.Size(), int64(10*_1MB))
rfh, err := os.Open(storagePath)
suite.assert.Nil(err)
defer func(fh *os.File) {
err := fh.Close()
suite.assert.Nil(err)
}(rfh)
r, err := computeMD5(rfh)
suite.assert.Nil(err)
// validate md5sum
suite.assert.Equal(l, r)
}
func (suite *blockCacheTestSuite) TestRandomWriteSparseFileWithPartialBlock() {
cfg := "block_cache:\n block-size-mb: 4\n mem-size-mb: 100\n prefetch: 12\n parallelism: 10"
tobj, err := setupPipeline(cfg)
defer tobj.cleanupPipeline()
suite.assert.Nil(err)
suite.assert.NotNil(tobj.blockCache)
path := "testSparseWrite"
storagePath := filepath.Join(tobj.fake_storage_path, path)
localPath := filepath.Join(tobj.disk_cache_path, path)
data := make([]byte, 5*_1MB)
_, _ = rand.Read(data)
// ------------------------------------------------------------------
// write to local file
fh, err := os.Create(localPath)
suite.assert.Nil(err)
defer func(fh *os.File) {
err := fh.Close()
suite.assert.Nil(err)
}(fh)
// write 1MB data at offset 0
n, err := fh.WriteAt(data[0:_1MB], 0)
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
// write 1MB data at offset 18*_1MB
n, err = fh.WriteAt(data[4*_1MB:], int64(18*_1MB))
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
// write 1MB data at offset 9*_1MB
n, err = fh.WriteAt(data[2*_1MB:3*_1MB], int64(9*_1MB))
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
l, err := computeMD5(fh)
suite.assert.Nil(err)
// ------------------------------------------------------------------
// write using block cache
options := internal.CreateFileOptions{Name: path, Mode: 0777}
h, err := tobj.blockCache.CreateFile(options)
suite.assert.Nil(err)
suite.assert.NotNil(h)
suite.assert.Equal(h.Size, int64(0))
suite.assert.False(h.Dirty())
// write 1MB data at offset 0
// partial block where it has data only from 0 to 1MB
n, err = tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data[0:_1MB]})
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
suite.assert.True(h.Dirty())
// write 1MB data at offset 9*_1MB
n, err = tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: int64(9 * _1MB), Data: data[2*_1MB : 3*_1MB]})
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
// write 1MB data at offset 18*_1MB
n, err = tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: int64(18 * _1MB), Data: data[4*_1MB:]})
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h})
suite.assert.Nil(err)
fs, err := os.Stat(storagePath)
suite.assert.Nil(err)
suite.assert.Equal(fs.Size(), int64(19*_1MB))
rfh, err := os.Open(storagePath)
suite.assert.Nil(err)
defer func(fh *os.File) {
err := fh.Close()
suite.assert.Nil(err)
}(rfh)
r, err := computeMD5(rfh)
suite.assert.Nil(err)
// validate md5sum
suite.assert.Equal(l, r)
}
func (suite *blockCacheTestSuite) TestRandomWriteSparseFileWithBlockOverlap() {
cfg := "block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: 12\n parallelism: 10"
tobj, err := setupPipeline(cfg)
defer tobj.cleanupPipeline()
suite.assert.Nil(err)
suite.assert.NotNil(tobj.blockCache)
path := "testSparseWrite"
storagePath := filepath.Join(tobj.fake_storage_path, path)
localPath := filepath.Join(tobj.disk_cache_path, path)
data := make([]byte, 5*_1MB)
_, _ = rand.Read(data)
// ------------------------------------------------------------------
// write to local file
fh, err := os.Create(localPath)
suite.assert.Nil(err)
defer func(fh *os.File) {
err := fh.Close()
suite.assert.Nil(err)
}(fh)
// write 1MB data at offset 0
n, err := fh.WriteAt(data[0:_1MB], 0)
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
// write 1MB data at offset 9*_1MB
n, err = fh.WriteAt(data[4*_1MB:], int64(9*_1MB))
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
// write 1MB data at offset 5.5*_1MB
n, err = fh.WriteAt(data[2*_1MB:3*_1MB], int64(5*_1MB+1024*512))
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
l, err := computeMD5(fh)
suite.assert.Nil(err)
// ------------------------------------------------------------------
// write using block cache
options := internal.CreateFileOptions{Name: path, Mode: 0777}
h, err := tobj.blockCache.CreateFile(options)
suite.assert.Nil(err)
suite.assert.NotNil(h)
suite.assert.Equal(h.Size, int64(0))
suite.assert.False(h.Dirty())
// write 1MB data at offset 0
n, err = tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data[0:_1MB]})
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
suite.assert.True(h.Dirty())
// write 1MB data at offset 9*_1MB
n, err = tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: int64(9 * _1MB), Data: data[4*_1MB:]})
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
// write 1MB data at offset 5*_1MB
// data is written to last 0.5MB of block 5 and first 0.5MB of block 6
n, err = tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: int64(5*_1MB + 1024*512), Data: data[2*_1MB : 3*_1MB]})
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h})
suite.assert.Nil(err)
fs, err := os.Stat(storagePath)
suite.assert.Nil(err)
suite.assert.Equal(fs.Size(), int64(10*_1MB))
rfh, err := os.Open(storagePath)
suite.assert.Nil(err)
defer func(fh *os.File) {
err := fh.Close()
suite.assert.Nil(err)
}(rfh)
r, err := computeMD5(rfh)
suite.assert.Nil(err)
// validate md5sum
suite.assert.Equal(l, r)
}
func (suite *blockCacheTestSuite) TestRandomWriteFileOneBlock() {
cfg := "block_cache:\n block-size-mb: 8\n mem-size-mb: 100\n prefetch: 12\n parallelism: 10"
tobj, err := setupPipeline(cfg)
defer tobj.cleanupPipeline()
suite.assert.Nil(err)
suite.assert.NotNil(tobj.blockCache)
path := "testWriteOneBlock"
storagePath := filepath.Join(tobj.fake_storage_path, path)
localPath := filepath.Join(tobj.disk_cache_path, path)
data := make([]byte, 5*_1MB)
_, _ = rand.Read(data)
// ------------------------------------------------------------------
// write to local file
fh, err := os.Create(localPath)
suite.assert.Nil(err)
defer func(fh *os.File) {
err := fh.Close()
suite.assert.Nil(err)
}(fh)
// write 2MB data at offset 4*1_MB
n, err := fh.WriteAt(data[3*_1MB:], int64(4*_1MB))
suite.assert.Nil(err)
suite.assert.Equal(n, int(2*_1MB))
// write 1MB data at offset 2*_1MB
n, err = fh.WriteAt(data[2*_1MB:3*_1MB], int64(2*_1MB))
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
l, err := computeMD5(fh)
suite.assert.Nil(err)
// ------------------------------------------------------------------
// write using block cache
options := internal.CreateFileOptions{Name: path, Mode: 0777}
h, err := tobj.blockCache.CreateFile(options)
suite.assert.Nil(err)
suite.assert.NotNil(h)
suite.assert.Equal(h.Size, int64(0))
suite.assert.False(h.Dirty())
// write 2MB data at offset 4*1_MB
n, err = tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: int64(4 * _1MB), Data: data[3*_1MB:]})
suite.assert.Nil(err)
suite.assert.Equal(n, int(2*_1MB))
suite.assert.True(h.Dirty())
// write 1MB data at offset 2*_1MB
n, err = tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: int64(2 * _1MB), Data: data[2*_1MB : 3*_1MB]})
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h})
suite.assert.Nil(err)
fs, err := os.Stat(storagePath)
suite.assert.Nil(err)
suite.assert.Equal(fs.Size(), int64(6*_1MB))
rfh, err := os.Open(storagePath)
suite.assert.Nil(err)
defer func(fh *os.File) {
err := fh.Close()
suite.assert.Nil(err)
}(rfh)
r, err := computeMD5(rfh)
suite.assert.Nil(err)
// validate md5sum
suite.assert.Equal(l, r)
}
func (suite *blockCacheTestSuite) TestRandomWriteFlushAndOverwrite() {
cfg := "block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: 12\n parallelism: 10"
tobj, err := setupPipeline(cfg)
defer tobj.cleanupPipeline()
suite.assert.Nil(err)
suite.assert.NotNil(tobj.blockCache)
path := "testSparseWrite"
storagePath := filepath.Join(tobj.fake_storage_path, path)
localPath := filepath.Join(tobj.disk_cache_path, path)
data := make([]byte, 5*_1MB)
_, _ = rand.Read(data)
// ------------------------------------------------------------------
// write to local file
fh, err := os.Create(localPath)
suite.assert.Nil(err)
defer func(fh *os.File) {
err := fh.Close()
suite.assert.Nil(err)
}(fh)
// write 1MB data at offset 0
n, err := fh.WriteAt(data[0:_1MB], 0)
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
// write 1MB data at offset 9*_1MB
n, err = fh.WriteAt(data[4*_1MB:], int64(9*_1MB))
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
// write 1MB data at offset 5.5*_1MB
n, err = fh.WriteAt(data[2*_1MB:3*_1MB], int64(5*_1MB+1024*512))
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
// write 1MB data at offset 18*_1MB
n, err = fh.WriteAt(data[4*_1MB:], int64(18*_1MB))
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
l, err := computeMD5(fh)
suite.assert.Nil(err)
// ------------------------------------------------------------------
// write using block cache
options := internal.CreateFileOptions{Name: path, Mode: 0777}
h, err := tobj.blockCache.CreateFile(options)
suite.assert.Nil(err)
suite.assert.NotNil(h)
suite.assert.Equal(h.Size, int64(0))
suite.assert.False(h.Dirty())
// write 1MB data at offset 0
n, err = tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data[0:_1MB]})
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
suite.assert.True(h.Dirty())
// write 1MB data at offset 9*_1MB
n, err = tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: int64(9 * _1MB), Data: data[4*_1MB:]})
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
// flush the file
err = tobj.blockCache.FlushFile(internal.FlushFileOptions{Handle: h})
suite.assert.Nil(err)
// write 1MB data at offset 5.5*_1MB
// overwriting last 0.5MB of block 5 and first 0.5MB of block 6 after flush
n, err = tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: int64(5*_1MB + 1024*512), Data: data[2*_1MB : 3*_1MB]})
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
// write 1MB data at offset 18*_1MB
n, err = tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: int64(18 * _1MB), Data: data[4*_1MB:]})
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h})
suite.assert.Nil(err)
fs, err := os.Stat(storagePath)
suite.assert.Nil(err)
suite.assert.Equal(fs.Size(), int64(19*_1MB))
rfh, err := os.Open(storagePath)
suite.assert.Nil(err)
defer func(fh *os.File) {
err := fh.Close()
suite.assert.Nil(err)
}(rfh)
r, err := computeMD5(rfh)
suite.assert.Nil(err)
// validate md5sum
suite.assert.Equal(l, r)
}
// In order for 'go test' to run this suite, we need to create
// a normal test function and pass our suite to suite.Run
func TestBlockCacheTestSuite(t *testing.T) {
bcsuite := new(blockCacheTestSuite)
err := log.SetDefaultLogger("silent", common.LogConfig{Level: common.ELogLevel.LOG_DEBUG()})
if err != nil {
panic("Unable to set silent logger as default.")
}
suite.Run(t, bcsuite)
suite.Run(t, new(blockCacheTestSuite))
}

Просмотреть файл

@ -779,6 +779,7 @@ func libfuse_write(path *C.char, buf *C.char, size C.size_t, off C.off_t, fi *C.
offset := uint64(off)
data := (*[1 << 30]byte)(unsafe.Pointer(buf))
// log.Debug("Libfuse::libfuse_write : Offset %v, Data %v", offset, size)
bytesWritten, err := fuseFS.NextComponent().WriteFile(
internal.WriteFileOptions{
Handle: handle,

Просмотреть файл

@ -466,7 +466,7 @@ func (lfs *LoopbackFS) Chown(options internal.ChownOptions) error {
func (lfs *LoopbackFS) StageData(options internal.StageDataOptions) error {
log.Trace("LoopbackFS::StageData : name=%s, id=%s", options.Name, options.Id)
path := fmt.Sprintf("%s_%d_%s", filepath.Join(lfs.path, options.Name), options.Offset, strings.ReplaceAll(options.Id, "/", "_"))
path := fmt.Sprintf("%s_%s", filepath.Join(lfs.path, options.Name), strings.ReplaceAll(options.Id, "/", "_"))
return os.WriteFile(path, options.Data, 0777)
}
@ -482,7 +482,7 @@ func (lfs *LoopbackFS) CommitData(options internal.CommitDataOptions) error {
}
for idx, id := range options.List {
path := fmt.Sprintf("%s_%d_%s", filepath.Join(lfs.path, options.Name), idx, strings.ReplaceAll(id, "/", "_"))
path := fmt.Sprintf("%s_%s", filepath.Join(lfs.path, options.Name), strings.ReplaceAll(id, "/", "_"))
info, err := os.Lstat(path)
if err == nil {
block, err := os.OpenFile(path, os.O_RDONLY, os.FileMode(0666))
@ -510,13 +510,17 @@ func (lfs *LoopbackFS) CommitData(options internal.CommitDataOptions) error {
if err != nil {
return err
}
_ = os.Remove(path)
} else if !os.IsNotExist(err) {
return err
}
}
// delete the staged files
for _, id := range options.List {
path := fmt.Sprintf("%s_%s", filepath.Join(lfs.path, options.Name), strings.ReplaceAll(id, "/", "_"))
_ = os.Remove(path)
}
err = blob.Close()
return err
}

Просмотреть файл

@ -295,13 +295,13 @@ func (suite *LoopbackFSTestSuite) TestStageAndCommitData() {
assert.Nil(err)
defer os.RemoveAll(lfs.path)
err = lfs.StageData(internal.StageDataOptions{Name: "testBlock", Data: []byte(loremText), Id: "123", Offset: 0})
err = lfs.StageData(internal.StageDataOptions{Name: "testBlock", Data: []byte(loremText), Id: "123"})
assert.Nil(err)
err = lfs.StageData(internal.StageDataOptions{Name: "testBlock", Data: []byte(loremText), Id: "456", Offset: 2})
err = lfs.StageData(internal.StageDataOptions{Name: "testBlock", Data: []byte(loremText), Id: "456"})
assert.Nil(err)
err = lfs.StageData(internal.StageDataOptions{Name: "testBlock", Data: []byte(loremText), Id: "789", Offset: 1})
err = lfs.StageData(internal.StageDataOptions{Name: "testBlock", Data: []byte(loremText), Id: "789"})
assert.Nil(err)
blockList := []string{"123", "789", "456"}

Просмотреть файл

@ -191,10 +191,9 @@ type ChownOptions struct {
}
type StageDataOptions struct {
Name string
Id string
Offset uint64
Data []byte
Name string
Id string
Data []byte
}
type CommitDataOptions struct {

Просмотреть файл

@ -37,8 +37,10 @@
package e2e_tests
import (
"crypto/md5"
"flag"
"fmt"
"io"
"math/rand"
"os"
"os/exec"
@ -60,6 +62,8 @@ var distro string
var minBuff, medBuff, largeBuff, hugeBuff []byte
const _1MB uint64 = (1024 * 1024)
type dataValidationTestSuite struct {
suite.Suite
testMntPath string
@ -126,12 +130,6 @@ func (suite *dataValidationTestSuite) validateData(localFilePath string, remoteF
// Test correct overwrite of file using echo command
func (suite *dataValidationTestSuite) TestFileOverwriteWithEchoCommand() {
if strings.Contains(strings.ToUpper(distro), "UBUNTU-20.04") {
fmt.Println("Skipping this test case for UBUNTU-20.04")
return
}
remoteFilePath := filepath.Join(suite.testMntPath, "TESTFORECHO.txt")
text := "Hello, this is a test."
command := "echo \"" + text + "\" > " + remoteFilePath
@ -347,10 +345,6 @@ func (suite *dataValidationTestSuite) TestMultipleMediumFiles() {
fmt.Println("Skipping this test case for stream direct")
return
}
if strings.Contains(strings.ToUpper(distro), "RHEL") {
fmt.Println("Skipping this test case for RHEL")
return
}
noOfFiles := 8
noOfWorkers := 4
@ -362,10 +356,6 @@ func (suite *dataValidationTestSuite) TestMultipleLargeFiles() {
fmt.Println("Skipping this test case for stream direct")
return
}
if strings.Contains(strings.ToUpper(distro), "RHEL") {
fmt.Println("Skipping this test case for RHEL")
return
}
noOfFiles := 4
noOfWorkers := 2
@ -387,13 +377,141 @@ func (suite *dataValidationTestSuite) TestMultipleHugeFiles() {
createThreadPool(noOfFiles, noOfWorkers, "huge", suite)
}
func computeMD5(filePath string) ([]byte, error) {
fh, err := os.Open(filePath)
if err != nil {
return nil, err
}
fi, err := fh.Stat()
fi.Size()
hash := md5.New()
if _, err := io.Copy(hash, fh); err != nil {
return nil, err
}
err = fh.Close()
if err != nil {
return nil, err
}
return hash.Sum(nil), nil
}
func writeSparseData(suite *dataValidationTestSuite, fh *os.File, offsets []int64) {
ind := uint64(0)
for _, o := range offsets {
// write 1MB data at offset o
n, err := fh.WriteAt(medBuff[ind*_1MB:(ind+1)*_1MB], o)
suite.Nil(err)
suite.Equal(n, int(_1MB))
ind = (ind + 1) % 10
}
// close the file handle
err := fh.Close()
suite.Nil(err)
}
func (suite *dataValidationTestSuite) TestSparseFileRandomWrite() {
fileName := "sparseFile"
localFilePath := suite.testLocalPath + "/" + fileName
remoteFilePath := suite.testMntPath + "/" + fileName
// create local file
lfh, err := os.Create(localFilePath)
suite.Nil(err)
defer func(fh *os.File) {
_ = fh.Close()
}(lfh)
// create remote file
rfh, err := os.Create(remoteFilePath)
suite.Nil(err)
defer func(fh *os.File) {
_ = fh.Close()
}(rfh)
// write to local file
writeSparseData(suite, lfh, []int64{0, 164 * int64(_1MB), 100 * int64(_1MB), 65 * int64(_1MB), 129 * int64(_1MB)})
// write to remote file
writeSparseData(suite, rfh, []int64{0, 164 * int64(_1MB), 100 * int64(_1MB), 65 * int64(_1MB), 129 * int64(_1MB)})
// check size of blob uploaded
fi, err := os.Stat(remoteFilePath)
suite.Nil(err)
suite.Equal(fi.Size(), 165*int64(_1MB))
localMD5, err := computeMD5(localFilePath)
suite.Nil(err)
suite.NotNil(localMD5)
remoteMD5, err := computeMD5(remoteFilePath)
suite.Nil(err)
suite.NotNil(remoteMD5)
suite.Equal(localMD5, remoteMD5)
suite.dataValidationTestCleanup([]string{localFilePath, remoteFilePath, suite.testCachePath})
}
func (suite *dataValidationTestSuite) TestSparseFileRandomWriteBlockOverlap() {
fileName := "sparseFileBlockOverlap"
localFilePath := suite.testLocalPath + "/" + fileName
remoteFilePath := suite.testMntPath + "/" + fileName
// create local file
lfh, err := os.Create(localFilePath)
suite.Nil(err)
defer func(fh *os.File) {
_ = fh.Close()
}(lfh)
// create remote file
rfh, err := os.Create(remoteFilePath)
suite.Nil(err)
defer func(fh *os.File) {
_ = fh.Close()
}(rfh)
// write to local file
writeSparseData(suite, lfh, []int64{0, 170 * int64(_1MB), 63*int64(_1MB) + 1024*512, 129 * int64(_1MB), 100 * int64(_1MB)})
// write to remote file
writeSparseData(suite, rfh, []int64{0, 170 * int64(_1MB), 63*int64(_1MB) + 1024*512, 129 * int64(_1MB), 100 * int64(_1MB)})
// check size of blob uploaded
fi, err := os.Stat(remoteFilePath)
suite.Nil(err)
suite.Equal(fi.Size(), 171*int64(_1MB))
localMD5, err := computeMD5(localFilePath)
suite.Nil(err)
suite.NotNil(localMD5)
remoteMD5, err := computeMD5(remoteFilePath)
suite.Nil(err)
suite.NotNil(remoteMD5)
suite.Equal(localMD5, remoteMD5)
suite.dataValidationTestCleanup([]string{localFilePath, remoteFilePath, suite.testCachePath})
}
// -------------- Main Method -------------------
func TestDataValidationTestSuite(t *testing.T) {
initDataValidationFlags()
fmt.Println("Distro Name: " + distro)
// Ignore data validation test on all distros other than UBN
if strings.ToLower(quickTest) == "true" {
if strings.ToLower(quickTest) == "true" || !(strings.Contains(strings.ToUpper(distro), "UBUNTU") || strings.Contains(strings.ToUpper(distro), "UBN")) {
fmt.Println("Skipping Data Validation test suite...")
return
}
@ -401,12 +519,12 @@ func TestDataValidationTestSuite(t *testing.T) {
dataValidationTest := dataValidationTestSuite{}
minBuff = make([]byte, 1024)
medBuff = make([]byte, (10 * 1024 * 1024))
largeBuff = make([]byte, (500 * 1024 * 1024))
medBuff = make([]byte, (10 * _1MB))
largeBuff = make([]byte, (500 * _1MB))
if strings.ToLower(quickTest) == "true" {
hugeBuff = make([]byte, (100 * 1024 * 1024))
hugeBuff = make([]byte, (100 * _1MB))
} else {
hugeBuff = make([]byte, (750 * 1024 * 1024))
hugeBuff = make([]byte, (750 * _1MB))
}
// Generate random test dir name where our End to End test run is contained