hdfs permission mode: add chmod support in hdfs fuse (#9)
1. use SetattrRequest in bazil.org/fuse for permmode 2. test setattr in Dir_test.go
This commit is contained in:
Родитель
c7f326ac90
Коммит
8aeb131e78
25
Dir.go
25
Dir.go
|
@ -245,3 +245,28 @@ func (this *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDir fs.
|
|||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Responds on FUSE Chmod request
|
||||
func (this *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
|
||||
// Get the filepath, so chmod in hdfs can work
|
||||
path := this.AbsolutePath()
|
||||
var err error
|
||||
|
||||
if req.Valid.Mode() {
|
||||
log.Printf("Chmod [%s] to [%d]", path, req.Mode)
|
||||
(func() {
|
||||
err = this.FileSystem.HdfsAccessor.Chmod(path, req.Mode)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
})()
|
||||
|
||||
if err != nil {
|
||||
log.Printf("Chmod failed with error: %v", err)
|
||||
} else {
|
||||
this.Attrs.Mode = req.Mode
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
|
15
Dir_test.go
15
Dir_test.go
|
@ -140,3 +140,18 @@ func TestMkdir(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
assert.Equal(t, "foo", node.(*Dir).Attrs.Name)
|
||||
}
|
||||
|
||||
// Tesing Chmod and Chown
|
||||
func TestSetattr(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
mockClock := &MockClock{}
|
||||
hdfsAccessor := NewMockHdfsAccessor(mockCtrl)
|
||||
fs, _ := NewFileSystem(hdfsAccessor, "/tmp/x", []string{"foo", "bar"}, false, NewDefaultRetryPolicy(mockClock), mockClock)
|
||||
root, _ := fs.Root()
|
||||
hdfsAccessor.EXPECT().Mkdir("/foo", os.FileMode(0757)|os.ModeDir).Return(nil)
|
||||
node, _ := root.(*Dir).Mkdir(nil, &fuse.MkdirRequest{Name: "foo", Mode: os.FileMode(0757) | os.ModeDir})
|
||||
hdfsAccessor.EXPECT().Chmod("/foo", os.FileMode(0777)).Return(nil)
|
||||
err := node.(*Dir).Setattr(nil, &fuse.SetattrRequest{Mode: os.FileMode(0777), Valid: fuse.SetattrMode}, &fuse.SetattrResponse{})
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, os.FileMode(0777), node.(*Dir).Attrs.Mode)
|
||||
}
|
||||
|
|
|
@ -107,3 +107,25 @@ func (this *FaultTolerantHdfsAccessor) Rename(oldPath string, newPath string) er
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Chmod file or directory
|
||||
func (this *FaultTolerantHdfsAccessor) Chmod(path string, mode os.FileMode) error {
|
||||
op := this.RetryPolicy.StartOperation()
|
||||
for {
|
||||
err := this.Impl.Chmod(path, mode)
|
||||
if IsSuccessOrBenignError(err) || !op.ShouldRetry("Chmod [%s] to [%d]: %s", path, mode, err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Chown file or directory
|
||||
func (this *FaultTolerantHdfsAccessor) Chown(path string, user, group string) error {
|
||||
op := this.RetryPolicy.StartOperation()
|
||||
for {
|
||||
err := this.Impl.Chown(path, user, group)
|
||||
if IsSuccessOrBenignError(err) || !op.ShouldRetry("Chown [%s] to [%s:%s]: %s", path, user, group, err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
25
File.go
25
File.go
|
@ -124,3 +124,28 @@ func (this *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
|
|||
func (this *File) InvalidateMetadataCache() {
|
||||
this.Attrs.Expires = this.FileSystem.Clock.Now().Add(-1 * time.Second)
|
||||
}
|
||||
|
||||
// Responds on FUSE Chmod request
|
||||
func (this *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
|
||||
// Get the filepath, so chmod in hdfs can work
|
||||
path := this.AbsolutePath()
|
||||
var err error
|
||||
|
||||
if req.Valid.Mode() {
|
||||
log.Printf("Chmod [%s] to [%d]", path, req.Mode)
|
||||
(func() {
|
||||
err = this.FileSystem.HdfsAccessor.Chmod(path, req.Mode)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
})()
|
||||
|
||||
if err != nil {
|
||||
log.Printf("Chmod failed with error: %v", err)
|
||||
} else {
|
||||
this.Attrs.Mode = req.Mode
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -29,6 +29,8 @@ type HdfsAccessor interface {
|
|||
Remove(path string) error // Removes a file or directory
|
||||
Rename(oldPath string, newPath string) error // Renames a file or directory
|
||||
EnsureConnected() error // Ensures HDFS accessor is connected to the HDFS name node
|
||||
Chown(path string, owner, group string) error // Changes the owner and group of the file
|
||||
Chmod(path string, mode os.FileMode) error // Changes the mode of the file
|
||||
}
|
||||
|
||||
type hdfsAccessorImpl struct {
|
||||
|
@ -299,3 +301,27 @@ func (this *hdfsAccessorImpl) Rename(oldPath string, newPath string) error {
|
|||
}
|
||||
return this.MetadataClient.Rename(oldPath, newPath)
|
||||
}
|
||||
|
||||
// Changes the mode of the file
|
||||
func (this *hdfsAccessorImpl) Chmod(path string, mode os.FileMode) error {
|
||||
this.MetadataClientMutex.Lock()
|
||||
defer this.MetadataClientMutex.Unlock()
|
||||
if this.MetadataClient == nil {
|
||||
if err := this.ConnectMetadataClient(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return this.MetadataClient.Chmod(path, mode)
|
||||
}
|
||||
|
||||
// Changes the owner and group of the file
|
||||
func (this *hdfsAccessorImpl) Chown(path string, user, group string) error {
|
||||
this.MetadataClientMutex.Lock()
|
||||
defer this.MetadataClientMutex.Unlock()
|
||||
if this.MetadataClient == nil {
|
||||
if err := this.ConnectMetadataClient(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return this.MetadataClient.Chown(path, user, group)
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче