Родитель
f07c7f7cf9
Коммит
5b2e0d31de
36
Dir_test.go
36
Dir_test.go
|
@ -18,7 +18,7 @@ func TestAttributeCaching(t *testing.T) {
|
|||
mockClock := &MockClock{}
|
||||
InitLogger(os.Stdout, os.Stdout, os.Stdout, os.Stderr)
|
||||
hdfsAccessor := NewMockHdfsAccessor(mockCtrl)
|
||||
fs, _ := NewFileSystem(hdfsAccessor, "/tmp/x", []string{"*"}, false, NewDefaultRetryPolicy(mockClock), mockClock)
|
||||
fs, _ := NewFileSystem(hdfsAccessor, "/tmp/x", []string{"*"}, false, false, NewDefaultRetryPolicy(mockClock), mockClock)
|
||||
root, _ := fs.Root()
|
||||
hdfsAccessor.EXPECT().Stat("/testDir").Return(Attrs{Name: "testDir", Mode: os.ModeDir | 0757}, nil)
|
||||
dir, err := root.(*Dir).Lookup(nil, "testDir")
|
||||
|
@ -57,14 +57,14 @@ func TestReadDirWithFiltering(t *testing.T) {
|
|||
mockCtrl := gomock.NewController(t)
|
||||
mockClock := &MockClock{}
|
||||
hdfsAccessor := NewMockHdfsAccessor(mockCtrl)
|
||||
fs, _ := NewFileSystem(hdfsAccessor, "/tmp/x", []string{"foo", "bar"}, false, NewDefaultRetryPolicy(mockClock), mockClock)
|
||||
fs, _ := NewFileSystem(hdfsAccessor, "/tmp/x", []string{"foo", "bar"}, false, false, NewDefaultRetryPolicy(mockClock), mockClock)
|
||||
root, _ := fs.Root()
|
||||
hdfsAccessor.EXPECT().ReadDir("/").Return([]Attrs{
|
||||
Attrs{Name: "quz", Mode: os.ModeDir},
|
||||
Attrs{Name: "foo", Mode: os.ModeDir},
|
||||
Attrs{Name: "bar", Mode: os.ModeDir},
|
||||
Attrs{Name: "foobar", Mode: os.ModeDir},
|
||||
Attrs{Name: "baz", Mode: os.ModeDir},
|
||||
{Name: "quz", Mode: os.ModeDir},
|
||||
{Name: "foo", Mode: os.ModeDir},
|
||||
{Name: "bar", Mode: os.ModeDir},
|
||||
{Name: "foobar", Mode: os.ModeDir},
|
||||
{Name: "baz", Mode: os.ModeDir},
|
||||
}, nil)
|
||||
dirents, err := root.(*Dir).ReadDirAll(nil)
|
||||
assert.Nil(t, err)
|
||||
|
@ -78,12 +78,12 @@ func TestReadDirWithZipExpansionDisabled(t *testing.T) {
|
|||
mockCtrl := gomock.NewController(t)
|
||||
mockClock := &MockClock{}
|
||||
hdfsAccessor := NewMockHdfsAccessor(mockCtrl)
|
||||
fs, _ := NewFileSystem(hdfsAccessor, "/tmp/x", []string{"*"}, false, NewDefaultRetryPolicy(mockClock), mockClock)
|
||||
fs, _ := NewFileSystem(hdfsAccessor, "/tmp/x", []string{"*"}, false, false, NewDefaultRetryPolicy(mockClock), mockClock)
|
||||
root, _ := fs.Root()
|
||||
hdfsAccessor.EXPECT().ReadDir("/").Return([]Attrs{
|
||||
Attrs{Name: "foo.zipx"},
|
||||
Attrs{Name: "dir.zip", Mode: os.ModeDir},
|
||||
Attrs{Name: "bar.zip"},
|
||||
{Name: "foo.zipx"},
|
||||
{Name: "dir.zip", Mode: os.ModeDir},
|
||||
{Name: "bar.zip"},
|
||||
}, nil)
|
||||
dirents, err := root.(*Dir).ReadDirAll(nil)
|
||||
assert.Nil(t, err)
|
||||
|
@ -98,12 +98,12 @@ func TestReadDirWithZipExpansionEnabled(t *testing.T) {
|
|||
mockCtrl := gomock.NewController(t)
|
||||
mockClock := &MockClock{}
|
||||
hdfsAccessor := NewMockHdfsAccessor(mockCtrl)
|
||||
fs, _ := NewFileSystem(hdfsAccessor, "/tmp/x", []string{"*"}, true, NewDefaultRetryPolicy(mockClock), mockClock)
|
||||
fs, _ := NewFileSystem(hdfsAccessor, "/tmp/x", []string{"*"}, true, false, NewDefaultRetryPolicy(mockClock), mockClock)
|
||||
root, _ := fs.Root()
|
||||
hdfsAccessor.EXPECT().ReadDir("/").Return([]Attrs{
|
||||
Attrs{Name: "foo.zipx"},
|
||||
Attrs{Name: "dir.zip", Mode: os.ModeDir},
|
||||
Attrs{Name: "bar.zip"},
|
||||
{Name: "foo.zipx"},
|
||||
{Name: "dir.zip", Mode: os.ModeDir},
|
||||
{Name: "bar.zip"},
|
||||
}, nil)
|
||||
dirents, err := root.(*Dir).ReadDirAll(nil)
|
||||
assert.Nil(t, err)
|
||||
|
@ -121,7 +121,7 @@ func TestLookupWithFiltering(t *testing.T) {
|
|||
mockCtrl := gomock.NewController(t)
|
||||
mockClock := &MockClock{}
|
||||
hdfsAccessor := NewMockHdfsAccessor(mockCtrl)
|
||||
fs, _ := NewFileSystem(hdfsAccessor, "/tmp/x", []string{"foo", "bar"}, false, NewDefaultRetryPolicy(mockClock), mockClock)
|
||||
fs, _ := NewFileSystem(hdfsAccessor, "/tmp/x", []string{"foo", "bar"}, false, false, NewDefaultRetryPolicy(mockClock), mockClock)
|
||||
root, _ := fs.Root()
|
||||
hdfsAccessor.EXPECT().Stat("/foo").Return(Attrs{Name: "foo", Mode: os.ModeDir}, nil)
|
||||
_, err := root.(*Dir).Lookup(nil, "foo")
|
||||
|
@ -135,7 +135,7 @@ func TestMkdir(t *testing.T) {
|
|||
mockCtrl := gomock.NewController(t)
|
||||
mockClock := &MockClock{}
|
||||
hdfsAccessor := NewMockHdfsAccessor(mockCtrl)
|
||||
fs, _ := NewFileSystem(hdfsAccessor, "/tmp/x", []string{"foo", "bar"}, false, NewDefaultRetryPolicy(mockClock), mockClock)
|
||||
fs, _ := NewFileSystem(hdfsAccessor, "/tmp/x", []string{"foo", "bar"}, false, false, NewDefaultRetryPolicy(mockClock), mockClock)
|
||||
root, _ := fs.Root()
|
||||
hdfsAccessor.EXPECT().Mkdir("/foo", os.FileMode(0757)|os.ModeDir).Return(nil)
|
||||
node, err := root.(*Dir).Mkdir(nil, &fuse.MkdirRequest{Name: "foo", Mode: os.FileMode(0757) | os.ModeDir})
|
||||
|
@ -148,7 +148,7 @@ func TestSetattr(t *testing.T) {
|
|||
mockCtrl := gomock.NewController(t)
|
||||
mockClock := &MockClock{}
|
||||
hdfsAccessor := NewMockHdfsAccessor(mockCtrl)
|
||||
fs, _ := NewFileSystem(hdfsAccessor, "/tmp/x", []string{"foo", "bar"}, false, NewDefaultRetryPolicy(mockClock), mockClock)
|
||||
fs, _ := NewFileSystem(hdfsAccessor, "/tmp/x", []string{"foo", "bar"}, false, false, NewDefaultRetryPolicy(mockClock), mockClock)
|
||||
root, _ := fs.Root()
|
||||
hdfsAccessor.EXPECT().Mkdir("/foo", os.FileMode(0757)|os.ModeDir).Return(nil)
|
||||
node, _ := root.(*Dir).Mkdir(nil, &fuse.MkdirRequest{Name: "foo", Mode: os.FileMode(0757) | os.ModeDir})
|
||||
|
|
|
@ -145,7 +145,7 @@ func createTestHandle(t *testing.T, mockCtrl *gomock.Controller, hdfsReader Read
|
|||
hdfsAccessor := NewMockHdfsAccessor(mockCtrl)
|
||||
hdfsAccessor.EXPECT().Stat("/test.dat").Return(Attrs{Name: "test.dat"}, nil)
|
||||
hdfsAccessor.EXPECT().OpenRead("/test.dat").Return(hdfsReader, nil)
|
||||
fs, _ := NewFileSystem(hdfsAccessor, "/tmp/x", []string{"*"}, false, NewDefaultRetryPolicy(&MockClock{}), &MockClock{})
|
||||
fs, _ := NewFileSystem(hdfsAccessor, "/tmp/x", []string{"*"}, false, false, NewDefaultRetryPolicy(&MockClock{}), &MockClock{})
|
||||
root, _ := fs.Root()
|
||||
file, _ := root.(*Dir).Lookup(nil, "test.dat")
|
||||
h, _ := file.(*File).Open(nil, &fuse.OpenRequest{Flags: fuse.OpenReadOnly}, nil)
|
||||
|
|
|
@ -14,7 +14,7 @@ func TestWriteFile(t *testing.T) {
|
|||
mockClock := &MockClock{}
|
||||
hdfsAccessor := NewMockHdfsAccessor(mockCtrl)
|
||||
fileName := "/testWriteFile_1"
|
||||
fs, _ := NewFileSystem(hdfsAccessor, "/tmp/x", []string{"*"}, false, NewDefaultRetryPolicy(mockClock), mockClock)
|
||||
fs, _ := NewFileSystem(hdfsAccessor, "/tmp/x", []string{"*"}, false, false, NewDefaultRetryPolicy(mockClock), mockClock)
|
||||
|
||||
hdfswriter := NewMockHdfsWriter(mockCtrl)
|
||||
hdfsAccessor.EXPECT().Remove(fileName).Return(nil)
|
||||
|
|
|
@ -20,6 +20,7 @@ type FileSystem struct {
|
|||
HdfsAccessor HdfsAccessor // Interface to access HDFS
|
||||
AllowedPrefixes []string // List of allowed path prefixes (only those prefixes are exposed via mountpoint)
|
||||
ExpandZips bool // Indicates whether ZIP expansion feature is enabled
|
||||
ReadOnly bool // Indicates whether mount filesystem with readonly
|
||||
Mounted bool // True if filesystem is mounted
|
||||
RetryPolicy *RetryPolicy // Retry policy
|
||||
Clock Clock // interface to get wall clock time
|
||||
|
@ -34,27 +35,43 @@ var _ fs.FS = (*FileSystem)(nil)
|
|||
var _ fs.FSStatfser = (*FileSystem)(nil)
|
||||
|
||||
// Creates an instance of mountable file system
|
||||
func NewFileSystem(hdfsAccessor HdfsAccessor, mountPoint string, allowedPrefixes []string, expandZips bool, retryPolicy *RetryPolicy, clock Clock) (*FileSystem, error) {
|
||||
func NewFileSystem(hdfsAccessor HdfsAccessor, mountPoint string, allowedPrefixes []string, expandZips bool, readOnly bool, retryPolicy *RetryPolicy, clock Clock) (*FileSystem, error) {
|
||||
return &FileSystem{
|
||||
HdfsAccessor: hdfsAccessor,
|
||||
MountPoint: mountPoint,
|
||||
Mounted: false,
|
||||
AllowedPrefixes: allowedPrefixes,
|
||||
ExpandZips: expandZips,
|
||||
ReadOnly: readOnly,
|
||||
RetryPolicy: retryPolicy,
|
||||
Clock: clock}, nil
|
||||
}
|
||||
|
||||
// Mounts the filesystem
|
||||
func (this *FileSystem) Mount() (*fuse.Conn, error) {
|
||||
conn, err := fuse.Mount(
|
||||
this.MountPoint,
|
||||
fuse.FSName("hdfs"),
|
||||
fuse.Subtype("hdfs"),
|
||||
fuse.VolumeName("HDFS filesystem"),
|
||||
fuse.AllowOther(),
|
||||
fuse.WritebackCache(),
|
||||
fuse.MaxReadahead(1024*64)) //TODO: make configurable
|
||||
var conn *fuse.Conn
|
||||
var err error
|
||||
//TODO: make configurable
|
||||
if this.ReadOnly {
|
||||
conn, err = fuse.Mount(
|
||||
this.MountPoint,
|
||||
fuse.FSName("hdfs"),
|
||||
fuse.Subtype("hdfs"),
|
||||
fuse.VolumeName("HDFS filesystem"),
|
||||
fuse.AllowOther(),
|
||||
fuse.WritebackCache(),
|
||||
fuse.MaxReadahead(1024*64),
|
||||
fuse.ReadOnly())
|
||||
} else {
|
||||
conn, err = fuse.Mount(
|
||||
this.MountPoint,
|
||||
fuse.FSName("hdfs"),
|
||||
fuse.Subtype("hdfs"),
|
||||
fuse.VolumeName("HDFS filesystem"),
|
||||
fuse.AllowOther(),
|
||||
fuse.WritebackCache(),
|
||||
fuse.MaxReadahead(1024*64))
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -10,14 +10,14 @@ import (
|
|||
)
|
||||
|
||||
func TestIsPathAllowedForStarPrefix(t *testing.T) {
|
||||
fs, _ := NewFileSystem(nil, "/tmp", []string{"*"}, false, NewDefaultRetryPolicy(WallClock{}), WallClock{})
|
||||
fs, _ := NewFileSystem(nil, "/tmp", []string{"*"}, false, false, NewDefaultRetryPolicy(WallClock{}), WallClock{})
|
||||
assert.True(t, fs.IsPathAllowed("/"))
|
||||
assert.True(t, fs.IsPathAllowed("/foo"))
|
||||
assert.True(t, fs.IsPathAllowed("/foo/bar"))
|
||||
}
|
||||
|
||||
func TestIsPathAllowedForMiscPrefixes(t *testing.T) {
|
||||
fs, _ := NewFileSystem(nil, "/tmp", []string{"foo", "bar", "baz/qux"}, false, NewDefaultRetryPolicy(WallClock{}), WallClock{})
|
||||
fs, _ := NewFileSystem(nil, "/tmp", []string{"foo", "bar", "baz/qux"}, false, false, NewDefaultRetryPolicy(WallClock{}), WallClock{})
|
||||
assert.True(t, fs.IsPathAllowed("/"))
|
||||
assert.True(t, fs.IsPathAllowed("/foo"))
|
||||
assert.True(t, fs.IsPathAllowed("/bar"))
|
||||
|
|
|
@ -41,7 +41,7 @@ func TestZipDirReadArchive(t *testing.T) {
|
|||
mockCtrl := gomock.NewController(t)
|
||||
mockClock := &MockClock{}
|
||||
hdfsAccessor := NewMockHdfsAccessor(mockCtrl)
|
||||
fs, _ := NewFileSystem(hdfsAccessor, "/tmp/x", []string{"*"}, true, NewDefaultRetryPolicy(mockClock), mockClock)
|
||||
fs, _ := NewFileSystem(hdfsAccessor, "/tmp/x", []string{"*"}, true, false, NewDefaultRetryPolicy(mockClock), mockClock)
|
||||
zipFile, err := os.Open(testZipPath())
|
||||
assert.Nil(t, err)
|
||||
zipFileInfo, err := zipFile.Stat()
|
||||
|
|
9
main.go
9
main.go
|
@ -35,6 +35,7 @@ func main() {
|
|||
allowedPrefixesString := flag.String("allowedPrefixes", "*", "Comma-separated list of allowed path prefixes on the remote file system, "+
|
||||
"if specified the mount point will expose access to those prefixes only")
|
||||
expandZips := flag.Bool("expandZips", false, "Enables automatic expansion of ZIP archives")
|
||||
readOnly := flag.Bool("readOnly", false, "Enables mount with readonly")
|
||||
logLevel := flag.Int("logLevel", 0, "logs to be printed. 0: only fatal/err logs; 1: +warning logs; 2: +info logs")
|
||||
|
||||
flag.Usage = Usage
|
||||
|
@ -72,7 +73,7 @@ func main() {
|
|||
}
|
||||
|
||||
// Creating the virtual file system
|
||||
fileSystem, err := NewFileSystem(ftHdfsAccessor, flag.Arg(1), allowedPrefixes, *expandZips, retryPolicy, WallClock{})
|
||||
fileSystem, err := NewFileSystem(ftHdfsAccessor, flag.Arg(1), allowedPrefixes, *expandZips, *readOnly, retryPolicy, WallClock{})
|
||||
if err != nil {
|
||||
log.Fatal("Error/NewFileSystem: ", err)
|
||||
}
|
||||
|
@ -84,9 +85,9 @@ func main() {
|
|||
log.Print("Mounted successfully")
|
||||
|
||||
// Increase the maximum number of file descriptor from 1K to 1M in Linux
|
||||
rLimit := syscall.Rlimit {
|
||||
Cur: 1024*1024,
|
||||
Max: 1024*1024}
|
||||
rLimit := syscall.Rlimit{
|
||||
Cur: 1024 * 1024,
|
||||
Max: 1024 * 1024}
|
||||
err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit)
|
||||
if err != nil {
|
||||
Error.Printf("Failed to update the maximum number of file descriptors from 1K to 1M, %v", err)
|
||||
|
|
Загрузка…
Ссылка в новой задаче