Block Cache Read correction (#1483)
Fixed: We copied the entire block regardless of whether it was fully used, leading to copying over garbage data. Fixed: Error in read when disk cache was enabled
This commit is contained in:
Родитель
ee19eff072
Коммит
da78133520
|
@ -6,6 +6,7 @@
|
||||||
- In flush operation, the blocks will be committed only if the handle is dirty.
|
- In flush operation, the blocks will be committed only if the handle is dirty.
|
||||||
- Reset block data to null before reuse.
|
- Reset block data to null before reuse.
|
||||||
- Sparse file data integrity issues fixed.
|
- Sparse file data integrity issues fixed.
|
||||||
|
- Fixed block-cache read of small files where file size is not multiple of kernel buffer size.
|
||||||
|
|
||||||
**Features**
|
**Features**
|
||||||
|
|
||||||
|
|
|
@ -519,11 +519,16 @@ func (bc *BlockCache) ReadInBuffer(options internal.ReadInBufferOptions) (int, e
|
||||||
|
|
||||||
// Copy data from this block to user buffer
|
// Copy data from this block to user buffer
|
||||||
readOffset := uint64(options.Offset) - block.offset
|
readOffset := uint64(options.Offset) - block.offset
|
||||||
bytesRead := copy(options.Data[dataRead:], block.data[readOffset:])
|
bytesRead := copy(options.Data[dataRead:], block.data[readOffset:(block.endIndex-block.offset)])
|
||||||
|
|
||||||
// Move offset forward in case we need to copy more data
|
// Move offset forward in case we need to copy more data
|
||||||
options.Offset += int64(bytesRead)
|
options.Offset += int64(bytesRead)
|
||||||
dataRead += bytesRead
|
dataRead += bytesRead
|
||||||
|
|
||||||
|
if options.Offset >= options.Handle.Size {
|
||||||
|
// EOF reached so early exit
|
||||||
|
return dataRead, io.EOF
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return dataRead, nil
|
return dataRead, nil
|
||||||
|
@ -903,7 +908,7 @@ func (bc *BlockCache) download(item *workItem) {
|
||||||
// Dump this block to local disk cache
|
// Dump this block to local disk cache
|
||||||
f, err := os.Create(localPath)
|
f, err := os.Create(localPath)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
_, err := f.Write(item.block.data)
|
_, err := f.Write(item.block.data[:n])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Err("BlockCache::download : Failed to write %s to disk [%v]", localPath, err.Error())
|
log.Err("BlockCache::download : Failed to write %s to disk [%v]", localPath, err.Error())
|
||||||
_ = os.Remove(localPath)
|
_ = os.Remove(localPath)
|
||||||
|
|
|
@ -435,51 +435,142 @@ func (suite *blockCacheTestSuite) TestValidateBlockList() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *blockCacheTestSuite) TestFileRead() {
|
func (suite *blockCacheTestSuite) TestFileReadTotalBytes() {
|
||||||
tobj, err := setupPipeline("")
|
tobj, err := setupPipeline("")
|
||||||
defer tobj.cleanupPipeline()
|
defer tobj.cleanupPipeline()
|
||||||
|
|
||||||
suite.assert.Nil(err)
|
suite.assert.Nil(err)
|
||||||
suite.assert.NotNil(tobj.blockCache)
|
suite.assert.NotNil(tobj.blockCache)
|
||||||
|
|
||||||
fileName := "bc.tst"
|
path := "testWriteSimple"
|
||||||
stroagePath := filepath.Join(tobj.fake_storage_path, fileName)
|
options := internal.CreateFileOptions{Name: path, Mode: 0777}
|
||||||
data := make([]byte, 50*_1MB)
|
h, err := tobj.blockCache.CreateFile(options)
|
||||||
_, _ = rand.Read(data)
|
|
||||||
ioutil.WriteFile(stroagePath, data, 0777)
|
|
||||||
|
|
||||||
options := internal.OpenFileOptions{Name: fileName}
|
|
||||||
h, err := tobj.blockCache.OpenFile(options)
|
|
||||||
suite.assert.Nil(err)
|
suite.assert.Nil(err)
|
||||||
suite.assert.NotNil(h)
|
suite.assert.NotNil(h)
|
||||||
suite.assert.Equal(h.Size, int64(50*_1MB))
|
suite.assert.Equal(h.Size, int64(0))
|
||||||
|
suite.assert.False(h.Dirty())
|
||||||
|
|
||||||
|
storagePath := filepath.Join(tobj.fake_storage_path, path)
|
||||||
|
fs, err := os.Stat(storagePath)
|
||||||
|
suite.assert.Nil(err)
|
||||||
|
suite.assert.Equal(fs.Size(), int64(0))
|
||||||
|
//Generate random size of file in bytes less than 2MB
|
||||||
|
size := rand.Intn(2097152)
|
||||||
|
data := make([]byte, size)
|
||||||
|
|
||||||
|
n, err := tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data}) // Write data to file
|
||||||
|
suite.assert.Nil(err)
|
||||||
|
suite.assert.Equal(n, size)
|
||||||
|
suite.assert.Equal(h.Size, int64(size))
|
||||||
|
|
||||||
|
data = make([]byte, 1000)
|
||||||
|
|
||||||
|
totaldata := uint64(0)
|
||||||
|
for {
|
||||||
|
n, err := tobj.blockCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: int64(totaldata), Data: data})
|
||||||
|
totaldata += uint64(n)
|
||||||
|
if err != nil {
|
||||||
|
suite.assert.Contains(err.Error(), "EOF")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
suite.assert.LessOrEqual(n, 1000)
|
||||||
|
}
|
||||||
|
suite.assert.Equal(totaldata, uint64(size))
|
||||||
|
|
||||||
|
err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h})
|
||||||
|
suite.assert.Nil(err)
|
||||||
|
suite.assert.Nil(h.Buffers.Cooked)
|
||||||
|
suite.assert.Nil(h.Buffers.Cooking)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *blockCacheTestSuite) TestFileReadBlockCacheTmpPath() {
|
||||||
|
tobj, err := setupPipeline("")
|
||||||
|
defer tobj.cleanupPipeline()
|
||||||
|
|
||||||
|
suite.assert.Nil(err)
|
||||||
|
suite.assert.NotNil(tobj.blockCache)
|
||||||
|
|
||||||
|
path := "testWriteSimple"
|
||||||
|
options := internal.CreateFileOptions{Name: path, Mode: 0777}
|
||||||
|
h, err := tobj.blockCache.CreateFile(options)
|
||||||
|
suite.assert.Nil(err)
|
||||||
|
suite.assert.NotNil(h)
|
||||||
|
suite.assert.Equal(h.Size, int64(0))
|
||||||
|
suite.assert.False(h.Dirty())
|
||||||
|
|
||||||
|
storagePath := filepath.Join(tobj.fake_storage_path, path)
|
||||||
|
fs, err := os.Stat(storagePath)
|
||||||
|
suite.assert.Nil(err)
|
||||||
|
suite.assert.Equal(fs.Size(), int64(0))
|
||||||
|
//Size is 1MB + 7 bytes
|
||||||
|
size := 1048583
|
||||||
|
data := make([]byte, size)
|
||||||
|
|
||||||
|
n, err := tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data}) // Write data to file
|
||||||
|
suite.assert.Nil(err)
|
||||||
|
suite.assert.Equal(n, size)
|
||||||
|
suite.assert.Equal(h.Size, int64(size))
|
||||||
|
|
||||||
|
err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h})
|
||||||
|
suite.assert.Nil(err)
|
||||||
|
|
||||||
|
options2 := internal.OpenFileOptions{Name: path}
|
||||||
|
h, err = tobj.blockCache.OpenFile(options2)
|
||||||
|
suite.assert.Nil(err)
|
||||||
|
suite.assert.NotNil(h)
|
||||||
|
suite.assert.Equal(h.Size, int64(size))
|
||||||
suite.assert.NotNil(h.Buffers.Cooked)
|
suite.assert.NotNil(h.Buffers.Cooked)
|
||||||
suite.assert.NotNil(h.Buffers.Cooking)
|
suite.assert.NotNil(h.Buffers.Cooking)
|
||||||
|
|
||||||
data = make([]byte, 1000)
|
data = make([]byte, 1000)
|
||||||
|
|
||||||
// Read beyond end of file
|
totaldata := uint64(0)
|
||||||
n, err := tobj.blockCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: int64((50 * _1MB) + 1), Data: data})
|
for {
|
||||||
suite.assert.NotNil(err)
|
n, err := tobj.blockCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: int64(totaldata), Data: data})
|
||||||
suite.assert.Equal(n, 0)
|
totaldata += uint64(n)
|
||||||
suite.assert.Contains(err.Error(), "EOF")
|
if err != nil {
|
||||||
|
suite.assert.Contains(err.Error(), "EOF")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
suite.assert.LessOrEqual(n, 1000)
|
||||||
|
}
|
||||||
|
suite.assert.Equal(totaldata, uint64(size))
|
||||||
|
|
||||||
// Read exactly at last offset
|
data = make([]byte, 1000)
|
||||||
n, err = tobj.blockCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: int64(50 * _1MB), Data: data})
|
|
||||||
suite.assert.NotNil(err)
|
|
||||||
suite.assert.Equal(n, 0)
|
|
||||||
suite.assert.Contains(err.Error(), "EOF")
|
|
||||||
|
|
||||||
n, err = tobj.blockCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: 0, Data: data})
|
totaldata = uint64(0)
|
||||||
|
for {
|
||||||
|
n, err := tobj.blockCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: int64(totaldata), Data: data})
|
||||||
|
totaldata += uint64(n)
|
||||||
|
if err != nil {
|
||||||
|
suite.assert.Contains(err.Error(), "EOF")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
suite.assert.LessOrEqual(n, 1000)
|
||||||
|
}
|
||||||
|
suite.assert.Equal(totaldata, uint64(size))
|
||||||
|
|
||||||
|
tmpPath := tobj.blockCache.tmpPath
|
||||||
|
|
||||||
|
files, err := ioutil.ReadDir(tmpPath)
|
||||||
suite.assert.Nil(err)
|
suite.assert.Nil(err)
|
||||||
suite.assert.Equal(n, 1000)
|
|
||||||
|
|
||||||
cnt := h.Buffers.Cooked.Len() + h.Buffers.Cooking.Len()
|
var size1048576, size7 bool
|
||||||
suite.assert.Equal(cnt, MIN_PREFETCH*2)
|
for _, file := range files {
|
||||||
|
if file.Size() == 1048576 {
|
||||||
|
size1048576 = true
|
||||||
|
}
|
||||||
|
if file.Size() == 7 {
|
||||||
|
size7 = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h})
|
suite.assert.True(size1048576)
|
||||||
suite.assert.Nil(h.Buffers.Cooked)
|
suite.assert.True(size7)
|
||||||
suite.assert.Nil(h.Buffers.Cooking)
|
suite.assert.Equal(len(files), 2)
|
||||||
|
|
||||||
|
err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h})
|
||||||
|
suite.assert.Nil(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *blockCacheTestSuite) TestFileReadSerial() {
|
func (suite *blockCacheTestSuite) TestFileReadSerial() {
|
||||||
|
|
|
@ -505,6 +505,164 @@ func (suite *dataValidationTestSuite) TestSparseFileRandomWriteBlockOverlap() {
|
||||||
suite.dataValidationTestCleanup([]string{localFilePath, remoteFilePath, suite.testCachePath})
|
suite.dataValidationTestCleanup([]string{localFilePath, remoteFilePath, suite.testCachePath})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (suite *dataValidationTestSuite) TestFileReadBytesMultipleBlocks() {
|
||||||
|
fileName := "bytesReadMultipleBlock"
|
||||||
|
localFilePath := suite.testLocalPath + "/" + fileName
|
||||||
|
remoteFilePath := suite.testMntPath + "/" + fileName
|
||||||
|
|
||||||
|
// create local file
|
||||||
|
lfh, err := os.Create(localFilePath)
|
||||||
|
suite.Nil(err)
|
||||||
|
|
||||||
|
defer func(fh *os.File) {
|
||||||
|
_ = fh.Close()
|
||||||
|
}(lfh)
|
||||||
|
|
||||||
|
// create remote file
|
||||||
|
rfh, err := os.Create(remoteFilePath)
|
||||||
|
suite.Nil(err)
|
||||||
|
|
||||||
|
defer func(fh *os.File) {
|
||||||
|
_ = fh.Close()
|
||||||
|
}(rfh)
|
||||||
|
|
||||||
|
// write 65MB data
|
||||||
|
n, err := lfh.WriteAt(largeBuff[0:65*_1MB], 0)
|
||||||
|
suite.Nil(err)
|
||||||
|
suite.Equal(n, int(65*_1MB))
|
||||||
|
|
||||||
|
// write 7 bytes at offset 65MB
|
||||||
|
n, err = lfh.WriteAt(largeBuff[0:7], int64(65*_1MB))
|
||||||
|
suite.Nil(err)
|
||||||
|
suite.Equal(n, 7)
|
||||||
|
|
||||||
|
err = lfh.Close()
|
||||||
|
suite.Nil(err)
|
||||||
|
|
||||||
|
// write 65MB data
|
||||||
|
n, err = rfh.WriteAt(largeBuff[0:65*_1MB], 0)
|
||||||
|
suite.Nil(err)
|
||||||
|
suite.Equal(n, int(65*_1MB))
|
||||||
|
|
||||||
|
// write 7 bytes at offset 65MB
|
||||||
|
n, err = rfh.WriteAt(largeBuff[0:7], int64(65*_1MB))
|
||||||
|
suite.Nil(err)
|
||||||
|
suite.Equal(n, 7)
|
||||||
|
|
||||||
|
err = rfh.Close()
|
||||||
|
suite.Nil(err)
|
||||||
|
|
||||||
|
// check size of blob uploaded using os.Stat()
|
||||||
|
fi, err := os.Stat(remoteFilePath)
|
||||||
|
suite.Nil(err)
|
||||||
|
suite.Equal(fi.Size(), 65*int64(_1MB)+7)
|
||||||
|
|
||||||
|
// count the total bytes uploaded
|
||||||
|
fh, err := os.Open(remoteFilePath)
|
||||||
|
suite.Nil(err)
|
||||||
|
|
||||||
|
totalBytesread := int64(0)
|
||||||
|
dataBuff := make([]byte, int(_1MB))
|
||||||
|
for {
|
||||||
|
bytesRead, err := fh.Read(dataBuff)
|
||||||
|
totalBytesread += int64(bytesRead)
|
||||||
|
if err != nil {
|
||||||
|
suite.Contains(err.Error(), "EOF")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
suite.Equal(totalBytesread, 65*int64(_1MB)+7)
|
||||||
|
|
||||||
|
err = fh.Close()
|
||||||
|
suite.Nil(err)
|
||||||
|
|
||||||
|
localMD5, err := computeMD5(localFilePath)
|
||||||
|
suite.Nil(err)
|
||||||
|
suite.NotNil(localMD5)
|
||||||
|
|
||||||
|
remoteMD5, err := computeMD5(remoteFilePath)
|
||||||
|
suite.Nil(err)
|
||||||
|
suite.NotNil(remoteMD5)
|
||||||
|
|
||||||
|
suite.Equal(localMD5, remoteMD5)
|
||||||
|
|
||||||
|
suite.dataValidationTestCleanup([]string{localFilePath, remoteFilePath, suite.testCachePath})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *dataValidationTestSuite) TestFileReadBytesOneBlock() {
|
||||||
|
fileName := "bytesReadOneBlock"
|
||||||
|
localFilePath := suite.testLocalPath + "/" + fileName
|
||||||
|
remoteFilePath := suite.testMntPath + "/" + fileName
|
||||||
|
|
||||||
|
// create local file
|
||||||
|
lfh, err := os.Create(localFilePath)
|
||||||
|
suite.Nil(err)
|
||||||
|
|
||||||
|
defer func(fh *os.File) {
|
||||||
|
_ = fh.Close()
|
||||||
|
}(lfh)
|
||||||
|
|
||||||
|
// create remote file
|
||||||
|
rfh, err := os.Create(remoteFilePath)
|
||||||
|
suite.Nil(err)
|
||||||
|
|
||||||
|
defer func(fh *os.File) {
|
||||||
|
_ = fh.Close()
|
||||||
|
}(rfh)
|
||||||
|
|
||||||
|
// write 13 bytes data to local file
|
||||||
|
n, err := lfh.WriteAt(largeBuff[0:13], 0)
|
||||||
|
suite.Nil(err)
|
||||||
|
suite.Equal(n, 13)
|
||||||
|
|
||||||
|
err = lfh.Close()
|
||||||
|
suite.Nil(err)
|
||||||
|
|
||||||
|
// write 13 bytes data to remote file
|
||||||
|
n, err = rfh.WriteAt(largeBuff[0:13], 0)
|
||||||
|
suite.Nil(err)
|
||||||
|
suite.Equal(n, 13)
|
||||||
|
|
||||||
|
err = rfh.Close()
|
||||||
|
suite.Nil(err)
|
||||||
|
|
||||||
|
// check size of blob uploaded using os.Stat()
|
||||||
|
fi, err := os.Stat(remoteFilePath)
|
||||||
|
suite.Nil(err)
|
||||||
|
suite.Equal(fi.Size(), int64(13))
|
||||||
|
|
||||||
|
// count the total bytes uploaded
|
||||||
|
fh, err := os.Open(remoteFilePath)
|
||||||
|
suite.Nil(err)
|
||||||
|
|
||||||
|
totalBytesread := int64(0)
|
||||||
|
dataBuff := make([]byte, 1000)
|
||||||
|
for {
|
||||||
|
bytesRead, err := fh.Read(dataBuff)
|
||||||
|
totalBytesread += int64(bytesRead)
|
||||||
|
if err != nil {
|
||||||
|
suite.Contains(err.Error(), "EOF")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
suite.Equal(totalBytesread, int64(13))
|
||||||
|
|
||||||
|
err = fh.Close()
|
||||||
|
suite.Nil(err)
|
||||||
|
|
||||||
|
localMD5, err := computeMD5(localFilePath)
|
||||||
|
suite.Nil(err)
|
||||||
|
suite.NotNil(localMD5)
|
||||||
|
|
||||||
|
remoteMD5, err := computeMD5(remoteFilePath)
|
||||||
|
suite.Nil(err)
|
||||||
|
suite.NotNil(remoteMD5)
|
||||||
|
|
||||||
|
suite.Equal(localMD5, remoteMD5)
|
||||||
|
|
||||||
|
suite.dataValidationTestCleanup([]string{localFilePath, remoteFilePath, suite.testCachePath})
|
||||||
|
}
|
||||||
|
|
||||||
// -------------- Main Method -------------------
|
// -------------- Main Method -------------------
|
||||||
func TestDataValidationTestSuite(t *testing.T) {
|
func TestDataValidationTestSuite(t *testing.T) {
|
||||||
initDataValidationFlags()
|
initDataValidationFlags()
|
||||||
|
|
Загрузка…
Ссылка в новой задаче