Merge pull request #711 from Azure/write-stream-block

[FEATURE] Support Write Stream on AzStorage Component
This commit is contained in:
Tamer Sherif 2022-03-14 13:52:05 -07:00 коммит произвёл GitHub
Родитель d9ffe5cba9 36292f0342
Коммит 1d05fcc431
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
15 изменённых файлов: 1173 добавлений и 36 удалений

Просмотреть файл

@ -34,6 +34,7 @@
package common
import (
"crypto/rand"
"os"
"path/filepath"
"reflect"
@ -163,8 +164,81 @@ func (ep *EvictionPolicy) Parse(s string) error {
type LogConfig struct {
Level LogLevel
FilePath string
MaxFileSize uint64
FileCount uint64
FilePath string
TimeTracker bool
}
type Block struct {
StartIndex int64
EndIndex int64
Size int64
Id string
Modified bool
}
// list that holds blocks containing ids and corresponding offsets
type BlockOffsetList struct {
BlockList []*Block //blockId to offset mapping
Cached bool // is it cached?
}
// return true if item found and index of the item
func (bol BlockOffsetList) binarySearch(offset int64) (bool, int) {
lowerBound := 0
size := len(bol.BlockList)
higherBound := size - 1
for lowerBound <= higherBound {
middleIndex := (lowerBound + higherBound) / 2
// we found the starting block that changes are being applied to
if bol.BlockList[middleIndex].EndIndex > offset && bol.BlockList[middleIndex].StartIndex <= offset {
return true, middleIndex
// if the end index is smaller or equal then we need to increase our lower bound
} else if bol.BlockList[middleIndex].EndIndex <= offset {
lowerBound = middleIndex + 1
// if the start index is larger than the offset we need to decrease our upper bound
} else if bol.BlockList[middleIndex].StartIndex > offset {
higherBound = middleIndex - 1
}
}
// return size as this would be where the new blocks start
return false, size
}
// returns index of first mod block, size of mod data, does the new data exceed current size?, is it append only?
func (bol BlockOffsetList) FindBlocksToModify(offset, length int64) (int, int64, bool, bool) {
// size of mod block list
size := int64(0)
appendOnly := true
currentBlockOffset := offset
found, index := bol.binarySearch(offset)
if !found {
return index, 0, true, appendOnly
}
// after the binary search just iterate to find the remaining blocks
for _, blk := range bol.BlockList[index:] {
if blk.StartIndex > offset+length {
break
}
if currentBlockOffset >= blk.StartIndex && currentBlockOffset < blk.EndIndex && currentBlockOffset <= offset+length {
appendOnly = false
blk.Modified = true
currentBlockOffset = blk.EndIndex
size += blk.Size
}
}
return index, size, offset+length >= bol.BlockList[len(bol.BlockList)-1].EndIndex, appendOnly
}
// NewUUID returns a new uuid using RFC 4122 algorithm with the given length.
func NewUUID(length int64) []byte {
u := make([]byte, length)
// Set all bits to randomly (or pseudo-randomly) chosen values.
rand.Read(u[:])
u[8] = (u[8] | 0x40) & 0x7F // u.setVariant(ReservedRFC4122)
var version byte = 4
u[6] = (u[6] & 0xF) | (version << 4) // u.setVersion(4)
return u[:]
}

97
common/types_test.go Normal file
Просмотреть файл

@ -0,0 +1,97 @@
/*
_____ _____ _____ ____ ______ _____ ------
| | | | | | | | | | | | |
| | | | | | | | | | | | |
| --- | | | | |-----| |---- | | |-----| |----- ------
| | | | | | | | | | | | |
| ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____
Licensed under the MIT License <http://opensource.org/licenses/MIT>.
Copyright © 2020-2021 Microsoft Corporation. All rights reserved.
Author : <blobfusedev@microsoft.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
*/
package common
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
type typesTestSuite struct {
suite.Suite
assert *assert.Assertions
}
func (suite *typesTestSuite) SetupTest() {
suite.assert = assert.New(suite.T())
}
func TestGenerateConfig(t *testing.T) {
suite.Run(t, new(typesTestSuite))
}
func (suite *typesTestSuite) TestBinarySearch() {
blocksList := []*Block{
{StartIndex: 0, EndIndex: 4, Size: 4},
{StartIndex: 4, EndIndex: 7, Size: 3},
{StartIndex: 7, EndIndex: 12, Size: 5},
}
bol := BlockOffsetList{
BlockList: blocksList,
}
found, startingIndex := bol.binarySearch(5)
suite.assert.Equal(found, true)
suite.assert.Equal(startingIndex, 1)
found, startingIndex = bol.binarySearch(20)
suite.assert.Equal(found, false)
suite.assert.Equal(startingIndex, 3)
}
func (suite *typesTestSuite) TestFindBlocksToModify() {
blocksList := []*Block{
{StartIndex: 0, EndIndex: 4, Size: 4},
{StartIndex: 4, EndIndex: 7, Size: 3},
{StartIndex: 7, EndIndex: 12, Size: 5},
}
bol := BlockOffsetList{
BlockList: blocksList,
}
index, size, largerThanFile, _ := bol.FindBlocksToModify(3, 7)
suite.assert.Equal(index, 0)
suite.assert.Equal(size, int64(12))
suite.assert.Equal(largerThanFile, false)
index, size, largerThanFile, _ = bol.FindBlocksToModify(8, 10)
suite.assert.Equal(index, 2)
suite.assert.Equal(size, int64(5))
suite.assert.Equal(largerThanFile, true)
index, size, largerThanFile, appendOnly := bol.FindBlocksToModify(20, 20)
suite.assert.Equal(size, int64(0))
suite.assert.Equal(largerThanFile, true)
suite.assert.Equal(appendOnly, true)
}

Просмотреть файл

@ -352,10 +352,15 @@ func (az *AzStorage) ReadInBuffer(options internal.ReadInBufferOptions) (length
}
func (az *AzStorage) WriteFile(options internal.WriteFileOptions) (int, error) {
err := az.storage.WriteFromBuffer(options.Handle.Path, nil, options.Data)
err := az.storage.Write(options.Handle.Path, options.Offset, int64(len(options.Data)), options.Data, options.FileOffsets, options.ModBlockList)
return len(options.Data), err
}
func (az *AzStorage) GetFileBlockOffsets(options internal.GetFileBlockOffsetsOptions) (*common.BlockOffsetList, error) {
return az.storage.GetFileBlockOffsets(options.Name)
}
func (az *AzStorage) TruncateFile(options internal.TruncateFileOptions) error {
log.Trace("AzStorage::TruncateFile : %s to %d bytes", options.Name, options.Size)

Просмотреть файл

@ -37,8 +37,11 @@ import (
"blobfuse2/common"
"blobfuse2/common/log"
"blobfuse2/internal"
"bytes"
"context"
"encoding/base64"
"errors"
"math"
"net/url"
"os"
"path/filepath"
@ -653,6 +656,172 @@ func (bb *BlockBlob) WriteFromBuffer(name string, metadata map[string]string, da
return nil
}
// GetFileBlockOffsets: store blocks ids and corresponding offsets
func (bb *BlockBlob) GetFileBlockOffsets(name string) (*common.BlockOffsetList, error) {
var blockOffset int64 = 0
blockList := common.BlockOffsetList{}
blobURL := bb.Container.NewBlockBlobURL(filepath.Join(bb.Config.prefixPath, name))
storageBlockList, err := blobURL.GetBlockList(
context.Background(), azblob.BlockListCommitted, bb.blobAccCond.LeaseAccessConditions)
if err != nil {
log.Err("BlockBlob::GetFileBlockOffsets : Failed to get block list %s ", name, err.Error())
return &common.BlockOffsetList{}, err
}
for _, block := range *&storageBlockList.CommittedBlocks {
blk := &common.Block{
Id: block.Name,
StartIndex: int64(blockOffset),
EndIndex: int64(blockOffset) + block.Size,
Size: block.Size,
}
blockOffset += block.Size
blockList.BlockList = append(blockList.BlockList, blk)
}
return &blockList, nil
}
// create our definition of block
func (bb *BlockBlob) createBlock(blockIdLength, startIndex, size int64) *common.Block {
newBlockId := base64.StdEncoding.EncodeToString(common.NewUUID(blockIdLength))
newBlock := &common.Block{
Id: newBlockId,
StartIndex: startIndex,
EndIndex: startIndex + size,
Size: size,
Modified: true,
}
return newBlock
}
func (bb *BlockBlob) createNewBlocks(blockList *common.BlockOffsetList, offset, length, blockIdLength int64) int64 {
prevIndex := blockList.BlockList[len(blockList.BlockList)-1].EndIndex
// BufferSize is the size of the buffer that will go beyond our current blob (appended)
var bufferSize int64
for i := prevIndex; i < offset+length; i += bb.Config.blockSize {
// create a new block if we hit our block size
blkSize := int64(math.Min(float64(bb.Config.blockSize), float64((offset+length)-i)))
newBlock := bb.createBlock(blockIdLength, i, blkSize)
blockList.BlockList = append(blockList.BlockList, newBlock)
// reset the counter since it will help us to determine if there is leftovers at the end
bufferSize += blkSize
}
return bufferSize
}
// Write : write data at given offset to a blob
func (bb *BlockBlob) Write(name string, offset, length int64, data []byte, fileOffsets, modFileOffsets *common.BlockOffsetList) error {
defer log.TimeTrack(time.Now(), "BlockBlob::Write", name)
log.Trace("BlockBlob::Write : name %s offset %v", name, offset)
// tracks the case where our offset is great than our current file size (appending only - not modifying pre-existing data)
var dataBuffer *[]byte
// when the file offset mapping is cached we don't need to make a get block list call
if fileOffsets != nil && !fileOffsets.Cached {
var err error
fileOffsets, err = bb.GetFileBlockOffsets(name)
if err != nil {
return err
}
}
// case 1: file consists of no blocks (small file)
if fileOffsets != nil && len(fileOffsets.BlockList) == 0 {
// get all the data
oldData, _ := bb.ReadBuffer(name, 0, 0)
// update the data with the new data
// if we're only overwriting existing data
if int64(len(oldData)) >= offset+length {
copy(oldData[offset:], data)
dataBuffer = &oldData
// else appending and/or overwriting
} else {
// if the file is not empty then we need to combine the data
if len(oldData) > 0 {
// new data buffer with the size of old and new data
newDataBuffer := make([]byte, offset+length)
// copy the old data into it
// TODO: better way to do this?
if offset != 0 {
copy(newDataBuffer, oldData)
oldData = nil
}
// overwrite with the new data we want to add
copy(newDataBuffer[offset:], data)
dataBuffer = &newDataBuffer
} else {
dataBuffer = &data
}
}
// WriteFromBuffer should be able to handle the case where now the block is too big and gets split into multiple blocks
err := bb.WriteFromBuffer(name, nil, *dataBuffer)
if err != nil {
log.Err("BlockBlob::Write : Failed to upload to blob %s ", name, err.Error())
return err
}
// case 2: given offset is within the size of the blob - and the blob consists of multiple blocks
// case 3: new blocks need to be added
} else {
index, oldDataSize, exceedsFileBlocks, appendOnly := fileOffsets.FindBlocksToModify(offset, length)
// keeps track of how much new data will be appended to the end of the file (applicable only to case 3)
newBufferSize := int64(0)
// case 3?
if exceedsFileBlocks {
// get length of blockID in order to generate a consistent size block ID so storage does not throw
existingBlockId, _ := base64.StdEncoding.DecodeString(fileOffsets.BlockList[0].Id)
blockIdLength := len(existingBlockId)
newBufferSize = bb.createNewBlocks(fileOffsets, offset, length, int64(blockIdLength))
}
// buffer that holds that pre-existing data in those blocks we're interested in
oldDataBuffer := make([]byte, oldDataSize+newBufferSize)
if !appendOnly {
// fetch the blocks that will be impacted by the new changes so we can overwrite them
bb.ReadInBuffer(name, fileOffsets.BlockList[index].StartIndex, oldDataSize, oldDataBuffer)
}
// this gives us where the offset with respect to the buffer that holds our old data - so we can start writing the new data
blockOffset := offset - fileOffsets.BlockList[index].StartIndex
copy(oldDataBuffer[blockOffset:], data)
err := bb.stageAndCommitModifiedBlocks(name, oldDataBuffer, index, fileOffsets)
return err
}
return nil
}
// TODO: make a similar method facing stream that would enable us to write to cached blocks then stage and commit
func (bb *BlockBlob) stageAndCommitModifiedBlocks(name string, data []byte, index int, offsetList *common.BlockOffsetList) error {
blobURL := bb.Container.NewBlockBlobURL(filepath.Join(bb.Config.prefixPath, name))
blockOffset := int64(0)
var blockIDList []string
for _, blk := range offsetList.BlockList {
blockIDList = append(blockIDList, blk.Id)
if blk.Modified {
_, err := blobURL.StageBlock(context.Background(),
blk.Id,
bytes.NewReader(data[blockOffset:blk.Size+blockOffset]),
bb.blobAccCond.LeaseAccessConditions,
nil,
bb.downloadOptions.ClientProvidedKeyOptions)
if err != nil {
log.Err("BlockBlob::stageAndCommitModifiedBlocks : Failed to stage to blob %s at block %v (%s)", name, blockOffset, err.Error())
return err
}
blockOffset = blk.Size + blockOffset
}
}
_, err := blobURL.CommitBlockList(context.Background(),
blockIDList,
azblob.BlobHTTPHeaders{ContentType: getContentType(name)},
nil,
bb.blobAccCond,
bb.Config.defaultTier,
nil, // datalake doesn't support tags here
bb.downloadOptions.ClientProvidedKeyOptions)
if err != nil {
log.Err("BlockBlob::stageAndCommitModifiedBlocks : Failed to commit block list to blob %s (%s)", name, err.Error())
return err
}
return nil
}
// ChangeMod : Change mode of a blob
func (bb *BlockBlob) ChangeMod(name string, _ os.FileMode) error {
log.Trace("BlockBlob::ChangeMod : name %s", name)

Просмотреть файл

@ -40,19 +40,25 @@ import (
"blobfuse2/common/log"
"blobfuse2/internal"
"blobfuse2/internal/handlemap"
"bytes"
"container/list"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"strings"
"sync"
"syscall"
"testing"
"time"
"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
@ -60,6 +66,100 @@ import (
var ctx = context.Background()
// A UUID representation compliant with specification in RFC 4122 document.
type uuid [16]byte
const reservedRFC4122 byte = 0x40
func (u uuid) bytes() []byte {
return u[:]
}
// NewUUID returns a new uuid using RFC 4122 algorithm.
func newUUID() (u uuid) {
u = uuid{}
// Set all bits to randomly (or pseudo-randomly) chosen values.
rand.Read(u[:])
u[8] = (u[8] | reservedRFC4122) & 0x7F // u.setVariant(ReservedRFC4122)
var version byte = 4
u[6] = (u[6] & 0xF) | (version << 4) // u.setVersion(4)
return
}
// uploadReaderAtToBlockBlob uploads a buffer in blocks to a block blob.
func uploadReaderAtToBlockBlob(ctx context.Context, reader io.ReaderAt, readerSize, singleUploadSize int64,
blockBlobURL azblob.BlockBlobURL, o azblob.UploadToBlockBlobOptions) (azblob.CommonResponse, error) {
if o.BlockSize == 0 {
// If bufferSize > (BlockBlobMaxStageBlockBytes * BlockBlobMaxBlocks), then error
if readerSize > azblob.BlockBlobMaxStageBlockBytes*azblob.BlockBlobMaxBlocks {
return nil, errors.New("buffer is too large to upload to a block blob")
}
// If bufferSize <= singleUploadSize, then Upload should be used with just 1 I/O request
if readerSize <= singleUploadSize {
o.BlockSize = singleUploadSize // Default if unspecified
} else {
o.BlockSize = readerSize / azblob.BlockBlobMaxBlocks // buffer / max blocks = block size to use all 50,000 blocks
if o.BlockSize < azblob.BlobDefaultDownloadBlockSize { // If the block size is smaller than 4MB, round up to 4MB
o.BlockSize = azblob.BlobDefaultDownloadBlockSize
}
// StageBlock will be called with blockSize blocks and a Parallelism of (BufferSize / BlockSize).
}
}
if readerSize <= singleUploadSize {
// If the size can fit in 1 Upload call, do it this way
var body io.ReadSeeker = io.NewSectionReader(reader, 0, readerSize)
if o.Progress != nil {
body = pipeline.NewRequestBodyProgress(body, o.Progress)
}
return blockBlobURL.Upload(ctx, body, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions, o.BlobAccessTier, o.BlobTagsMap, o.ClientProvidedKeyOptions)
}
var numBlocks = uint16(((readerSize - 1) / o.BlockSize) + 1)
blockIDList := make([]string, numBlocks) // Base-64 encoded block IDs
progress := int64(0)
progressLock := &sync.Mutex{}
err := azblob.DoBatchTransfer(ctx, azblob.BatchTransferOptions{
OperationName: "uploadReaderAtToBlockBlob",
TransferSize: readerSize,
ChunkSize: o.BlockSize,
Parallelism: o.Parallelism,
Operation: func(offset int64, count int64, ctx context.Context) error {
// This function is called once per block.
// It is passed this block's offset within the buffer and its count of bytes
// Prepare to read the proper block/section of the buffer
var body io.ReadSeeker = io.NewSectionReader(reader, offset, count)
blockNum := offset / o.BlockSize
if o.Progress != nil {
blockProgress := int64(0)
body = pipeline.NewRequestBodyProgress(body,
func(bytesTransferred int64) {
diff := bytesTransferred - blockProgress
blockProgress = bytesTransferred
progressLock.Lock() // 1 goroutine at a time gets a progress report
progress += diff
o.Progress(progress)
progressLock.Unlock()
})
}
// Block IDs are unique values to avoid issue if 2+ clients are uploading blocks
// at the same time causing PutBlockList to get a mix of blocks from all the clients.
blockIDList[blockNum] = base64.StdEncoding.EncodeToString(newUUID().bytes())
_, err := blockBlobURL.StageBlock(ctx, blockIDList[blockNum], body, o.AccessConditions.LeaseAccessConditions, nil, o.ClientProvidedKeyOptions)
return err
},
})
if err != nil {
return nil, err
}
// All put blocks were successful, call Put Block List to finalize the blob
return blockBlobURL.CommitBlockList(ctx, blockIDList, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions, o.BlobAccessTier, o.BlobTagsMap, o.ClientProvidedKeyOptions)
}
type blockBlobTestSuite struct {
suite.Suite
assert *assert.Assertions
@ -908,7 +1008,7 @@ func (s *blockBlobTestSuite) TestReadFile() {
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
h, _ = s.az.OpenFile(internal.OpenFileOptions{Name: name})
output, err := s.az.ReadFile(internal.ReadFileOptions{Handle: h})
@ -934,7 +1034,7 @@ func (s *blockBlobTestSuite) TestReadInBuffer() {
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
h, _ = s.az.OpenFile(internal.OpenFileOptions{Name: name})
output := make([]byte, 5)
@ -951,7 +1051,7 @@ func (s *blockBlobTestSuite) TestReadInBufferLargeBuffer() {
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
h, _ = s.az.OpenFile(internal.OpenFileOptions{Name: name})
output := make([]byte, 1000) // Testing that passing in a super large buffer will still work
@ -1005,7 +1105,7 @@ func (s *blockBlobTestSuite) TestWriteFile() {
testData := "test data"
data := []byte(testData)
count, err := s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
count, err := s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
s.assert.EqualValues(len(data), count)
@ -1025,7 +1125,7 @@ func (s *blockBlobTestSuite) TestTruncateFileSmaller() {
testData := "test data"
data := []byte(testData)
truncatedLength := 5
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
err := s.az.TruncateFile(internal.TruncateFileOptions{Name: name, Size: int64(truncatedLength)})
s.assert.Nil(err)
@ -1047,7 +1147,7 @@ func (s *blockBlobTestSuite) TestTruncateFileEqual() {
testData := "test data"
data := []byte(testData)
truncatedLength := 9
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
err := s.az.TruncateFile(internal.TruncateFileOptions{Name: name, Size: int64(truncatedLength)})
s.assert.Nil(err)
@ -1069,7 +1169,7 @@ func (s *blockBlobTestSuite) TestTruncateFileBigger() {
testData := "test data"
data := []byte(testData)
truncatedLength := 15
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
err := s.az.TruncateFile(internal.TruncateFileOptions{Name: name, Size: int64(truncatedLength)})
s.assert.Nil(err)
@ -1093,7 +1193,7 @@ func (s *blockBlobTestSuite) TestTruncateFileError() {
s.assert.EqualValues(syscall.ENOENT, err)
}
func (s *blockBlobTestSuite) TestCopyToFile() {
func (s *blockBlobTestSuite) TestWriteSmallFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
@ -1101,11 +1201,12 @@ func (s *blockBlobTestSuite) TestCopyToFile() {
testData := "test data"
data := []byte(testData)
dataLen := len(data)
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
_, err := s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
f, _ := ioutil.TempFile("", name+".tmp")
defer os.Remove(f.Name())
err := s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
output := make([]byte, len(data))
@ -1117,6 +1218,297 @@ func (s *blockBlobTestSuite) TestCopyToFile() {
f.Close()
}
func (s *blockBlobTestSuite) TestOverwriteSmallFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test-replace-data"
data := []byte(testData)
dataLen := len(data)
_, err := s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
f, _ := ioutil.TempFile("", name+".tmp")
defer os.Remove(f.Name())
newTestData := []byte("newdata")
_, err = s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 5, Data: newTestData, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
currentData := []byte("test-newdata-data")
output := make([]byte, len(currentData))
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
f, _ = os.Open(f.Name())
len, err := f.Read(output)
s.assert.Nil(err)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(currentData, output)
f.Close()
}
func (s *blockBlobTestSuite) TestOverwriteAndAppendToSmallFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test-data"
data := []byte(testData)
_, err := s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
f, _ := ioutil.TempFile("", name+".tmp")
defer os.Remove(f.Name())
newTestData := []byte("newdata")
_, err = s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 5, Data: newTestData, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
currentData := []byte("test-newdata")
dataLen := len(currentData)
output := make([]byte, dataLen)
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
f, _ = os.Open(f.Name())
len, err := f.Read(output)
s.assert.Nil(err)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(currentData, output)
f.Close()
}
func (s *blockBlobTestSuite) TestAppendToSmallFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test-data"
data := []byte(testData)
_, err := s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
f, _ := ioutil.TempFile("", name+".tmp")
defer os.Remove(f.Name())
newTestData := []byte("-newdata")
_, err = s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 9, Data: newTestData, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
currentData := []byte("test-data-newdata")
dataLen := len(currentData)
output := make([]byte, dataLen)
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
f, _ = os.Open(f.Name())
len, err := f.Read(output)
s.assert.Nil(err)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(currentData, output)
f.Close()
}
func (s *blockBlobTestSuite) TestAppendOffsetLargerThanSmallFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test-data"
data := []byte(testData)
_, err := s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
f, _ := ioutil.TempFile("", name+".tmp")
defer os.Remove(f.Name())
newTestData := []byte("newdata")
_, err = s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 12, Data: newTestData, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
currentData := []byte("test-data\x00\x00\x00newdata")
dataLen := len(currentData)
output := make([]byte, dataLen)
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
f, _ = os.Open(f.Name())
len, err := f.Read(output)
s.assert.Nil(err)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(currentData, output)
f.Close()
}
// This test is a regular blob (without blocks) and we're adding data that will cause it to create blocks
func (s *blockBlobTestSuite) TestAppendBlocksToSmallFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test-data"
data := []byte(testData)
// use our method to make the max upload size (size before a blob is broken down to blocks) to 9 Bytes
_, err := uploadReaderAtToBlockBlob(ctx, bytes.NewReader(data), int64(len(data)), 9, s.containerUrl.NewBlockBlobURL(name), azblob.UploadToBlockBlobOptions{
BlockSize: 8,
})
s.assert.Nil(err)
f, _ := ioutil.TempFile("", name+".tmp")
defer os.Remove(f.Name())
newTestData := []byte("-newdata-newdata-newdata")
_, err = s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 9, Data: newTestData, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
currentData := []byte("test-data-newdata-newdata-newdata")
dataLen := len(currentData)
output := make([]byte, dataLen)
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
f, _ = os.Open(f.Name())
len, err := f.Read(output)
s.assert.Nil(err)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(currentData, output)
f.Close()
}
func (s *blockBlobTestSuite) TestOverwriteBlocks() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "testdatates1dat1tes2dat2tes3dat3tes4dat4"
data := []byte(testData)
// use our method to make the max upload size (size before a blob is broken down to blocks) to 4 Bytes
_, err := uploadReaderAtToBlockBlob(ctx, bytes.NewReader(data), int64(len(data)), 4, s.containerUrl.NewBlockBlobURL(name), azblob.UploadToBlockBlobOptions{
BlockSize: 4,
})
s.assert.Nil(err)
f, _ := ioutil.TempFile("", name+".tmp")
defer os.Remove(f.Name())
newTestData := []byte("cake")
_, err = s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 16, Data: newTestData, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
currentData := []byte("testdatates1dat1cakedat2tes3dat3tes4dat4")
dataLen := len(currentData)
output := make([]byte, dataLen)
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
f, _ = os.Open(f.Name())
len, err := f.Read(output)
s.assert.Nil(err)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(currentData, output)
f.Close()
}
func (s *blockBlobTestSuite) TestOverwriteAndAppendBlocks() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "testdatates1dat1tes2dat2tes3dat3tes4dat4"
data := []byte(testData)
// use our method to make the max upload size (size before a blob is broken down to blocks) to 4 Bytes
_, err := uploadReaderAtToBlockBlob(ctx, bytes.NewReader(data), int64(len(data)), 4, s.containerUrl.NewBlockBlobURL(name), azblob.UploadToBlockBlobOptions{
BlockSize: 4,
})
s.assert.Nil(err)
f, _ := ioutil.TempFile("", name+".tmp")
defer os.Remove(f.Name())
newTestData := []byte("43211234cake")
_, err = s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 32, Data: newTestData, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
currentData := []byte("testdatates1dat1tes2dat2tes3dat343211234cake")
dataLen := len(currentData)
output := make([]byte, dataLen)
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
f, _ = os.Open(f.Name())
len, err := f.Read(output)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(currentData, output)
f.Close()
}
func (s *blockBlobTestSuite) TestAppendBlocks() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "testdatates1dat1tes2dat2tes3dat3tes4dat4"
data := []byte(testData)
// use our method to make the max upload size (size before a blob is broken down to blocks) to 4 Bytes
_, err := uploadReaderAtToBlockBlob(ctx, bytes.NewReader(data), int64(len(data)), 4, s.containerUrl.NewBlockBlobURL(name), azblob.UploadToBlockBlobOptions{
BlockSize: 4,
})
s.assert.Nil(err)
f, _ := ioutil.TempFile("", name+".tmp")
defer os.Remove(f.Name())
newTestData := []byte("43211234cake")
_, err = s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: newTestData, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
currentData := []byte("43211234cakedat1tes2dat2tes3dat3tes4dat4")
dataLen := len(currentData)
output := make([]byte, dataLen)
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
f, _ = os.Open(f.Name())
len, err := f.Read(output)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(currentData, output)
f.Close()
}
func (s *blockBlobTestSuite) TestAppendOffsetLargerThanSize() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "testdatates1dat1tes2dat2tes3dat3tes4dat4"
data := []byte(testData)
// use our method to make the max upload size (size before a blob is broken down to blocks) to 4 Bytes
_, err := uploadReaderAtToBlockBlob(ctx, bytes.NewReader(data), int64(len(data)), 4, s.containerUrl.NewBlockBlobURL(name), azblob.UploadToBlockBlobOptions{
BlockSize: 4,
})
s.assert.Nil(err)
f, _ := ioutil.TempFile("", name+".tmp")
defer os.Remove(f.Name())
newTestData := []byte("43211234cake")
_, err = s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 45, Data: newTestData, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
currentData := []byte("testdatates1dat1tes2dat2tes3dat3tes4dat4\x00\x00\x00\x00\x0043211234cake")
dataLen := len(currentData)
output := make([]byte, dataLen)
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
f, _ = os.Open(f.Name())
len, err := f.Read(output)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(currentData, output)
f.Close()
}
func (s *blockBlobTestSuite) TestCopyToFileError() {
defer s.cleanupTest()
// Setup
@ -1251,7 +1643,7 @@ func (s *blockBlobTestSuite) TestGetAttrFileSize() {
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
props, err := s.az.GetAttr(internal.GetAttrOptions{Name: name})
s.assert.Nil(err)
@ -1268,7 +1660,7 @@ func (s *blockBlobTestSuite) TestGetAttrFileTime() {
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
before, err := s.az.GetAttr(internal.GetAttrOptions{Name: name})
s.assert.Nil(err)
@ -1276,7 +1668,7 @@ func (s *blockBlobTestSuite) TestGetAttrFileTime() {
time.Sleep(time.Second * 3) // Wait 3 seconds and then modify the file again
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
after, err := s.az.GetAttr(internal.GetAttrOptions{Name: name})
s.assert.Nil(err)

Просмотреть файл

@ -34,6 +34,7 @@
package azstorage
import (
"blobfuse2/common"
"blobfuse2/common/log"
"blobfuse2/internal"
"net/url"
@ -111,6 +112,8 @@ type AzConnection interface {
WriteFromFile(name string, metadata map[string]string, fi *os.File) error
WriteFromBuffer(name string, metadata map[string]string, data []byte) error
Write(name string, offset int64, len int64, data []byte, fileOffsets, modBlockList *common.BlockOffsetList) error
GetFileBlockOffsets(name string) (*common.BlockOffsetList, error)
ChangeMod(string, os.FileMode) error
ChangeOwner(string, int, int) error

Просмотреть файл

@ -500,6 +500,15 @@ func (dl *Datalake) WriteFromBuffer(name string, metadata map[string]string, dat
return dl.BlockBlob.WriteFromBuffer(name, metadata, data)
}
// Write : Write to a file at given offset
func (dl *Datalake) Write(name string, offset int64, len int64, data []byte, fileOffsets, modBlockList *common.BlockOffsetList) error {
return dl.BlockBlob.Write(name, offset, len, data, fileOffsets, modBlockList)
}
func (dl *Datalake) GetFileBlockOffsets(name string) (*common.BlockOffsetList, error) {
return dl.BlockBlob.GetFileBlockOffsets(name)
}
// ChangeMod : Change mode of a path
func (dl *Datalake) ChangeMod(name string, mode os.FileMode) error {
log.Trace("Datalake::ChangeMod : Change mode of file %s to %s", name, mode)

Просмотреть файл

@ -39,6 +39,7 @@ import (
"blobfuse2/common/log"
"blobfuse2/internal"
"blobfuse2/internal/handlemap"
"bytes"
"container/list"
"encoding/json"
"fmt"
@ -691,6 +692,349 @@ func (s *datalakeTestSuite) TestCreateFile() {
s.assert.Empty(props.XMsProperties())
}
func (s *datalakeTestSuite) TestWriteSmallFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
dataLen := len(data)
_, err := s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
f, _ := ioutil.TempFile("", name+".tmp")
defer os.Remove(f.Name())
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
output := make([]byte, len(data))
f, _ = os.Open(f.Name())
len, err := f.Read(output)
s.assert.Nil(err)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(testData, output)
f.Close()
}
func (s *datalakeTestSuite) TestOverwriteSmallFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test-replace-data"
data := []byte(testData)
dataLen := len(data)
_, err := s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
f, _ := ioutil.TempFile("", name+".tmp")
defer os.Remove(f.Name())
newTestData := []byte("newdata")
_, err = s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 5, Data: newTestData, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
currentData := []byte("test-newdata-data")
output := make([]byte, len(currentData))
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
f, _ = os.Open(f.Name())
len, err := f.Read(output)
s.assert.Nil(err)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(currentData, output)
f.Close()
}
func (s *datalakeTestSuite) TestOverwriteAndAppendToSmallFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test-data"
data := []byte(testData)
_, err := s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
f, _ := ioutil.TempFile("", name+".tmp")
defer os.Remove(f.Name())
newTestData := []byte("newdata")
_, err = s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 5, Data: newTestData, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
currentData := []byte("test-newdata")
dataLen := len(currentData)
output := make([]byte, dataLen)
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
f, _ = os.Open(f.Name())
len, err := f.Read(output)
s.assert.Nil(err)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(currentData, output)
f.Close()
}
func (s *datalakeTestSuite) TestAppendOffsetLargerThanSmallFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test-data"
data := []byte(testData)
_, err := s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
f, _ := ioutil.TempFile("", name+".tmp")
defer os.Remove(f.Name())
newTestData := []byte("newdata")
_, err = s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 12, Data: newTestData, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
currentData := []byte("test-data\x00\x00\x00newdata")
dataLen := len(currentData)
output := make([]byte, dataLen)
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
f, _ = os.Open(f.Name())
len, err := f.Read(output)
s.assert.Nil(err)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(currentData, output)
f.Close()
}
func (s *datalakeTestSuite) TestAppendToSmallFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test-data"
data := []byte(testData)
_, err := s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
f, _ := ioutil.TempFile("", name+".tmp")
defer os.Remove(f.Name())
newTestData := []byte("-newdata")
_, err = s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 9, Data: newTestData, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
currentData := []byte("test-data-newdata")
dataLen := len(currentData)
output := make([]byte, dataLen)
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
f, _ = os.Open(f.Name())
len, err := f.Read(output)
s.assert.Nil(err)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(currentData, output)
f.Close()
}
// This test is a regular blob (without blocks) and we're adding data that will cause it to create blocks
func (s *datalakeTestSuite) TestAppendBlocksToSmallFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test-data"
data := []byte(testData)
// use our method to make the max upload size (size before a blob is broken down to blocks) to 9 Bytes
_, err := uploadReaderAtToBlockBlob(
ctx, bytes.NewReader(data),
int64(len(data)),
9,
s.az.storage.(*Datalake).BlockBlob.Container.NewBlockBlobURL(name),
azblob.UploadToBlockBlobOptions{
BlockSize: 8,
})
s.assert.Nil(err)
f, _ := ioutil.TempFile("", name+".tmp")
defer os.Remove(f.Name())
newTestData := []byte("-newdata-newdata-newdata")
_, err = s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 9, Data: newTestData, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
currentData := []byte("test-data-newdata-newdata-newdata")
dataLen := len(currentData)
output := make([]byte, dataLen)
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
f, _ = os.Open(f.Name())
len, err := f.Read(output)
s.assert.Nil(err)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(currentData, output)
f.Close()
}
func (s *datalakeTestSuite) TestOverwriteBlocks() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "testdatates1dat1tes2dat2tes3dat3tes4dat4"
data := []byte(testData)
// use our method to make the max upload size (size before a blob is broken down to blocks) to 4 Bytes
_, err := uploadReaderAtToBlockBlob(
ctx,
bytes.NewReader(data),
int64(len(data)),
4,
s.az.storage.(*Datalake).BlockBlob.Container.NewBlockBlobURL(name),
azblob.UploadToBlockBlobOptions{
BlockSize: 4,
})
s.assert.Nil(err)
f, _ := ioutil.TempFile("", name+".tmp")
defer os.Remove(f.Name())
newTestData := []byte("cake")
_, err = s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 16, Data: newTestData, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
currentData := []byte("testdatates1dat1cakedat2tes3dat3tes4dat4")
dataLen := len(currentData)
output := make([]byte, dataLen)
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
f, _ = os.Open(f.Name())
len, err := f.Read(output)
s.assert.Nil(err)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(currentData, output)
f.Close()
}
func (s *datalakeTestSuite) TestOverwriteAndAppendBlocks() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "testdatates1dat1tes2dat2tes3dat3tes4dat4"
data := []byte(testData)
// use our method to make the max upload size (size before a blob is broken down to blocks) to 4 Bytes
_, err := uploadReaderAtToBlockBlob(
ctx,
bytes.NewReader(data),
int64(len(data)),
4,
s.az.storage.(*Datalake).BlockBlob.Container.NewBlockBlobURL(name),
azblob.UploadToBlockBlobOptions{
BlockSize: 4,
})
s.assert.Nil(err)
f, _ := ioutil.TempFile("", name+".tmp")
defer os.Remove(f.Name())
newTestData := []byte("43211234cake")
_, err = s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 32, Data: newTestData, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
currentData := []byte("testdatates1dat1tes2dat2tes3dat343211234cake")
dataLen := len(currentData)
output := make([]byte, dataLen)
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
f, _ = os.Open(f.Name())
len, err := f.Read(output)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(currentData, output)
f.Close()
}
func (s *datalakeTestSuite) TestAppendBlocks() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "testdatates1dat1tes2dat2tes3dat3tes4dat4"
data := []byte(testData)
// use our method to make the max upload size (size before a blob is broken down to blocks) to 4 Bytes
_, err := uploadReaderAtToBlockBlob(ctx,
bytes.NewReader(data),
int64(len(data)),
4,
s.az.storage.(*Datalake).BlockBlob.Container.NewBlockBlobURL(name),
azblob.UploadToBlockBlobOptions{
BlockSize: 4,
})
s.assert.Nil(err)
f, _ := ioutil.TempFile("", name+".tmp")
defer os.Remove(f.Name())
newTestData := []byte("43211234cake")
_, err = s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: newTestData, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
currentData := []byte("43211234cakedat1tes2dat2tes3dat3tes4dat4")
dataLen := len(currentData)
output := make([]byte, dataLen)
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
f, _ = os.Open(f.Name())
len, err := f.Read(output)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(currentData, output)
f.Close()
}
func (s *datalakeTestSuite) TestAppendOffsetLargerThanSize() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "testdatates1dat1tes2dat2tes3dat3tes4dat4"
data := []byte(testData)
// use our method to make the max upload size (size before a blob is broken down to blocks) to 4 Bytes
_, err := uploadReaderAtToBlockBlob(ctx,
bytes.NewReader(data),
int64(len(data)),
4,
s.az.storage.(*Datalake).BlockBlob.Container.NewBlockBlobURL(name),
azblob.UploadToBlockBlobOptions{
BlockSize: 4,
})
s.assert.Nil(err)
f, _ := ioutil.TempFile("", name+".tmp")
defer os.Remove(f.Name())
newTestData := []byte("43211234cake")
_, err = s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 45, Data: newTestData, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
currentData := []byte("testdatates1dat1tes2dat2tes3dat3tes4dat4\x00\x00\x00\x00\x0043211234cake")
dataLen := len(currentData)
output := make([]byte, dataLen)
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
f, _ = os.Open(f.Name())
len, err := f.Read(output)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(currentData, output)
f.Close()
}
func (s *datalakeTestSuite) TestOpenFile() {
defer s.cleanupTest()
// Setup
@ -856,7 +1200,7 @@ func (s *datalakeTestSuite) TestReadFile() {
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
h, _ = s.az.OpenFile(internal.OpenFileOptions{Name: name})
output, err := s.az.ReadFile(internal.ReadFileOptions{Handle: h})
@ -882,7 +1226,7 @@ func (s *datalakeTestSuite) TestReadInBuffer() {
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
h, _ = s.az.OpenFile(internal.OpenFileOptions{Name: name})
output := make([]byte, 5)
@ -899,7 +1243,7 @@ func (s *datalakeTestSuite) TestReadInBufferLargeBuffer() {
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
h, _ = s.az.OpenFile(internal.OpenFileOptions{Name: name})
output := make([]byte, 1000) // Testing that passing in a super large buffer will still work
@ -953,7 +1297,7 @@ func (s *datalakeTestSuite) TestWriteFile() {
testData := "test data"
data := []byte(testData)
count, err := s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
count, err := s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
s.assert.Nil(err)
s.assert.EqualValues(len(data), count)
@ -973,7 +1317,7 @@ func (s *datalakeTestSuite) TestTruncateFileSmaller() {
testData := "test data"
data := []byte(testData)
truncatedLength := 5
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
err := s.az.TruncateFile(internal.TruncateFileOptions{Name: name, Size: int64(truncatedLength)})
s.assert.Nil(err)
@ -995,7 +1339,7 @@ func (s *datalakeTestSuite) TestTruncateFileEqual() {
testData := "test data"
data := []byte(testData)
truncatedLength := 9
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
err := s.az.TruncateFile(internal.TruncateFileOptions{Name: name, Size: int64(truncatedLength)})
s.assert.Nil(err)
@ -1017,7 +1361,7 @@ func (s *datalakeTestSuite) TestTruncateFileBigger() {
testData := "test data"
data := []byte(testData)
truncatedLength := 15
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
err := s.az.TruncateFile(internal.TruncateFileOptions{Name: name, Size: int64(truncatedLength)})
s.assert.Nil(err)
@ -1049,7 +1393,7 @@ func (s *datalakeTestSuite) TestCopyToFile() {
testData := "test data"
data := []byte(testData)
dataLen := len(data)
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
f, _ := ioutil.TempFile("", name+".tmp")
defer os.Remove(f.Name())
@ -1197,7 +1541,7 @@ func (s *datalakeTestSuite) TestGetAttrFileSize() {
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
props, err := s.az.GetAttr(internal.GetAttrOptions{Name: name})
s.assert.Nil(err)
@ -1214,7 +1558,7 @@ func (s *datalakeTestSuite) TestGetAttrFileTime() {
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
before, err := s.az.GetAttr(internal.GetAttrOptions{Name: name})
s.assert.Nil(err)
@ -1222,7 +1566,7 @@ func (s *datalakeTestSuite) TestGetAttrFileTime() {
time.Sleep(time.Second * 3) // Wait 3 seconds and then modify the file again
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data, FileOffsets: &common.BlockOffsetList{}})
time.Sleep(time.Second * 1)
after, err := s.az.GetAttr(internal.GetAttrOptions{Name: name})

Просмотреть файл

@ -601,9 +601,11 @@ func libfuse_write(path *C.char, buf *C.char, size C.size_t, off C.off_t, fi *C.
data := (*[1 << 30]byte)(unsafe.Pointer(buf))
bytesWritten, err := fuseFS.NextComponent().WriteFile(
internal.WriteFileOptions{
Handle: handle,
Offset: int64(offset),
Data: data[:size],
Handle: handle,
Offset: int64(offset),
Data: data[:size],
FileOffsets: &common.BlockOffsetList{},
ModBlockList: &common.BlockOffsetList{},
})
if err != nil {

Просмотреть файл

@ -625,9 +625,11 @@ func libfuse_write(path *C.char, buf *C.char, size C.size_t, off C.off_t, fi *C.
data := (*[1 << 30]byte)(unsafe.Pointer(buf))
bytesWritten, err := fuseFS.NextComponent().WriteFile(
internal.WriteFileOptions{
Handle: handle,
Offset: int64(offset),
Data: data[:size],
Handle: handle,
Offset: int64(offset),
Data: data[:size],
FileOffsets: &common.BlockOffsetList{},
ModBlockList: &common.BlockOffsetList{},
})
if err != nil {

Просмотреть файл

@ -303,7 +303,6 @@ func (st *Stream) copyCachedBlock(fileKey string, handle *handlemap.Handle, offs
}
func (st *Stream) ReadInBuffer(options internal.ReadInBufferOptions) (int, error) {
// later on this fileKey can either be the path or handle
fileKey := st.getFileKey(options.Handle.Path, options.Handle.ID)
// if we're only streaming then avoid using the cache
if st.streamOnly {
@ -317,6 +316,13 @@ func (st *Stream) ReadInBuffer(options internal.ReadInBufferOptions) (int, error
return st.copyCachedBlock(fileKey, options.Handle, options.Offset, options.Data)
}
func (st *Stream) WriteFile(options internal.WriteFileOptions) (int, error) {
// if len(options.FileOffsets) == 0 {
// }
return st.NextComponent().WriteFile(options)
}
func (st *Stream) CloseFile(options internal.CloseFileOptions) error {
log.Trace("Stream::CloseFile : name=%s, handle=%d", options.Handle.Path, options.Handle.ID)
st.NextComponent().CloseFile(options)

Просмотреть файл

@ -34,6 +34,7 @@
package internal
import (
"blobfuse2/common"
"blobfuse2/internal/handlemap"
"context"
)
@ -307,3 +308,10 @@ func (base *BaseComponent) InvalidateObject(name string) {
base.next.InvalidateObject(name)
}
}
func (base *BaseComponent) GetFileBlockOffsets(options GetFileBlockOffsetsOptions) (*common.BlockOffsetList, error) {
if base.next != nil {
base.next.InvalidateObject(name)
}
return &common.BlockOffsetList{}, nil
}

Просмотреть файл

@ -34,6 +34,7 @@
package internal
import (
"blobfuse2/common"
"blobfuse2/internal/handlemap"
"context"
)
@ -132,4 +133,5 @@ type Component interface {
Chown(ChownOptions) error
//InvalidateObject: function used to clear any inode information relating to a particular fs object
InvalidateObject(string) // TODO: What does this do? Why do we need it if its a noop?
GetFileBlockOffsets(options GetFileBlockOffsetsOptions) (*common.BlockOffsetList, error)
}

Просмотреть файл

@ -34,6 +34,7 @@
package internal
import (
"blobfuse2/common"
"blobfuse2/internal/handlemap"
"os"
)
@ -110,9 +111,15 @@ type ReadInBufferOptions struct {
}
type WriteFileOptions struct {
Handle *handlemap.Handle
Offset int64
Data []byte
Handle *handlemap.Handle
Offset int64
Data []byte
FileOffsets *common.BlockOffsetList
ModBlockList *common.BlockOffsetList
}
type GetFileBlockOffsetsOptions struct {
Name string
}
type TruncateFileOptions struct {

Просмотреть файл

@ -41,6 +41,7 @@ import (
handlemap "blobfuse2/internal/handlemap"
context "context"
reflect "reflect"
common "blobfuse2/common"
gomock "github.com/golang/mock/gomock"
)
@ -308,6 +309,22 @@ func (mr *MockComponentMockRecorder) InvalidateObject(arg0 interface{}) *gomock.
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvalidateObject", reflect.TypeOf((*MockComponent)(nil).InvalidateObject), arg0)
}
// GetFileBlockOffsets mocks base method.
func (m *MockComponent) GetFileBlockOffsets(arg0 GetFileBlockOffsetsOptions)(*common.BlockOffsetList, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetFileBlockOffsets", arg0)
ret0, _ := ret[0].(*common.BlockOffsetList)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetFileBlockOffsets maps offsets to block ids.
func (mr *MockComponentMockRecorder) GetFileBlockOffsets(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFileBlockOffsets", reflect.TypeOf((*MockComponent)(nil).GetFileBlockOffsets), arg0)
}
// IsDirEmpty mocks base method.
func (m *MockComponent) IsDirEmpty(arg0 IsDirEmptyOptions) bool {
m.ctrl.T.Helper()