azure-storage-fuse/component/azstorage/datalake_test.go

2728 строки
89 KiB
Go

//go:build !authtest
// +build !authtest
/*
_____ _____ _____ ____ ______ _____ ------
| | | | | | | | | | | | |
| | | | | | | | | | | | |
| --- | | | | |-----| |---- | | |-----| |----- ------
| | | | | | | | | | | | |
| ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____
Licensed under the MIT License <http://opensource.org/licenses/MIT>.
Copyright © 2020-2024 Microsoft Corporation. All rights reserved.
Author : <blobfusedev@microsoft.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
*/
package azstorage
import (
"bytes"
"container/list"
"context"
"crypto/rand"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"strings"
"syscall"
"testing"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/directory"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/file"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/filesystem"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/service"
"github.com/Azure/azure-storage-fuse/v2/common"
"github.com/Azure/azure-storage-fuse/v2/common/log"
"github.com/Azure/azure-storage-fuse/v2/internal"
"github.com/Azure/azure-storage-fuse/v2/internal/handlemap"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
type datalakeTestSuite struct {
suite.Suite
assert *assert.Assertions
az *AzStorage
serviceClient *service.Client
containerClient *filesystem.Client
config string
container string
}
func (s *datalakeTestSuite) SetupTest() {
// Logging config
cfg := common.LogConfig{
FilePath: "./logfile.txt",
MaxFileSize: 10,
FileCount: 10,
Level: common.ELogLevel.LOG_DEBUG(),
}
log.SetDefaultLogger("base", cfg)
homeDir, err := os.UserHomeDir()
if err != nil {
fmt.Println("Unable to get home directory")
os.Exit(1)
}
cfgFile, err := os.Open(homeDir + "/azuretest.json")
if err != nil {
fmt.Println("Unable to open config file")
os.Exit(1)
}
cfgData, _ := io.ReadAll(cfgFile)
err = json.Unmarshal(cfgData, &storageTestConfigurationParameters)
if err != nil {
fmt.Println("Failed to parse the config file")
os.Exit(1)
}
cfgFile.Close()
s.setupTestHelper("", "", true)
}
func (s *datalakeTestSuite) setupTestHelper(configuration string, container string, create bool) {
if container == "" {
container = generateContainerName()
}
s.container = container
if configuration == "" {
configuration = fmt.Sprintf("azstorage:\n account-name: %s\n endpoint: https://%s.dfs.core.windows.net/\n type: adls\n account-key: %s\n mode: key\n container: %s\n fail-unsupported-op: true",
storageTestConfigurationParameters.AdlsAccount, storageTestConfigurationParameters.AdlsAccount, storageTestConfigurationParameters.AdlsKey, s.container)
}
s.config = configuration
s.assert = assert.New(s.T())
s.az, _ = newTestAzStorage(configuration)
s.az.Start(ctx) // Note: Start->TestValidation will fail but it doesn't matter. We are creating the container a few lines below anyway.
// We could create the container before but that requires rewriting the code to new up a service client.
s.serviceClient = s.az.storage.(*Datalake).Service // Grab the service client to do some validation
s.containerClient = s.serviceClient.NewFileSystemClient(s.container)
if create {
s.containerClient.Create(ctx, nil)
}
}
func (s *datalakeTestSuite) tearDownTestHelper(delete bool) {
s.az.Stop()
if delete {
s.containerClient.Delete(ctx, nil)
}
}
func (s *datalakeTestSuite) cleanupTest() {
s.tearDownTestHelper(true)
log.Destroy()
}
func (s *datalakeTestSuite) TestDefault() {
defer s.cleanupTest()
s.assert.Equal(storageTestConfigurationParameters.AdlsAccount, s.az.stConfig.authConfig.AccountName)
s.assert.Equal(EAccountType.ADLS(), s.az.stConfig.authConfig.AccountType)
s.assert.False(s.az.stConfig.authConfig.UseHTTP)
s.assert.Equal(storageTestConfigurationParameters.AdlsKey, s.az.stConfig.authConfig.AccountKey)
s.assert.Empty(s.az.stConfig.authConfig.SASKey)
s.assert.Empty(s.az.stConfig.authConfig.ApplicationID)
s.assert.Empty(s.az.stConfig.authConfig.ResourceID)
s.assert.Empty(s.az.stConfig.authConfig.ActiveDirectoryEndpoint)
s.assert.Empty(s.az.stConfig.authConfig.ClientSecret)
s.assert.Empty(s.az.stConfig.authConfig.TenantID)
s.assert.Empty(s.az.stConfig.authConfig.ClientID)
s.assert.EqualValues("https://"+s.az.stConfig.authConfig.AccountName+".dfs.core.windows.net/", s.az.stConfig.authConfig.Endpoint)
s.assert.Equal(EAuthType.KEY(), s.az.stConfig.authConfig.AuthMode)
s.assert.Equal(s.container, s.az.stConfig.container)
s.assert.Empty(s.az.stConfig.prefixPath)
s.assert.EqualValues(0, s.az.stConfig.blockSize)
s.assert.EqualValues(32, s.az.stConfig.maxConcurrency)
s.assert.EqualValues((*blob.AccessTier)(nil), s.az.stConfig.defaultTier)
s.assert.EqualValues(0, s.az.stConfig.cancelListForSeconds)
s.assert.EqualValues(5, s.az.stConfig.maxRetries)
s.assert.EqualValues(900, s.az.stConfig.maxTimeout)
s.assert.EqualValues(4, s.az.stConfig.backoffTime)
s.assert.EqualValues(60, s.az.stConfig.maxRetryDelay)
s.assert.Empty(s.az.stConfig.proxyAddress)
}
func (s *datalakeTestSuite) TestModifyEndpoint() {
defer s.cleanupTest()
// Setup
s.tearDownTestHelper(false) // Don't delete the generated container.
config := fmt.Sprintf("azstorage:\n account-name: %s\n endpoint: https://%s.blob.core.windows.net/\n type: adls\n account-key: %s\n mode: key\n container: %s\n fail-unsupported-op: true",
storageTestConfigurationParameters.AdlsAccount, storageTestConfigurationParameters.AdlsAccount, storageTestConfigurationParameters.AdlsKey, s.container)
s.setupTestHelper(config, s.container, true)
err := s.az.storage.TestPipeline()
s.assert.Nil(err)
}
func (s *datalakeTestSuite) TestNoEndpoint() {
defer s.cleanupTest()
// Setup
s.tearDownTestHelper(false) // Don't delete the generated container.
config := fmt.Sprintf("azstorage:\n account-name: %s\n type: adls\n account-key: %s\n mode: key\n container: %s\n fail-unsupported-op: true",
storageTestConfigurationParameters.AdlsAccount, storageTestConfigurationParameters.AdlsKey, s.container)
s.setupTestHelper(config, s.container, true)
err := s.az.storage.TestPipeline()
s.assert.Nil(err)
}
func (s *datalakeTestSuite) TestListContainers() {
defer s.cleanupTest()
// Setup
num := 10
prefix := generateContainerName()
for i := 0; i < num; i++ {
f := s.serviceClient.NewFileSystemClient(prefix + fmt.Sprint(i))
f.Create(ctx, nil)
defer f.Delete(ctx, nil)
}
containers, err := s.az.ListContainers()
s.assert.Nil(err)
s.assert.NotNil(containers)
s.assert.True(len(containers) >= num)
count := 0
for _, c := range containers {
if strings.HasPrefix(c, prefix) {
count++
}
}
s.assert.EqualValues(num, count)
}
// TODO : ListContainersHuge: Maybe this is overkill?
func (s *datalakeTestSuite) TestCreateDir() {
defer s.cleanupTest()
// Testing dir and dir/
var paths = []string{generateDirectoryName(), generateDirectoryName() + "/"}
for _, path := range paths {
log.Debug(path)
s.Run(path, func() {
err := s.az.CreateDir(internal.CreateDirOptions{Name: path})
s.assert.Nil(err)
// Directory should be in the account
dir := s.containerClient.NewDirectoryClient(internal.TruncateDirName(path))
_, err = dir.GetProperties(ctx, nil)
s.assert.Nil(err)
})
}
}
func (s *datalakeTestSuite) TestCreateDirWithCPKEnabled() {
defer s.cleanupTest()
CPKEncryptionKey, CPKEncryptionKeySHA256 := generateCPKInfo()
config := fmt.Sprintf("azstorage:\n account-name: %s\n endpoint: https://%s.dfs.core.windows.net/\n type: adls\n account-key: %s\n mode: key\n container: %s\n cpk-enabled: true\n cpk-encryption-key: %s\n cpk-encryption-key-sha256: %s\n",
storageTestConfigurationParameters.AdlsAccount, storageTestConfigurationParameters.AdlsAccount, storageTestConfigurationParameters.AdlsKey, s.container, CPKEncryptionKey, CPKEncryptionKeySHA256)
s.setupTestHelper(config, s.container, false)
datalakeCPKOpt := &file.CPKInfo{
EncryptionKey: &CPKEncryptionKey,
EncryptionKeySHA256: &CPKEncryptionKeySHA256,
EncryptionAlgorithm: to.Ptr(file.EncryptionAlgorithmTypeAES256),
}
// Testing dir and dir/
var paths = []string{generateDirectoryName()}
for _, path := range paths {
log.Debug(path)
s.Run(path, func() {
err := s.az.CreateDir(internal.CreateDirOptions{Name: path})
s.assert.Nil(err)
// Directory should not be accessible without CPK
dir := s.containerClient.NewDirectoryClient(internal.TruncateDirName(path))
_, err = dir.GetProperties(ctx, nil)
s.assert.NotNil(err)
//Directory should exist
dir = s.containerClient.NewDirectoryClient(internal.TruncateDirName(path))
_, err = dir.GetProperties(ctx, &directory.GetPropertiesOptions{
CPKInfo: datalakeCPKOpt,
})
s.assert.Nil(err)
})
}
}
func (s *datalakeTestSuite) TestDeleteDir() {
defer s.cleanupTest()
// Testing dir and dir/
var paths = []string{generateDirectoryName(), generateDirectoryName() + "/"}
for _, path := range paths {
log.Debug(path)
s.Run(path, func() {
s.az.CreateDir(internal.CreateDirOptions{Name: path})
err := s.az.DeleteDir(internal.DeleteDirOptions{Name: path})
s.assert.Nil(err)
// Directory should not be in the account
dir := s.containerClient.NewDirectoryClient(internal.TruncateDirName(path))
_, err = dir.GetProperties(ctx, nil)
s.assert.NotNil(err)
})
}
}
// Directory structure
// a/
// a/c1/
// a/c1/gc1
// a/c2
// ab/
// ab/c1
// ac
func (s *datalakeTestSuite) setupHierarchy(base string) (*list.List, *list.List, *list.List) {
// Hierarchy looks as follows
// a/
// a/c1/
// a/c1/gc1
// a/c2
// ab/
// ab/c1
// ac
s.az.CreateDir(internal.CreateDirOptions{Name: base})
c1 := base + "/c1"
s.az.CreateDir(internal.CreateDirOptions{Name: c1})
gc1 := c1 + "/gc1"
s.az.CreateFile(internal.CreateFileOptions{Name: gc1})
c2 := base + "/c2"
s.az.CreateFile(internal.CreateFileOptions{Name: c2})
abPath := base + "b"
s.az.CreateDir(internal.CreateDirOptions{Name: abPath})
abc1 := abPath + "/c1"
s.az.CreateFile(internal.CreateFileOptions{Name: abc1})
acPath := base + "c"
s.az.CreateFile(internal.CreateFileOptions{Name: acPath})
a, ab, ac := generateNestedDirectory(base)
// Validate the paths were setup correctly and all paths exist
for p := a.Front(); p != nil; p = p.Next() {
_, err := s.containerClient.NewDirectoryClient(p.Value.(string)).GetProperties(ctx, nil)
s.assert.Nil(err)
}
for p := ab.Front(); p != nil; p = p.Next() {
_, err := s.containerClient.NewDirectoryClient(p.Value.(string)).GetProperties(ctx, nil)
s.assert.Nil(err)
}
for p := ac.Front(); p != nil; p = p.Next() {
_, err := s.containerClient.NewDirectoryClient(p.Value.(string)).GetProperties(ctx, nil)
s.assert.Nil(err)
}
return a, ab, ac
}
func (s *datalakeTestSuite) TestDeleteDirHierarchy() {
defer s.cleanupTest()
// Setup
base := generateDirectoryName()
a, ab, ac := s.setupHierarchy(base)
err := s.az.DeleteDir(internal.DeleteDirOptions{Name: base})
s.assert.Nil(err)
// a paths should be deleted
for p := a.Front(); p != nil; p = p.Next() {
_, err = s.containerClient.NewDirectoryClient(p.Value.(string)).GetProperties(ctx, nil)
s.assert.NotNil(err)
}
ab.PushBackList(ac) // ab and ac paths should exist
for p := ab.Front(); p != nil; p = p.Next() {
_, err = s.containerClient.NewDirectoryClient(p.Value.(string)).GetProperties(ctx, nil)
s.assert.Nil(err)
}
}
func (s *datalakeTestSuite) TestDeleteSubDirPrefixPath() {
defer s.cleanupTest()
// Setup
base := generateDirectoryName()
a, ab, ac := s.setupHierarchy(base)
s.az.storage.SetPrefixPath(base)
err := s.az.DeleteDir(internal.DeleteDirOptions{Name: "c1"})
s.assert.Nil(err)
// a paths under c1 should be deleted
for p := a.Front(); p != nil; p = p.Next() {
path := p.Value.(string)
_, err = s.containerClient.NewDirectoryClient(path).GetProperties(ctx, nil)
if strings.HasPrefix(path, base+"/c1") {
s.assert.NotNil(err)
} else {
s.assert.Nil(err)
}
}
ab.PushBackList(ac) // ab and ac paths should exist
for p := ab.Front(); p != nil; p = p.Next() {
_, err = s.containerClient.NewDirectoryClient(p.Value.(string)).GetProperties(ctx, nil)
s.assert.Nil(err)
}
}
func (s *datalakeTestSuite) TestDeleteDirError() {
defer s.cleanupTest()
// Setup
name := generateDirectoryName()
err := s.az.DeleteDir(internal.DeleteDirOptions{Name: name})
s.assert.NotNil(err)
s.assert.EqualValues(syscall.ENOENT, err)
// Directory should not be in the account
dir := s.containerClient.NewDirectoryClient(name)
_, err = dir.GetProperties(ctx, nil)
s.assert.NotNil(err)
}
func (s *datalakeTestSuite) TestIsDirEmpty() {
defer s.cleanupTest()
// Setup
name := generateDirectoryName()
s.az.CreateDir(internal.CreateDirOptions{Name: name})
// Testing dir and dir/
var paths = []string{name, name + "/"}
for _, path := range paths {
log.Debug(path)
s.Run(path, func() {
empty := s.az.IsDirEmpty(internal.IsDirEmptyOptions{Name: name})
s.assert.True(empty)
})
}
}
func (s *datalakeTestSuite) TestIsDirEmptyFalse() {
defer s.cleanupTest()
// Setup
name := generateDirectoryName()
s.az.CreateDir(internal.CreateDirOptions{Name: name})
file := name + "/" + generateFileName()
s.az.CreateFile(internal.CreateFileOptions{Name: file})
empty := s.az.IsDirEmpty(internal.IsDirEmptyOptions{Name: name})
s.assert.False(empty)
}
func (s *datalakeTestSuite) TestIsDirEmptyError() {
defer s.cleanupTest()
// Setup
name := generateDirectoryName()
empty := s.az.IsDirEmpty(internal.IsDirEmptyOptions{Name: name})
s.assert.False(empty) // Note: See comment in BlockBlob.List. BlockBlob behaves differently from Datalake
// Directory should not be in the account
dir := s.containerClient.NewDirectoryClient(name)
_, err := dir.GetProperties(ctx, nil)
s.assert.NotNil(err)
}
func (s *datalakeTestSuite) TestReadDir() {
defer s.cleanupTest()
// This tests the default listBlocked = 0. It should return the expected paths.
// Setup
name := generateDirectoryName()
s.az.CreateDir(internal.CreateDirOptions{Name: name})
childName := name + "/" + generateFileName()
s.az.CreateFile(internal.CreateFileOptions{Name: childName})
// Testing dir and dir/
var paths = []string{name, name + "/"}
for _, path := range paths {
log.Debug(path)
s.Run(path, func() {
entries, err := s.az.ReadDir(internal.ReadDirOptions{Name: path})
s.assert.Nil(err)
s.assert.EqualValues(1, len(entries))
})
}
}
func (s *datalakeTestSuite) TestReadDirHierarchy() {
defer s.cleanupTest()
// Setup
base := generateDirectoryName()
s.setupHierarchy(base)
// ReadDir only reads the first level of the hierarchy
entries, err := s.az.ReadDir(internal.ReadDirOptions{Name: base})
s.assert.Nil(err)
s.assert.EqualValues(2, len(entries))
// Check the dir
s.assert.EqualValues(base+"/c1", entries[0].Path)
s.assert.EqualValues("c1", entries[0].Name)
s.assert.True(entries[0].IsDir())
s.assert.False(entries[0].IsModeDefault())
// Check the file
s.assert.EqualValues(base+"/c2", entries[1].Path)
s.assert.EqualValues("c2", entries[1].Name)
s.assert.False(entries[1].IsDir())
s.assert.False(entries[1].IsModeDefault())
}
func (s *datalakeTestSuite) TestReadDirRoot() {
defer s.cleanupTest()
// Setup
base := generateDirectoryName()
s.setupHierarchy(base)
// Testing dir and dir/
var paths = []string{"", "/"}
for _, path := range paths {
log.Debug(path)
s.Run(path, func() {
// ReadDir only reads the first level of the hierarchy
entries, err := s.az.ReadDir(internal.ReadDirOptions{Name: path})
s.assert.Nil(err)
s.assert.EqualValues(3, len(entries))
// Check the base dir
s.assert.EqualValues(base, entries[0].Path)
s.assert.EqualValues(base, entries[0].Name)
s.assert.True(entries[0].IsDir())
s.assert.False(entries[0].IsModeDefault())
// Check the baseb dir
s.assert.EqualValues(base+"b", entries[1].Path)
s.assert.EqualValues(base+"b", entries[1].Name)
s.assert.True(entries[1].IsDir())
s.assert.False(entries[1].IsModeDefault())
// Check the basec file
s.assert.EqualValues(base+"c", entries[2].Path)
s.assert.EqualValues(base+"c", entries[2].Name)
s.assert.False(entries[2].IsDir())
s.assert.False(entries[2].IsModeDefault())
})
}
}
func (s *datalakeTestSuite) TestReadDirSubDir() {
defer s.cleanupTest()
// Setup
base := generateDirectoryName()
s.setupHierarchy(base)
// ReadDir only reads the first level of the hierarchy
entries, err := s.az.ReadDir(internal.ReadDirOptions{Name: base + "/c1"})
s.assert.Nil(err)
s.assert.EqualValues(1, len(entries))
// Check the dir
s.assert.EqualValues(base+"/c1"+"/gc1", entries[0].Path)
s.assert.EqualValues("gc1", entries[0].Name)
s.assert.False(entries[0].IsDir())
s.assert.False(entries[0].IsModeDefault())
}
func (s *datalakeTestSuite) TestReadDirSubDirPrefixPath() {
defer s.cleanupTest()
// Setup
base := generateDirectoryName()
s.setupHierarchy(base)
s.az.storage.SetPrefixPath(base)
// ReadDir only reads the first level of the hierarchy
entries, err := s.az.ReadDir(internal.ReadDirOptions{Name: "/c1"})
s.assert.Nil(err)
s.assert.EqualValues(1, len(entries))
// Check the dir
s.assert.EqualValues(base+"/c1"+"/gc1", entries[0].Path)
s.assert.EqualValues("gc1", entries[0].Name)
s.assert.False(entries[0].IsDir())
s.assert.False(entries[0].IsModeDefault())
}
func (s *datalakeTestSuite) TestReadDirError() {
defer s.cleanupTest()
// Setup
name := generateDirectoryName()
entries, err := s.az.ReadDir(internal.ReadDirOptions{Name: name})
s.assert.NotNil(err) // Note: See comment in BlockBlob.List. BlockBlob behaves differently from Datalake
s.assert.Empty(entries)
// Directory should not be in the account
dir := s.containerClient.NewDirectoryClient(name)
_, err = dir.GetProperties(ctx, nil)
s.assert.NotNil(err)
}
func (s *datalakeTestSuite) TestReadDirListBlocked() {
defer s.cleanupTest()
// Setup
s.tearDownTestHelper(false) // Don't delete the generated container.
listBlockedTime := 10
config := fmt.Sprintf("azstorage:\n account-name: %s\n endpoint: https://%s.dfs.core.windows.net/\n type: adls\n account-key: %s\n mode: key\n container: %s\n block-list-on-mount-sec: %d\n fail-unsupported-op: true\n",
storageTestConfigurationParameters.AdlsAccount, storageTestConfigurationParameters.AdlsAccount, storageTestConfigurationParameters.AdlsKey, s.container, listBlockedTime)
s.setupTestHelper(config, s.container, true)
name := generateDirectoryName()
s.az.CreateDir(internal.CreateDirOptions{Name: name})
childName := name + "/" + generateFileName()
s.az.CreateFile(internal.CreateFileOptions{Name: childName})
entries, err := s.az.ReadDir(internal.ReadDirOptions{Name: name})
s.assert.Nil(err)
s.assert.EqualValues(0, len(entries)) // Since we block the list, it will return an empty list.
}
func (s *datalakeTestSuite) TestRenameDir() {
defer s.cleanupTest()
// Test handling "dir" and "dir/"
var inputs = []struct {
src string
dst string
}{
{src: generateDirectoryName(), dst: generateDirectoryName()},
{src: generateDirectoryName() + "/", dst: generateDirectoryName()},
{src: generateDirectoryName(), dst: generateDirectoryName() + "/"},
{src: generateDirectoryName() + "/", dst: generateDirectoryName() + "/"},
}
for _, input := range inputs {
s.Run(input.src+"->"+input.dst, func() {
// Setup
s.az.CreateDir(internal.CreateDirOptions{Name: input.src})
err := s.az.RenameDir(internal.RenameDirOptions{Src: input.src, Dst: input.dst})
s.assert.Nil(err)
// Src should not be in the account
dir := s.containerClient.NewDirectoryClient(internal.TruncateDirName(input.src))
_, err = dir.GetProperties(ctx, nil)
s.assert.NotNil(err)
// Dst should be in the account
dir = s.containerClient.NewDirectoryClient(internal.TruncateDirName(input.dst))
_, err = dir.GetProperties(ctx, nil)
s.assert.Nil(err)
})
}
}
func (s *datalakeTestSuite) TestRenameDirWithCPKEnabled() {
defer s.cleanupTest()
CPKEncryptionKey, CPKEncryptionKeySHA256 := generateCPKInfo()
config := fmt.Sprintf("azstorage:\n account-name: %s\n endpoint: https://%s.dfs.core.windows.net/\n type: adls\n account-key: %s\n mode: key\n container: %s\n cpk-enabled: true\n cpk-encryption-key: %s\n cpk-encryption-key-sha256: %s\n",
storageTestConfigurationParameters.AdlsAccount, storageTestConfigurationParameters.AdlsAccount, storageTestConfigurationParameters.AdlsKey, s.container, CPKEncryptionKey, CPKEncryptionKeySHA256)
s.setupTestHelper(config, s.container, false)
datalakeCPKOpt := &file.CPKInfo{
EncryptionKey: &CPKEncryptionKey,
EncryptionKeySHA256: &CPKEncryptionKeySHA256,
EncryptionAlgorithm: to.Ptr(file.EncryptionAlgorithmTypeAES256),
}
// Test handling "dir" and "dir/"
var inputs = []struct {
src string
dst string
}{
{src: generateDirectoryName(), dst: generateDirectoryName()},
{src: generateDirectoryName() + "/", dst: generateDirectoryName()},
{src: generateDirectoryName(), dst: generateDirectoryName() + "/"},
{src: generateDirectoryName() + "/", dst: generateDirectoryName() + "/"},
}
for _, input := range inputs {
s.Run(input.src+"->"+input.dst, func() {
// Setup
s.az.CreateDir(internal.CreateDirOptions{Name: input.src})
err := s.az.RenameDir(internal.RenameDirOptions{Src: input.src, Dst: input.dst})
s.assert.Nil(err)
// Src should not be in the account
dir := s.containerClient.NewDirectoryClient(internal.TruncateDirName(input.src))
_, err = dir.GetProperties(ctx, &directory.GetPropertiesOptions{
CPKInfo: datalakeCPKOpt,
})
s.assert.NotNil(err)
// Dst should be in the account
dir = s.containerClient.NewDirectoryClient(internal.TruncateDirName(input.dst))
_, err = dir.GetProperties(ctx, &directory.GetPropertiesOptions{
CPKInfo: datalakeCPKOpt,
})
s.assert.Nil(err)
})
}
}
func (s *datalakeTestSuite) TestRenameDirHierarchy() {
defer s.cleanupTest()
// Setup
baseSrc := generateDirectoryName()
aSrc, abSrc, acSrc := s.setupHierarchy(baseSrc)
baseDst := generateDirectoryName()
aDst, abDst, acDst := generateNestedDirectory(baseDst)
err := s.az.RenameDir(internal.RenameDirOptions{Src: baseSrc, Dst: baseDst})
s.assert.Nil(err)
// Source
// aSrc paths should be deleted
for p := aSrc.Front(); p != nil; p = p.Next() {
_, err = s.containerClient.NewDirectoryClient(p.Value.(string)).GetProperties(ctx, nil)
s.assert.NotNil(err)
}
abSrc.PushBackList(acSrc) // abSrc and acSrc paths should exist
for p := abSrc.Front(); p != nil; p = p.Next() {
_, err = s.containerClient.NewDirectoryClient(p.Value.(string)).GetProperties(ctx, nil)
s.assert.Nil(err)
}
// Destination
// aDst paths should exist
for p := aDst.Front(); p != nil; p = p.Next() {
_, err = s.containerClient.NewDirectoryClient(p.Value.(string)).GetProperties(ctx, nil)
s.assert.Nil(err)
}
abDst.PushBackList(acDst) // abDst and acDst paths should not exist
for p := abDst.Front(); p != nil; p = p.Next() {
_, err = s.containerClient.NewDirectoryClient(p.Value.(string)).GetProperties(ctx, nil)
s.assert.NotNil(err)
}
}
func (s *datalakeTestSuite) TestRenameDirSubDirPrefixPath() {
defer s.cleanupTest()
// Setup
baseSrc := generateDirectoryName()
aSrc, abSrc, acSrc := s.setupHierarchy(baseSrc)
baseDst := generateDirectoryName()
s.az.storage.SetPrefixPath(baseSrc)
err := s.az.RenameDir(internal.RenameDirOptions{Src: "c1", Dst: baseDst})
s.assert.Nil(err)
// Source
// aSrc paths under c1 should be deleted
for p := aSrc.Front(); p != nil; p = p.Next() {
path := p.Value.(string)
_, err = s.containerClient.NewDirectoryClient(path).GetProperties(ctx, nil)
if strings.HasPrefix(path, baseSrc+"/c1") {
s.assert.NotNil(err)
} else {
s.assert.Nil(err)
}
}
abSrc.PushBackList(acSrc) // abSrc and acSrc paths should exist
for p := abSrc.Front(); p != nil; p = p.Next() {
_, err = s.containerClient.NewDirectoryClient(p.Value.(string)).GetProperties(ctx, nil)
s.assert.Nil(err)
}
// Destination
// aDst paths should exist -> aDst and aDst/gc1
_, err = s.containerClient.NewDirectoryClient(baseSrc+"/"+baseDst).GetProperties(ctx, nil)
s.assert.Nil(err)
_, err = s.containerClient.NewDirectoryClient(baseSrc+"/"+baseDst+"/gc1").GetProperties(ctx, nil)
s.assert.Nil(err)
}
func (s *datalakeTestSuite) TestRenameDirError() {
defer s.cleanupTest()
// Setup
src := generateDirectoryName()
dst := generateDirectoryName()
err := s.az.RenameDir(internal.RenameDirOptions{Src: src, Dst: dst})
s.assert.NotNil(err)
s.assert.EqualValues(syscall.ENOENT, err)
// Neither directory should be in the account
dir := s.containerClient.NewDirectoryClient(src)
_, err = dir.GetProperties(ctx, nil)
s.assert.NotNil(err)
dir = s.containerClient.NewDirectoryClient(dst)
_, err = dir.GetProperties(ctx, nil)
s.assert.NotNil(err)
}
func (s *datalakeTestSuite) TestCreateFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, err := s.az.CreateFile(internal.CreateFileOptions{Name: name})
s.assert.Nil(err)
s.assert.NotNil(h)
s.assert.EqualValues(name, h.Path)
s.assert.EqualValues(0, h.Size)
// File should be in the account
file := s.containerClient.NewDirectoryClient(name)
props, err := file.GetProperties(ctx, nil)
s.assert.Nil(err)
s.assert.NotNil(props)
s.assert.Empty(props.Metadata)
}
func (s *datalakeTestSuite) TestWriteSmallFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
dataLen := len(data)
_, err := s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
s.assert.Nil(err)
f, _ := os.CreateTemp("", name+".tmp")
defer os.Remove(f.Name())
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
output := make([]byte, len(data))
f, _ = os.Open(f.Name())
len, err := f.Read(output)
s.assert.Nil(err)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(testData, output)
f.Close()
}
func (s *datalakeTestSuite) TestOverwriteSmallFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test-replace-data"
data := []byte(testData)
dataLen := len(data)
_, err := s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
s.assert.Nil(err)
f, _ := os.CreateTemp("", name+".tmp")
defer os.Remove(f.Name())
newTestData := []byte("newdata")
_, err = s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 5, Data: newTestData})
s.assert.Nil(err)
currentData := []byte("test-newdata-data")
output := make([]byte, len(currentData))
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
f, _ = os.Open(f.Name())
len, err := f.Read(output)
s.assert.Nil(err)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(currentData, output)
f.Close()
}
func (s *datalakeTestSuite) TestOverwriteAndAppendToSmallFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test-data"
data := []byte(testData)
_, err := s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
s.assert.Nil(err)
f, _ := os.CreateTemp("", name+".tmp")
defer os.Remove(f.Name())
newTestData := []byte("newdata")
_, err = s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 5, Data: newTestData})
s.assert.Nil(err)
currentData := []byte("test-newdata")
dataLen := len(currentData)
output := make([]byte, dataLen)
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
f, _ = os.Open(f.Name())
len, err := f.Read(output)
s.assert.Nil(err)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(currentData, output)
f.Close()
}
func (s *datalakeTestSuite) TestAppendOffsetLargerThanSmallFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test-data"
data := []byte(testData)
_, err := s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
s.assert.Nil(err)
f, _ := os.CreateTemp("", name+".tmp")
defer os.Remove(f.Name())
newTestData := []byte("newdata")
_, err = s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 12, Data: newTestData})
s.assert.Nil(err)
currentData := []byte("test-data\x00\x00\x00newdata")
dataLen := len(currentData)
output := make([]byte, dataLen)
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
f, _ = os.Open(f.Name())
len, err := f.Read(output)
s.assert.Nil(err)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(currentData, output)
f.Close()
}
func (s *datalakeTestSuite) TestAppendToSmallFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test-data"
data := []byte(testData)
_, err := s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
s.assert.Nil(err)
f, _ := os.CreateTemp("", name+".tmp")
defer os.Remove(f.Name())
newTestData := []byte("-newdata")
_, err = s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 9, Data: newTestData})
s.assert.Nil(err)
currentData := []byte("test-data-newdata")
dataLen := len(currentData)
output := make([]byte, dataLen)
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
f, _ = os.Open(f.Name())
len, err := f.Read(output)
s.assert.Nil(err)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(currentData, output)
f.Close()
}
// This test is a regular blob (without blocks) and we're adding data that will cause it to create blocks
func (s *datalakeTestSuite) TestAppendBlocksToSmallFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test-data"
data := []byte(testData)
// use our method to make the max upload size (size before a blob is broken down to blocks) to 9 Bytes
err := uploadReaderAtToBlockBlob(
ctx, bytes.NewReader(data),
int64(len(data)),
9,
s.az.storage.(*Datalake).BlockBlob.Container.NewBlockBlobClient(name),
&blockblob.UploadBufferOptions{
BlockSize: 8,
})
s.assert.Nil(err)
f, _ := os.CreateTemp("", name+".tmp")
defer os.Remove(f.Name())
newTestData := []byte("-newdata-newdata-newdata")
_, err = s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 9, Data: newTestData})
s.assert.Nil(err)
currentData := []byte("test-data-newdata-newdata-newdata")
dataLen := len(currentData)
output := make([]byte, dataLen)
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
f, _ = os.Open(f.Name())
len, err := f.Read(output)
s.assert.Nil(err)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(currentData, output)
f.Close()
}
func (s *datalakeTestSuite) TestOverwriteBlocks() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "testdatates1dat1tes2dat2tes3dat3tes4dat4"
data := []byte(testData)
// use our method to make the max upload size (size before a blob is broken down to blocks) to 4 Bytes
err := uploadReaderAtToBlockBlob(
ctx,
bytes.NewReader(data),
int64(len(data)),
4,
s.az.storage.(*Datalake).BlockBlob.Container.NewBlockBlobClient(name),
&blockblob.UploadBufferOptions{
BlockSize: 4,
})
s.assert.Nil(err)
f, _ := os.CreateTemp("", name+".tmp")
defer os.Remove(f.Name())
newTestData := []byte("cake")
_, err = s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 16, Data: newTestData})
s.assert.Nil(err)
currentData := []byte("testdatates1dat1cakedat2tes3dat3tes4dat4")
dataLen := len(currentData)
output := make([]byte, dataLen)
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
f, _ = os.Open(f.Name())
len, err := f.Read(output)
s.assert.Nil(err)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(currentData, output)
f.Close()
}
func (s *datalakeTestSuite) TestOverwriteAndAppendBlocks() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "testdatates1dat1tes2dat2tes3dat3tes4dat4"
data := []byte(testData)
// use our method to make the max upload size (size before a blob is broken down to blocks) to 4 Bytes
err := uploadReaderAtToBlockBlob(
ctx,
bytes.NewReader(data),
int64(len(data)),
4,
s.az.storage.(*Datalake).BlockBlob.Container.NewBlockBlobClient(name),
&blockblob.UploadBufferOptions{
BlockSize: 4,
})
s.assert.Nil(err)
f, _ := os.CreateTemp("", name+".tmp")
defer os.Remove(f.Name())
newTestData := []byte("43211234cake")
_, err = s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 32, Data: newTestData})
s.assert.Nil(err)
currentData := []byte("testdatates1dat1tes2dat2tes3dat343211234cake")
dataLen := len(currentData)
output := make([]byte, dataLen)
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
f, _ = os.Open(f.Name())
len, _ := f.Read(output)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(currentData, output)
f.Close()
}
func (s *datalakeTestSuite) TestAppendBlocks() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "testdatates1dat1tes2dat2tes3dat3tes4dat4"
data := []byte(testData)
// use our method to make the max upload size (size before a blob is broken down to blocks) to 4 Bytes
err := uploadReaderAtToBlockBlob(ctx,
bytes.NewReader(data),
int64(len(data)),
4,
s.az.storage.(*Datalake).BlockBlob.Container.NewBlockBlobClient(name),
&blockblob.UploadBufferOptions{
BlockSize: 4,
})
s.assert.Nil(err)
f, _ := os.CreateTemp("", name+".tmp")
defer os.Remove(f.Name())
newTestData := []byte("43211234cake")
_, err = s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: newTestData})
s.assert.Nil(err)
currentData := []byte("43211234cakedat1tes2dat2tes3dat3tes4dat4")
dataLen := len(currentData)
output := make([]byte, dataLen)
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
f, _ = os.Open(f.Name())
len, _ := f.Read(output)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(currentData, output)
f.Close()
}
func (s *datalakeTestSuite) TestAppendOffsetLargerThanSize() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "testdatates1dat1tes2dat2tes3dat3tes4dat4"
data := []byte(testData)
// use our method to make the max upload size (size before a blob is broken down to blocks) to 4 Bytes
err := uploadReaderAtToBlockBlob(ctx,
bytes.NewReader(data),
int64(len(data)),
4,
s.az.storage.(*Datalake).BlockBlob.Container.NewBlockBlobClient(name),
&blockblob.UploadBufferOptions{
BlockSize: 4,
})
s.assert.Nil(err)
f, _ := os.CreateTemp("", name+".tmp")
defer os.Remove(f.Name())
newTestData := []byte("43211234cake")
_, err = s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 45, Data: newTestData})
s.assert.Nil(err)
currentData := []byte("testdatates1dat1tes2dat2tes3dat3tes4dat4\x00\x00\x00\x00\x0043211234cake")
dataLen := len(currentData)
output := make([]byte, dataLen)
err = s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
f, _ = os.Open(f.Name())
len, _ := f.Read(output)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(currentData, output)
f.Close()
}
func (s *datalakeTestSuite) TestOpenFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
s.az.CreateFile(internal.CreateFileOptions{Name: name})
h, err := s.az.OpenFile(internal.OpenFileOptions{Name: name})
s.assert.Nil(err)
s.assert.NotNil(h)
s.assert.EqualValues(name, h.Path)
s.assert.EqualValues(0, h.Size)
}
func (s *datalakeTestSuite) TestOpenFileError() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, err := s.az.OpenFile(internal.OpenFileOptions{Name: name})
s.assert.NotNil(err)
s.assert.EqualValues(syscall.ENOENT, err)
s.assert.Nil(h)
}
func (s *datalakeTestSuite) TestOpenFileSize() {
defer s.cleanupTest()
// Setup
name := generateFileName()
size := 10
s.az.CreateFile(internal.CreateFileOptions{Name: name})
s.az.TruncateFile(internal.TruncateFileOptions{Name: name, Size: int64(size)})
h, err := s.az.OpenFile(internal.OpenFileOptions{Name: name})
s.assert.Nil(err)
s.assert.NotNil(h)
s.assert.EqualValues(name, h.Path)
s.assert.EqualValues(size, h.Size)
}
func (s *datalakeTestSuite) TestCloseFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
// This method does nothing.
err := s.az.CloseFile(internal.CloseFileOptions{Handle: h})
s.assert.Nil(err)
}
func (s *datalakeTestSuite) TestCloseFileFakeHandle() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h := handlemap.NewHandle(name)
// This method does nothing.
err := s.az.CloseFile(internal.CloseFileOptions{Handle: h})
s.assert.Nil(err)
}
func (s *datalakeTestSuite) TestDeleteFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
s.az.CreateFile(internal.CreateFileOptions{Name: name})
err := s.az.DeleteFile(internal.DeleteFileOptions{Name: name})
s.assert.Nil(err)
// File should not be in the account
file := s.containerClient.NewDirectoryClient(name)
_, err = file.GetProperties(ctx, nil)
s.assert.NotNil(err)
}
func (s *datalakeTestSuite) TestDeleteFileError() {
defer s.cleanupTest()
// Setup
name := generateFileName()
err := s.az.DeleteFile(internal.DeleteFileOptions{Name: name})
s.assert.NotNil(err)
s.assert.EqualValues(syscall.ENOENT, err)
// File should not be in the account
file := s.containerClient.NewDirectoryClient(name)
_, err = file.GetProperties(ctx, nil)
s.assert.NotNil(err)
}
func (s *datalakeTestSuite) TestRenameFile() {
defer s.cleanupTest()
// Setup
src := generateFileName()
s.az.CreateFile(internal.CreateFileOptions{Name: src})
dst := generateFileName()
err := s.az.RenameFile(internal.RenameFileOptions{Src: src, Dst: dst})
s.assert.Nil(err)
// Src should not be in the account
source := s.containerClient.NewDirectoryClient(src)
_, err = source.GetProperties(ctx, nil)
s.assert.NotNil(err)
// Dst should be in the account
destination := s.containerClient.NewDirectoryClient(dst)
_, err = destination.GetProperties(ctx, nil)
s.assert.Nil(err)
}
func (s *datalakeTestSuite) TestRenameFileWithCPKenabled() {
defer s.cleanupTest()
CPKEncryptionKey, CPKEncryptionKeySHA256 := generateCPKInfo()
config := fmt.Sprintf("azstorage:\n account-name: %s\n endpoint: https://%s.dfs.core.windows.net/\n type: adls\n account-key: %s\n mode: key\n container: %s\n cpk-enabled: true\n cpk-encryption-key: %s\n cpk-encryption-key-sha256: %s\n",
storageTestConfigurationParameters.AdlsAccount, storageTestConfigurationParameters.AdlsAccount, storageTestConfigurationParameters.AdlsKey, s.container, CPKEncryptionKey, CPKEncryptionKeySHA256)
s.setupTestHelper(config, s.container, false)
datalakeCPKOpt := &file.CPKInfo{
EncryptionKey: &CPKEncryptionKey,
EncryptionKeySHA256: &CPKEncryptionKeySHA256,
EncryptionAlgorithm: to.Ptr(file.EncryptionAlgorithmTypeAES256),
}
src := generateFileName()
s.az.CreateFile(internal.CreateFileOptions{Name: src})
dst := generateFileName()
testData := "test data"
data := []byte(testData)
err := uploadReaderAtToBlockBlob(
ctx, bytes.NewReader(data),
int64(len(data)),
100,
s.az.storage.(*Datalake).BlockBlob.Container.NewBlockBlobClient(src),
&blockblob.UploadBufferOptions{
CPKInfo: &blob.CPKInfo{
EncryptionKey: &CPKEncryptionKey,
EncryptionKeySHA256: &CPKEncryptionKeySHA256,
EncryptionAlgorithm: to.Ptr(blob.EncryptionAlgorithmTypeAES256),
},
})
s.assert.Nil(err)
err = s.az.RenameFile(internal.RenameFileOptions{Src: src, Dst: dst})
s.assert.Nil(err)
// Src should not be in the account
source := s.containerClient.NewDirectoryClient(src)
_, err = source.GetProperties(ctx, &file.GetPropertiesOptions{
CPKInfo: datalakeCPKOpt,
})
s.assert.NotNil(err)
// Dst should be in the account
destination := s.containerClient.NewDirectoryClient(dst)
_, err = destination.GetProperties(ctx, &file.GetPropertiesOptions{
CPKInfo: datalakeCPKOpt,
})
s.assert.Nil(err)
}
func (s *datalakeTestSuite) TestRenameFileMetadataConservation() {
defer s.cleanupTest()
// Setup
src := generateFileName()
source := s.containerClient.NewFileClient(src)
s.az.CreateFile(internal.CreateFileOptions{Name: src})
// Add srcMeta to source
srcMeta := make(map[string]*string)
srcMeta["foo"] = to.Ptr("bar")
source.SetMetadata(ctx, srcMeta, nil)
dst := generateFileName()
err := s.az.RenameFile(internal.RenameFileOptions{Src: src, Dst: dst})
s.assert.Nil(err)
// Src should not be in the account
_, err = source.GetProperties(ctx, nil)
s.assert.NotNil(err)
// Dst should be in the account
destination := s.containerClient.NewFileClient(dst)
props, err := destination.GetProperties(ctx, nil)
s.assert.Nil(err)
// Dst should have metadata
s.assert.True(checkMetadata(props.Metadata, "foo", "bar"))
}
func (s *datalakeTestSuite) TestRenameFileError() {
defer s.cleanupTest()
// Setup
src := generateFileName()
dst := generateFileName()
err := s.az.RenameFile(internal.RenameFileOptions{Src: src, Dst: dst})
s.assert.NotNil(err)
s.assert.EqualValues(syscall.ENOENT, err)
// Src and destination should not be in the account
source := s.containerClient.NewDirectoryClient(src)
_, err = source.GetProperties(ctx, nil)
s.assert.NotNil(err)
destination := s.containerClient.NewDirectoryClient(dst)
_, err = destination.GetProperties(ctx, nil)
s.assert.NotNil(err)
}
func (s *datalakeTestSuite) TestReadFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
h, _ = s.az.OpenFile(internal.OpenFileOptions{Name: name})
output, err := s.az.ReadFile(internal.ReadFileOptions{Handle: h})
s.assert.Nil(err)
s.assert.EqualValues(testData, output)
}
func (s *datalakeTestSuite) TestReadFileError() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h := handlemap.NewHandle(name)
_, err := s.az.ReadFile(internal.ReadFileOptions{Handle: h})
s.assert.NotNil(err)
s.assert.EqualValues(syscall.ENOENT, err)
}
func (s *datalakeTestSuite) TestReadInBuffer() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
h, _ = s.az.OpenFile(internal.OpenFileOptions{Name: name})
output := make([]byte, 5)
len, err := s.az.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: 0, Data: output})
s.assert.Nil(err)
s.assert.EqualValues(5, len)
s.assert.EqualValues(testData[:5], output)
}
func (s *datalakeTestSuite) TestReadInBufferLargeBuffer() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
h, _ = s.az.OpenFile(internal.OpenFileOptions{Name: name})
output := make([]byte, 1000) // Testing that passing in a super large buffer will still work
len, err := s.az.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: 0, Data: output})
s.assert.Nil(err)
s.assert.EqualValues(h.Size, len)
s.assert.EqualValues(testData, output[:h.Size])
}
func (s *datalakeTestSuite) TestReadInBufferEmpty() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
output := make([]byte, 10)
len, err := s.az.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: 0, Data: output})
s.assert.Nil(err)
s.assert.EqualValues(0, len)
}
func (s *datalakeTestSuite) TestReadInBufferBadRange() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h := handlemap.NewHandle(name)
h.Size = 10
_, err := s.az.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: 20, Data: make([]byte, 2)})
s.assert.NotNil(err)
s.assert.EqualValues(syscall.ERANGE, err)
}
func (s *datalakeTestSuite) TestReadInBufferError() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h := handlemap.NewHandle(name)
h.Size = 10
_, err := s.az.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: 0, Data: make([]byte, 2)})
s.assert.NotNil(err)
s.assert.EqualValues(syscall.ENOENT, err)
}
func (s *datalakeTestSuite) TestWriteFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
count, err := s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
s.assert.Nil(err)
s.assert.EqualValues(len(data), count)
// Blob should have updated data
fileClient := s.containerClient.NewFileClient(name)
resp, err := fileClient.DownloadStream(ctx, &file.DownloadStreamOptions{
Range: &file.HTTPRange{Offset: 0, Count: int64(len(data))},
})
s.assert.Nil(err)
output, _ := io.ReadAll(resp.Body)
s.assert.EqualValues(testData, output)
}
func (s *datalakeTestSuite) TestTruncateSmallFileSmaller() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
truncatedLength := 5
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
err := s.az.TruncateFile(internal.TruncateFileOptions{Name: name, Size: int64(truncatedLength)})
s.assert.Nil(err)
// Blob should have updated data
fileClient := s.containerClient.NewFileClient(name)
resp, err := fileClient.DownloadStream(ctx, &file.DownloadStreamOptions{
Range: &file.HTTPRange{Offset: 0, Count: int64(truncatedLength)},
})
// 0, int64(truncatedLength))
s.assert.Nil(err)
s.assert.EqualValues(truncatedLength, *resp.ContentLength)
output, _ := io.ReadAll(resp.Body)
s.assert.EqualValues(testData[:truncatedLength], output)
}
func (s *datalakeTestSuite) TestTruncateChunkedFileSmaller() {
defer s.cleanupTest()
// Setup
name := generateFileName()
s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
truncatedLength := 5
// use our method to make the max upload size (size before a blob is broken down to blocks) to 4 Bytes
err := uploadReaderAtToBlockBlob(ctx, bytes.NewReader(data), int64(len(data)), 4,
s.az.storage.(*Datalake).BlockBlob.Container.NewBlockBlobClient(name), &blockblob.UploadBufferOptions{
BlockSize: 4,
})
s.assert.Nil(err)
err = s.az.TruncateFile(internal.TruncateFileOptions{Name: name, Size: int64(truncatedLength)})
s.assert.Nil(err)
// Blob should have updated data
fileClient := s.containerClient.NewFileClient(name)
resp, err := fileClient.DownloadStream(ctx, &file.DownloadStreamOptions{
Range: &file.HTTPRange{Offset: 0, Count: int64(truncatedLength)},
})
s.assert.Nil(err)
s.assert.EqualValues(truncatedLength, *resp.ContentLength)
output, _ := io.ReadAll(resp.Body)
s.assert.EqualValues(testData[:truncatedLength], output)
}
func (s *datalakeTestSuite) TestTruncateSmallFileEqual() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
truncatedLength := 9
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
err := s.az.TruncateFile(internal.TruncateFileOptions{Name: name, Size: int64(truncatedLength)})
s.assert.Nil(err)
// Blob should have updated data
fileClient := s.containerClient.NewFileClient(name)
resp, err := fileClient.DownloadStream(ctx, &file.DownloadStreamOptions{
Range: &file.HTTPRange{Offset: 0, Count: int64(truncatedLength)},
})
s.assert.Nil(err)
s.assert.EqualValues(truncatedLength, *resp.ContentLength)
output, _ := io.ReadAll(resp.Body)
s.assert.EqualValues(testData, output)
}
func (s *datalakeTestSuite) TestTruncateChunkedFileEqual() {
defer s.cleanupTest()
// Setup
name := generateFileName()
s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
truncatedLength := 9
// use our method to make the max upload size (size before a blob is broken down to blocks) to 4 Bytes
err := uploadReaderAtToBlockBlob(ctx, bytes.NewReader(data), int64(len(data)), 4,
s.az.storage.(*Datalake).BlockBlob.Container.NewBlockBlobClient(name), &blockblob.UploadBufferOptions{
BlockSize: 4,
})
s.assert.Nil(err)
err = s.az.TruncateFile(internal.TruncateFileOptions{Name: name, Size: int64(truncatedLength)})
s.assert.Nil(err)
// Blob should have updated data
fileClient := s.containerClient.NewFileClient(name)
resp, err := fileClient.DownloadStream(ctx, &file.DownloadStreamOptions{
Range: &file.HTTPRange{Offset: 0, Count: int64(truncatedLength)},
})
s.assert.Nil(err)
s.assert.EqualValues(truncatedLength, *resp.ContentLength)
output, _ := io.ReadAll(resp.Body)
s.assert.EqualValues(testData, output)
}
func (s *datalakeTestSuite) TestTruncateSmallFileBigger() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
truncatedLength := 15
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
err := s.az.TruncateFile(internal.TruncateFileOptions{Name: name, Size: int64(truncatedLength)})
s.assert.Nil(err)
// Blob should have updated data
fileClient := s.containerClient.NewFileClient(name)
resp, err := fileClient.DownloadStream(ctx, &file.DownloadStreamOptions{
Range: &file.HTTPRange{Offset: 0, Count: int64(truncatedLength)},
})
s.assert.Nil(err)
s.assert.EqualValues(truncatedLength, *resp.ContentLength)
output, _ := io.ReadAll(resp.Body)
s.assert.EqualValues(testData, output[:len(data)])
}
func (s *datalakeTestSuite) TestTruncateChunkedFileBigger() {
defer s.cleanupTest()
// Setup
name := generateFileName()
s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
truncatedLength := 15
// use our method to make the max upload size (size before a blob is broken down to blocks) to 4 Bytes
err := uploadReaderAtToBlockBlob(ctx, bytes.NewReader(data), int64(len(data)), 4,
s.az.storage.(*Datalake).BlockBlob.Container.NewBlockBlobClient(name), &blockblob.UploadBufferOptions{
BlockSize: 4,
})
s.assert.Nil(err)
s.az.TruncateFile(internal.TruncateFileOptions{Name: name, Size: int64(truncatedLength)})
s.assert.Nil(err)
// Blob should have updated data
fileClient := s.containerClient.NewFileClient(name)
resp, err := fileClient.DownloadStream(ctx, &file.DownloadStreamOptions{
Range: &file.HTTPRange{Offset: 0, Count: int64(truncatedLength)},
})
s.assert.Nil(err)
s.assert.EqualValues(truncatedLength, *resp.ContentLength)
output, _ := io.ReadAll(resp.Body)
s.assert.EqualValues(testData, output[:len(data)])
}
func (s *datalakeTestSuite) TestTruncateFileError() {
defer s.cleanupTest()
// Setup
name := generateFileName()
err := s.az.TruncateFile(internal.TruncateFileOptions{Name: name})
s.assert.NotNil(err)
s.assert.EqualValues(syscall.ENOENT, err)
}
func (s *datalakeTestSuite) TestCopyToFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
dataLen := len(data)
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
f, _ := os.CreateTemp("", name+".tmp")
defer os.Remove(f.Name())
err := s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.Nil(err)
output := make([]byte, len(data))
f, _ = os.Open(f.Name())
len, err := f.Read(output)
s.assert.Nil(err)
s.assert.EqualValues(dataLen, len)
s.assert.EqualValues(testData, output)
f.Close()
}
func (s *datalakeTestSuite) TestCopyToFileError() {
defer s.cleanupTest()
// Setup
name := generateFileName()
f, _ := os.CreateTemp("", name+".tmp")
defer os.Remove(f.Name())
err := s.az.CopyToFile(internal.CopyToFileOptions{Name: name, File: f})
s.assert.NotNil(err)
}
func (s *datalakeTestSuite) TestCopyFromFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
homeDir, _ := os.UserHomeDir()
f, _ := os.CreateTemp(homeDir, name+".tmp")
defer os.Remove(f.Name())
f.Write(data)
err := s.az.CopyFromFile(internal.CopyFromFileOptions{Name: name, File: f})
s.assert.Nil(err)
// Blob should have updated data
fileClient := s.containerClient.NewFileClient(name)
resp, err := fileClient.DownloadStream(ctx, &file.DownloadStreamOptions{
Range: &file.HTTPRange{Offset: 0, Count: int64(len(data))},
})
s.assert.Nil(err)
output, _ := io.ReadAll(resp.Body)
s.assert.EqualValues(testData, output)
}
func (s *datalakeTestSuite) TestCreateLink() {
defer s.cleanupTest()
// Setup
target := generateFileName()
s.az.CreateFile(internal.CreateFileOptions{Name: target})
name := generateFileName()
err := s.az.CreateLink(internal.CreateLinkOptions{Name: name, Target: target})
s.assert.Nil(err)
// Link should be in the account
link := s.containerClient.NewFileClient(name)
props, err := link.GetProperties(ctx, nil)
s.assert.Nil(err)
s.assert.NotNil(props)
s.assert.NotEmpty(props.Metadata)
s.assert.True(checkMetadata(props.Metadata, symlinkKey, "true"))
resp, err := link.DownloadStream(ctx, &file.DownloadStreamOptions{
Range: &file.HTTPRange{Offset: 0, Count: *props.ContentLength},
})
s.assert.Nil(err)
data, _ := io.ReadAll(resp.Body)
s.assert.EqualValues(target, data)
}
func (s *datalakeTestSuite) TestReadLink() {
defer s.cleanupTest()
// Setup
target := generateFileName()
s.az.CreateFile(internal.CreateFileOptions{Name: target})
name := generateFileName()
s.az.CreateLink(internal.CreateLinkOptions{Name: name, Target: target})
read, err := s.az.ReadLink(internal.ReadLinkOptions{Name: name})
s.assert.Nil(err)
s.assert.EqualValues(target, read)
}
func (s *datalakeTestSuite) TestReadLinkError() {
defer s.cleanupTest()
// Setup
name := generateFileName()
_, err := s.az.ReadLink(internal.ReadLinkOptions{Name: name})
s.assert.NotNil(err)
s.assert.EqualValues(syscall.ENOENT, err)
}
func (s *datalakeTestSuite) TestGetAttrDir() {
defer s.cleanupTest()
// Setup
name := generateDirectoryName()
s.az.CreateDir(internal.CreateDirOptions{Name: name})
props, err := s.az.GetAttr(internal.GetAttrOptions{Name: name})
s.assert.Nil(err)
s.assert.NotNil(props)
s.assert.True(props.IsDir())
}
func (s *datalakeTestSuite) TestGetAttrFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
s.az.CreateFile(internal.CreateFileOptions{Name: name})
props, err := s.az.GetAttr(internal.GetAttrOptions{Name: name})
s.assert.Nil(err)
s.assert.NotNil(props)
s.assert.False(props.IsDir())
s.assert.False(props.IsSymlink())
}
func (s *datalakeTestSuite) TestGetAttrLink() {
defer s.cleanupTest()
// Setup
target := generateFileName()
s.az.CreateFile(internal.CreateFileOptions{Name: target})
name := generateFileName()
s.az.CreateLink(internal.CreateLinkOptions{Name: name, Target: target})
props, err := s.az.GetAttr(internal.GetAttrOptions{Name: name})
s.assert.Nil(err)
s.assert.NotNil(props)
s.assert.True(props.IsSymlink())
s.assert.NotEmpty(props.Metadata)
s.assert.True(checkMetadata(props.Metadata, symlinkKey, "true"))
}
func (s *datalakeTestSuite) TestGetAttrFileSize() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
props, err := s.az.GetAttr(internal.GetAttrOptions{Name: name})
s.assert.Nil(err)
s.assert.NotNil(props)
s.assert.False(props.IsDir())
s.assert.False(props.IsSymlink())
s.assert.EqualValues(len(testData), props.Size)
}
func (s *datalakeTestSuite) TestGetAttrFileTime() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
before, err := s.az.GetAttr(internal.GetAttrOptions{Name: name})
s.assert.Nil(err)
s.assert.NotNil(before.Mtime)
time.Sleep(time.Second * 3) // Wait 3 seconds and then modify the file again
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
time.Sleep(time.Second * 1)
after, err := s.az.GetAttr(internal.GetAttrOptions{Name: name})
s.assert.Nil(err)
s.assert.NotNil(after.Mtime)
s.assert.True(after.Mtime.After(before.Mtime))
}
func (s *datalakeTestSuite) TestGetAttrError() {
defer s.cleanupTest()
// Setup
name := generateFileName()
_, err := s.az.GetAttr(internal.GetAttrOptions{Name: name})
s.assert.NotNil(err)
s.assert.EqualValues(syscall.ENOENT, err)
}
func (s *datalakeTestSuite) TestChmod() {
defer s.cleanupTest()
// Setup
name := generateFileName()
s.az.CreateFile(internal.CreateFileOptions{Name: name})
err := s.az.Chmod(internal.ChmodOptions{Name: name, Mode: 0666})
s.assert.Nil(err)
// File's ACL info should have changed
file := s.containerClient.NewFileClient(name)
acl, err := file.GetAccessControl(ctx, nil)
s.assert.Nil(err)
s.assert.NotNil(acl.ACL)
s.assert.EqualValues("user::rw-,group::rw-,other::rw-", *acl.ACL)
}
func (s *datalakeTestSuite) TestChmodError() {
defer s.cleanupTest()
// Setup
name := generateFileName()
err := s.az.Chmod(internal.ChmodOptions{Name: name, Mode: 0666})
s.assert.NotNil(err)
s.assert.EqualValues(syscall.ENOENT, err)
}
// If support for chown or chmod are ever added to blob, add tests for error cases and modify the following tests.
func (s *datalakeTestSuite) TestChown() {
defer s.cleanupTest()
// Setup
name := generateFileName()
s.az.CreateFile(internal.CreateFileOptions{Name: name})
err := s.az.Chown(internal.ChownOptions{Name: name, Owner: 6, Group: 5})
s.assert.NotNil(err)
s.assert.EqualValues(syscall.ENOTSUP, err)
}
func (s *datalakeTestSuite) TestChownIgnore() {
defer s.cleanupTest()
// Setup
s.tearDownTestHelper(false) // Don't delete the generated container.
config := fmt.Sprintf("azstorage:\n account-name: %s\n endpoint: https://%s.dfs.core.windows.net/\n type: adls\n account-key: %s\n mode: key\n container: %s\n fail-unsupported-op: false\n",
storageTestConfigurationParameters.AdlsAccount, storageTestConfigurationParameters.AdlsAccount, storageTestConfigurationParameters.AdlsKey, s.container)
s.setupTestHelper(config, s.container, true)
name := generateFileName()
s.az.CreateFile(internal.CreateFileOptions{Name: name})
err := s.az.Chown(internal.ChownOptions{Name: name, Owner: 6, Group: 5})
s.assert.Nil(err)
}
func (s *datalakeTestSuite) TestGetFileBlockOffsetsSmallFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "testdatates1dat1tes2dat2tes3dat3tes4dat4"
data := []byte(testData)
s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
// GetFileBlockOffsets
offsetList, err := s.az.GetFileBlockOffsets(internal.GetFileBlockOffsetsOptions{Name: name})
s.assert.Nil(err)
s.assert.Len(offsetList.BlockList, 0)
s.assert.True(offsetList.SmallFile())
s.assert.EqualValues(0, offsetList.BlockIdLength)
}
func (s *datalakeTestSuite) TestGetFileBlockOffsetsChunkedFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "testdatates1dat1tes2dat2tes3dat3tes4dat4"
data := []byte(testData)
// use our method to make the max upload size (size before a blob is broken down to blocks) to 4 Bytes
err := uploadReaderAtToBlockBlob(
ctx, bytes.NewReader(data),
int64(len(data)),
4,
s.az.storage.(*Datalake).BlockBlob.Container.NewBlockBlobClient(name),
&blockblob.UploadBufferOptions{
BlockSize: 4,
})
s.assert.Nil(err)
// GetFileBlockOffsets
offsetList, err := s.az.GetFileBlockOffsets(internal.GetFileBlockOffsetsOptions{Name: name})
s.assert.Nil(err)
s.assert.Len(offsetList.BlockList, 10)
s.assert.Zero(offsetList.Flags)
s.assert.EqualValues(16, offsetList.BlockIdLength)
}
func (s *datalakeTestSuite) TestGetFileBlockOffsetsError() {
defer s.cleanupTest()
// Setup
name := generateFileName()
// GetFileBlockOffsets
_, err := s.az.GetFileBlockOffsets(internal.GetFileBlockOffsetsOptions{Name: name})
s.assert.NotNil(err)
}
func (s *datalakeTestSuite) TestCustomEndpoint() {
defer s.cleanupTest()
dfsEndpoint := "https://mycustom.endpoint"
blobEndpoint := transformAccountEndpoint(dfsEndpoint)
s.assert.EqualValues(dfsEndpoint, blobEndpoint)
}
func (s *datalakeTestSuite) TestFlushFileEmptyFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
bol, _ := s.az.GetFileBlockOffsets(internal.GetFileBlockOffsetsOptions{Name: name})
handlemap.CreateCacheObject(int64(16*MB), h)
h.CacheObj.BlockOffsetList = bol
err := s.az.FlushFile(internal.FlushFileOptions{Handle: h})
s.assert.Nil(err)
output, err := s.az.ReadFile(internal.ReadFileOptions{Handle: h})
s.assert.Nil(err)
s.assert.EqualValues("", output)
}
func (s *datalakeTestSuite) TestFlushFileChunkedFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
data := make([]byte, 16*MB)
rand.Read(data)
// use our method to make the max upload size (size before a blob is broken down to blocks) to 4 Bytes
err := uploadReaderAtToBlockBlob(ctx, bytes.NewReader(data), int64(len(data)), 4,
s.az.storage.(*Datalake).BlockBlob.Container.NewBlockBlobClient(name), &blockblob.UploadBufferOptions{
BlockSize: 4 * MB,
})
s.assert.Nil(err)
bol, _ := s.az.GetFileBlockOffsets(internal.GetFileBlockOffsetsOptions{Name: name})
handlemap.CreateCacheObject(int64(16*MB), h)
h.CacheObj.BlockOffsetList = bol
err = s.az.FlushFile(internal.FlushFileOptions{Handle: h})
s.assert.Nil(err)
output, err := s.az.ReadFile(internal.ReadFileOptions{Handle: h})
s.assert.Nil(err)
s.assert.EqualValues(data, output)
}
func (s *datalakeTestSuite) TestFlushFileUpdateChunkedFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
blockSize := 4 * MB
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
data := make([]byte, 16*MB)
rand.Read(data)
// use our method to make the max upload size (size before a blob is broken down to blocks) to 4 Bytes
err := uploadReaderAtToBlockBlob(ctx, bytes.NewReader(data), int64(len(data)), 4,
s.az.storage.(*Datalake).BlockBlob.Container.NewBlockBlobClient(name), &blockblob.UploadBufferOptions{
BlockSize: int64(blockSize),
})
s.assert.Nil(err)
bol, _ := s.az.GetFileBlockOffsets(internal.GetFileBlockOffsetsOptions{Name: name})
handlemap.CreateCacheObject(int64(16*MB), h)
h.CacheObj.BlockOffsetList = bol
updatedBlock := make([]byte, 2*MB)
rand.Read(updatedBlock)
h.CacheObj.BlockOffsetList.BlockList[1].Data = make([]byte, blockSize)
s.az.storage.ReadInBuffer(name, int64(blockSize), int64(blockSize), h.CacheObj.BlockOffsetList.BlockList[1].Data)
copy(h.CacheObj.BlockOffsetList.BlockList[1].Data[MB:2*MB+MB], updatedBlock)
h.CacheObj.BlockOffsetList.BlockList[1].Flags.Set(common.DirtyBlock)
err = s.az.FlushFile(internal.FlushFileOptions{Handle: h})
s.assert.Nil(err)
output, err := s.az.ReadFile(internal.ReadFileOptions{Handle: h})
s.assert.Nil(err)
s.assert.NotEqualValues(data, output)
s.assert.EqualValues(data[:5*MB], output[:5*MB])
s.assert.EqualValues(updatedBlock, output[5*MB:5*MB+2*MB])
s.assert.EqualValues(data[7*MB:], output[7*MB:])
}
func (s *datalakeTestSuite) TestFlushFileTruncateUpdateChunkedFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
blockSize := 4 * MB
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
data := make([]byte, 16*MB)
rand.Read(data)
// use our method to make the max upload size (size before a blob is broken down to blocks) to 4 Bytes
err := uploadReaderAtToBlockBlob(ctx, bytes.NewReader(data), int64(len(data)), 4,
s.az.storage.(*Datalake).BlockBlob.Container.NewBlockBlobClient(name), &blockblob.UploadBufferOptions{
BlockSize: int64(blockSize),
})
s.assert.Nil(err)
bol, _ := s.az.GetFileBlockOffsets(internal.GetFileBlockOffsetsOptions{Name: name})
handlemap.CreateCacheObject(int64(16*MB), h)
h.CacheObj.BlockOffsetList = bol
// truncate block
h.CacheObj.BlockOffsetList.BlockList[1].Data = make([]byte, blockSize/2)
h.CacheObj.BlockOffsetList.BlockList[1].EndIndex = int64(blockSize + blockSize/2)
s.az.storage.ReadInBuffer(name, int64(blockSize), int64(blockSize)/2, h.CacheObj.BlockOffsetList.BlockList[1].Data)
h.CacheObj.BlockOffsetList.BlockList[1].Flags.Set(common.DirtyBlock)
// remove 2 blocks
h.CacheObj.BlockOffsetList.BlockList = h.CacheObj.BlockOffsetList.BlockList[:2]
err = s.az.FlushFile(internal.FlushFileOptions{Handle: h})
s.assert.Nil(err)
output, err := s.az.ReadFile(internal.ReadFileOptions{Handle: h})
s.assert.Nil(err)
s.assert.NotEqualValues(data, output)
s.assert.EqualValues(data[:6*MB], output[:6*MB])
}
func (s *datalakeTestSuite) TestFlushFileAppendBlocksEmptyFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
blockSize := 2 * MB
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
bol, _ := s.az.GetFileBlockOffsets(internal.GetFileBlockOffsetsOptions{Name: name})
handlemap.CreateCacheObject(int64(12*MB), h)
h.CacheObj.BlockOffsetList = bol
h.CacheObj.BlockIdLength = 16
data1 := make([]byte, blockSize)
rand.Read(data1)
blk1 := &common.Block{
StartIndex: 0,
EndIndex: int64(blockSize),
Id: base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(h.CacheObj.BlockIdLength)),
Data: data1,
}
blk1.Flags.Set(common.DirtyBlock)
data2 := make([]byte, blockSize)
rand.Read(data2)
blk2 := &common.Block{
StartIndex: int64(blockSize),
EndIndex: 2 * int64(blockSize),
Id: base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(h.CacheObj.BlockIdLength)),
Data: data2,
}
blk2.Flags.Set(common.DirtyBlock)
data3 := make([]byte, blockSize)
rand.Read(data3)
blk3 := &common.Block{
StartIndex: 2 * int64(blockSize),
EndIndex: 3 * int64(blockSize),
Id: base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(h.CacheObj.BlockIdLength)),
Data: data3,
}
blk3.Flags.Set(common.DirtyBlock)
h.CacheObj.BlockOffsetList.BlockList = append(h.CacheObj.BlockOffsetList.BlockList, blk1, blk2, blk3)
bol.Flags.Clear(common.SmallFile)
err := s.az.FlushFile(internal.FlushFileOptions{Handle: h})
s.assert.Nil(err)
output, err := s.az.ReadFile(internal.ReadFileOptions{Handle: h})
s.assert.Nil(err)
s.assert.EqualValues(blk1.Data, output[0:blockSize])
s.assert.EqualValues(blk2.Data, output[blockSize:2*blockSize])
s.assert.EqualValues(blk3.Data, output[2*blockSize:3*blockSize])
}
func (s *datalakeTestSuite) TestFlushFileAppendBlocksChunkedFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
blockSize := 2 * MB
fileSize := 16 * MB
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
data := make([]byte, fileSize)
rand.Read(data)
// use our method to make the max upload size (size before a blob is broken down to blocks) to 4 Bytes
err := uploadReaderAtToBlockBlob(ctx, bytes.NewReader(data), int64(len(data)), 4,
s.az.storage.(*Datalake).BlockBlob.Container.NewBlockBlobClient(name), &blockblob.UploadBufferOptions{
BlockSize: int64(blockSize),
})
s.assert.Nil(err)
bol, _ := s.az.GetFileBlockOffsets(internal.GetFileBlockOffsetsOptions{Name: name})
handlemap.CreateCacheObject(int64(16*MB), h)
h.CacheObj.BlockOffsetList = bol
h.CacheObj.BlockIdLength = 16
data1 := make([]byte, blockSize)
rand.Read(data1)
blk1 := &common.Block{
StartIndex: int64(fileSize),
EndIndex: int64(fileSize + blockSize),
Id: base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(h.CacheObj.BlockIdLength)),
Data: data1,
}
blk1.Flags.Set(common.DirtyBlock)
data2 := make([]byte, blockSize)
rand.Read(data2)
blk2 := &common.Block{
StartIndex: int64(fileSize + blockSize),
EndIndex: int64(fileSize + 2*blockSize),
Id: base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(h.CacheObj.BlockIdLength)),
Data: data2,
}
blk2.Flags.Set(common.DirtyBlock)
data3 := make([]byte, blockSize)
rand.Read(data3)
blk3 := &common.Block{
StartIndex: int64(fileSize + 2*blockSize),
EndIndex: int64(fileSize + 3*blockSize),
Id: base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(h.CacheObj.BlockIdLength)),
Data: data3,
}
blk3.Flags.Set(common.DirtyBlock)
h.CacheObj.BlockOffsetList.BlockList = append(h.CacheObj.BlockOffsetList.BlockList, blk1, blk2, blk3)
bol.Flags.Clear(common.SmallFile)
err = s.az.FlushFile(internal.FlushFileOptions{Handle: h})
s.assert.Nil(err)
output, err := s.az.ReadFile(internal.ReadFileOptions{Handle: h})
s.assert.Nil(err)
s.assert.EqualValues(data, output[0:fileSize])
s.assert.EqualValues(blk1.Data, output[fileSize:fileSize+blockSize])
s.assert.EqualValues(blk2.Data, output[fileSize+blockSize:fileSize+2*blockSize])
s.assert.EqualValues(blk3.Data, output[fileSize+2*blockSize:fileSize+3*blockSize])
}
func (s *datalakeTestSuite) TestFlushFileTruncateBlocksEmptyFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
blockSize := 4 * MB
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
bol, _ := s.az.GetFileBlockOffsets(internal.GetFileBlockOffsetsOptions{Name: name})
handlemap.CreateCacheObject(int64(12*MB), h)
h.CacheObj.BlockOffsetList = bol
h.CacheObj.BlockIdLength = 16
blk1 := &common.Block{
StartIndex: 0,
EndIndex: int64(blockSize),
Id: base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(h.CacheObj.BlockIdLength)),
}
blk1.Flags.Set(common.TruncatedBlock)
blk1.Flags.Set(common.DirtyBlock)
blk2 := &common.Block{
StartIndex: int64(blockSize),
EndIndex: 2 * int64(blockSize),
Id: base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(h.CacheObj.BlockIdLength)),
}
blk2.Flags.Set(common.TruncatedBlock)
blk2.Flags.Set(common.DirtyBlock)
blk3 := &common.Block{
StartIndex: 2 * int64(blockSize),
EndIndex: 3 * int64(blockSize),
Id: base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(h.CacheObj.BlockIdLength)),
}
blk3.Flags.Set(common.TruncatedBlock)
blk3.Flags.Set(common.DirtyBlock)
h.CacheObj.BlockOffsetList.BlockList = append(h.CacheObj.BlockOffsetList.BlockList, blk1, blk2, blk3)
bol.Flags.Clear(common.SmallFile)
err := s.az.FlushFile(internal.FlushFileOptions{Handle: h})
s.assert.Nil(err)
output, err := s.az.ReadFile(internal.ReadFileOptions{Handle: h})
s.assert.Nil(err)
data := make([]byte, 3*blockSize)
s.assert.EqualValues(data, output)
}
func (s *datalakeTestSuite) TestFlushFileTruncateBlocksChunkedFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
blockSize := 4 * MB
fileSize := 16 * MB
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
data := make([]byte, fileSize)
rand.Read(data)
// use our method to make the max upload size (size before a blob is broken down to blocks) to 4 Bytes
err := uploadReaderAtToBlockBlob(ctx, bytes.NewReader(data), int64(len(data)), 4,
s.az.storage.(*Datalake).BlockBlob.Container.NewBlockBlobClient(name), &blockblob.UploadBufferOptions{
BlockSize: int64(blockSize),
})
s.assert.Nil(err)
bol, _ := s.az.GetFileBlockOffsets(internal.GetFileBlockOffsetsOptions{Name: name})
handlemap.CreateCacheObject(int64(16*MB), h)
h.CacheObj.BlockOffsetList = bol
h.CacheObj.BlockIdLength = 16
blk1 := &common.Block{
StartIndex: int64(fileSize),
EndIndex: int64(fileSize + blockSize),
Id: base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(h.CacheObj.BlockIdLength)),
}
blk1.Flags.Set(common.TruncatedBlock)
blk1.Flags.Set(common.DirtyBlock)
blk2 := &common.Block{
StartIndex: int64(fileSize + blockSize),
EndIndex: int64(fileSize + 2*blockSize),
Id: base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(h.CacheObj.BlockIdLength)),
}
blk2.Flags.Set(common.TruncatedBlock)
blk2.Flags.Set(common.DirtyBlock)
blk3 := &common.Block{
StartIndex: int64(fileSize + 2*blockSize),
EndIndex: int64(fileSize + 3*blockSize),
Id: base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(h.CacheObj.BlockIdLength)),
}
blk3.Flags.Set(common.TruncatedBlock)
blk3.Flags.Set(common.DirtyBlock)
h.CacheObj.BlockOffsetList.BlockList = append(h.CacheObj.BlockOffsetList.BlockList, blk1, blk2, blk3)
bol.Flags.Clear(common.SmallFile)
err = s.az.FlushFile(internal.FlushFileOptions{Handle: h})
s.assert.Nil(err)
output, err := s.az.ReadFile(internal.ReadFileOptions{Handle: h})
s.assert.Nil(err)
s.assert.EqualValues(data, output[:fileSize])
emptyData := make([]byte, 3*blockSize)
s.assert.EqualValues(emptyData, output[fileSize:])
}
func (s *datalakeTestSuite) TestFlushFileAppendAndTruncateBlocksEmptyFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
blockSize := 7 * MB
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
bol, _ := s.az.GetFileBlockOffsets(internal.GetFileBlockOffsetsOptions{Name: name})
handlemap.CreateCacheObject(int64(12*MB), h)
h.CacheObj.BlockOffsetList = bol
h.CacheObj.BlockIdLength = 16
data1 := make([]byte, blockSize)
rand.Read(data1)
blk1 := &common.Block{
StartIndex: 0,
EndIndex: int64(blockSize),
Id: base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(h.CacheObj.BlockIdLength)),
Data: data1,
}
blk1.Flags.Set(common.DirtyBlock)
blk2 := &common.Block{
StartIndex: int64(blockSize),
EndIndex: 2 * int64(blockSize),
Id: base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(h.CacheObj.BlockIdLength)),
}
blk2.Flags.Set(common.DirtyBlock)
blk2.Flags.Set(common.TruncatedBlock)
blk3 := &common.Block{
StartIndex: 2 * int64(blockSize),
EndIndex: 3 * int64(blockSize),
Id: base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(h.CacheObj.BlockIdLength)),
}
blk3.Flags.Set(common.DirtyBlock)
blk3.Flags.Set(common.TruncatedBlock)
h.CacheObj.BlockOffsetList.BlockList = append(h.CacheObj.BlockOffsetList.BlockList, blk1, blk2, blk3)
bol.Flags.Clear(common.SmallFile)
err := s.az.FlushFile(internal.FlushFileOptions{Handle: h})
s.assert.Nil(err)
output, err := s.az.ReadFile(internal.ReadFileOptions{Handle: h})
s.assert.Nil(err)
data := make([]byte, blockSize)
s.assert.EqualValues(blk1.Data, output[0:blockSize])
s.assert.EqualValues(data, output[blockSize:2*blockSize])
s.assert.EqualValues(data, output[2*blockSize:3*blockSize])
}
func (s *datalakeTestSuite) TestFlushFileAppendAndTruncateBlocksChunkedFile() {
defer s.cleanupTest()
// Setup
name := generateFileName()
blockSize := 7 * MB
fileSize := 16 * MB
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
data := make([]byte, fileSize)
rand.Read(data)
// use our method to make the max upload size (size before a blob is broken down to blocks) to 4 Bytes
err := uploadReaderAtToBlockBlob(ctx, bytes.NewReader(data), int64(len(data)), 4,
s.az.storage.(*Datalake).BlockBlob.Container.NewBlockBlobClient(name), &blockblob.UploadBufferOptions{
BlockSize: int64(blockSize),
})
s.assert.Nil(err)
bol, _ := s.az.GetFileBlockOffsets(internal.GetFileBlockOffsetsOptions{Name: name})
handlemap.CreateCacheObject(int64(16*MB), h)
h.CacheObj.BlockOffsetList = bol
h.CacheObj.BlockIdLength = 16
data1 := make([]byte, blockSize)
rand.Read(data1)
blk1 := &common.Block{
StartIndex: int64(fileSize),
EndIndex: int64(fileSize + blockSize),
Id: base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(h.CacheObj.BlockIdLength)),
Data: data1,
}
blk1.Flags.Set(common.DirtyBlock)
blk2 := &common.Block{
StartIndex: int64(fileSize + blockSize),
EndIndex: int64(fileSize + 2*blockSize),
Id: base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(h.CacheObj.BlockIdLength)),
}
blk2.Flags.Set(common.DirtyBlock)
blk2.Flags.Set(common.TruncatedBlock)
blk3 := &common.Block{
StartIndex: int64(fileSize + 2*blockSize),
EndIndex: int64(fileSize + 3*blockSize),
Id: base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(h.CacheObj.BlockIdLength)),
}
blk3.Flags.Set(common.DirtyBlock)
blk3.Flags.Set(common.TruncatedBlock)
h.CacheObj.BlockOffsetList.BlockList = append(h.CacheObj.BlockOffsetList.BlockList, blk1, blk2, blk3)
bol.Flags.Clear(common.SmallFile)
err = s.az.FlushFile(internal.FlushFileOptions{Handle: h})
s.assert.Nil(err)
// file should be empty
output, err := s.az.ReadFile(internal.ReadFileOptions{Handle: h})
s.assert.Nil(err)
s.assert.EqualValues(data, output[:fileSize])
emptyData := make([]byte, blockSize)
s.assert.EqualValues(blk1.Data, output[fileSize:fileSize+blockSize])
s.assert.EqualValues(emptyData, output[fileSize+blockSize:fileSize+2*blockSize])
s.assert.EqualValues(emptyData, output[fileSize+2*blockSize:fileSize+3*blockSize])
}
func (s *datalakeTestSuite) TestUpdateConfig() {
defer s.cleanupTest()
s.az.storage.UpdateConfig(AzStorageConfig{
blockSize: 7 * MB,
maxConcurrency: 4,
defaultTier: to.Ptr(blob.AccessTierArchive),
ignoreAccessModifiers: true,
})
s.assert.EqualValues(7*MB, s.az.storage.(*Datalake).Config.blockSize)
s.assert.EqualValues(4, s.az.storage.(*Datalake).Config.maxConcurrency)
s.assert.EqualValues(blob.AccessTierArchive, *s.az.storage.(*Datalake).Config.defaultTier)
s.assert.True(s.az.storage.(*Datalake).Config.ignoreAccessModifiers)
s.assert.EqualValues(7*MB, s.az.storage.(*Datalake).BlockBlob.Config.blockSize)
s.assert.EqualValues(4, s.az.storage.(*Datalake).BlockBlob.Config.maxConcurrency)
s.assert.EqualValues(blob.AccessTierArchive, *s.az.storage.(*Datalake).BlockBlob.Config.defaultTier)
s.assert.True(s.az.storage.(*Datalake).BlockBlob.Config.ignoreAccessModifiers)
}
func (s *datalakeTestSuite) TestDownloadWithCPKEnabled() {
defer s.cleanupTest()
s.tearDownTestHelper(false)
CPKEncryptionKey, CPKEncryptionKeySHA256 := generateCPKInfo()
config := fmt.Sprintf("azstorage:\n account-name: %s\n endpoint: https://%s.dfs.core.windows.net/\n type: adls\n account-key: %s\n mode: key\n container: %s\n cpk-enabled: true\n cpk-encryption-key: %s\n cpk-encryption-key-sha256: %s\n",
storageTestConfigurationParameters.AdlsAccount, storageTestConfigurationParameters.AdlsAccount, storageTestConfigurationParameters.AdlsKey, s.container, CPKEncryptionKey, CPKEncryptionKeySHA256)
s.setupTestHelper(config, s.container, false)
blobCPKOpt := &blob.CPKInfo{
EncryptionKey: &CPKEncryptionKey,
EncryptionKeySHA256: &CPKEncryptionKeySHA256,
EncryptionAlgorithm: to.Ptr(blob.EncryptionAlgorithmTypeAES256),
}
name := generateFileName()
s.az.CreateFile(internal.CreateFileOptions{Name: name})
testData := "test data"
data := []byte(testData)
err := uploadReaderAtToBlockBlob(
ctx, bytes.NewReader(data),
int64(len(data)),
100,
s.az.storage.(*Datalake).BlockBlob.Container.NewBlockBlobClient(name),
&blockblob.UploadBufferOptions{
CPKInfo: blobCPKOpt,
})
s.assert.Nil(err)
f, err := os.Create(name)
s.assert.Nil(err)
s.assert.NotNil(f)
err = s.az.storage.ReadToFile(name, 0, int64(len(data)), f)
s.assert.Nil(err)
fileData, err := os.ReadFile(name)
s.assert.Nil(err)
s.assert.EqualValues(data, fileData)
buf := make([]byte, len(data))
err = s.az.storage.ReadInBuffer(name, 0, int64(len(data)), buf)
s.assert.Nil(err)
s.assert.EqualValues(data, buf)
rbuf, err := s.az.storage.ReadBuffer(name, 0, int64(len(data)))
s.assert.Nil(err)
s.assert.EqualValues(data, rbuf)
_ = s.az.storage.DeleteFile(name)
_ = os.Remove(name)
}
func (s *datalakeTestSuite) TestUploadWithCPKEnabled() {
defer s.cleanupTest()
s.tearDownTestHelper(false)
CPKEncryptionKey, CPKEncryptionKeySHA256 := generateCPKInfo()
config := fmt.Sprintf("azstorage:\n account-name: %s\n endpoint: https://%s.dfs.core.windows.net/\n type: adls\n account-key: %s\n mode: key\n container: %s\n cpk-enabled: true\n cpk-encryption-key: %s\n cpk-encryption-key-sha256: %s\n",
storageTestConfigurationParameters.AdlsAccount, storageTestConfigurationParameters.AdlsAccount, storageTestConfigurationParameters.AdlsKey, s.container, CPKEncryptionKey, CPKEncryptionKeySHA256)
s.setupTestHelper(config, s.container, false)
datalakeCPKOpt := &file.CPKInfo{
EncryptionKey: &CPKEncryptionKey,
EncryptionKeySHA256: &CPKEncryptionKeySHA256,
EncryptionAlgorithm: to.Ptr(file.EncryptionAlgorithmTypeAES256),
}
name1 := generateFileName()
f, err := os.Create(name1)
s.assert.Nil(err)
s.assert.NotNil(f)
testData := "test data"
data := []byte(testData)
_, err = f.Write(data)
s.assert.Nil(err)
_, _ = f.Seek(0, 0)
err = s.az.storage.WriteFromFile(name1, nil, f)
s.assert.Nil(err)
// Blob should have updated data
fileClient := s.containerClient.NewFileClient(name1)
attr, err := s.az.storage.(*Datalake).GetAttr(name1)
s.assert.Nil(err)
s.assert.NotNil(attr)
resp, err := fileClient.DownloadStream(ctx, &file.DownloadStreamOptions{
Range: &file.HTTPRange{Offset: 0, Count: int64(len(data))},
})
s.assert.NotNil(err)
s.assert.Nil(resp.RequestID)
resp, err = fileClient.DownloadStream(ctx, &file.DownloadStreamOptions{
Range: &file.HTTPRange{Offset: 0, Count: int64(len(data))},
CPKInfo: datalakeCPKOpt,
})
s.assert.Nil(err)
s.assert.NotNil(resp.RequestID)
name2 := generateFileName()
err = s.az.storage.WriteFromBuffer(name2, nil, data)
s.assert.Nil(err)
fileClient = s.containerClient.NewFileClient(name2)
resp, err = fileClient.DownloadStream(ctx, &file.DownloadStreamOptions{
Range: &file.HTTPRange{Offset: 0, Count: int64(len(data))},
})
s.assert.NotNil(err)
s.assert.Nil(resp.RequestID)
resp, err = fileClient.DownloadStream(ctx, &file.DownloadStreamOptions{
Range: &file.HTTPRange{Offset: 0, Count: int64(len(data))},
CPKInfo: datalakeCPKOpt,
})
s.assert.Nil(err)
s.assert.NotNil(resp.RequestID)
_ = s.az.storage.DeleteFile(name1)
_ = s.az.storage.DeleteFile(name2)
_ = os.Remove(name1)
}
func getACL(dl *Datalake, name string) (string, error) {
fileClient := dl.Filesystem.NewFileClient(filepath.Join(dl.Config.prefixPath, name))
acl, err := fileClient.GetAccessControl(context.Background(), nil)
if err != nil || acl.ACL == nil {
return "", err
}
return *acl.ACL, nil
}
func (s *datalakeTestSuite) createFileWithData(name string, data []byte, mode os.FileMode) {
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
_, err := s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
s.assert.Nil(err)
err = s.az.Chmod(internal.ChmodOptions{Name: name, Mode: mode})
s.assert.Nil(err)
s.az.CloseFile(internal.CloseFileOptions{Handle: h})
s.assert.Nil(err)
}
func (s *datalakeTestSuite) TestPermissionPreservationWithoutFlag() {
defer s.cleanupTest()
name := generateFileName()
data := []byte("test data")
mode := fs.FileMode(0764)
s.createFileWithData(name, data, mode)
// Simulate file copy and permission checks
_ = os.WriteFile(name+"_local", []byte("123123"), mode)
f, err := os.OpenFile(name+"_local", os.O_RDWR, mode)
s.assert.Nil(err)
err = s.az.CopyFromFile(internal.CopyFromFileOptions{Name: name, File: f, Metadata: nil})
s.assert.Nil(err)
attr, err := s.az.GetAttr(internal.GetAttrOptions{Name: name})
s.assert.Nil(err)
s.assert.NotNil(attr)
s.assert.NotEqual(os.FileMode(0764), attr.Mode)
acl, err := getACL(s.az.storage.(*Datalake), name)
s.assert.Nil(err)
s.assert.Contains(acl, "user::rw-")
s.assert.Contains(acl, "group::r--")
s.assert.Contains(acl, "other::---")
os.Remove(name + "_local")
}
func (s *datalakeTestSuite) TestPermissionPreservationWithFlag() {
defer s.cleanupTest()
// Setup
conf := fmt.Sprintf("azstorage:\n preserve-acl: true\n account-name: %s\n endpoint: https://%s.dfs.core.windows.net/\n type: adls\n account-key: %s\n mode: key\n container: %s\n fail-unsupported-op: true",
storageTestConfigurationParameters.AdlsAccount, storageTestConfigurationParameters.AdlsAccount, storageTestConfigurationParameters.AdlsKey, s.container)
s.setupTestHelper(conf, s.container, false)
name := generateFileName()
data := []byte("test data")
mode := fs.FileMode(0764)
s.createFileWithData(name, data, mode)
// Simulate file copy and permission checks
_ = os.WriteFile(name+"_local", []byte("123123"), mode)
f, err := os.OpenFile(name+"_local", os.O_RDWR, mode)
s.assert.Nil(err)
err = s.az.CopyFromFile(internal.CopyFromFileOptions{Name: name, File: f, Metadata: nil})
s.assert.Nil(err)
attr, err := s.az.GetAttr(internal.GetAttrOptions{Name: name})
s.assert.Nil(err)
s.assert.NotNil(attr)
s.assert.Equal(os.FileMode(0764), attr.Mode)
acl, err := getACL(s.az.storage.(*Datalake), name)
s.assert.Nil(err)
s.assert.Contains(acl, "user::rwx")
s.assert.Contains(acl, "group::rw-")
s.assert.Contains(acl, "other::r--")
os.Remove(name + "_local")
}
func (s *datalakeTestSuite) TestPermissionPreservationWithCommit() {
defer s.cleanupTest()
// Setup
s.setupTestHelper("", s.container, false)
name := generateFileName()
s.createFileWithData(name, []byte("test data"), fs.FileMode(0767))
data := []byte("123123")
id := base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(16))
err := s.az.StageData(internal.StageDataOptions{
Name: name,
Id: id,
Data: data,
Offset: 0,
})
s.assert.Nil(err)
ids := []string{}
ids = append(ids, id)
err = s.az.CommitData(internal.CommitDataOptions{
Name: name,
List: ids,
BlockSize: 1,
})
s.assert.Nil(err)
attr, err := s.az.GetAttr(internal.GetAttrOptions{Name: name})
s.assert.Nil(err)
s.assert.NotNil(attr)
s.assert.EqualValues(os.FileMode(0767), attr.Mode)
acl, err := getACL(s.az.storage.(*Datalake), name)
s.assert.Nil(err)
s.assert.Contains(acl, "user::rwx")
s.assert.Contains(acl, "group::rw-")
s.assert.Contains(acl, "other::rwx")
}
// func (s *datalakeTestSuite) TestRAGRS() {
// defer s.cleanupTest()
// // Setup
// name := generateFileName()
// h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
// testData := "test data"
// data := []byte(testData)
// s.az.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data})
// h, _ = s.az.OpenFile(internal.OpenFileOptions{Name: name})
// s.az.CloseFile(internal.CloseFileOptions{Handle: h})
// // This can be flaky since it may take time to replicate the data. We could hardcode a container and file for this test
// time.Sleep(time.Second * time.Duration(10))
// s.tearDownTestHelper(false) // Don't delete the generated container.
// config := fmt.Sprintf("azstorage:\n account-name: %s\n type: adls\n account-key: %s\n mode: key\n container: %s\n endpoint: https://%s-secondary.dfs.core.windows.net\n",
// storageTestConfigurationParameters.AdlsAccount, storageTestConfigurationParameters.AdlsKey, s.container, storageTestConfigurationParameters.AdlsAccount)
// s.setupTestHelper(config, s.container, false) // Don't create a new container
// h, _ = s.az.OpenFile(internal.OpenFileOptions{Name: name})
// output, err := s.az.ReadFile(internal.ReadFileOptions{Handle: h})
// s.assert.Nil(err)
// s.assert.EqualValues(testData, output)
// s.az.CloseFile(internal.CloseFileOptions{Handle: h})
// }
// In order for 'go test' to run this suite, we need to create
// a normal test function and pass our suite to suite.Run
func TestDatalake(t *testing.T) {
suite.Run(t, new(datalakeTestSuite))
}