This commit is contained in:
Tamer Sherif 2022-02-24 15:15:11 -08:00
Родитель f7fa00fdcd
Коммит 948ba17ccb
8 изменённых файлов: 77 добавлений и 59 удалений

Просмотреть файл

@ -178,23 +178,30 @@ type Block struct {
}
// list that holds blocks containing ids and corresponding offsets
type BlockOffsetList []*Block
type BlockOffsetList struct {
BlockOffsetList []*Block //blockId to offset mapping
SmallBlob bool // does it consist of blocks
Cached bool // is it cached?
}
func (bol BlockOffsetList) FindBlocksToModify(offset, length int64) (BlockOffsetList, int64, bool) {
func (bol BlockOffsetList) FindBlocksToModify(offset, length int64) (*BlockOffsetList, int64, bool) {
// size of mod block list
size := int64(0)
currentBlockOffset := offset
var modBlockList BlockOffsetList
modBlockList := BlockOffsetList{}
// TODO: change this to binary search (logn) for better perf
for _, blk := range bol {
for _, blk := range bol.BlockOffsetList {
if blk.StartIndex > offset+length {
break
}
if currentBlockOffset >= blk.StartIndex && currentBlockOffset <= blk.EndIndex && currentBlockOffset <= offset+length {
modBlockList = append(modBlockList, blk)
size += blk.Size
modBlockList.BlockOffsetList = append(modBlockList.BlockOffsetList, blk)
currentBlockOffset = blk.EndIndex
size += blk.Size
}
}
// return: block list subset affected, size of mod data, does the new data exceed current size?
return modBlockList, size, offset+length >= bol[len(bol)-1].EndIndex
return &modBlockList, size, offset+length >= bol.BlockOffsetList[len(bol.BlockOffsetList)-1].EndIndex
}
// NewUUID returns a new uuid using RFC 4122 algorithm with the given length.

Просмотреть файл

@ -352,11 +352,11 @@ func (az *AzStorage) ReadInBuffer(options internal.ReadInBufferOptions) (length
}
func (az *AzStorage) WriteFile(options internal.WriteFileOptions) (int, error) {
err := az.storage.Write(options.Handle.Path, options.Offset, int64(len(options.Data)), options.Data, options.FileOffsets)
err := az.storage.Write(options.Handle.Path, options.Offset, int64(len(options.Data)), options.Data, options.FileOffsets, options.ModBlockList)
return len(options.Data), err
}
func (az *AzStorage) GetFileBlockOffsets(options internal.GetFileBlockOffsetsOptions) (common.BlockOffsetList, bool, error) {
func (az *AzStorage) GetFileBlockOffsets(options internal.GetFileBlockOffsetsOptions) (*common.BlockOffsetList, error) {
return az.storage.GetFileBlockOffsets(options.Name)
}

Просмотреть файл

@ -42,6 +42,7 @@ import (
"context"
"encoding/base64"
"errors"
"fmt"
"net/url"
"os"
"path/filepath"
@ -657,15 +658,15 @@ func (bb *BlockBlob) WriteFromBuffer(name string, metadata map[string]string, da
}
// GetFileBlockOffsets: store blocks ids and corresponding offsets
func (bb *BlockBlob) GetFileBlockOffsets(name string) (common.BlockOffsetList, bool, error) {
func (bb *BlockBlob) GetFileBlockOffsets(name string) (*common.BlockOffsetList, error) {
var blockOffset int64 = 0
var blockList common.BlockOffsetList
blockList := common.BlockOffsetList{}
blobURL := bb.Container.NewBlockBlobURL(filepath.Join(bb.Config.prefixPath, name))
storageBlockList, err := blobURL.GetBlockList(
context.Background(), azblob.BlockListCommitted, bb.blobAccCond.LeaseAccessConditions)
if err != nil {
log.Err("BlockBlob::GetFileBlockOffsets : Failed to get block list %s ", name, err.Error())
return nil, false, err
return &common.BlockOffsetList{}, err
}
for _, block := range *&storageBlockList.CommittedBlocks {
blk := &common.Block{
@ -675,10 +676,18 @@ func (bb *BlockBlob) GetFileBlockOffsets(name string) (common.BlockOffsetList, b
Size: block.Size,
}
blockOffset += block.Size
blockList = append(blockList, blk)
blockList.BlockOffsetList = append(blockList.BlockOffsetList, blk)
}
if blockOffset == 0 {
blockList.SmallBlob = true
}
// return block list of the blob, does the blob consist of blocks?, err
return blockList, len(blockList) > 0, nil
for _, v := range blockList.BlockOffsetList {
fmt.Println(v.StartIndex, v.EndIndex)
}
return &blockList, nil
}
// create our definition of block
@ -693,7 +702,8 @@ func (bb *BlockBlob) createBlock(blockIdLength, startIndex, size int64) *common.
return newBlock
}
func (bb *BlockBlob) createNewBlocks(modBlockList common.BlockOffsetList, blockList common.BlockOffsetList, offset int64, length int64, blockIdLength int64) (bool, int64, common.BlockOffsetList, common.BlockOffsetList) {
func (bb *BlockBlob) createNewBlocks(modBlockList, blockList *common.BlockOffsetList, offset, length, blockIdLength int64) (bool, int64) {
prevIndex := blockList.BlockOffsetList[len(blockList.BlockOffsetList)-1].EndIndex
// BufferSize is the size of the buffer that will go beyond our current blob (appended)
var bufferSize int64
// counter will help us keep track of how many blocks we've created
@ -701,23 +711,23 @@ func (bb *BlockBlob) createNewBlocks(modBlockList common.BlockOffsetList, blockL
// appendOnly means there is no overlap at all with the blob - data only being appended
appendOnly := false
// if modBlockList is not of size 0 then we know that we're overwriting part of our current data - with new data also being appended to the end
if len(modBlockList) != 0 {
bufferSize = length - (blockList[len(blockList)-1].EndIndex - offset)
if len(modBlockList.BlockOffsetList) != 0 {
bufferSize = length - (prevIndex - offset)
// only new data at the end basically offset => lastBlock.EndIndex
} else {
bufferSize = (offset + length) - blockList[len(blockList)-1].EndIndex
bufferSize = (offset + length) - prevIndex
appendOnly = true
}
// reset the offset to be the last index in the block list since we are adding new blocks
offset = blockList[len(blockList)-1].EndIndex
offset = prevIndex
startIndex := offset
for i := 0; i < int(bufferSize); i++ {
counter += 1
// create a new block if we hit our block size
if int64(i)%bb.Config.blockSize == 0 && i != 0 {
newBlock := bb.createBlock(blockIdLength, startIndex, bb.Config.blockSize)
modBlockList = append(modBlockList, newBlock)
blockList = append(blockList, newBlock)
modBlockList.BlockOffsetList = append(modBlockList.BlockOffsetList, newBlock)
blockList.BlockOffsetList = append(blockList.BlockOffsetList, newBlock)
startIndex = newBlock.EndIndex
// reset the counter since it will help us to determine if there is leftovers at the end
counter = 0
@ -725,39 +735,36 @@ func (bb *BlockBlob) createNewBlocks(modBlockList common.BlockOffsetList, blockL
}
if counter != 0 {
newBlock := bb.createBlock(blockIdLength, startIndex, counter)
modBlockList = append(modBlockList, newBlock)
blockList = append(blockList, newBlock)
modBlockList.BlockOffsetList = append(modBlockList.BlockOffsetList, newBlock)
blockList.BlockOffsetList = append(blockList.BlockOffsetList, newBlock)
}
return appendOnly, bufferSize, modBlockList, blockList
return appendOnly, bufferSize
}
// Write : write data at given offset to a blob
func (bb *BlockBlob) Write(name string, offset int64, length int64, data []byte, FileOffsets common.BlockOffsetList) error {
log.Trace("BlockBlob::Write : name %s offset %v", name, offset)
func (bb *BlockBlob) Write(name string, offset, length int64, data []byte, fileOffsets, modFileOffsets *common.BlockOffsetList) error {
defer log.TimeTrack(time.Now(), "BlockBlob::Write", name)
log.Trace("BlockBlob::Write : name %s offset %v", name, offset)
blobURL := bb.Container.NewBlockBlobURL(filepath.Join(bb.Config.prefixPath, name))
// does the file consist of blocks?
multipleBlocks := true
// tracks the case where our offset is great than our current file size (appending only - not modifying pre-existing data)
appendOnly := false
var blockList common.BlockOffsetList
var dataBuffer *[]byte
// if this is not 0 then we passed a cached block ID list - from stream for example
if len(FileOffsets) == 0 {
// TODO: the case where our file offsets is of len 0 because blob doesn't have blocks
if !fileOffsets.Cached {
var err error
blockList, multipleBlocks, err = bb.GetFileBlockOffsets(name)
fileOffsets, err = bb.GetFileBlockOffsets(name)
if err != nil {
return err
}
} else {
blockList = FileOffsets
}
// case 1: file consists of no blocks (small file)
if !multipleBlocks {
if fileOffsets.SmallBlob {
// get all the data
// TODO: create another method we already read in buffer and we just need to
// find related blocks, stage and commit
oldData, _ := bb.ReadBuffer(name, 0, 0)
// update the data with the new data
// if we're only overwriting existing data
@ -767,6 +774,7 @@ func (bb *BlockBlob) Write(name string, offset int64, length int64, data []byte,
// else appending and/or overwriting
} else {
// new data buffer with the size of old and new data
// TODO: stream might be passing the data buffer so we don't want to create this here
newDataBuffer := make([]byte, offset+length)
// copy the old data into it
// TODO: better way to do this?
@ -785,36 +793,36 @@ func (bb *BlockBlob) Write(name string, offset int64, length int64, data []byte,
// case 2: given offset is within the size of the blob - and the blob consists of multiple blocks
// case 3: new blocks need to be added
} else {
modifiedBlockList, oldDataSize, exceedsFileBlocks := blockList.FindBlocksToModify(offset, length)
modifiedBlockList, oldDataSize, exceedsFileBlocks := fileOffsets.FindBlocksToModify(offset, length)
// keeps track of how much new data will be appended to the end of the file (applicable only to case 3)
newBufferSize := int64(0)
// case 3?
if exceedsFileBlocks {
// get length of blockID in order to generate a consistent size block ID so storage does not throw
existingBlockId, _ := base64.StdEncoding.DecodeString(blockList[0].Id)
existingBlockId, _ := base64.StdEncoding.DecodeString(fileOffsets.BlockOffsetList[0].Id)
blockIdLength := len(existingBlockId)
appendOnly, newBufferSize, modifiedBlockList, blockList = bb.createNewBlocks(modifiedBlockList, blockList, offset, length, int64(blockIdLength))
appendOnly, newBufferSize = bb.createNewBlocks(modifiedBlockList, fileOffsets, offset, length, int64(blockIdLength))
}
if len(modifiedBlockList) > 0 {
if len(modifiedBlockList.BlockOffsetList) > 0 {
// buffer that holds that pre-existing data in those blocks we're interested in
oldDataBuffer := make([]byte, oldDataSize+newBufferSize)
if !appendOnly {
// fetch the blocks that will be impacted by the new changes so we can overwrite them
bb.ReadInBuffer(name, modifiedBlockList[0].StartIndex, oldDataSize, oldDataBuffer)
bb.ReadInBuffer(name, modifiedBlockList.BlockOffsetList[0].StartIndex, oldDataSize, oldDataBuffer)
}
// this gives us where the offset with respect to the buffer that holds our old data - so we can start writing the new data
blockOffset := offset - modifiedBlockList[0].StartIndex
blockOffset := offset - modifiedBlockList.BlockOffsetList[0].StartIndex
copy(oldDataBuffer[blockOffset:], data)
err := bb.stageAndCommitModifiedBlocks(name, blobURL, oldDataBuffer, modifiedBlockList, blockList)
err := bb.stageAndCommitModifiedBlocks(name, blobURL, oldDataBuffer, modifiedBlockList, fileOffsets)
return err
}
}
return nil
}
func (bb *BlockBlob) stageAndCommitModifiedBlocks(name string, blobURL azblob.BlockBlobURL, data []byte, modifiedBlockList, blockList common.BlockOffsetList) error {
func (bb *BlockBlob) stageAndCommitModifiedBlocks(name string, blobURL azblob.BlockBlobURL, data []byte, modifiedBlockList, blockList *common.BlockOffsetList) error {
blockOffset := int64(0)
for _, blk := range modifiedBlockList {
for _, blk := range modifiedBlockList.BlockOffsetList {
_, err := blobURL.StageBlock(context.Background(),
blk.Id,
bytes.NewReader(data[blockOffset:blk.Size+blockOffset]),
@ -829,7 +837,7 @@ func (bb *BlockBlob) stageAndCommitModifiedBlocks(name string, blobURL azblob.Bl
}
var blockIDList []string
// TODO: we can probably clean up this for loop - move it to types method?
for _, blk := range blockList {
for _, blk := range blockList.BlockOffsetList {
blockIDList = append(blockIDList, blk.Id)
}
_, err := blobURL.CommitBlockList(context.Background(),

Просмотреть файл

@ -112,8 +112,8 @@ type AzConnection interface {
WriteFromFile(name string, metadata map[string]string, fi *os.File) error
WriteFromBuffer(name string, metadata map[string]string, data []byte) error
Write(name string, offset int64, len int64, data []byte, FileOffsets common.BlockOffsetList) error
GetFileBlockOffsets(name string) (common.BlockOffsetList, bool, error)
Write(name string, offset int64, len int64, data []byte, fileOffsets, modBlockList *common.BlockOffsetList) error
GetFileBlockOffsets(name string) (*common.BlockOffsetList, error)
ChangeMod(string, os.FileMode) error
ChangeOwner(string, int, int) error

Просмотреть файл

@ -501,11 +501,11 @@ func (dl *Datalake) WriteFromBuffer(name string, metadata map[string]string, dat
}
// Write : Write to a file at given offset
func (dl *Datalake) Write(name string, offset int64, len int64, data []byte, FileOffsets common.BlockOffsetList) error {
return dl.BlockBlob.Write(name, offset, len, data, FileOffsets)
func (dl *Datalake) Write(name string, offset int64, len int64, data []byte, fileOffsets, modBlockList *common.BlockOffsetList) error {
return dl.BlockBlob.Write(name, offset, len, data, fileOffsets, modBlockList)
}
func (dl *Datalake) GetFileBlockOffsets(name string) (common.BlockOffsetList, bool, error) {
func (dl *Datalake) GetFileBlockOffsets(name string) (*common.BlockOffsetList, error) {
return dl.BlockBlob.GetFileBlockOffsets(name)
}

Просмотреть файл

@ -540,9 +540,11 @@ func libfuse_write(path *C.char, buf *C.char, size C.size_t, off C.off_t, fi *C.
data := (*[1 << 30]byte)(unsafe.Pointer(buf))
bytesWritten, err := fuseFS.NextComponent().WriteFile(
internal.WriteFileOptions{
Handle: handle,
Offset: int64(offset),
Data: data[:size],
Handle: handle,
Offset: int64(offset),
Data: data[:size],
FileOffsets: &common.BlockOffsetList{},
ModBlockList: &common.BlockOffsetList{},
})
if err != nil {

Просмотреть файл

@ -309,9 +309,9 @@ func (base *BaseComponent) InvalidateObject(name string) {
}
}
func (base *BaseComponent) GetFileBlockOffsets(options GetFileBlockOffsetsOptions) (common.BlockOffsetList, bool, error) {
func (base *BaseComponent) GetFileBlockOffsets(options GetFileBlockOffsetsOptions) (*common.BlockOffsetList, error) {
if base.next != nil {
base.next.InvalidateObject(name)
}
return common.BlockOffsetList{}, false, nil
return &common.BlockOffsetList{}, nil
}

Просмотреть файл

@ -111,10 +111,11 @@ type ReadInBufferOptions struct {
}
type WriteFileOptions struct {
Handle *handlemap.Handle
Offset int64
Data []byte
FileOffsets common.BlockOffsetList
Handle *handlemap.Handle
Offset int64
Data []byte
FileOffsets *common.BlockOffsetList
ModBlockList *common.BlockOffsetList
}
type GetFileBlockOffsetsOptions struct {