Sync with main'
This commit is contained in:
Коммит
191b2105d2
24
CHANGELOG.md
24
CHANGELOG.md
|
@ -1,27 +1,21 @@
|
|||
## 2.3.2 (Unreleased)
|
||||
|
||||
## 2.3.3 (Unreleased)
|
||||
**Bug Fixes**
|
||||
- Flush shall only sync the blocks to storage and not delete them from local cache.
|
||||
- Random write has been re-enabled in block cache.
|
||||
- Writing to an uncommitted block which has been deleted from the in-memory cache.
|
||||
- Check download status of a block before updating and return error if it failed to download.
|
||||
|
||||
## 2.3.1 (Unreleased)
|
||||
**NOTICE**
|
||||
- Due to data integrity issues, random write has been disabled in block-cache. Refer [#1484](https://github.com/Azure/azure-storage-fuse/pull/1484) for blocked scenarios.
|
||||
|
||||
## 2.3.2 (2024-09-03)
|
||||
**Bug Fixes**
|
||||
- Fixed the case where file creation using SAS on HNS accounts was returning back wrong error code.
|
||||
- [#1402](https://github.com/Azure/azure-storage-fuse/issues/1402) Fixed proxy URL parsing.
|
||||
- If earlier instance of Blobfuse2 crashed and mount is unstable then next mount to same path will automatically cleanup the system.
|
||||
- In flush operation, the blocks will be committed only if the handle is dirty.
|
||||
- Reset block data to null before reuse.
|
||||
- Fixed an issue in File-Cache that caused upload to fail due to insufficient permissions.
|
||||
|
||||
**Data Integrity Fixes**
|
||||
- Fixed block-cache read of small files in direct-io mode, where file size is not multiple of kernel buffer size.
|
||||
- Fixed race condition in block-cache random write flow where a block is being uploaded and written to in parallel.
|
||||
- Fixed issue in block-cache random read/write flow where a uncommitted block, which is deleted from local cache, is reused.
|
||||
- Sparse file data integrity issues fixed.
|
||||
- Fixed block-cache read of small files where file size is not multiple of kernel buffer size.
|
||||
- Fixed race condition in random write where a block is being uploaded and written to in parallel.
|
||||
|
||||
**Other Changes**
|
||||
- LFU policy in file cache has been deprecated.
|
||||
- LFU policy in file cache has been removed.
|
||||
- Default values, if not assigned in config, for the following parameters in block-cache are calculated as follows:
|
||||
- Memory preallocated for Block-Cache is 80% of free memory
|
||||
- Disk Cache Size is 80% of free disk space
|
||||
|
|
|
@ -8,9 +8,16 @@ Blobfuse2 is stable, and is ***supported by Microsoft*** provided that it is use
|
|||
[This](https://github.com/Azure/azure-storage-fuse/tree/main?tab=readme-ov-file#config-guide) section will help you choose the correct config for Blobfuse2.
|
||||
|
||||
## NOTICE
|
||||
- If you are using versions 2.2.0, 2.2.1 and 2.3.0, refrain from using Block-cache mode and switch to `file-cache` mode till [known issues](https://github.com/Azure/azure-storage-fuse/wiki/Blobfuse2-Known-issues) are fixed.
|
||||
- If you are using versions 2.2.0, 2.2.1 and 2.3.0, refrain from using Block-cache mode and switch to `file-cache` mode. [Known issues](https://github.com/Azure/azure-storage-fuse/wiki/Blobfuse2-Known-issues) in these versions are fixed in version **`2.3.2`**.
|
||||
- As of version 2.3.0, blobfuse has updated its authentication methods. For Managed Identity, Object-ID based OAuth is solely accessible via CLI-based login, requiring Azure CLI on the system. For a dependency-free option, users may utilize Application/Client-ID or Resource ID based authentication.
|
||||
- `streaming` mode is being deprecated.
|
||||
|
||||
## Limitations in Block Cache
|
||||
- Parallel write operations using multiple handles on a same file is not supported and might lead to data inconsistency.
|
||||
- Read operation on a file which is being written via another handle will not return updated data.
|
||||
- When using `cp` utility on mounted path, always use `--sparse=never` parameter. For example, `cp --sparse=never src dest`
|
||||
- In write operations data is persisted in storage only on close, sync or flush calls.
|
||||
- User applications must check the returned code for write, close and flush operations.
|
||||
|
||||
## Blobfuse2 Benchmarks
|
||||
[This](https://azure.github.io/azure-storage-fuse/) page lists various benchmarking results for HNS and FNS Storage account.
|
||||
|
|
|
@ -385,4 +385,4 @@ steps:
|
|||
${{ parameters.working_dir }}/blobfuse2 mount ${{ parameters.mount_dir }} --config-file=${{ parameters.config }}
|
||||
--default-working-dir=${{ parameters.working_dir }}
|
||||
displayName: 'HugeList: Mount'
|
||||
continueOnError: false
|
||||
continueOnError: false
|
||||
|
|
|
@ -1679,14 +1679,33 @@ stages:
|
|||
|
||||
fuse2AmdRpm=`pmc --msal-cert-path $(pmcCertificate.secureFilePath) --config $(settings.secureFilePath) --id-only package upload blobfuse2*fuse2.x86_64.rpm`
|
||||
echo "Fuse2 AMD RPM ID: $fuse2AmdRpm"
|
||||
|
||||
|
||||
marinerAmdRpmFile=$(ls blobfuse2* | grep 'cm2\.x86_64\.rpm')
|
||||
marinerFuse3AmdRpm=`pmc --msal-cert-path $(pmcCertificate.secureFilePath) --config $(settings.secureFilePath) --id-only package upload blobfuse2*cm2.x86_64.rpm`
|
||||
echo "Mariner fuse3 AMD RPM ID: $marinerFuse3AmdRpm"
|
||||
echo "Mariner fuse3 AMD RPM $marinerAmdRpmFile"
|
||||
|
||||
marinerAarchRpmFile=$(ls blobfuse2* | grep 'cm2\.aarch64\.rpm')
|
||||
marinerFuse3AarchRpm=`pmc --msal-cert-path $(pmcCertificate.secureFilePath) --config $(settings.secureFilePath) --id-only package upload blobfuse2*cm2.aarch64.rpm`
|
||||
echo "Mariner fuse3 ARM RPM ID: $marinerFuse3AarchRpm"
|
||||
echo "Mariner fuse3 ARM RPM: $marinerAarchRpmFile"
|
||||
|
||||
is_preview="false"
|
||||
echo "##vso[task.setvariable variable=is_preview]$is_preview"
|
||||
if [[ $marinerAmdRpmFile == *"preview"* ]]; then
|
||||
is_preview="true"
|
||||
echo "##vso[task.setvariable variable=is_preview]$is_preview"
|
||||
fi
|
||||
|
||||
while IFS=, read -r distro fuseArchType repoName releaseName; do
|
||||
|
||||
# If the package is preview, publish to mariner preview package
|
||||
if [[ $distro == *"Mariner-"* ]]; then
|
||||
if [ $is_preview = "true" ]; then
|
||||
repoName=$(echo $repoName | sed 's/prod/preview/')
|
||||
fi
|
||||
fi
|
||||
|
||||
echo "Uploading packages for $distro"
|
||||
pmc --msal-cert-path $(pmcCertificate.secureFilePath) --config $(settings.secureFilePath) repo package update --add-packages ${!fuseArchType} $repoName $releaseName
|
||||
done < <(tail -n +3 ../packages.csv)
|
||||
|
@ -1699,6 +1718,13 @@ stages:
|
|||
then
|
||||
echo "Skipping for ARM type on $distro"
|
||||
else
|
||||
if [[ $distro == *"Mariner-"* ]]; then
|
||||
if [ "$(is_preview)" = "true" ]; then
|
||||
repoName=$(echo $repoName | sed 's/prod/preview/')
|
||||
fi
|
||||
fi
|
||||
echo "Repository Name: $repoName"
|
||||
|
||||
echo "Publishing for $distro"
|
||||
pmc --msal-cert-path $(pmcCertificate.secureFilePath) --config $(settings.secureFilePath) repo publish $repoName
|
||||
fi
|
||||
|
|
|
@ -132,7 +132,7 @@ func validateHMonOptions() error {
|
|||
}
|
||||
|
||||
if len(errMsg) != 0 {
|
||||
return fmt.Errorf(errMsg)
|
||||
return fmt.Errorf("%s", errMsg)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -754,5 +754,5 @@ func init() {
|
|||
|
||||
func Destroy(message string) error {
|
||||
_ = log.Destroy()
|
||||
return fmt.Errorf(message)
|
||||
return fmt.Errorf("%s", message)
|
||||
}
|
||||
|
|
|
@ -47,7 +47,7 @@ import (
|
|||
|
||||
// Standard config default values
|
||||
const (
|
||||
blobfuse2Version_ = "2.3.2"
|
||||
blobfuse2Version_ = "2.3.3"
|
||||
|
||||
DefaultMaxLogFileSize = 512
|
||||
DefaultLogFileCount = 10
|
||||
|
|
|
@ -97,7 +97,7 @@ func (azmsi *azAuthMSI) getTokenCredentialUsingCLI() (azcore.TokenCredential, er
|
|||
if msg == "" {
|
||||
msg = err.Error()
|
||||
}
|
||||
return nil, fmt.Errorf(msg)
|
||||
return nil, fmt.Errorf("%s", msg)
|
||||
}
|
||||
|
||||
log.Info("azAuthMSI::getTokenCredentialUsingCLI : Successfully logged in using Azure CLI")
|
||||
|
|
|
@ -51,6 +51,14 @@ const (
|
|||
BlockFlagFailed // Block upload/download has failed
|
||||
)
|
||||
|
||||
// Flags to denote the status of upload/download of a block
|
||||
const (
|
||||
BlockStatusDownloaded int = iota + 1 // Download of this block is complete
|
||||
BlockStatusUploaded // Upload of this block is complete
|
||||
BlockStatusDownloadFailed // Download of this block has failed
|
||||
BlockStatusUploadFailed // Upload of this block has failed
|
||||
)
|
||||
|
||||
// Block is a memory mapped buffer with its state to hold data
|
||||
type Block struct {
|
||||
offset uint64 // Start offset of the data this block holds
|
||||
|
@ -127,9 +135,9 @@ func (b *Block) Uploading() {
|
|||
}
|
||||
|
||||
// Ready marks this Block is now ready for reading by its first reader (data download completed)
|
||||
func (b *Block) Ready() {
|
||||
func (b *Block) Ready(val int) {
|
||||
select {
|
||||
case b.state <- 1:
|
||||
case b.state <- val:
|
||||
break
|
||||
default:
|
||||
break
|
||||
|
|
|
@ -335,12 +335,12 @@ func (bc *BlockCache) OpenFile(options internal.OpenFileOptions) (*handlemap.Han
|
|||
|
||||
bc.prepareHandleForBlockCache(handle)
|
||||
|
||||
if options.Flags&os.O_TRUNC != 0 || options.Flags&os.O_WRONLY != 0 {
|
||||
if options.Flags&os.O_TRUNC != 0 || (options.Flags&os.O_WRONLY != 0 && options.Flags&os.O_APPEND == 0) {
|
||||
// If file is opened in truncate or wronly mode then we need to wipe out the data consider current file size as 0
|
||||
log.Debug("BlockCache::OpenFile : Truncate %v to 0", options.Name)
|
||||
handle.Size = 0
|
||||
handle.Flags.Set(handlemap.HandleFlagDirty)
|
||||
} else if options.Flags&os.O_RDWR != 0 && handle.Size != 0 {
|
||||
} else if handle.Size != 0 && (options.Flags&os.O_RDWR != 0 || options.Flags&os.O_APPEND != 0) {
|
||||
// File is not opened in read-only mode so we need to get the list of blocks and validate the size
|
||||
// As there can be a potential write on this file, currently configured block size and block size of the file in container
|
||||
// has to match otherwise it will corrupt the file. Fail the open call if this is not the case.
|
||||
|
@ -537,6 +537,22 @@ func (bc *BlockCache) ReadInBuffer(options internal.ReadInBufferOptions) (int, e
|
|||
return dataRead, nil
|
||||
}
|
||||
|
||||
func (bc *BlockCache) addToCooked(handle *handlemap.Handle, block *Block) {
|
||||
if block.node != nil {
|
||||
_ = handle.Buffers.Cooking.Remove(block.node)
|
||||
_ = handle.Buffers.Cooked.Remove(block.node)
|
||||
}
|
||||
block.node = handle.Buffers.Cooked.PushBack(block)
|
||||
}
|
||||
|
||||
func (bc *BlockCache) addToCooking(handle *handlemap.Handle, block *Block) {
|
||||
if block.node != nil {
|
||||
_ = handle.Buffers.Cooked.Remove(block.node)
|
||||
_ = handle.Buffers.Cooking.Remove(block.node)
|
||||
}
|
||||
block.node = handle.Buffers.Cooking.PushBack(block)
|
||||
}
|
||||
|
||||
// getBlock: From offset generate the Block index and get the Block corresponding to it
|
||||
/* Base logic of getBlock:
|
||||
Check if the given block is already available or not
|
||||
|
@ -563,6 +579,20 @@ func (bc *BlockCache) getBlock(handle *handlemap.Handle, readoffset uint64) (*Bl
|
|||
index := bc.getBlockIndex(readoffset)
|
||||
node, found := handle.GetValue(fmt.Sprintf("%v", index))
|
||||
if !found {
|
||||
|
||||
// block is not present in the buffer list, check if it is uncommitted
|
||||
// If yes, commit all the uncommitted blocks first and then download this block
|
||||
shouldCommit, _ := shouldCommitAndDownload(int64(index), handle)
|
||||
if shouldCommit {
|
||||
// commit all the uncommitted blocks to storage
|
||||
log.Debug("BlockCache::getBlock : Downloading an uncommitted block %v, so committing all the staged blocks for %v=>%s", index, handle.ID, handle.Path)
|
||||
err := bc.commitBlocks(handle)
|
||||
if err != nil {
|
||||
log.Err("BlockCache::getBlock : Failed to commit blocks for %v=>%s [%s]", handle.ID, handle.Path, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// If this is the first read request then prefetch all required nodes
|
||||
val, _ := handle.GetValue("#")
|
||||
if !bc.noPrefetch && val.(uint64) == 0 {
|
||||
|
@ -600,40 +630,52 @@ func (bc *BlockCache) getBlock(handle *handlemap.Handle, readoffset uint64) (*Bl
|
|||
block := node.(*Block)
|
||||
|
||||
// Wait for this block to complete the download
|
||||
t := int(0)
|
||||
t = <-block.state
|
||||
t, ok := <-block.state
|
||||
if ok {
|
||||
// this block is now open to read and process
|
||||
block.Unblock()
|
||||
|
||||
if t == 1 {
|
||||
block.flags.Clear(BlockFlagDownloading)
|
||||
switch t {
|
||||
case BlockStatusDownloaded:
|
||||
log.Debug("BlockCache::getBlock : Downloaded block %v for %v=>%s (read offset %v)", index, handle.ID, handle.Path, readoffset)
|
||||
|
||||
if block.IsFailed() {
|
||||
log.Err("BlockCache::getBlock : Failed to download block %v=>%s (offset %v, index %v)", handle.ID, handle.Path, readoffset, index)
|
||||
block.flags.Clear(BlockFlagDownloading)
|
||||
|
||||
// Download complete and you are first reader of this block
|
||||
if !bc.noPrefetch && handle.OptCnt <= MIN_RANDREAD {
|
||||
// So far this file has been read sequentially so prefetch more
|
||||
val, _ := handle.GetValue("#")
|
||||
if int64(val.(uint64)*bc.blockSize) < handle.Size {
|
||||
_ = bc.startPrefetch(handle, val.(uint64), true)
|
||||
}
|
||||
}
|
||||
|
||||
// This block was moved to in-process queue as download is complete lets move it back to normal queue
|
||||
bc.addToCooked(handle, block)
|
||||
|
||||
// mark this block as synced so that if it can used for write later
|
||||
// which will move it back to cooking list as per the synced flag
|
||||
block.flags.Set(BlockFlagSynced)
|
||||
|
||||
case BlockStatusUploaded:
|
||||
log.Debug("BlockCache::getBlock : Staged block %v for %v=>%s (read offset %v)", index, handle.ID, handle.Path, readoffset)
|
||||
block.flags.Clear(BlockFlagUploading)
|
||||
|
||||
case BlockStatusDownloadFailed:
|
||||
log.Err("BlockCache::getBlock : Failed to download block %v for %v=>%s (read offset %v)", index, handle.ID, handle.Path, readoffset)
|
||||
|
||||
// Remove this node from handle so that next read retries to download the block again
|
||||
bc.releaseFailedBlock(handle, block)
|
||||
bc.releaseDownloadFailedBlock(handle, block)
|
||||
return nil, fmt.Errorf("failed to download block")
|
||||
|
||||
case BlockStatusUploadFailed:
|
||||
// Local data is still valid so continue using this buffer
|
||||
log.Err("BlockCache::getBlock : Failed to upload block %v for %v=>%s (read offset %v)", index, handle.ID, handle.Path, readoffset)
|
||||
block.flags.Clear(BlockFlagUploading)
|
||||
|
||||
// Move this block to end of queue as this is still modified and un-staged
|
||||
bc.addToCooking(handle, block)
|
||||
}
|
||||
|
||||
// Download complete and you are first reader of this block
|
||||
if handle.OptCnt <= MIN_RANDREAD {
|
||||
// So far this file has been read sequentially so prefetch more
|
||||
val, _ := handle.GetValue("#")
|
||||
if int64(val.(uint64)*bc.blockSize) < handle.Size {
|
||||
_ = bc.startPrefetch(handle, val.(uint64), true)
|
||||
}
|
||||
}
|
||||
|
||||
// This block was moved to in-process queue as download is complete lets move it back to normal queue
|
||||
_ = handle.Buffers.Cooking.Remove(block.node)
|
||||
block.node = handle.Buffers.Cooked.PushBack(block)
|
||||
|
||||
// mark this block as synced so that if it can used for write later
|
||||
// which will move it back to cooking list as per the synced flag
|
||||
block.flags.Set(BlockFlagSynced)
|
||||
|
||||
// Mark this block is now open for everyone to read and process
|
||||
// Once unblocked and moved to original queue, any instance can delete this block to reuse as well
|
||||
block.Unblock()
|
||||
}
|
||||
|
||||
return block, nil
|
||||
|
@ -667,15 +709,18 @@ func (bc *BlockCache) startPrefetch(handle *handlemap.Handle, index uint64, pref
|
|||
block := handle.Buffers.Cooking.Remove(node).(*Block)
|
||||
block.node = nil
|
||||
i++
|
||||
|
||||
//This list may contain dirty blocks which are yet to be committed.
|
||||
select {
|
||||
case <-block.state:
|
||||
case _, ok := <-block.state:
|
||||
// As we are first reader of this block here its important to unblock any future readers on this block
|
||||
block.flags.Clear(BlockFlagDownloading)
|
||||
block.Unblock()
|
||||
|
||||
// Block is downloaded so it's safe to ready it for reuse
|
||||
block.node = handle.Buffers.Cooked.PushBack(block)
|
||||
if ok {
|
||||
block.flags.Clear(BlockFlagDownloading)
|
||||
block.Unblock()
|
||||
// Block is downloaded so it's safe to ready it for reuse
|
||||
block.node = handle.Buffers.Cooked.PushBack(block)
|
||||
} else {
|
||||
block.node = handle.Buffers.Cooking.PushBack(block)
|
||||
}
|
||||
|
||||
default:
|
||||
// Block is still under download so can not reuse this
|
||||
|
@ -725,10 +770,24 @@ func (bc *BlockCache) startPrefetch(handle *handlemap.Handle, index uint64, pref
|
|||
}
|
||||
|
||||
for i := uint32(0); i < cnt; i++ {
|
||||
// Revalidate this node does not exists in the block map
|
||||
// Check if the block exists in the local cache or not
|
||||
// If not, download the block from storage
|
||||
_, found := handle.GetValue(fmt.Sprintf("%v", index))
|
||||
if !found {
|
||||
// Block not found so lets push it for download
|
||||
// Check if the block is an uncommitted block or not
|
||||
// For uncommitted block we need to commit the block first
|
||||
shouldCommit, _ := shouldCommitAndDownload(int64(index), handle)
|
||||
if shouldCommit {
|
||||
// This shall happen only for the first uncommitted block and shall flush all the uncommitted blocks to storage
|
||||
log.Debug("BlockCache::startPrefetch : Fetching an uncommitted block %v, so committing all the staged blocks for %v=>%s", index, handle.ID, handle.Path)
|
||||
err := bc.commitBlocks(handle)
|
||||
if err != nil {
|
||||
log.Err("BlockCache::startPrefetch : Failed to commit blocks for %v=>%s [%s]", handle.ID, handle.Path, err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// push the block for download
|
||||
err := bc.refreshBlock(handle, index, prefetch || i > 0)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -770,6 +829,16 @@ func (bc *BlockCache) refreshBlock(handle *handlemap.Handle, index uint64, prefe
|
|||
block := node.Value.(*Block)
|
||||
|
||||
if block.id != -1 {
|
||||
// If the block is being staged, then wait till it is uploaded
|
||||
// and then use it for read
|
||||
if block.flags.IsSet(BlockFlagUploading) {
|
||||
log.Debug("BlockCache::refreshBlock : Waiting for the block %v to upload before using it for block %v read for %v=>%s", block.id, index, handle.ID, handle.Path)
|
||||
_, ok := <-block.state
|
||||
if ok {
|
||||
block.Unblock()
|
||||
}
|
||||
}
|
||||
|
||||
// This is a reuse of a block case so we need to remove old entry from the map
|
||||
handle.RemoveValue(fmt.Sprintf("%v", block.id))
|
||||
}
|
||||
|
@ -783,14 +852,14 @@ func (bc *BlockCache) refreshBlock(handle *handlemap.Handle, index uint64, prefe
|
|||
handle.SetValue(fmt.Sprintf("%v", index), block)
|
||||
handle.SetValue("#", (index + 1))
|
||||
|
||||
bc.lineupDownload(handle, block, prefetch, true)
|
||||
bc.lineupDownload(handle, block, prefetch)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// lineupDownload : Create a work item and schedule the download
|
||||
func (bc *BlockCache) lineupDownload(handle *handlemap.Handle, block *Block, prefetch bool, pushFront bool) {
|
||||
func (bc *BlockCache) lineupDownload(handle *handlemap.Handle, block *Block, prefetch bool) {
|
||||
item := &workItem{
|
||||
handle: handle,
|
||||
block: block,
|
||||
|
@ -800,16 +869,8 @@ func (bc *BlockCache) lineupDownload(handle *handlemap.Handle, block *Block, pre
|
|||
}
|
||||
|
||||
// Remove this block from free block list and add to in-process list
|
||||
if block.node != nil {
|
||||
_ = handle.Buffers.Cooked.Remove(block.node)
|
||||
}
|
||||
bc.addToCooking(handle, block)
|
||||
|
||||
if pushFront {
|
||||
block.node = handle.Buffers.Cooking.PushFront(block)
|
||||
} else {
|
||||
// push back to cooking list in case of write scenario where a block is downloaded before it is updated
|
||||
block.node = handle.Buffers.Cooking.PushBack(block)
|
||||
}
|
||||
block.flags.Set(BlockFlagDownloading)
|
||||
|
||||
// Send the work item to worker pool to schedule download
|
||||
|
@ -864,7 +925,7 @@ func (bc *BlockCache) download(item *workItem) {
|
|||
// Just mark the block that download is complete
|
||||
|
||||
item.block.endIndex = item.block.offset + uint64(n)
|
||||
item.block.Ready()
|
||||
item.block.Ready(BlockStatusDownloaded)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -882,7 +943,7 @@ func (bc *BlockCache) download(item *workItem) {
|
|||
// If we failed to read the data 3 times then just give up
|
||||
log.Err("BlockCache::download : 3 attempts to download a block have failed %v=>%s (index %v, offset %v)", item.handle.ID, item.handle.Path, item.block.id, item.block.offset)
|
||||
item.block.Failed()
|
||||
item.block.Ready()
|
||||
item.block.Ready(BlockStatusDownloadFailed)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -924,7 +985,7 @@ func (bc *BlockCache) download(item *workItem) {
|
|||
}
|
||||
|
||||
// Just mark the block that download is complete
|
||||
item.block.Ready()
|
||||
item.block.Ready(BlockStatusDownloaded)
|
||||
}
|
||||
|
||||
// WriteFile: Write to the local file
|
||||
|
@ -1023,7 +1084,7 @@ func (bc *BlockCache) getOrCreateBlock(handle *handlemap.Handle, offset uint64)
|
|||
if shouldDownload || shouldCommit {
|
||||
// We are writing somewhere in between so just fetch this block
|
||||
log.Debug("BlockCache::getOrCreateBlock : Downloading block %v for %v=>%v", block.id, handle.ID, handle.Path)
|
||||
bc.lineupDownload(handle, block, false, false)
|
||||
bc.lineupDownload(handle, block, false)
|
||||
|
||||
// Now wait for download to complete
|
||||
<-block.state
|
||||
|
@ -1033,7 +1094,7 @@ func (bc *BlockCache) getOrCreateBlock(handle *handlemap.Handle, offset uint64)
|
|||
log.Err("BlockCache::getOrCreateBlock : Failed to download block %v for %v=>%s", block.id, handle.ID, handle.Path)
|
||||
|
||||
// Remove this node from handle so that next read retries to download the block again
|
||||
bc.releaseFailedBlock(handle, block)
|
||||
bc.releaseDownloadFailedBlock(handle, block)
|
||||
return nil, fmt.Errorf("failed to download block")
|
||||
}
|
||||
} else {
|
||||
|
@ -1063,37 +1124,34 @@ func (bc *BlockCache) getOrCreateBlock(handle *handlemap.Handle, offset uint64)
|
|||
// If the block was staged earlier then we are overwriting it here so move it back to cooking queue
|
||||
if block.flags.IsSet(BlockFlagSynced) {
|
||||
log.Debug("BlockCache::getOrCreateBlock : Overwriting back to staged block %v for %v=>%s", block.id, handle.ID, handle.Path)
|
||||
if block.node != nil {
|
||||
_ = handle.Buffers.Cooked.Remove(block.node)
|
||||
}
|
||||
|
||||
block.node = handle.Buffers.Cooking.PushBack(block)
|
||||
} else if block.flags.IsSet(BlockFlagDownloading) {
|
||||
log.Debug("BlockCache::getOrCreateBlock : Waiting for download to finish for committed block %v for %v=>%s", block.id, handle.ID, handle.Path)
|
||||
<-block.state
|
||||
block.Unblock()
|
||||
_, ok := <-block.state
|
||||
if ok {
|
||||
block.Unblock()
|
||||
}
|
||||
|
||||
// if the block failed to download, it can't be used for overwriting
|
||||
if block.IsFailed() {
|
||||
log.Err("BlockCache::getOrCreateBlock : Failed to download block %v for %v=>%s", block.id, handle.ID, handle.Path)
|
||||
|
||||
// Remove this node from handle so that next read retries to download the block again
|
||||
bc.releaseFailedBlock(handle, block)
|
||||
bc.releaseDownloadFailedBlock(handle, block)
|
||||
return nil, fmt.Errorf("failed to download block")
|
||||
}
|
||||
} else if block.flags.IsSet(BlockFlagUploading) {
|
||||
// If the block is being staged, then wait till it is uploaded,
|
||||
// and then write to the same block and move it back to cooking queue
|
||||
log.Debug("BlockCache::getOrCreateBlock : Waiting for the block %v to upload for %v=>%s", block.id, handle.ID, handle.Path)
|
||||
<-block.state
|
||||
block.Unblock()
|
||||
|
||||
if block.node != nil {
|
||||
_ = handle.Buffers.Cooked.Remove(block.node)
|
||||
_, ok := <-block.state
|
||||
if ok {
|
||||
block.Unblock()
|
||||
}
|
||||
block.node = handle.Buffers.Cooking.PushBack(block)
|
||||
}
|
||||
|
||||
bc.addToCooking(handle, block)
|
||||
|
||||
block.flags.Clear(BlockFlagUploading)
|
||||
block.flags.Clear(BlockFlagDownloading)
|
||||
block.flags.Clear(BlockFlagSynced)
|
||||
|
@ -1128,9 +1186,14 @@ func (bc *BlockCache) stageBlocks(handle *handlemap.Handle, cnt int) error {
|
|||
}
|
||||
|
||||
// remove the block which failed to download so that it can be used again
|
||||
func (bc *BlockCache) releaseFailedBlock(handle *handlemap.Handle, block *Block) {
|
||||
_ = handle.Buffers.Cooking.Remove(block.node)
|
||||
func (bc *BlockCache) releaseDownloadFailedBlock(handle *handlemap.Handle, block *Block) {
|
||||
if block.node != nil {
|
||||
_ = handle.Buffers.Cooking.Remove(block.node)
|
||||
_ = handle.Buffers.Cooked.Remove(block.node)
|
||||
}
|
||||
|
||||
handle.RemoveValue(fmt.Sprintf("%v", block.id))
|
||||
block.node = nil
|
||||
block.ReUse()
|
||||
bc.blockPool.Release(block)
|
||||
}
|
||||
|
@ -1217,15 +1280,12 @@ func (bc *BlockCache) lineupUpload(handle *handlemap.Handle, block *Block, listM
|
|||
|
||||
// log.Debug("BlockCache::lineupUpload : Upload block %v=>%s (index %v, offset %v, data %v)", handle.ID, handle.Path, block.id, block.offset, (block.endIndex - block.offset))
|
||||
|
||||
// Remove this block from free block list and add to in-process list
|
||||
if block.node != nil {
|
||||
_ = handle.Buffers.Cooking.Remove(block.node)
|
||||
}
|
||||
|
||||
block.Uploading()
|
||||
block.flags.Clear(BlockFlagFailed)
|
||||
block.flags.Set(BlockFlagUploading)
|
||||
block.node = handle.Buffers.Cooked.PushBack(block)
|
||||
|
||||
// Remove this block from free block list and add to in-process list
|
||||
bc.addToCooked(handle, block)
|
||||
|
||||
// Send the work item to worker pool to schedule download
|
||||
bc.threadPool.Schedule(false, item)
|
||||
|
@ -1249,7 +1309,9 @@ func (bc *BlockCache) waitAndFreeUploadedBlocks(handle *handlemap.Handle, cnt in
|
|||
if block.id != -1 {
|
||||
// Wait for upload of this block to complete
|
||||
_, ok := <-block.state
|
||||
block.flags.Clear(BlockFlagDownloading)
|
||||
block.flags.Clear(BlockFlagUploading)
|
||||
|
||||
if ok {
|
||||
block.Unblock()
|
||||
}
|
||||
|
@ -1259,8 +1321,7 @@ func (bc *BlockCache) waitAndFreeUploadedBlocks(handle *handlemap.Handle, cnt in
|
|||
|
||||
if block.IsFailed() {
|
||||
log.Err("BlockCache::waitAndFreeUploadedBlocks : Failed to upload block, posting back to cooking list %v=>%s (index %v, offset %v)", handle.ID, handle.Path, block.id, block.offset)
|
||||
_ = handle.Buffers.Cooked.Remove(block.node)
|
||||
block.node = handle.Buffers.Cooking.PushFront(block)
|
||||
bc.addToCooking(handle, block)
|
||||
continue
|
||||
}
|
||||
cnt--
|
||||
|
@ -1301,7 +1362,7 @@ func (bc *BlockCache) upload(item *workItem) {
|
|||
// If we failed to write the data 3 times then just give up
|
||||
log.Err("BlockCache::upload : 3 attempts to upload a block have failed %v=>%s (index %v, offset %v)", item.handle.ID, item.handle.Path, item.block.id, item.block.offset)
|
||||
item.block.Failed()
|
||||
item.block.Ready()
|
||||
item.block.Ready(BlockStatusUploadFailed)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -1342,7 +1403,7 @@ func (bc *BlockCache) upload(item *workItem) {
|
|||
return_safe:
|
||||
item.block.flags.Set(BlockFlagSynced)
|
||||
item.block.NoMoreDirty()
|
||||
item.block.Ready()
|
||||
item.block.Ready(BlockStatusUploaded)
|
||||
}
|
||||
|
||||
// Stage the given number of blocks from this handle
|
||||
|
|
|
@ -2322,6 +2322,281 @@ func (suite *blockCacheTestSuite) TestBlockDownloadFailed() {
|
|||
suite.assert.Equal(fs.Size(), int64(0))
|
||||
}
|
||||
|
||||
func (suite *blockCacheTestSuite) TestReadStagedBlock() {
|
||||
cfg := "block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: 12\n parallelism: 10"
|
||||
tobj, err := setupPipeline(cfg)
|
||||
defer tobj.cleanupPipeline()
|
||||
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.NotNil(tobj.blockCache)
|
||||
|
||||
path := getTestFileName(suite.T().Name())
|
||||
storagePath := filepath.Join(tobj.fake_storage_path, path)
|
||||
|
||||
// write using block cache
|
||||
options := internal.CreateFileOptions{Name: path, Mode: 0777}
|
||||
h, err := tobj.blockCache.CreateFile(options)
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.NotNil(h)
|
||||
suite.assert.Equal(h.Size, int64(0))
|
||||
suite.assert.False(h.Dirty())
|
||||
|
||||
// write 4MB at offset 0
|
||||
n, err := tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: dataBuff[:4*_1MB]})
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.Equal(n, int(4*_1MB))
|
||||
suite.assert.True(h.Dirty())
|
||||
suite.assert.Equal(3, h.Buffers.Cooking.Len())
|
||||
suite.assert.Equal(1, h.Buffers.Cooked.Len())
|
||||
|
||||
data := make([]byte, _1MB)
|
||||
n, err = tobj.blockCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: 0, Data: data})
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.Equal(n, int(_1MB))
|
||||
|
||||
err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h})
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.Nil(h.Buffers.Cooking)
|
||||
suite.assert.Nil(h.Buffers.Cooked)
|
||||
|
||||
fs, err := os.Stat(storagePath)
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.Equal(fs.Size(), int64(4*_1MB))
|
||||
}
|
||||
|
||||
func (suite *blockCacheTestSuite) TestReadUncommittedBlockValidation() {
|
||||
prefetch := 12
|
||||
cfg := fmt.Sprintf("block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: %v\n parallelism: 10", prefetch)
|
||||
tobj, err := setupPipeline(cfg)
|
||||
defer tobj.cleanupPipeline()
|
||||
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.NotNil(tobj.blockCache)
|
||||
|
||||
path := getTestFileName(suite.T().Name())
|
||||
storagePath := filepath.Join(tobj.fake_storage_path, path)
|
||||
localPath := filepath.Join(tobj.disk_cache_path, path)
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// write to local file
|
||||
fh, err := os.Create(localPath)
|
||||
suite.assert.Nil(err)
|
||||
|
||||
defer func(fh *os.File) {
|
||||
err := fh.Close()
|
||||
suite.assert.Nil(err)
|
||||
}(fh)
|
||||
|
||||
// write 62MB data
|
||||
ind := uint64(0)
|
||||
for i := 0; i < prefetch+50; i++ {
|
||||
n, err := fh.WriteAt(dataBuff[ind*_1MB:(ind+1)*_1MB], int64(i*int(_1MB)))
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.Equal(n, int(_1MB))
|
||||
ind = (ind + 1) % 5
|
||||
}
|
||||
|
||||
l, err := computeMD5(fh)
|
||||
suite.assert.Nil(err)
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// write using block cache
|
||||
options := internal.CreateFileOptions{Name: path, Mode: 0777}
|
||||
h, err := tobj.blockCache.CreateFile(options)
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.NotNil(h)
|
||||
suite.assert.Equal(h.Size, int64(0))
|
||||
suite.assert.False(h.Dirty())
|
||||
|
||||
ind = 0
|
||||
for i := 0; i < prefetch+50; i++ {
|
||||
n, err := tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: int64(i * int(_1MB)), Data: dataBuff[ind*_1MB : (ind+1)*_1MB]})
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.Equal(n, int(_1MB))
|
||||
suite.assert.True(h.Dirty())
|
||||
ind = (ind + 1) % 5
|
||||
}
|
||||
|
||||
suite.assert.Equal(h.Buffers.Cooking.Len()+h.Buffers.Cooked.Len(), prefetch)
|
||||
|
||||
// read blocks 0, 1 and 2 which are uncommitted
|
||||
data := make([]byte, 2*_1MB)
|
||||
n, err := tobj.blockCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: 512, Data: data})
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.Equal(n, int(2*_1MB))
|
||||
suite.assert.Equal(data[:], dataBuff[512:2*_1MB+512])
|
||||
suite.assert.False(h.Dirty())
|
||||
|
||||
// read block 4 which has been committed by the previous read
|
||||
data = make([]byte, _1MB)
|
||||
n, err = tobj.blockCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: int64(4 * _1MB), Data: data})
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.Equal(n, int(_1MB))
|
||||
suite.assert.Equal(data[:], dataBuff[4*_1MB:5*_1MB])
|
||||
suite.assert.False(h.Dirty())
|
||||
|
||||
err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h})
|
||||
suite.assert.Nil(err)
|
||||
|
||||
fs, err := os.Stat(storagePath)
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.Equal(fs.Size(), int64(62*_1MB))
|
||||
|
||||
rfh, err := os.Open(storagePath)
|
||||
suite.assert.Nil(err)
|
||||
|
||||
defer func(fh *os.File) {
|
||||
err := fh.Close()
|
||||
suite.assert.Nil(err)
|
||||
}(rfh)
|
||||
|
||||
r, err := computeMD5(rfh)
|
||||
suite.assert.Nil(err)
|
||||
|
||||
// validate md5sum
|
||||
suite.assert.Equal(l, r)
|
||||
}
|
||||
|
||||
func (suite *blockCacheTestSuite) TestReadUncommittedPrefetchedBlock() {
|
||||
prefetch := 12
|
||||
cfg := fmt.Sprintf("block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: %v\n parallelism: 10", prefetch)
|
||||
tobj, err := setupPipeline(cfg)
|
||||
defer tobj.cleanupPipeline()
|
||||
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.NotNil(tobj.blockCache)
|
||||
|
||||
path := getTestFileName(suite.T().Name())
|
||||
storagePath := filepath.Join(tobj.fake_storage_path, path)
|
||||
|
||||
// write using block cache
|
||||
options := internal.CreateFileOptions{Name: path, Mode: 0777}
|
||||
h, err := tobj.blockCache.CreateFile(options)
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.NotNil(h)
|
||||
suite.assert.Equal(h.Size, int64(0))
|
||||
suite.assert.False(h.Dirty())
|
||||
|
||||
n, err := tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: dataBuff[:_1MB]})
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.Equal(n, int(_1MB))
|
||||
suite.assert.True(h.Dirty())
|
||||
|
||||
err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h})
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.False(h.Dirty())
|
||||
|
||||
h, err = tobj.blockCache.OpenFile(internal.OpenFileOptions{Name: path, Flags: os.O_RDWR})
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.NotNil(h)
|
||||
suite.assert.Equal(h.Size, int64(_1MB))
|
||||
suite.assert.False(h.Dirty())
|
||||
|
||||
ind := uint64(1)
|
||||
for i := 1; i < prefetch+50; i++ {
|
||||
n, err = tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: int64(i * int(_1MB)), Data: dataBuff[ind*_1MB : (ind+1)*_1MB]})
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.Equal(n, int(_1MB))
|
||||
suite.assert.True(h.Dirty())
|
||||
ind = (ind + 1) % 5
|
||||
}
|
||||
|
||||
suite.assert.Equal(h.Buffers.Cooking.Len()+h.Buffers.Cooked.Len(), prefetch)
|
||||
|
||||
// read blocks 0, 1 and 2 where prefetched blocks 1 and 2 are uncommitted
|
||||
data := make([]byte, 2*_1MB)
|
||||
n, err = tobj.blockCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: 512, Data: data})
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.Equal(n, int(2*_1MB))
|
||||
suite.assert.Equal(data[:], dataBuff[512:2*_1MB+512])
|
||||
suite.assert.False(h.Dirty())
|
||||
|
||||
// read block 4 which has been committed by the previous read
|
||||
data = make([]byte, _1MB)
|
||||
n, err = tobj.blockCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: int64(4 * _1MB), Data: data})
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.Equal(n, int(_1MB))
|
||||
suite.assert.Equal(data[:], dataBuff[4*_1MB:5*_1MB])
|
||||
suite.assert.False(h.Dirty())
|
||||
|
||||
err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h})
|
||||
suite.assert.Nil(err)
|
||||
|
||||
fs, err := os.Stat(storagePath)
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.Equal(fs.Size(), int64(62*_1MB))
|
||||
}
|
||||
|
||||
func (suite *blockCacheTestSuite) TestReadWriteBlockInParallel() {
|
||||
prefetch := 12
|
||||
cfg := fmt.Sprintf("block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: %v\n parallelism: 1", prefetch)
|
||||
tobj, err := setupPipeline(cfg)
|
||||
defer tobj.cleanupPipeline()
|
||||
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.NotNil(tobj.blockCache)
|
||||
|
||||
path := getTestFileName(suite.T().Name())
|
||||
storagePath := filepath.Join(tobj.fake_storage_path, path)
|
||||
|
||||
// write using block cache
|
||||
options := internal.CreateFileOptions{Name: path, Mode: 0777}
|
||||
h, err := tobj.blockCache.CreateFile(options)
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.NotNil(h)
|
||||
suite.assert.Equal(h.Size, int64(0))
|
||||
suite.assert.False(h.Dirty())
|
||||
|
||||
n, err := tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: dataBuff[:]})
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.Equal(n, int(5*_1MB))
|
||||
suite.assert.True(h.Dirty())
|
||||
|
||||
err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h})
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.False(h.Dirty())
|
||||
|
||||
h, err = tobj.blockCache.OpenFile(internal.OpenFileOptions{Name: path, Flags: os.O_RDWR})
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.NotNil(h)
|
||||
suite.assert.Equal(h.Size, int64(5*_1MB))
|
||||
suite.assert.False(h.Dirty())
|
||||
|
||||
ind := uint64(0)
|
||||
for i := 5; i < prefetch+50; i++ {
|
||||
n, err = tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: int64(i * int(_1MB)), Data: dataBuff[ind*_1MB : (ind+1)*_1MB]})
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.Equal(n, int(_1MB))
|
||||
suite.assert.True(h.Dirty())
|
||||
ind = (ind + 1) % 5
|
||||
}
|
||||
|
||||
suite.assert.Equal(h.Buffers.Cooking.Len()+h.Buffers.Cooked.Len(), prefetch)
|
||||
|
||||
// read blocks 0, 1 and 2
|
||||
data := make([]byte, 2*_1MB)
|
||||
n, err = tobj.blockCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: 512, Data: data})
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.Equal(n, int(2*_1MB))
|
||||
suite.assert.Equal(data[:], dataBuff[512:2*_1MB+512])
|
||||
suite.assert.True(h.Dirty())
|
||||
|
||||
// read blocks 4 and 5
|
||||
n, err = tobj.blockCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: int64(4 * _1MB), Data: data})
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.Equal(n, int(2*_1MB))
|
||||
suite.assert.Equal(data[:_1MB], dataBuff[4*_1MB:])
|
||||
suite.assert.Equal(data[_1MB:], dataBuff[:_1MB])
|
||||
suite.assert.False(h.Dirty())
|
||||
|
||||
err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h})
|
||||
suite.assert.Nil(err)
|
||||
|
||||
fs, err := os.Stat(storagePath)
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.Equal(fs.Size(), int64(62*_1MB))
|
||||
}
|
||||
|
||||
// In order for 'go test' to run this suite, we need to create
|
||||
// a normal test function and pass our suite to suite.Run
|
||||
func TestBlockCacheTestSuite(t *testing.T) {
|
||||
|
|
|
@ -143,7 +143,7 @@ func (suite *blockTestSuite) TestReady() {
|
|||
b.ReUse()
|
||||
suite.assert.NotNil(b.state)
|
||||
|
||||
b.Ready()
|
||||
b.Ready(BlockStatusDownloaded)
|
||||
suite.assert.Equal(len(b.state), 1)
|
||||
|
||||
<-b.state
|
||||
|
@ -167,7 +167,7 @@ func (suite *blockTestSuite) TestUnBlock() {
|
|||
suite.assert.NotNil(b.state)
|
||||
suite.assert.Nil(b.node)
|
||||
|
||||
b.Ready()
|
||||
b.Ready(BlockStatusDownloaded)
|
||||
suite.assert.Equal(len(b.state), 1)
|
||||
|
||||
<-b.state
|
||||
|
@ -201,7 +201,7 @@ func (suite *blockTestSuite) TestWriter() {
|
|||
suite.assert.Equal(b.id, int64(-1))
|
||||
suite.assert.False(b.IsDirty())
|
||||
|
||||
b.Ready()
|
||||
b.Ready(BlockStatusDownloaded)
|
||||
suite.assert.Equal(len(b.state), 1)
|
||||
|
||||
<-b.state
|
||||
|
@ -223,7 +223,7 @@ func (suite *blockTestSuite) TestWriter() {
|
|||
b.NoMoreDirty()
|
||||
suite.assert.False(b.IsDirty())
|
||||
|
||||
b.Ready()
|
||||
b.Ready(BlockStatusUploaded)
|
||||
suite.assert.Equal(len(b.state), 1)
|
||||
|
||||
<-b.state
|
||||
|
|
|
@ -1226,12 +1226,28 @@ func (fc *FileCache) FlushFile(options internal.FlushFileOptions) error {
|
|||
// Write to storage
|
||||
// Create a new handle for the SDK to use to upload (read local file)
|
||||
// The local handle can still be used for read and write.
|
||||
var orgMode fs.FileMode
|
||||
modeChanged := false
|
||||
|
||||
uploadHandle, err := os.Open(localPath)
|
||||
if err != nil {
|
||||
log.Err("FileCache::FlushFile : error [unable to open upload handle] %s [%s]", options.Handle.Path, err.Error())
|
||||
return nil
|
||||
}
|
||||
if os.IsPermission(err) {
|
||||
info, _ := os.Stat(localPath)
|
||||
orgMode = info.Mode()
|
||||
newMode := orgMode | 0444
|
||||
err = os.Chmod(localPath, newMode)
|
||||
if err == nil {
|
||||
modeChanged = true
|
||||
uploadHandle, err = os.Open(localPath)
|
||||
log.Info("FileCache::FlushFile : read mode added to file %s", options.Handle.Path)
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Err("FileCache::FlushFile : error [unable to open upload handle] %s [%s]", options.Handle.Path, err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
err = fc.NextComponent().CopyFromFile(
|
||||
internal.CopyFromFileOptions{
|
||||
Name: options.Handle.Path,
|
||||
|
@ -1239,6 +1255,14 @@ func (fc *FileCache) FlushFile(options internal.FlushFileOptions) error {
|
|||
})
|
||||
|
||||
uploadHandle.Close()
|
||||
|
||||
if modeChanged {
|
||||
err1 := os.Chmod(localPath, orgMode)
|
||||
if err1 != nil {
|
||||
log.Err("FileCache::FlushFile : Failed to remove read mode from file %s [%s]", options.Handle.Path, err1.Error())
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Err("FileCache::FlushFile : %s upload failed [%s]", options.Handle.Path, err.Error())
|
||||
return err
|
||||
|
|
|
@ -37,6 +37,7 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"math"
|
||||
"math/rand"
|
||||
"os"
|
||||
|
@ -637,6 +638,50 @@ func (suite *fileCacheTestSuite) TestCreateFile() {
|
|||
suite.assert.True(os.IsNotExist(err))
|
||||
}
|
||||
|
||||
func (suite *fileCacheTestSuite) TestCreateFileWithNoPerm() {
|
||||
defer suite.cleanupTest()
|
||||
// Default is to not create empty files on create file to support immutable storage.
|
||||
path := "file1"
|
||||
options := internal.CreateFileOptions{Name: path, Mode: 0000}
|
||||
f, err := suite.fileCache.CreateFile(options)
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.True(f.Dirty()) // Handle should be dirty since it was not created in storage
|
||||
|
||||
// Path should be added to the file cache
|
||||
_, err = os.Stat(suite.cache_path + "/" + path)
|
||||
suite.assert.True(err == nil || os.IsExist(err))
|
||||
// Path should not be in fake storage
|
||||
_, err = os.Stat(suite.fake_storage_path + "/" + path)
|
||||
suite.assert.True(os.IsNotExist(err))
|
||||
err = suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: f})
|
||||
suite.assert.Nil(err)
|
||||
info, _ := os.Stat(suite.cache_path + "/" + path)
|
||||
suite.assert.Equal(info.Mode(), os.FileMode(0000))
|
||||
}
|
||||
|
||||
func (suite *fileCacheTestSuite) TestCreateFileWithWritePerm() {
|
||||
defer suite.cleanupTest()
|
||||
// Default is to not create empty files on create file to support immutable storage.
|
||||
path := "file1"
|
||||
options := internal.CreateFileOptions{Name: path, Mode: 0222}
|
||||
f, err := suite.fileCache.CreateFile(options)
|
||||
suite.assert.Nil(err)
|
||||
suite.assert.True(f.Dirty()) // Handle should be dirty since it was not created in storage
|
||||
|
||||
os.Chmod(suite.cache_path+"/"+path, 0331)
|
||||
|
||||
// Path should be added to the file cache
|
||||
_, err = os.Stat(suite.cache_path + "/" + path)
|
||||
suite.assert.True(err == nil || os.IsExist(err))
|
||||
// Path should not be in fake storage
|
||||
_, err = os.Stat(suite.fake_storage_path + "/" + path)
|
||||
suite.assert.True(os.IsNotExist(err))
|
||||
err = suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: f})
|
||||
suite.assert.Nil(err)
|
||||
info, _ := os.Stat(suite.cache_path + "/" + path)
|
||||
suite.assert.Equal(info.Mode(), fs.FileMode(0331))
|
||||
}
|
||||
|
||||
func (suite *fileCacheTestSuite) TestCreateFileInDir() {
|
||||
defer suite.cleanupTest()
|
||||
// Default is to not create empty files on create file to support immutable storage.
|
||||
|
|
20
go.mod
20
go.mod
|
@ -1,9 +1,11 @@
|
|||
module github.com/Azure/azure-storage-fuse/v2
|
||||
|
||||
go 1.20
|
||||
go 1.21.0
|
||||
|
||||
toolchain go1.22.4
|
||||
|
||||
require (
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.13.0
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0
|
||||
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0
|
||||
github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake v1.2.0
|
||||
|
@ -38,7 +40,7 @@ require (
|
|||
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect
|
||||
github.com/kylelemons/godebug v1.1.0 // indirect
|
||||
github.com/magiconair/properties v1.8.7 // indirect
|
||||
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
|
||||
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
|
||||
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
||||
github.com/russross/blackfriday/v2 v2.1.0 // indirect
|
||||
|
@ -46,14 +48,14 @@ require (
|
|||
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
|
||||
github.com/sourcegraph/conc v0.3.0 // indirect
|
||||
github.com/spf13/afero v1.11.0 // indirect
|
||||
github.com/spf13/cast v1.6.0 // indirect
|
||||
github.com/spf13/cast v1.7.0 // indirect
|
||||
github.com/subosito/gotenv v1.6.0 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
golang.org/x/crypto v0.25.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
|
||||
golang.org/x/net v0.27.0 // indirect
|
||||
golang.org/x/sys v0.22.0 // indirect
|
||||
golang.org/x/text v0.16.0 // indirect
|
||||
golang.org/x/crypto v0.26.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 // indirect
|
||||
golang.org/x/net v0.28.0 // indirect
|
||||
golang.org/x/sys v0.24.0 // indirect
|
||||
golang.org/x/text v0.17.0 // indirect
|
||||
)
|
||||
|
||||
replace github.com/spf13/cobra => github.com/gapra-msft/cobra v1.4.1-0.20220411185530-5b83e8ba06dd
|
||||
|
|
49
go.sum
49
go.sum
|
@ -1,10 +1,11 @@
|
|||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.13.0 h1:GJHeeA2N7xrG3q30L2UXDyuWRzDM900/65j70wcM4Ww=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.13.0/go.mod h1:l38EPgmsp71HHLq9j7De57JcKOWPyhrsW1Awm1JS6K0=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0 h1:nyQWyZvwGTvunIMxi1Y9uXkcyr+I7TeNrr/foo4Kpk8=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0/go.mod h1:l38EPgmsp71HHLq9j7De57JcKOWPyhrsW1Awm1JS6K0=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 h1:tfLQ34V6F7tVSwoTf/4lH5sE0o6eCJuNDTmH09nDpbc=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0/go.mod h1:9kIvujWAA58nmPmWB1m23fyWic1kYZMxD9CxaWn4Qpg=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 h1:ywEEhmNahHBihViHepv3xPBn1663uRv2t2q/ESv9seY=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0/go.mod h1:iZDifYGJTIgIIkYRNWPENUnqx6bJ2xnSDFI2tjwZNuY=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0 h1:PiSrjRPpkQNjrM8H0WwKMnZUdu1RGMtd/LdGKUrOo+c=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0/go.mod h1:oDrbWx4ewMylP7xHivfgixbfGBT6APAwsSoHRKotnIc=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0 h1:Be6KInmFEKV81c0pOAEbRYehLMwmmGI1exuFj248AMk=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0/go.mod h1:WCPBHsOXfBVnivScjs2ypRfimjEW0qPVLGgJkZlrIOA=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake v1.2.0 h1:gXpwp0sGZz2FY9lVpSdM1rMpsP9PUtevHQyFhGoqHxY=
|
||||
|
@ -16,11 +17,11 @@ github.com/JeffreyRichter/enum v0.0.0-20180725232043-2567042f9cda/go.mod h1:2CaS
|
|||
github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
|
||||
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
|
||||
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
|
||||
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
|
||||
github.com/gapra-msft/cobra v1.4.1-0.20220411185530-5b83e8ba06dd h1:U3d5Jlb0ANsyxk2lnlhYh7/Ov4bZpIBUxJTsVuJM9G0=
|
||||
|
@ -30,6 +31,7 @@ github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVI
|
|||
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
|
||||
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
|
||||
|
@ -40,7 +42,9 @@ github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLf
|
|||
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 h1:iQTw/8FWTuc7uiaSepXwyf3o52HaUYcV+Tu66S3F5GA=
|
||||
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8=
|
||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
|
||||
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
|
||||
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
|
||||
|
@ -51,16 +55,16 @@ github.com/montanaflynn/stats v0.7.0 h1:r3y12KyNxj/Sb/iOE46ws+3mS1+MZca1wlHQFPsY
|
|||
github.com/montanaflynn/stats v0.7.0/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
|
||||
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
|
||||
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
|
||||
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
|
||||
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
|
||||
github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M=
|
||||
github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc=
|
||||
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ=
|
||||
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/radovskyb/watcher v1.0.7 h1:AYePLih6dpmS32vlHfhCeli8127LzkIgwJGcwwe8tUE=
|
||||
github.com/radovskyb/watcher v1.0.7/go.mod h1:78okwvY5wPdzcb1UYnip1pvrZNIVEIh/Cm+ZuvsUYIg=
|
||||
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
|
||||
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
|
||||
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
|
||||
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
|
||||
github.com/sagikazarmark/locafero v0.6.0 h1:ON7AQg37yzcRPU69mt7gwhFEBwxI6P9T4Qu3N51bwOk=
|
||||
|
@ -73,19 +77,12 @@ github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9yS
|
|||
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
|
||||
github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8=
|
||||
github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY=
|
||||
github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0=
|
||||
github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
|
||||
github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w=
|
||||
github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
|
||||
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
|
||||
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
|
||||
github.com/spf13/viper v1.19.0 h1:RWq5SEjt8o25SROyN3z2OrDB9l7RPd3lwTWU8EcEdcI=
|
||||
github.com/spf13/viper v1.19.0/go.mod h1:GQUN9bilAbhU/jgc1bKs99f/suXKeUMct8Adx5+Ntkg=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
|
||||
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
|
||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
|
||||
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
||||
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
|
||||
|
@ -99,16 +96,16 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
|
|||
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
|
||||
golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
|
||||
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8=
|
||||
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY=
|
||||
golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw=
|
||||
golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54=
|
||||
golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 h1:kx6Ds3MlpiUHKj7syVnbp57++8WpuKPcR5yjLBjvLEA=
|
||||
golang.org/x/exp v0.0.0-20240823005443-9b4947da3948/go.mod h1:akd2r19cwCdwSwWeIdzYQGa/EZZyqcOdwWiwj5L5eKQ=
|
||||
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
|
||||
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
|
||||
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
|
||||
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
|
||||
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
|
@ -117,13 +114,13 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
|
|||
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
|
||||
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
|
||||
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
|
||||
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
|
||||
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
|
||||
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
|
||||
|
@ -132,10 +129,10 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T
|
|||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||
gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
|
||||
gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
|
||||
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
|
||||
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
|
|
@ -24,3 +24,5 @@ Mariner-2.0-x86_64,marinerFuse3AmdRpm,cbl-mariner-2.0-prod-Microsoft-x86_64-yum,
|
|||
Mariner-2.0-aarch64,marinerFuse3AarchRpm,cbl-mariner-2.0-prod-Microsoft-aarch64-yum,
|
||||
Rocky-8.0,fuse3AmdRpm,microsoft-el8-prod-yum,
|
||||
Rocky-9.0,fuse3AmdRpm,microsoft-el9-prod-yum,
|
||||
Mariner-3.0-x86_64,marinerFuse3AmdRpm,azurelinux-3.0-prod-ms-oss-x86_64-yum,
|
||||
Mariner-3.0-aarch64,marinerFuse3AarchRpm,azurelinux-3.0-prod-ms-oss-aarch64-yum,
|
||||
|
|
|
|
@ -47,6 +47,7 @@ import (
|
|||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -59,6 +60,7 @@ var dataValidationAdlsPtr string
|
|||
var quickTest string
|
||||
var streamDirectTest string
|
||||
var distro string
|
||||
var blockSizeMB int = 16
|
||||
|
||||
var minBuff, medBuff, largeBuff, hugeBuff []byte
|
||||
|
||||
|
@ -94,6 +96,7 @@ func initDataValidationFlags() {
|
|||
quickTest = getDataValidationTestFlag("quick-test")
|
||||
streamDirectTest = getDataValidationTestFlag("stream-direct-test")
|
||||
distro = getDataValidationTestFlag("distro-name")
|
||||
blockSizeMB = flag.Lookup("block-size-mb").Value.(flag.Getter).Get().(int)
|
||||
}
|
||||
|
||||
func getDataValidationTestDirName(n int) string {
|
||||
|
@ -120,15 +123,96 @@ func (suite *dataValidationTestSuite) copyToMountDir(localFilePath string, remot
|
|||
suite.Equal(nil, err)
|
||||
}
|
||||
|
||||
// Computes MD5 and returns the 32byte slice which represents the hash value
|
||||
func (suite *dataValidationTestSuite) computeMD5(filePath string) []byte {
|
||||
fh, err := os.Open(filePath)
|
||||
suite.Nil(err)
|
||||
|
||||
fi, err := fh.Stat()
|
||||
suite.Nil(err)
|
||||
size := fi.Size()
|
||||
|
||||
hash := md5.New()
|
||||
bytesCopied, err := io.Copy(hash, fh)
|
||||
suite.Nil(err)
|
||||
suite.Equal(size, bytesCopied)
|
||||
|
||||
err = fh.Close()
|
||||
suite.Nil(err)
|
||||
|
||||
return hash.Sum(nil)
|
||||
}
|
||||
|
||||
func (suite *dataValidationTestSuite) validateData(localFilePath string, remoteFilePath string) {
|
||||
// compare the local and mounted files
|
||||
diffCmd := exec.Command("diff", localFilePath, remoteFilePath)
|
||||
cliOut, err := diffCmd.Output()
|
||||
if len(cliOut) != 0 {
|
||||
fmt.Println(string(cliOut))
|
||||
localMD5sum := suite.computeMD5(localFilePath)
|
||||
remoteMD5sum := suite.computeMD5(remoteFilePath)
|
||||
suite.Equal(localMD5sum, remoteMD5sum)
|
||||
}
|
||||
|
||||
//----------------Utility Functions-----------------------
|
||||
|
||||
// pass the file name and the function returns the LocalFilePath and MountedFilePath
|
||||
func convertFileNameToFilePath(fileName string) (localFilePath string, remoteFilePath string) {
|
||||
localFilePath = tObj.testLocalPath + "/" + fileName
|
||||
remoteFilePath = tObj.testMntPath + "/" + fileName
|
||||
return localFilePath, remoteFilePath
|
||||
}
|
||||
|
||||
// creates File in Local and Mounted Directories and returns there file handles the associated fd has O_RDWR mode
|
||||
func createFileHandleInLocalAndRemote(suite *dataValidationTestSuite, localFilePath, remoteFilePath string) (lfh *os.File, rfh *os.File) {
|
||||
lfh, err := os.Create(localFilePath)
|
||||
suite.Nil(err)
|
||||
|
||||
rfh, err = os.Create(remoteFilePath)
|
||||
suite.Nil(err)
|
||||
|
||||
return lfh, rfh
|
||||
}
|
||||
|
||||
// closes the file handles, This ensures that data is flushed to disk/Azure Storage from the cache
|
||||
func closeFileHandles(suite *dataValidationTestSuite, handles ...*os.File) {
|
||||
for _, h := range handles {
|
||||
err := h.Close()
|
||||
suite.Nil(err)
|
||||
}
|
||||
suite.Equal(0, len(cliOut))
|
||||
suite.Equal(nil, err)
|
||||
}
|
||||
|
||||
// Writes the data at the given Offsets for the given file
|
||||
func writeSparseData(suite *dataValidationTestSuite, fh *os.File, offsets []int64) {
|
||||
ind := uint64(0)
|
||||
for _, o := range offsets {
|
||||
// write 1MB data at offset o
|
||||
n, err := fh.WriteAt(medBuff[ind*_1MB:(ind+1)*_1MB], o)
|
||||
suite.Nil(err)
|
||||
suite.Equal(n, int(_1MB))
|
||||
|
||||
ind = (ind + 1) % 10
|
||||
}
|
||||
}
|
||||
|
||||
func min(x, y int) int {
|
||||
if x < y {
|
||||
return x
|
||||
}
|
||||
return y
|
||||
}
|
||||
|
||||
// Creates the file with filePath and puts random data of size bytes
|
||||
func generateFileWithRandomData(suite *dataValidationTestSuite, filePath string, size int) {
|
||||
fh, err := os.Create(filePath)
|
||||
suite.Nil(err)
|
||||
bufferSize := 4 * 1024
|
||||
buffer := make([]byte, 4*1024)
|
||||
rand.Read(buffer)
|
||||
blocks := size / bufferSize
|
||||
for i := 0; i < blocks; i++ {
|
||||
bytesToWrite := min(bufferSize, size)
|
||||
bytesWritten, err := fh.Write(buffer[0:bytesToWrite])
|
||||
suite.Nil(err)
|
||||
suite.Equal(bytesToWrite, bytesWritten)
|
||||
size -= bytesWritten
|
||||
}
|
||||
closeFileHandles(suite, fh)
|
||||
}
|
||||
|
||||
// -------------- Data Validation Tests -------------------
|
||||
|
@ -382,64 +466,10 @@ func (suite *dataValidationTestSuite) TestMultipleHugeFiles() {
|
|||
createThreadPool(noOfFiles, noOfWorkers, "huge", suite)
|
||||
}
|
||||
|
||||
func computeMD5(filePath string) ([]byte, error) {
|
||||
fh, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fi, err := fh.Stat()
|
||||
fi.Size()
|
||||
|
||||
hash := md5.New()
|
||||
if _, err := io.Copy(hash, fh); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = fh.Close()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return hash.Sum(nil), nil
|
||||
}
|
||||
|
||||
func writeSparseData(suite *dataValidationTestSuite, fh *os.File, offsets []int64) {
|
||||
ind := uint64(0)
|
||||
for _, o := range offsets {
|
||||
// write 1MB data at offset o
|
||||
n, err := fh.WriteAt(medBuff[ind*_1MB:(ind+1)*_1MB], o)
|
||||
suite.Nil(err)
|
||||
suite.Equal(n, int(_1MB))
|
||||
|
||||
ind = (ind + 1) % 10
|
||||
}
|
||||
|
||||
// close the file handle
|
||||
err := fh.Close()
|
||||
suite.Nil(err)
|
||||
}
|
||||
|
||||
func (suite *dataValidationTestSuite) TestSparseFileRandomWrite() {
|
||||
fileName := "sparseFile"
|
||||
localFilePath := tObj.testLocalPath + "/" + fileName
|
||||
remoteFilePath := tObj.testMntPath + "/" + fileName
|
||||
|
||||
// create local file
|
||||
lfh, err := os.Create(localFilePath)
|
||||
suite.Nil(err)
|
||||
|
||||
defer func(fh *os.File) {
|
||||
_ = fh.Close()
|
||||
}(lfh)
|
||||
|
||||
// create remote file
|
||||
rfh, err := os.Create(remoteFilePath)
|
||||
suite.Nil(err)
|
||||
|
||||
defer func(fh *os.File) {
|
||||
_ = fh.Close()
|
||||
}(rfh)
|
||||
localFilePath, remoteFilePath := convertFileNameToFilePath(fileName)
|
||||
lfh, rfh := createFileHandleInLocalAndRemote(suite, localFilePath, remoteFilePath)
|
||||
|
||||
// write to local file
|
||||
writeSparseData(suite, lfh, []int64{0, 164 * int64(_1MB), 100 * int64(_1MB), 65 * int64(_1MB), 129 * int64(_1MB)})
|
||||
|
@ -447,44 +477,21 @@ func (suite *dataValidationTestSuite) TestSparseFileRandomWrite() {
|
|||
// write to remote file
|
||||
writeSparseData(suite, rfh, []int64{0, 164 * int64(_1MB), 100 * int64(_1MB), 65 * int64(_1MB), 129 * int64(_1MB)})
|
||||
|
||||
closeFileHandles(suite, lfh, rfh)
|
||||
// check size of blob uploaded
|
||||
fi, err := os.Stat(remoteFilePath)
|
||||
suite.Nil(err)
|
||||
suite.Equal(fi.Size(), 165*int64(_1MB))
|
||||
|
||||
localMD5, err := computeMD5(localFilePath)
|
||||
suite.Nil(err)
|
||||
suite.NotNil(localMD5)
|
||||
|
||||
remoteMD5, err := computeMD5(remoteFilePath)
|
||||
suite.Nil(err)
|
||||
suite.NotNil(remoteMD5)
|
||||
|
||||
suite.Equal(localMD5, remoteMD5)
|
||||
suite.validateData(localFilePath, remoteFilePath)
|
||||
|
||||
suite.dataValidationTestCleanup([]string{localFilePath, remoteFilePath, tObj.testCachePath})
|
||||
}
|
||||
|
||||
func (suite *dataValidationTestSuite) TestSparseFileRandomWriteBlockOverlap() {
|
||||
fileName := "sparseFileBlockOverlap"
|
||||
localFilePath := tObj.testLocalPath + "/" + fileName
|
||||
remoteFilePath := tObj.testMntPath + "/" + fileName
|
||||
|
||||
// create local file
|
||||
lfh, err := os.Create(localFilePath)
|
||||
suite.Nil(err)
|
||||
|
||||
defer func(fh *os.File) {
|
||||
_ = fh.Close()
|
||||
}(lfh)
|
||||
|
||||
// create remote file
|
||||
rfh, err := os.Create(remoteFilePath)
|
||||
suite.Nil(err)
|
||||
|
||||
defer func(fh *os.File) {
|
||||
_ = fh.Close()
|
||||
}(rfh)
|
||||
localFilePath, remoteFilePath := convertFileNameToFilePath(fileName)
|
||||
lfh, rfh := createFileHandleInLocalAndRemote(suite, localFilePath, remoteFilePath)
|
||||
|
||||
// write to local file
|
||||
writeSparseData(suite, lfh, []int64{0, 170 * int64(_1MB), 63*int64(_1MB) + 1024*512, 129 * int64(_1MB), 100 * int64(_1MB)})
|
||||
|
@ -492,44 +499,22 @@ func (suite *dataValidationTestSuite) TestSparseFileRandomWriteBlockOverlap() {
|
|||
// write to remote file
|
||||
writeSparseData(suite, rfh, []int64{0, 170 * int64(_1MB), 63*int64(_1MB) + 1024*512, 129 * int64(_1MB), 100 * int64(_1MB)})
|
||||
|
||||
closeFileHandles(suite, lfh, rfh)
|
||||
|
||||
// check size of blob uploaded
|
||||
fi, err := os.Stat(remoteFilePath)
|
||||
suite.Nil(err)
|
||||
suite.Equal(fi.Size(), 171*int64(_1MB))
|
||||
|
||||
localMD5, err := computeMD5(localFilePath)
|
||||
suite.Nil(err)
|
||||
suite.NotNil(localMD5)
|
||||
|
||||
remoteMD5, err := computeMD5(remoteFilePath)
|
||||
suite.Nil(err)
|
||||
suite.NotNil(remoteMD5)
|
||||
|
||||
suite.Equal(localMD5, remoteMD5)
|
||||
suite.validateData(localFilePath, remoteFilePath)
|
||||
|
||||
suite.dataValidationTestCleanup([]string{localFilePath, remoteFilePath, tObj.testCachePath})
|
||||
}
|
||||
|
||||
func (suite *dataValidationTestSuite) TestFileReadBytesMultipleBlocks() {
|
||||
fileName := "bytesReadMultipleBlock"
|
||||
localFilePath := tObj.testLocalPath + "/" + fileName
|
||||
remoteFilePath := tObj.testMntPath + "/" + fileName
|
||||
|
||||
// create local file
|
||||
lfh, err := os.Create(localFilePath)
|
||||
suite.Nil(err)
|
||||
|
||||
defer func(fh *os.File) {
|
||||
_ = fh.Close()
|
||||
}(lfh)
|
||||
|
||||
// create remote file
|
||||
rfh, err := os.Create(remoteFilePath)
|
||||
suite.Nil(err)
|
||||
|
||||
defer func(fh *os.File) {
|
||||
_ = fh.Close()
|
||||
}(rfh)
|
||||
localFilePath, remoteFilePath := convertFileNameToFilePath(fileName)
|
||||
lfh, rfh := createFileHandleInLocalAndRemote(suite, localFilePath, remoteFilePath)
|
||||
|
||||
// write 65MB data
|
||||
n, err := lfh.WriteAt(largeBuff[0:65*_1MB], 0)
|
||||
|
@ -541,9 +526,6 @@ func (suite *dataValidationTestSuite) TestFileReadBytesMultipleBlocks() {
|
|||
suite.Nil(err)
|
||||
suite.Equal(n, 7)
|
||||
|
||||
err = lfh.Close()
|
||||
suite.Nil(err)
|
||||
|
||||
// write 65MB data
|
||||
n, err = rfh.WriteAt(largeBuff[0:65*_1MB], 0)
|
||||
suite.Nil(err)
|
||||
|
@ -554,8 +536,7 @@ func (suite *dataValidationTestSuite) TestFileReadBytesMultipleBlocks() {
|
|||
suite.Nil(err)
|
||||
suite.Equal(n, 7)
|
||||
|
||||
err = rfh.Close()
|
||||
suite.Nil(err)
|
||||
closeFileHandles(suite, lfh, rfh)
|
||||
|
||||
// check size of blob uploaded using os.Stat()
|
||||
fi, err := os.Stat(remoteFilePath)
|
||||
|
@ -578,58 +559,29 @@ func (suite *dataValidationTestSuite) TestFileReadBytesMultipleBlocks() {
|
|||
}
|
||||
suite.Equal(totalBytesread, 65*int64(_1MB)+7)
|
||||
|
||||
err = fh.Close()
|
||||
suite.Nil(err)
|
||||
closeFileHandles(suite, fh)
|
||||
|
||||
localMD5, err := computeMD5(localFilePath)
|
||||
suite.Nil(err)
|
||||
suite.NotNil(localMD5)
|
||||
|
||||
remoteMD5, err := computeMD5(remoteFilePath)
|
||||
suite.Nil(err)
|
||||
suite.NotNil(remoteMD5)
|
||||
|
||||
suite.Equal(localMD5, remoteMD5)
|
||||
suite.validateData(localFilePath, remoteFilePath)
|
||||
|
||||
suite.dataValidationTestCleanup([]string{localFilePath, remoteFilePath, tObj.testCachePath})
|
||||
}
|
||||
|
||||
func (suite *dataValidationTestSuite) TestFileReadBytesOneBlock() {
|
||||
fileName := "bytesReadOneBlock"
|
||||
localFilePath := tObj.testLocalPath + "/" + fileName
|
||||
remoteFilePath := tObj.testMntPath + "/" + fileName
|
||||
|
||||
// create local file
|
||||
lfh, err := os.Create(localFilePath)
|
||||
suite.Nil(err)
|
||||
|
||||
defer func(fh *os.File) {
|
||||
_ = fh.Close()
|
||||
}(lfh)
|
||||
|
||||
// create remote file
|
||||
rfh, err := os.Create(remoteFilePath)
|
||||
suite.Nil(err)
|
||||
|
||||
defer func(fh *os.File) {
|
||||
_ = fh.Close()
|
||||
}(rfh)
|
||||
localFilePath, remoteFilePath := convertFileNameToFilePath(fileName)
|
||||
lfh, rfh := createFileHandleInLocalAndRemote(suite, localFilePath, remoteFilePath)
|
||||
|
||||
// write 13 bytes data to local file
|
||||
n, err := lfh.WriteAt(largeBuff[0:13], 0)
|
||||
suite.Nil(err)
|
||||
suite.Equal(n, 13)
|
||||
|
||||
err = lfh.Close()
|
||||
suite.Nil(err)
|
||||
|
||||
// write 13 bytes data to remote file
|
||||
n, err = rfh.WriteAt(largeBuff[0:13], 0)
|
||||
suite.Nil(err)
|
||||
suite.Equal(n, 13)
|
||||
|
||||
err = rfh.Close()
|
||||
suite.Nil(err)
|
||||
closeFileHandles(suite, lfh, rfh)
|
||||
|
||||
// check size of blob uploaded using os.Stat()
|
||||
fi, err := os.Stat(remoteFilePath)
|
||||
|
@ -652,42 +604,17 @@ func (suite *dataValidationTestSuite) TestFileReadBytesOneBlock() {
|
|||
}
|
||||
suite.Equal(totalBytesread, int64(13))
|
||||
|
||||
err = fh.Close()
|
||||
suite.Nil(err)
|
||||
closeFileHandles(suite, fh)
|
||||
|
||||
localMD5, err := computeMD5(localFilePath)
|
||||
suite.Nil(err)
|
||||
suite.NotNil(localMD5)
|
||||
|
||||
remoteMD5, err := computeMD5(remoteFilePath)
|
||||
suite.Nil(err)
|
||||
suite.NotNil(remoteMD5)
|
||||
|
||||
suite.Equal(localMD5, remoteMD5)
|
||||
suite.validateData(localFilePath, remoteFilePath)
|
||||
|
||||
suite.dataValidationTestCleanup([]string{localFilePath, remoteFilePath, tObj.testCachePath})
|
||||
}
|
||||
|
||||
func (suite *dataValidationTestSuite) TestRandomWriteRaceCondition() {
|
||||
fileName := "randomWriteRaceCondition"
|
||||
localFilePath := tObj.testLocalPath + "/" + fileName
|
||||
remoteFilePath := tObj.testMntPath + "/" + fileName
|
||||
|
||||
// create local file
|
||||
lfh, err := os.Create(localFilePath)
|
||||
suite.Nil(err)
|
||||
|
||||
defer func(fh *os.File) {
|
||||
_ = fh.Close()
|
||||
}(lfh)
|
||||
|
||||
// create remote file
|
||||
rfh, err := os.Create(remoteFilePath)
|
||||
suite.Nil(err)
|
||||
|
||||
defer func(fh *os.File) {
|
||||
_ = fh.Close()
|
||||
}(rfh)
|
||||
localFilePath, remoteFilePath := convertFileNameToFilePath(fileName)
|
||||
lfh, rfh := createFileHandleInLocalAndRemote(suite, localFilePath, remoteFilePath)
|
||||
|
||||
offsetList := []int64{}
|
||||
for i := 0; i < 10; i++ {
|
||||
|
@ -702,24 +629,96 @@ func (suite *dataValidationTestSuite) TestRandomWriteRaceCondition() {
|
|||
// write to remote file
|
||||
writeSparseData(suite, rfh, offsetList)
|
||||
|
||||
closeFileHandles(suite, lfh, rfh)
|
||||
|
||||
// check size of blob uploaded
|
||||
fi, err := os.Stat(remoteFilePath)
|
||||
suite.Nil(err)
|
||||
suite.Equal(fi.Size(), 145*int64(_1MB))
|
||||
|
||||
localMD5, err := computeMD5(localFilePath)
|
||||
suite.Nil(err)
|
||||
suite.NotNil(localMD5)
|
||||
|
||||
remoteMD5, err := computeMD5(remoteFilePath)
|
||||
suite.Nil(err)
|
||||
suite.NotNil(remoteMD5)
|
||||
|
||||
suite.Equal(localMD5, remoteMD5)
|
||||
suite.validateData(localFilePath, remoteFilePath)
|
||||
|
||||
suite.dataValidationTestCleanup([]string{localFilePath, remoteFilePath, tObj.testCachePath})
|
||||
}
|
||||
|
||||
func (suite *dataValidationTestSuite) TestPanicOnClosingFile() {
|
||||
fileName := "panicOnClosingFile"
|
||||
_, remoteFilePath := convertFileNameToFilePath(fileName)
|
||||
blockSizeBytes := blockSizeMB * int(_1MB)
|
||||
buffer := make([]byte, blockSizeBytes)
|
||||
generateFileWithRandomData(suite, remoteFilePath, blockSizeBytes*10)
|
||||
|
||||
rfh, err := os.OpenFile(remoteFilePath, syscall.O_RDWR, 0666)
|
||||
suite.Nil(err)
|
||||
|
||||
//Read 1st block
|
||||
bytes_read, err := rfh.Read(buffer)
|
||||
suite.Nil(err)
|
||||
suite.Equal(bytes_read, blockSizeBytes)
|
||||
|
||||
//Write to 2nd block
|
||||
bytes_written, err := rfh.Write(buffer)
|
||||
suite.Equal(bytes_written, blockSizeBytes)
|
||||
suite.Nil(err)
|
||||
|
||||
closeFileHandles(suite, rfh)
|
||||
}
|
||||
|
||||
// This tests takes default 1MB blockSize as other blockSizes could cause the test to take some time.
|
||||
func (suite *dataValidationTestSuite) TestPanicOnWritingToFile() {
|
||||
fileName := "panicOnWritingToFile"
|
||||
_, remoteFilePath := convertFileNameToFilePath(fileName)
|
||||
blockSizeBytes := blockSizeMB * int(_1MB)
|
||||
buffer := make([]byte, blockSizeBytes)
|
||||
generateFileWithRandomData(suite, remoteFilePath, blockSizeBytes*20)
|
||||
|
||||
rfh, err := os.OpenFile(remoteFilePath, syscall.O_RDWR, 0666)
|
||||
suite.Nil(err)
|
||||
|
||||
//Make the cooking+cooked=prefetchCount
|
||||
for i := 0; i < 3; i++ {
|
||||
offset := 4 * int64(i) * int64(_1MB)
|
||||
bytes_read, err := rfh.ReadAt(buffer, offset)
|
||||
suite.Nil(err)
|
||||
suite.Equal(bytes_read, blockSizeBytes)
|
||||
}
|
||||
|
||||
for i := 18; i < 20; i++ {
|
||||
bytes_written, err := rfh.WriteAt(buffer, 18*int64(blockSizeBytes))
|
||||
suite.Equal(bytes_written, blockSizeBytes)
|
||||
suite.Nil(err)
|
||||
}
|
||||
|
||||
closeFileHandles(suite, rfh)
|
||||
}
|
||||
|
||||
// This tests takes default 1MB blockSize as other blockSizes could cause the test to take some time.
|
||||
func (suite *dataValidationTestSuite) TestPanicOnReadingFileInRandReadMode() {
|
||||
fileName := "panicOnReadingFileInRandReadMode"
|
||||
_, remoteFilePath := convertFileNameToFilePath(fileName)
|
||||
blockSizeBytes := int(_1MB)
|
||||
buffer := make([]byte, blockSizeBytes)
|
||||
generateFileWithRandomData(suite, remoteFilePath, blockSizeBytes*84)
|
||||
|
||||
rfh, err := os.OpenFile(remoteFilePath, syscall.O_RDWR, 0666)
|
||||
suite.Nil(err)
|
||||
|
||||
//Write at some offset
|
||||
bytes_written, err := rfh.WriteAt(buffer, 0)
|
||||
suite.Equal(bytes_written, blockSizeBytes)
|
||||
suite.Nil(err)
|
||||
|
||||
//Make the file handle goto random read mode in block cache(This is causing panic)
|
||||
for i := 0; i < 14; i++ {
|
||||
offset := int64(_1MB) * 6 * int64(i)
|
||||
bytes_read, err := rfh.ReadAt(buffer, offset)
|
||||
suite.Nil(err)
|
||||
suite.Equal(bytes_read, blockSizeBytes)
|
||||
}
|
||||
|
||||
closeFileHandles(suite, rfh)
|
||||
}
|
||||
|
||||
// -------------- Main Method -------------------
|
||||
func TestDataValidationTestSuite(t *testing.T) {
|
||||
initDataValidationFlags()
|
||||
|
@ -762,6 +761,8 @@ func TestDataValidationTestSuite(t *testing.T) {
|
|||
fmt.Println("BLOCK Blob Testing...")
|
||||
}
|
||||
|
||||
fmt.Println("Block Size MB Used for the tests ", blockSizeMB)
|
||||
|
||||
// Sanity check in the off chance the same random name was generated twice and was still around somehow
|
||||
err := os.RemoveAll(tObj.testMntPath)
|
||||
if err != nil {
|
||||
|
@ -795,4 +796,5 @@ func init() {
|
|||
regDataValidationTestFlag(&quickTest, "quick-test", "true", "Run quick tests")
|
||||
regDataValidationTestFlag(&streamDirectTest, "stream-direct-test", "false", "Run stream direct tests")
|
||||
regDataValidationTestFlag(&distro, "distro-name", "", "Name of the distro")
|
||||
flag.IntVar(&blockSizeMB, "block-size-mb", 16, "Block size MB in block cache")
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче